Hi Fabian & all,

Patch works great, thanks! I have rebased it a bit and added support
for client-connect plugin call.

I would like to offer a related feature (and implementation) I call async-push.

Use case: authentication / authorization takes time. I have auth/az
code in auth-user-pass-verify and client-connect calls, and sometimes
it takes more that second to execute those. The problem is that after
auth-user-pass-verify is done, OpenVPN server won’t proceed with
client-connect unless some timeout/io event happens for that client.
Also, server will not notify client that client-connect returned
success unless client sends PULL_REQUEST. Client, in turn, sends
PULL_REQUEST one second after connection initiation and after that
once per 5 seconds. So, for example, if at the moment when first pull
request has arrived, client-connect has not finished yet, we will have
to wait another 5 seconds for the next PULL_REQUEST.

Solution: Inotify. Since OpenVPN creates itself files (auth-contro and
client-connect-deferred) which names it passes to the plugin, we
create one inotify descriptor for event loop and right after creating
those files, we add inotify watch on those. Before calling epoll (or
whatever we use) we add inotify descriptor to the list of watched
descriptors. We also keep watch descriptor and multi_instance in a
hashtable.

When epoll informs us that an event has happened on inotify
descriptor, we get multi_instance by watch descriptor (fetched from
poll event) from our new hashtable and call multi_process_post for
given multi_instance. This will check result from the file and
eventually call multi_connection_established, from where we call
send_push_reply.

Since implementation uses Inotify, it will work on Linux only. Code is
under #define, which is set at compile-time (--enable-async-push=yes).

I have attached an implementation. So far has been working nicely in
my test environment. I would love to hear a feedback from the
community. Is the whole thing done more or less right? Any bugs got
introduced that someone could spot?

-Lev

2014-08-01 0:21 GMT+03:00 Fabian Knittel <fabian.knit...@lettink.de>:
> Hi Lev,
>
> 2014-07-29 12:56 GMT+02:00 Lev Stipakov <lstipa...@gmail.com>:
>>
>> I am pondering about asynchronous OPENVPN_PLUGIN_CLIENT_CONNECT
>> callback. Basically, I want _not_ to establish connection until
>> response is received and ofcI  don't want to block rest of traffic.
>
>
> [ Details of approach snipped. ]
>
>> What do you think about that? Does that approach sound reasonable?
>
>
> Some time ago I implemented something quite similar, but never quite managed
> to officially submit it. You can find my old git branch here [0].
> Unfortunately, to be of any use it would need to be ported to a current
> OpenVPN release / master first.
>
> The code has been in use for several years now [1], so the approach and the
> code basically work quite well. (I think my use case involved calling a
> Python script, but I might have implemented the plugin part too.)
>
> If the OpenVPN commiters see a certain chance, that such a change could be
> included upstream, I might even try to rebase the branch to master myself...
>
> Cheers
> Fabian
>
> 0:
> http://opensource.fsmi.uni-karlsruhe.de/gitweb/?p=openvpn.git;a=shortlog;h=refs/heads/feat_deferred_client-connect
> 1: ... in a production environment with several hundred users (together with
> the equally unofficial VLAN-tagging feature [2]). The feature is needed by a
> daemon that does asynchronous IP-configuration via a central DHCP server
> [3].
> 2:
> http://opensource.fsmi.uni-karlsruhe.de/gitweb/?p=openvpn.git;a=shortlog;h=refs/heads/feat_vlan
> 3: https://gitorious.org/odr



-- 
-Lev
diff --git a/configure.ac b/configure.ac
index ffba374..2c5c65d 100644
--- a/configure.ac
+++ b/configure.ac
@@ -264,6 +264,13 @@ AC_ARG_ENABLE(
 	[enable_systemd="no"]
 )
 
+AC_ARG_ENABLE(
+	[async-push],
+	[AS_HELP_STRING([--enable-async-push], [enable async-push support @<:@default=no@:>@])],
+	[enable_async_push="yes"],
+	[enable_async_push="no"]
+)
+
 AC_ARG_WITH(
 	[special-build],
 	[AS_HELP_STRING([--with-special-build=STRING], [specify special build string])],
@@ -1144,6 +1151,14 @@ if test "${enable_plugin_auth_pam}" = "yes"; then
 	fi
 fi
 
+if test "${enable_async_push}" = "yes"; then
+	AC_CHECK_HEADERS(
+		[sys/inotify.h],
+		AC_DEFINE([ENABLE_ASYNC_PUSH], [1], [Enable async push]),
+		AC_MSG_ERROR([inotify.h not found.])
+	)
+fi
+
 CONFIGURE_DEFINES="`set | grep '^enable_.*=' ; set | grep '^with_.*='`"
 AC_DEFINE_UNQUOTED([CONFIGURE_DEFINES], ["`echo ${CONFIGURE_DEFINES}`"], [Configuration settings])
 
diff --git a/src/openvpn/forward.c b/src/openvpn/forward.c
index 39f66e3..940d426 100644
--- a/src/openvpn/forward.c
+++ b/src/openvpn/forward.c
@@ -1353,6 +1353,9 @@ io_wait_dowork (struct context *c, const unsigned int flags)
 #ifdef ENABLE_MANAGEMENT
   static int management_shift = 6; /* depends on MANAGEMENT_READ and MANAGEMENT_WRITE */
 #endif
+#ifdef ENABLE_ASYNC_PUSH
+  static int file_shift = 8;
+#endif
 
   /*
    * Decide what kind of events we want to wait for.
@@ -1447,6 +1450,11 @@ io_wait_dowork (struct context *c, const unsigned int flags)
     management_socket_set (management, c->c2.event_set, (void*)&management_shift, NULL);
 #endif
 
+#ifdef ENABLE_ASYNC_PUSH
+  // configure file watcher
+  event_ctl (c->c2.event_set, c->c2.inotify_fd, EVENT_READ, (void*)&file_shift);
+#endif
+
   /*
    * Possible scenarios:
    *  (1) tcp/udp port has data available to read
diff --git a/src/openvpn/mtcp.c b/src/openvpn/mtcp.c
index dc15f09..cf2c5ff 100644
--- a/src/openvpn/mtcp.c
+++ b/src/openvpn/mtcp.c
@@ -62,6 +62,10 @@
 # define MTCP_MANAGEMENT ((void*)4)
 #endif
 
+#ifdef ENABLE_ASYNC_PUSH
+#define MTCP_FILE_CLOSE_WRITE ((void*)5)
+#endif
+
 #define MTCP_N           ((void*)16) /* upper bound on MTCP_x */
 
 struct ta_iow_flags
@@ -245,6 +249,12 @@ multi_tcp_wait (const struct context *c,
   if (management)
     management_socket_set (management, mtcp->es, MTCP_MANAGEMENT, &mtcp->management_persist_flags);
 #endif
+
+#ifdef ENABLE_ASYNC_PUSH
+  // configure file watcher
+  event_ctl (mtcp->es, c->c2.inotify_fd, EVENT_READ, MTCP_FILE_CLOSE_WRITE);
+#endif
+
   status = event_wait (mtcp->es, &c->c2.timeval, mtcp->esr, mtcp->maxevents);
   update_time ();
   mtcp->n_esr = 0;
@@ -581,6 +591,10 @@ multi_tcp_action (struct multi_context *m, struct multi_instance *mi, int action
   } while (action != TA_UNDEF);
 }
 
+#ifdef ENABLE_ASYNC_PUSH
+void multi_process_file_closed (struct multi_context *m, const unsigned int mpp_flags);
+#endif
+
 static void
 multi_tcp_process_io (struct multi_context *m)
 {
@@ -636,6 +650,12 @@ multi_tcp_process_io (struct multi_context *m)
 	    {
 	      get_signal (&m->top.sig->signal_received);
 	    }
+#ifdef ENABLE_ASYNC_PUSH
+	  else if (e->arg == MTCP_FILE_CLOSE_WRITE)
+	    {
+	      multi_process_file_closed (m, MPP_PRE_SELECT|MPP_RECORD_TOUCH);
+	    }
+#endif
 	}
       if (IS_SIG (&m->top))
 	break;
@@ -684,6 +704,13 @@ tunnel_server_tcp (struct context *top)
   /* finished with initialization */
   initialization_sequence_completed (top, ISC_SERVER); /* --mode server --proto tcp-server */
 
+#ifdef ENABLE_ASYNC_PUSH
+  multi.top.c2.inotify_fd = inotify_init();
+  if (multi.top.c2.inotify_fd < 0) {
+    msg (D_MULTI_ERRORS, "MULTI: cannot create inotify descriptor");
+  }
+#endif
+
   /* per-packet event loop */
   while (true)
     {
@@ -712,6 +739,10 @@ tunnel_server_tcp (struct context *top)
       perf_pop ();
     }
 
+#ifdef ENABLE_ASYNC_PUSH
+  close(top->c2.inotify_fd);
+#endif
+
   /* shut down management interface */
   uninit_management_callback_multi (&multi);
 
diff --git a/src/openvpn/mudp.c b/src/openvpn/mudp.c
index 3468dab..f0fcb7a 100644
--- a/src/openvpn/mudp.c
+++ b/src/openvpn/mudp.c
@@ -37,6 +37,11 @@
 
 #include "memdbg.h"
 
+#ifdef ENABLE_ASYNC_PUSH
+#include <sys/inotify.h>
+#define INOTIFY_EVENT_BUFFER_SIZE 16384
+#endif
+
 /*
  * Get a client instance based on real address.  If
  * the instance doesn't exist, create it while
@@ -121,6 +126,49 @@ multi_process_outgoing_link (struct multi_context *m, const unsigned int mpp_fla
     multi_process_outgoing_link_dowork (m, mi, mpp_flags);
 }
 
+#ifdef ENABLE_ASYNC_PUSH
+void
+multi_process_file_closed (struct multi_context *m, const unsigned int mpp_flags)
+{
+  char buffer[INOTIFY_EVENT_BUFFER_SIZE];
+  int r = read (m->top.c2.inotify_fd, buffer, INOTIFY_EVENT_BUFFER_SIZE);
+
+  size_t buffer_i = 0;
+  while (buffer_i < r)
+    {
+      // parse events
+      struct inotify_event *pevent = (struct inotify_event *) &buffer[buffer_i];
+      size_t event_size = sizeof (struct inotify_event) + pevent->len;
+      buffer_i += event_size;
+
+      msg(D_LOW, "modified fd %d, mask %d", pevent->wd, pevent->mask);
+
+      struct multi_instance* mi = hash_lookup(m->ccfiles, (void*) (unsigned long) pevent->wd);
+
+      if (pevent->mask & IN_CLOSE_WRITE)
+        {
+          if (mi)
+            {
+              // continue authentication and send push_reply
+              multi_process_post (m, mi, mpp_flags);
+            }
+          else
+            {
+              msg(D_MULTI_ERRORS, "multi instance not found!");
+            }
+        }
+      else if (pevent->mask & IN_IGNORED)
+        {
+          // this event is _always_ called when watch is removed / file deleted
+          if (mi)
+            hash_remove(m->ccfiles, (void*) (unsigned long) pevent->wd);
+        }
+      else
+        msg(D_MULTI_ERRORS, "unknown mask %d", pevent->mask);
+    }
+}
+#endif
+
 /*
  * Process an I/O event.
  */
@@ -143,6 +191,10 @@ multi_process_io_udp (struct multi_context *m)
     strcat (buf, "TR/");
   else if (status & TUN_WRITE)
     strcat (buf, "TW/");
+#ifdef ENABLE_ASYNC_PUSH
+  else if (status & FILE_CLOSED)
+    strcat (buf, "FC/");
+#endif
   printf ("IO %s\n", buf);
 #endif
 
@@ -180,6 +232,13 @@ multi_process_io_udp (struct multi_context *m)
       if (!IS_SIG (&m->top))
 	multi_process_incoming_tun (m, mpp_flags);
     }
+#ifdef ENABLE_ASYNC_PUSH
+  /* INOTIFY callback */
+  else if (status & FILE_CLOSED)
+    {
+      multi_process_file_closed(m, mpp_flags);
+    }
+#endif
 }
 
 /*
@@ -242,6 +301,13 @@ tunnel_server_udp_single_threaded (struct context *top)
   /* finished with initialization */
   initialization_sequence_completed (top, ISC_SERVER); /* --mode server --proto udp */
 
+#ifdef ENABLE_ASYNC_PUSH
+  multi.top.c2.inotify_fd = inotify_init();
+  if (multi.top.c2.inotify_fd < 0) {
+    msg (D_MULTI_ERRORS, "MULTI: cannot create inotify descriptor, errno %d", errno);
+  }
+#endif
+
   /* per-packet event loop */
   while (true)
     {
@@ -266,10 +332,14 @@ tunnel_server_udp_single_threaded (struct context *top)
 	  multi_process_io_udp (&multi);
 	  MULTI_CHECK_SIG (&multi);
 	}
-      
+
       perf_pop ();
     }
 
+#ifdef ENABLE_ASYNC_PUSH
+  close(top->c2.inotify_fd);
+#endif
+
   /* shut down management interface */
   uninit_management_callback_multi (&multi);
 
diff --git a/src/openvpn/multi.c b/src/openvpn/multi.c
index e2f11ef..a64900c 100644
--- a/src/openvpn/multi.c
+++ b/src/openvpn/multi.c
@@ -28,6 +28,10 @@
 #include "config-msvc.h"
 #endif
 
+#ifdef ENABLE_ASYNC_PUSH
+#include <sys/inotify.h>
+#endif
+
 #include "syshead.h"
 
 #if P2MP_SERVER
@@ -245,6 +249,20 @@ cid_compare_function (const void *key1, const void *key2)
 
 #endif
 
+#ifdef ENABLE_ASYNC_PUSH
+static uint32_t
+int_hash_function (const void *key, uint32_t iv)
+{
+  return (unsigned long)key;
+}
+
+static bool
+int_compare_function (const void *key1, const void *key2)
+{
+  return (unsigned long)key1 == (unsigned long)key2;
+}
+#endif
+
 /*
  * Main initialization function, init multi_context object.
  */
@@ -306,6 +324,17 @@ multi_init (struct multi_context *m, struct context *t, bool tcp_mode, int threa
 			   cid_compare_function);
 #endif
 
+#ifdef ENABLE_ASYNC_PUSH
+  /*
+   * Mapping between inotify watch descriptors and
+   * multi_instances.
+   */
+  m->ccfiles = hash_init (t->options.real_hash_size,
+                        get_random(),
+                        int_hash_function,
+                        int_compare_function);
+#endif
+
   /*
    * This is our scheduler, for time-based wakeup
    * events.
@@ -634,6 +663,11 @@ multi_uninit (struct multi_context *m)
 #endif
 	  m->hash = NULL;
 
+#ifdef ENABLE_ASYNC_PUSH
+	  hash_free (m->ccfiles);
+	  m->ccfiles = NULL;
+#endif
+
 	  schedule_free (m->schedule);
 	  mbuf_free (m->mbuf);
 	  ifconfig_pool_free (m->ifconfig_pool);
@@ -735,13 +769,15 @@ multi_create_instance (struct multi_context *m, const struct mroute_addr *real)
 void
 multi_print_status (struct multi_context *m, struct status_output *so, const int version)
 {
+  status_reset (so);
+
   if (m->hash)
     {
       struct gc_arena gc_top = gc_new ();
       struct hash_iterator hi;
       const struct hash_element *he;
 
-      status_reset (so);
+      
 
       if (version == 1) /* WAS: m->status_file_version */
 	{
@@ -910,9 +946,16 @@ multi_print_status (struct multi_context *m, struct status_output *so, const int
       }
 #endif
 
-      status_flush (so);
       gc_free (&gc_top);
     }
+
+#ifdef ENABLE_ASYNC_PUSH
+    if (m->ccfiles) {
+      status_printf(so, "ccfiles size: %d\n", hash_n_elements(m->ccfiles));
+    }
+#endif
+
+    status_flush (so);
 }
 
 /*
@@ -1629,6 +1672,7 @@ ccs_delete_deferred_ret_file (struct multi_instance *mi)
   if (ccs->deferred_ret_file)
     {
       setenv_del (mi->context.c2.es, "client_connect_deferred_file");
+      // this will trigger IN_NOTIFY, which will remove item from ccfiles
       delete_file (ccs->deferred_ret_file);
       free (ccs->deferred_ret_file);
       ccs->deferred_ret_file = NULL;
@@ -1636,7 +1680,7 @@ ccs_delete_deferred_ret_file (struct multi_instance *mi)
 }
 
 static bool
-ccs_gen_deferred_ret_file (struct multi_instance *mi)
+ccs_gen_deferred_ret_file (struct multi_context *m, struct multi_instance *mi)
 {
   struct client_connect_state *ccs = mi->client_connect_state;
   struct gc_arena gc = gc_new ();
@@ -1656,6 +1700,17 @@ ccs_gen_deferred_ret_file (struct multi_instance *mi)
   setenv_str (mi->context.c2.es, "client_connect_deferred_file",
 	      ccs->deferred_ret_file);
 
+#ifdef ENABLE_ASYNC_PUSH
+  // monitor 'close write' event
+  long wd = inotify_add_watch(m->top.c2.inotify_fd, ccs->deferred_ret_file, IN_CLOSE_WRITE | IN_ONESHOT);
+  if (wd < 0)
+    msg(M_NONFATAL, "cannot add watch, errno: %d", errno);
+  else
+    {
+      hash_add(m->ccfiles, (const uintptr_t*)wd, mi, true);
+    }
+#endif
+
   gc_free (&gc);
   return true;
 }
@@ -1821,13 +1876,13 @@ multi_client_connect_call_plugin_v1 (struct multi_context *m,
       struct argv argv = argv_new ();
 
       if (!ccs_gen_config_file (mi) ||
-	  !ccs_gen_deferred_ret_file (mi))
+	  !ccs_gen_deferred_ret_file (m, mi))
 	{
 	  ret = CC_RET_FAILED;
 	  goto script_depr_failed;
 	}
 
-      argv_printf (&argv, "%s", ccs->config_file);
+      argv_printf (&argv, "%s, %s", ccs->config_file, ccs->deferred_ret_file);
 
       plug_ret = plugin_call (mi->context.plugins,
 			      OPENVPN_PLUGIN_CLIENT_CONNECT,
@@ -1880,7 +1935,7 @@ multi_client_connect_call_plugin_v2 (struct multi_context *m,
       struct plugin_return pr;
 
       if (!ccs_gen_config_file (mi) ||
-	  !ccs_gen_deferred_ret_file (mi))
+	  !ccs_gen_deferred_ret_file (m, mi))
 	{
 	  ret = CC_RET_FAILED;
 	  goto script_depr_failed;
@@ -1935,7 +1990,7 @@ multi_client_connect_call_script (struct multi_context *m,
       setenv_str (mi->context.c2.es, "script_type", "client-connect");
 
       if (!ccs_gen_config_file (mi) ||
-	  !ccs_gen_deferred_ret_file (mi))
+	  !ccs_gen_deferred_ret_file (m, mi))
 	{
 	  ret = CC_RET_FAILED;
 	  goto script_failed;
@@ -2238,8 +2293,10 @@ multi_connection_established (struct multi_context *m, struct multi_instance *mi
 	  ret = multi_client_handle_single
 		  (client_connect_handlers[ccs->cur_handler_idx].deferred, m,
 		   mi);
-	  if (ret == CC_RET_DEFERRED)
+	  if (ret == CC_RET_DEFERRED) {
+             msg (D_LOW, "MULTI: still deferred");
 	    return;
+          }
 
 	  /* Proceed to next handler.  */
 	  ++ccs->cur_handler_idx;
@@ -2253,8 +2310,9 @@ multi_connection_established (struct multi_context *m, struct multi_instance *mi
 	{
 	  ret = multi_client_handle_single
 		  (client_connect_handlers[ccs->cur_handler_idx].main, m, mi);
-	  if (ret == CC_RET_DEFERRED)
+	  if (ret == CC_RET_DEFERRED) {
 	    return;
+          }
 
 	  /* Proceed to next handler.  */
 	  ++ccs->cur_handler_idx;
@@ -2270,6 +2328,11 @@ multi_connection_established (struct multi_context *m, struct multi_instance *mi
 
       /* set flag so we don't get called again */
       mi->connection_established_flag = true;
+
+#ifdef ENABLE_ASYNC_PUSH
+      /* authentication complete, send push reply*/
+      send_push_reply(&mi->context);
+#endif
     }
 
   /*
@@ -2438,19 +2501,44 @@ multi_process_post (struct multi_context *m, struct multi_instance *mi, const un
 
   if (!IS_SIG (&mi->context) && ((flags & MPP_PRE_SELECT) || ((flags & MPP_CONDITIONAL_PRE_SELECT) && !ANY_OUT (&mi->context))))
     {
+#ifdef ENABLE_ASYNC_PUSH
+      bool was_authenticated = false;
+      struct key_state *ks = NULL;
+      if (mi->context.c2.tls_multi)
+        {
+          ks = &mi->context.c2.tls_multi->session[TM_ACTIVE].key[KS_PRIMARY];
+          was_authenticated = ks->authenticated;
+        }
+#endif
+
       /* figure timeouts and fetch possible outgoing
 	 to_link packets (such as ping or TLS control) */
       pre_select (&mi->context);
 
-      if (!IS_SIG (&mi->context))
+#ifdef ENABLE_ASYNC_PUSH
+      // when client connects first time, TM_ACTIVE and KS_PRIMARY are used
+      if (ks && ks->auth_control_file && ks->authenticated && !was_authenticated)
 	{
-	  /* tell scheduler to wake us up at some point in the future */
-	  multi_schedule_context_wakeup(m, mi);
+	  // watch acf file
+	  long wd = inotify_add_watch(m->top.c2.inotify_fd, ks->auth_control_file, IN_CLOSE_WRITE | IN_ONESHOT);
+	  if (wd < 0)
+	    msg(M_NONFATAL, "cannot add watch, errno: %d", errno);
+	  else
+	    {
+	      hash_add(m->ccfiles, (const uintptr_t*)wd, mi, true);
+	    }
+	}
+#endif
 
+      if (!IS_SIG (&mi->context))
+	{
 	  /* connection is "established" when SSL/TLS key negotiation succeeds
 	     and (if specified) auth user/pass succeeds */
 	  if (!mi->connection_established_flag && CONNECTION_ESTABLISHED (&mi->context))
 	    multi_connection_established (m, mi);
+
+          /* tell scheduler to wake us up at some point in the future */
+          multi_schedule_context_wakeup(m, mi);
 	}
     }
 
diff --git a/src/openvpn/multi.h b/src/openvpn/multi.h
index 137228d..f55dc8e 100644
--- a/src/openvpn/multi.h
+++ b/src/openvpn/multi.h
@@ -186,6 +186,11 @@ struct multi_context {
    * Timer object for stale route check
    */
   struct event_timeout stale_routes_check_et;
+
+#ifdef ENABLE_ASYNC_PUSH
+  // client connect result descriptor -> multi_instance
+  struct hash *ccfiles;
+#endif
 };
 
 /**
diff --git a/src/openvpn/openvpn.h b/src/openvpn/openvpn.h
index 24df3bb..d546eae 100644
--- a/src/openvpn/openvpn.h
+++ b/src/openvpn/openvpn.h
@@ -247,6 +247,9 @@ struct context_2
 #  define MANAGEMENT_READ  (1<<6)
 #  define MANAGEMENT_WRITE (1<<7)
 # endif
+#ifdef ENABLE_ASYNC_PUSH
+# define FILE_CLOSED       (1<<8)
+#endif
 
   unsigned int event_set_status;
 
@@ -491,6 +494,11 @@ struct context_2
 #ifdef MANAGEMENT_DEF_AUTH
   struct man_def_auth_context mda_context;
 #endif
+
+#ifdef ENABLE_ASYNC_PUSH
+  // descriptor for monitoring file changes
+  int inotify_fd;
+#endif
 };
 
 

Reply via email to