The branch, master has been updated
       via  d02909f s3: lib: messaging. Add function comments I needed to 
understand this code.
      from  eb75553 s3-printing: fix migrate printer code (bug 8618)

https://git.samba.org/?p=samba.git;a=shortlog;h=master


- Log -----------------------------------------------------------------
commit d02909f3e07bd78103367de8d74429af5e802020
Author: Jeremy Allison <[email protected]>
Date:   Wed Oct 5 10:46:13 2016 -0700

    s3: lib: messaging. Add function comments I needed to understand this code.
    
    Signed-off-by: Jeremy Allison <[email protected]>
    Reviewed-by: Volker Lendecke <[email protected]>
    
    Autobuild-User(master): Volker Lendecke <[email protected]>
    Autobuild-Date(master): Thu Oct  6 02:29:41 CEST 2016 on sn-devel-144

-----------------------------------------------------------------------

Summary of changes:
 source3/lib/messages_dgm.c | 125 +++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 125 insertions(+)


Changeset truncated at 500 lines:

diff --git a/source3/lib/messages_dgm.c b/source3/lib/messages_dgm.c
index 39b779b..6bdd589 100644
--- a/source3/lib/messages_dgm.c
+++ b/source3/lib/messages_dgm.c
@@ -141,6 +141,11 @@ static void close_fd_array(int *fds, size_t num_fds)
        }
 }
 
+/*
+ * The idle handler can free the struct messaging_dgm_out *,
+ * if it's unused (qlen of zero) which closes the socket.
+ */
+
 static void messaging_dgm_out_idle_handler(struct tevent_context *ev,
                                           struct tevent_timer *te,
                                           struct timeval current_time,
@@ -158,6 +163,11 @@ static void messaging_dgm_out_idle_handler(struct 
tevent_context *ev,
        }
 }
 
+/*
+ * Setup the idle handler to fire afer 1 second if the
+ * queue is zero.
+ */
+
 static void messaging_dgm_out_rearm_idle_timer(struct messaging_dgm_out *out)
 {
        size_t qlen;
@@ -189,6 +199,11 @@ static void messaging_dgm_out_idle_handler(struct 
tevent_context *ev,
                                           struct timeval current_time,
                                           void *private_data);
 
+/*
+ * Connect to an existing rendezvous point for another
+ * pid - wrapped inside a struct messaging_dgm_out *.
+ */
+
 static int messaging_dgm_out_create(TALLOC_CTX *mem_ctx,
                                    struct messaging_dgm_context *ctx,
                                    pid_t pid, struct messaging_dgm_out **pout)
@@ -277,6 +292,12 @@ static int messaging_dgm_out_destructor(struct 
messaging_dgm_out *out)
        return 0;
 }
 
+/*
+ * Find the struct messaging_dgm_out * to talk to pid.
+ * If we don't have one, create it. Set the timer to
+ * delete after 1 sec.
+ */
+
 static int messaging_dgm_out_get(struct messaging_dgm_context *ctx, pid_t pid,
                                 struct messaging_dgm_out **pout)
 {
@@ -302,6 +323,13 @@ static int messaging_dgm_out_get(struct 
messaging_dgm_context *ctx, pid_t pid,
        return 0;
 }
 
+/*
+ * This function is called directly to send a message fragment
+ * when the outgoing queue is zero, and from a pthreadpool
+ * job thread when messages are being queued (qlen != 0).
+ * Make sure *ONLY* thread-safe functions are called within.
+ */
+
 static ssize_t messaging_dgm_sendmsg(int sock,
                                     const struct iovec *iov, int iovlen,
                                     const int *fds, size_t num_fds,
@@ -365,6 +393,13 @@ static void messaging_dgm_out_queue_trigger(struct 
tevent_req *req,
 static void messaging_dgm_out_threaded_job(void *private_data);
 static void messaging_dgm_out_queue_done(struct tevent_req *subreq);
 
+/*
+ * Push a message fragment onto a queue to be sent by a
+ * threadpool job. Makes copies of data/fd's to be sent.
+ * The running tevent_queue internally creates an immediate
+ * event to schedule the write.
+ */
+
 static struct tevent_req *messaging_dgm_out_queue_send(
        TALLOC_CTX *mem_ctx, struct tevent_context *ev,
        struct messaging_dgm_out *out,
@@ -467,6 +502,11 @@ static int messaging_dgm_out_queue_state_destructor(
        return 0;
 }
 
+/*
+ * tevent_queue callback that schedules the pthreadpool to actually
+ * send the queued message fragment.
+ */
+
 static void messaging_dgm_out_queue_trigger(struct tevent_req *req,
                                           void *private_data)
 {
@@ -485,6 +525,11 @@ static void messaging_dgm_out_queue_trigger(struct 
tevent_req *req,
                                req);
 }
 
+/*
+ * Wrapper function run by the pthread that calls
+ * messaging_dgm_sendmsg() to actually do the sendmsg().
+ */
+
 static void messaging_dgm_out_threaded_job(void *private_data)
 {
        struct messaging_dgm_out_queue_state *state = talloc_get_type_abort(
@@ -498,6 +543,10 @@ static void messaging_dgm_out_threaded_job(void 
*private_data)
                                            state->fds, num_fds, &state->err);
 }
 
+/*
+ * Pickup the results of the pthread sendmsg().
+ */
+
 static void messaging_dgm_out_queue_done(struct tevent_req *subreq)
 {
        struct tevent_req *req = tevent_req_callback_data(
@@ -532,6 +581,14 @@ static int messaging_dgm_out_queue_recv(struct tevent_req 
*req)
 
 static void messaging_dgm_out_sent_fragment(struct tevent_req *req);
 
+/*
+ * Core function to send a message fragment given a
+ * connected struct messaging_dgm_out * destination.
+ * If no current queue tries to send nonblocking
+ * directly. If not, queues the fragment (which makes
+ * a copy of it) and adds a 60-second timeout on the send.
+ */
+
 static int messaging_dgm_out_send_fragment(
        struct tevent_context *ev, struct messaging_dgm_out *out,
        const struct iovec *iov, int iovlen, const int *fds, size_t num_fds)
@@ -581,6 +638,11 @@ static int messaging_dgm_out_send_fragment(
        return 0;
 }
 
+/*
+ * Pickup the result of the fragment send. Reset idle timer
+ * if queue empty.
+ */
+
 static void messaging_dgm_out_sent_fragment(struct tevent_req *req)
 {
        struct messaging_dgm_out *out = tevent_req_callback_data(
@@ -605,6 +667,33 @@ struct messaging_dgm_fragment_hdr {
        int sock;
 };
 
+/*
+ * Fragment a message into MESSAGING_DGM_FRAGMENT_LENGTH - 64-bit cookie
+ * size chunks and send it.
+ *
+ * Message fragments are prefixed by a 64-bit cookie that
+ * stays the same for all fragments. This allows the receiver
+ * to recognise fragments of the same message and re-assemble
+ * them on the other end.
+ *
+ * Note that this allows other message fragments from other
+ * senders to be interleaved in the receive read processing,
+ * the combination of the cookie and header info allows unique
+ * identification of the message from a specific sender in
+ * re-assembly.
+ *
+ * If the message is smaller than MESSAGING_DGM_FRAGMENT_LENGTH - cookie
+ * then send a single message with cookie set to zero.
+ *
+ * Otherwise the message is fragmented into chunks and added
+ * to the sending queue. Any file descriptors are passed only
+ * in the last fragment.
+ *
+ * Finally the cookie is incremented (wrap over zero) to
+ * prepare for the next message sent to this channel.
+ *
+ */
+
 static int messaging_dgm_out_send_fragmented(struct tevent_context *ev,
                                             struct messaging_dgm_out *out,
                                             const struct iovec *iov,
@@ -837,6 +926,12 @@ static void messaging_dgm_read_handler(struct 
tevent_context *ev,
                                       uint16_t flags,
                                       void *private_data);
 
+/*
+ * Create the rendezvous point in the file system
+ * that other processes can use to send messages to
+ * this pid.
+ */
+
 int messaging_dgm_init(struct tevent_context *ev,
                       uint64_t *punique,
                       const char *socket_dir,
@@ -948,6 +1043,11 @@ fail_nomem:
        return ENOMEM;
 }
 
+/*
+ * Remove the rendezvous point in the filesystem
+ * if we're the owner.
+ */
+
 static int messaging_dgm_context_destructor(struct messaging_dgm_context *c)
 {
        while (c->outsocks != NULL) {
@@ -1004,6 +1104,11 @@ static void messaging_dgm_recv(struct 
messaging_dgm_context *ctx,
                               uint8_t *msg, size_t msg_len,
                               int *fds, size_t num_fds);
 
+/*
+ * Raw read callback handler - passes to messaging_dgm_recv()
+ * for fragment reassembly processing.
+ */
+
 static void messaging_dgm_read_handler(struct tevent_context *ev,
                                       struct tevent_fd *fde,
                                       uint16_t flags,
@@ -1078,6 +1183,12 @@ static int messaging_dgm_in_msg_destructor(struct 
messaging_dgm_in_msg *m)
        return 0;
 }
 
+/*
+ * Deal with identification of fragmented messages and
+ * re-assembly into full messages sent, then calls the
+ * callback.
+ */
+
 static void messaging_dgm_recv(struct messaging_dgm_context *ctx,
                               struct tevent_context *ev,
                               uint8_t *buf, size_t buflen,
@@ -1387,6 +1498,20 @@ static int messaging_dgm_fde_ev_destructor(struct 
messaging_dgm_fde_ev *fde_ev)
        return 0;
 }
 
+/*
+ * Reference counter for a struct tevent_fd messaging read event
+ * (with callback function) on a struct tevent_context registered
+ * on a messaging context.
+ *
+ * If we've already registered this struct tevent_context before
+ * (so already have a read event), just increase the reference count.
+ *
+ * Otherwise create a new struct tevent_fd messaging read event on the
+ * previously unseen struct tevent_context - this is what drives
+ * the message receive processing.
+ *
+ */
+
 struct messaging_dgm_fde *messaging_dgm_register_tevent_context(
        TALLOC_CTX *mem_ctx, struct tevent_context *ev)
 {


-- 
Samba Shared Repository

Reply via email to