On Sun, 19 Aug 2012 11:53:38 -0700
Richard Sharpe <[email protected]> wrote:

Looks like a great start! Comments inline below:

> diff --git a/fs/cifs/cifsglob.h b/fs/cifs/cifsglob.h
> index 54ec716..9530faf 100644
> --- a/fs/cifs/cifsglob.h
> +++ b/fs/cifs/cifsglob.h
> @@ -471,6 +471,16 @@ inc_rfc1001_len(void *buf, int count)
>       be32_add_cpu((__be32 *)buf, count);
>  }
>  
> +/*
> + * States for the read socket work function
> + */
> +enum cifs_sock_read_states {
> +     CIFS_READING_PDU_HEADER = 0,
> +     CIFS_READING_SMB_HEADER,
> +     CIFS_READING_SMB_BODY,
> +     CIFS_READING_JUNK
> +};
> +
>  struct TCP_Server_Info {
>       struct list_head tcp_ses_list;
>       struct list_head smb_ses_list;
> @@ -555,6 +565,32 @@ struct TCP_Server_Info {
>       unsigned int    max_read;
>       unsigned int    max_write;
>  #endif /* CONFIG_CIFS_SMB2 */
> +
> +     /* The following should be split out probably */
> +     int use_demux_thread;

This should probbaly be a bool. BTW: what determines whether you'll use
the demux thread or not? Module option? That might be a reasonable way
to add this code in parallel to the existing code for testing.

> +     struct work_struct sock_read_work; /* How reads are processed */
> +     struct delayed_work sock_read_delayed_work; /* When we need to delay */
> +     struct delayed_work reconnect_work;  /* When we need to reconnect */
> +     /*
> +      * Saved socket callbacks
> +      */ 
> +     void (*initial_cifs_data_ready)(struct sock *, int);
> +     void (*initial_cifs_state_change)(struct sock *);
> +     void (*initial_cifs_write_space)(struct sock *);
> +     void (*initial_cifs_error_report)(struct sock *);
> +
> +     /*
> +      * It is not clear that we need this ...
> +      */
> +     spinlock_t conn_read_lock; /* Protects the next few vars */
> +
> +     /*
> +      * These handle the processing of received SMBs and keeping track of
> +      * how much data we have read ...
> +      */
> +     unsigned int tcp_offset;   /* How far into an SMB we are */
> +     unsigned int smb_reclen;
> +     enum cifs_sock_read_states sock_read_state;
>  };
>  

I would think that you'd want to put these fields into a separate
struct that hangs off of a list on the TCP_Server_Info? Or, are you
planning to somehow have multiple TCP_Server_Info structs acting as
parents to the smb_ses objects?

>  static inline unsigned int
> diff --git a/fs/cifs/connect.c b/fs/cifs/connect.c
> index 549409b..06a70be 100644
> --- a/fs/cifs/connect.c
> +++ b/fs/cifs/connect.c
> @@ -62,6 +62,9 @@ extern mempool_t *cifs_req_poolp;
>  #define TLINK_ERROR_EXPIRE   (1 * HZ)
>  #define TLINK_IDLE_EXPIRE    (600 * HZ)
>  
> +/* How long we delay when there is no memory */
> +#define READ_NO_MEMORY_DELAY    (1 * HZ)
> +
>  enum {
>  
>       /* Mount options that take no arguments */
> @@ -422,6 +425,46 @@ requeue_echo:
>       queue_delayed_work(cifsiod_wq, &server->echo, SMB_ECHO_INTERVAL);
>  }
>  
> +/*
> + * An allocate buffers routine for work queues. We cannot call sleep in a 
> + * work queue, so we return false immediately, and the work item must 
> + * reschedule itself, delayed.
> + */

You absolutely _can_ sleep on a workqueue. One of the main reasons for
deferring work to a workqueue from an interrupt handler is to do things
that require you to sleep.

> +static bool
> +work_allocate_buffers(struct TCP_Server_Info *server)
> +{
> +     if (!server->bigbuf) {
> +             server->bigbuf = (char *)cifs_buf_get();


...and you're possibly sleeping here in any case. cifs_buf_get can
sleep since it does a slab allocation with GFP_NOFS. That includes the
__GFP_WAIT flag which allows the page allocator to sleep while trying
to free memory.

> +             if (!server->bigbuf) {
> +                     printk(KERN_INFO "%s:no memory for large buf for %p", 
> +                             __func__, server);
> +                     cERROR(1, "No memory for large SMB response");
> +                     /* retry will check if exiting */
> +                     return false;
> +             }
> +     } else if (server->large_buf) {
> +             /* we are reusing a dirty large buf, clear its start */
> +             memset(server->bigbuf, 0, HEADER_SIZE(server));
> +     }
> +
> +     if (!server->smallbuf) {
> +             server->smallbuf = (char *)cifs_small_buf_get();
> +             if (!server->smallbuf) {
> +                     printk(KERN_INFO "%s:no memory for small buf for %p\n",
> +                             __func__, server);
> +                     cERROR(1, "No memory for small SMB response");
> +                     /* retry will check if exiting */
> +                     return false;
> +             }
> +             /* beginning of smb buffer is cleared in our buf_get */
> +     } else {
> +             /* if existing small buf clear beginning */
> +             memset(server->smallbuf, 0, HEADER_SIZE(server));
> +     }
> +
> +     return true;
> +}
> +
>  static bool
>  allocate_buffers(struct TCP_Server_Info *server)
>  {
> @@ -2166,25 +2209,27 @@ cifs_get_tcp_session(struct smb_vol *volume_info)
>               goto out_err_crypto_release;
>       }
>  
> +     if (tcp_ses->use_demux_thread) {
> +             tcp_ses->tsk = kthread_run(cifs_demultiplex_thread,
> +                                       tcp_ses, "cifsd");
> +             if (IS_ERR(tcp_ses->tsk)) {
> +                     rc = PTR_ERR(tcp_ses->tsk);
> +                     cERROR(1, "error %d create cifsd thread", rc);
> +                     goto out_err_crypto_release;
> +             }
> +             tcp_ses->tcpStatus = CifsNeedNegotiate;
> +
> +             /* thread spawned, put it on the list */
> +             spin_lock(&cifs_tcp_ses_lock);
> +             list_add(&tcp_ses->tcp_ses_list, &cifs_tcp_ses_list);
> +             spin_unlock(&cifs_tcp_ses_lock);
> +     }
> +
>       /*
>        * since we're in a cifs function already, we know that
>        * this will succeed. No need for try_module_get().
>        */
>       __module_get(THIS_MODULE);
> -     tcp_ses->tsk = kthread_run(cifs_demultiplex_thread,
> -                               tcp_ses, "cifsd");
> -     if (IS_ERR(tcp_ses->tsk)) {
> -             rc = PTR_ERR(tcp_ses->tsk);
> -             cERROR(1, "error %d create cifsd thread", rc);
> -             module_put(THIS_MODULE);
> -             goto out_err_crypto_release;
> -     }
> -     tcp_ses->tcpStatus = CifsNeedNegotiate;
> -
> -     /* thread spawned, put it on the list */
> -     spin_lock(&cifs_tcp_ses_lock);
> -     list_add(&tcp_ses->tcp_ses_list, &cifs_tcp_ses_list);
> -     spin_unlock(&cifs_tcp_ses_lock);
>  
>       cifs_fscache_get_client_cookie(tcp_ses);
>  
> @@ -2959,6 +3004,332 @@ ip_rfc1001_connect(struct TCP_Server_Info *server)
>       return rc;
>  }
>  
> +/*
> + * Must be holding the lock
> + */
> +static void
> +save_old_callbacks(struct TCP_Server_Info *server, struct sock *sk)
> +{
> +     server->initial_cifs_state_change = sk->sk_state_change;
> +     server->initial_cifs_data_ready   = sk->sk_data_ready;
> +     server->initial_cifs_write_space  = sk->sk_write_space;
> +     server->initial_cifs_error_report = sk->sk_error_report;
> +}
> +
> +static void
> +restore_old_callbacks(struct TCP_Server_Info *server, struct sock *sk)
> +{
> +     sk->sk_state_change = server->initial_cifs_state_change;
> +     sk->sk_data_ready   = server->initial_cifs_data_ready;
> +     sk->sk_write_space  = server->initial_cifs_write_space;
> +     sk->sk_error_report = server->initial_cifs_error_report;
> +}
> +
> +/*
> + * Handle the arrival of data ... we defer it all to the workqueue
> + */
> +static void
> +cifs_data_ready(struct sock *sk, int bytes)
> +{
> +     struct TCP_Server_Info *server;
> +
> +     read_lock(&sk->sk_callback_lock);
> +     
> +     server = sk->sk_user_data;
> +     if (!server || !queue_work(cifsiod_wq, &server->sock_read_work)) {
> +             printk(KERN_INFO "%s: Unable to queue incoming data work on 
> socket\n", __func__);
> +             printk(KERN_INFO "server: %p\n", server);
> +     }
> +
> +     read_unlock(&sk->sk_callback_lock);

You probably need to think about the object lifetime here. What
guarantees that the "server" struct still exists when the work actually
gets around to running?

> +}
> +
> +/*
> + * Handle the state change calls. Drive the state machine and handle 
> disconnect
> + */
> +static void
> +cifs_state_change(struct sock *sk)
> +{
> +     struct TCP_Server_Info *server;
> +
> +     read_lock(&sk->sk_callback_lock);
> +
> +     server = sk->sk_user_data;
> +     if (!server) {
> +             printk(KERN_INFO "%s: sk_user_data bad (%p)!", __func__, 
> server);
> +             goto out;
> +     }
> +
> +     printk(KERN_INFO "%s: Processing socket states for %p", __func__, 
> server);
> +
> +     switch (sk->sk_state) {
> +     case TCP_ESTABLISHED:  /* We are connected */
> +             server->tcp_offset = 0;
> +             server->smb_reclen = 0;
> +             server->sock_read_state = CIFS_READING_PDU_HEADER;
> +
> +             break;
> +     case TCP_FIN_WAIT1:
> +             /* Client shutdown ... reconnect? How many times */
> +             printk(KERN_INFO "%s: We sent shutdown, cleaning up\n",
> +                     __func__);
> +             break;
> +     case TCP_CLOSE_WAIT:
> +             /* Server shutdown ... clean up? */
> +             printk(KERN_INFO "%s: Server shutdown, reconnecting\n",
> +                     __func__);
> +             break;
> +     case TCP_SYN_SENT:
> +
> +             break;
> +     case TCP_CLOSING:
> +
> +             break;
> +
> +     case TCP_LAST_ACK:
> +
> +             break;
> +
> +     case TCP_CLOSE:
> +
> +             break;
> +     }
> +out:
> +     read_unlock(&sk->sk_callback_lock);     
> +}
> +
> +/*
> + * The socket READ worker work main function. 
> + *
> + * We will get called when there is data on the socket. We expect that there
> + * will be a full PATH_MTU segment worth of data although there might not be
> + * if the server is nasty. We must keep state about where we are up to 
> because
> + * we might have to leave off after reading all the data off the socket 
> without
> + * having processed a complete SMB. This is because we are executing in the
> + * context of a work queue and we cannot block because other work queue items
> + * need to be processed.
> + *
> + * States are: CIFS_READING_PDU_HEADER,  we are reading the 4-byte-header
> + *          CIFS_READING_SMB_HEADER,  we are reading the SMB1/2 header 
> + *          CIFS_READING_SMB_BODY,    we are reading the SMB body
> + *          CIFS_READING_JUNK,        we are reading junk to be ditched.
> + */
> +static void
> +cifs_read_worker_main(struct TCP_Server_Info *server)
> +{
> +     struct msghdr msg;
> +     struct kvec iov;
> +     int result;
> +
> +     if (!work_allocate_buffers(server)) {
> +             printk(KERN_INFO "%s: Delaying because of buffer issues\n",
> +                     __func__);
> +
> +             /*
> +              * Is there an issue here that it could be queued twice?
> +              */
> +             queue_delayed_work(cifsiod_wq, 
> +                             &server->sock_read_delayed_work,
> +                             READ_NO_MEMORY_DELAY);
> +             return;  /* Nothing more to do here ... */
> +     }
> +
> +     /*
> +      * We have memory and data on the socket, deal with it. This might
> +      * require multiple calls to kernel_recvmsg ... there should be data
> +      * there, though.
> +      */
> +     while (1) {
> +             switch (server->sock_read_state) {
> +             case CIFS_READING_PDU_HEADER:
> +                     iov.iov_base = server->smallbuf + server->tcp_offset;
> +                     iov.iov_len = 4 - server->tcp_offset;
> +                     msg.msg_name = NULL;
> +                     msg.msg_namelen = 0;
> +                     msg.msg_control = NULL;
> +                     msg.msg_controllen = 0;
> +                     msg.msg_flags = MSG_DONTWAIT;
> +
> +                     result = kernel_recvmsg(server->ssocket, 
> +                                             &msg, 
> +                                             &iov, 
> +                                             1, 
> +                                             iov.iov_len,
> +                                             msg.msg_flags);
> +
> +                     /*
> +                      * Deal with errors ...
> +                      */
> +                     if (result < 0) {
> +                             if (result == -EAGAIN) {
> +                                     printk(KERN_INFO "%s: We got EAGAIN 
> ...\n",
> +                                             __func__);
> +                                     return;
> +                             }
> +                             /*
> +                             * Other errors probably require reconnect.
> +                             * Should we handle the response?
> +                             */
> +                             kernel_sock_shutdown(server->ssocket,
> +                                             SHUT_WR);
> +                             queue_delayed_work(cifsiod_wq,
> +                                             &server->reconnect_work,
> +                                             (1 * HZ));
> +                             return;
> +                     }
> +
> +                     server->tcp_offset += result;
> +
> +                     /*
> +                      * Do we have the PDU header? If so, check it out and
> +                      * update our state etc. FIXME: Extract this into
> +                      * its own function.
> +                      */
> +                     if (server->tcp_offset == 4) {
> +                             server->smb_reclen = 
> +                                     get_rfc1002_length(server->smallbuf);
> +
> +                             switch((unsigned char)server->smallbuf[0]) {
> +                             case RFC1002_SESSION_MESSAGE:
> +                                     server->sock_read_state = 
> +                                             CIFS_READING_SMB_HEADER;
> +                                     break;
> +                             case RFC1002_SESSION_KEEP_ALIVE:
> +                                     printk(KERN_INFO 
> +                                             "%s: Rcvd session keep alive\n",
> +                                             __func__);
> +                                     /* We simply stay in this state */
> +                                     server->tcp_offset = 0;
> +                                     break;
> +                             case RFC1002_POSITIVE_SESSION_RESPONSE:
> +                                     printk(KERN_INFO
> +                                             "%s: Rcvd positive response\n",
> +                                             __func__);
> +                                     /* We simply stay in this state */
> +                                     server->tcp_offset = 0;
> +                                     break;
> +                             case RFC1002_NEGATIVE_SESSION_RESPONSE:
> +                                     printk(KERN_INFO
> +                                             "%s: Rcvd negative response\n",
> +                                             __func__);
> +                                     kernel_sock_shutdown(server->ssocket,
> +                                                     SHUT_WR);
> +                                     /* Reconnect in one second */
> +                                     queue_delayed_work(cifsiod_wq,
> +                                                     &server->reconnect_work,
> +                                                     (1 * HZ));
> +                                     return;
> +                             default:
> +                                     printk(KERN_INFO 
> +                                             "%s: Unknown RFC1002 response"
> +                                             " type 0x%x\n",
> +                                             __func__,
> +                                             server->smallbuf[0]);
> +                                     kernel_sock_shutdown(server->ssocket,
> +                                                     SHUT_WR);
> +                                     queue_delayed_work(cifsiod_wq,
> +                                                     &server->reconnect_work,
> +                                                     (1 * HZ));
> +                                     return;
> +                             }
> +
> +                     }
> +
> +                     break;
> +
> +             case CIFS_READING_SMB_HEADER:
> +                     /* Process the header. Figure out how long it is */
> +                     /* We take up to and including the mid for SMB1 
> +                      * and the whole header for SMB2 ... */
> +
> +                     /* We know how long this SMB is, is it long enough? */
> +                     if (server->smb_reclen < HEADER_SIZE(server)) {
> +                             kernel_sock_shutdown(server->ssocket, SHUT_WR);
> +                             queue_delayed_work(cifsiod_wq,
> +                                             &server->reconnect_work,
> +                                             (1 *HZ));
> +                             return;  /* Nothing more to do ... */
> +                     }
> +
> +                     iov.iov_base = server->smallbuf + server->tcp_offset;
> +                     /*
> +                      * tcp_offset includes the PDU header. We could add it
> +                      * to the length or subtract it from tcp_offset.
> +                      */
> +                     iov.iov_len = (HEADER_SIZE(server) + 4) - 
> +                                     server->tcp_offset;
> +                     msg.msg_name = NULL;
> +                     msg.msg_namelen = 0;
> +                     msg.msg_control = NULL;
> +                     msg.msg_controllen = 0;
> +                     msg.msg_flags = MSG_DONTWAIT;
> +
> +                     result = kernel_recvmsg(server->ssocket, 
> +                                             &msg, 
> +                                             &iov, 
> +                                             1, 
> +                                             iov.iov_len,
> +                                             msg.msg_flags);
> +                     if (result < 0) {
> +
> +
> +                     }
> +
> +                     server->tcp_offset += result;
> +
> +                     /*
> +                      * Have we got the header? If so, figure out what to
> +                      * do next
> +                      */
> +                     if (server->tcp_offset == (HEADER_SIZE(server) + 4)) {
> +
> +                     }
> +
> +                     break;
> +
> +             case CIFS_READING_SMB_BODY:
> +
> +                     break;
> +
> +             case CIFS_READING_JUNK:
> +
> +                     break;
> +
> +             default:
> +
> +                     break;
> +             }
> +     }
> +}
> +

Big function, might be nice to break that up a bit.

> +/*
> + * Call cifs_read_worker_main ... 
> + */
> +static void
> +cifs_read_worker(struct work_struct *work)
> +{
> +     struct TCP_Server_Info *server =
> +             container_of(work, struct TCP_Server_Info, sock_read_work);
> +
> +     cifs_read_worker_main(server);
> +}
> +
> +/*
> + * The reconnect worker function.
> + */
> +static void
> +cifs_reconnect_worker(struct work_struct *work)
> +{
> +     struct TCP_Server_Info *server =
> +             container_of(work, struct TCP_Server_Info, reconnect_work.work);
> +
> +     /*
> +      * Initiate reconnect processing ... do we need to shutdown the
> +      * existing socket and the reconnect?
> +      */
> +
> +}
> +
>  static int
>  generic_ip_connect(struct TCP_Server_Info *server)
>  {
> @@ -2967,6 +3338,7 @@ generic_ip_connect(struct TCP_Server_Info *server)
>       int slen, sfamily;
>       struct socket *socket = server->ssocket;
>       struct sockaddr *saddr;
> +     struct sock *sk;
>  
>       saddr = (struct sockaddr *) &server->dstaddr;
>  
> @@ -3031,6 +3403,32 @@ generic_ip_connect(struct TCP_Server_Info *server)
>                socket->sk->sk_sndbuf,
>                socket->sk->sk_rcvbuf, socket->sk->sk_rcvtimeo);
>  
> +     if (!server->use_demux_thread) {
> +             /*
> +              * Save the old socket callbacks and establish our own.
> +              */
> +             sk = socket->sk;
> +             write_lock_bh(&sk->sk_callback_lock);
> +             save_old_callbacks(server, sk);
> +             sk->sk_user_data = server;
> +
> +             /* We only change two callbacks for the moment */
> +             sk->sk_data_ready = cifs_data_ready;
> +             sk->sk_state_change = cifs_state_change;
> +
> +             write_unlock_bh(&sk->sk_callback_lock);
> +
> +             INIT_WORK(&server->sock_read_work, cifs_read_worker);

What happens if you reinit the work while it's already queued on the
workqueue? It seems like that could happen here, no?

> +             /*
> +              * Can this be deferrable as well? Anyway, this is used when 
> +              * we cannot get memory, and it calls cifs_read_worker as well.
> +              */ 
> +             INIT_DELAYED_WORK(&server->sock_read_delayed_work, 
> +                             cifs_read_worker);
> +             INIT_DELAYED_WORK(&server->reconnect_work, 
> +                             cifs_reconnect_worker);
> +     }
> +
>       rc = socket->ops->connect(socket, saddr, slen, 0);
>       if (rc < 0) {
>               cFYI(1, "Error %d connecting to server", rc);


-- 
Jeff Layton <[email protected]>
--
To unsubscribe from this list: send the line "unsubscribe linux-cifs" in
the body of a message to [email protected]
More majordomo info at  http://vger.kernel.org/majordomo-info.html

Reply via email to