On 11.02.2018 00:36, Raman Shyshniou wrote:
Add autosuspend=<bool> option. Enabling this option will make
pipe-source suspended when all writers closed fifo. Source will
be automatically unsuspended if any data will be written to pipe
and suspended again when last writer closed fifo.
Can you add a sentence about the motivation to the commit
message? Something like "Currently the pipe-source will remain
running even if no writer is connected and therefore no data is
produced. This patch adds the autosuspend=<bool> option to
prevent this. ..."

Otherwise only two small comments below.
---
  src/modules/module-pipe-source.c | 136 +++++++++++++++++++++++++++++++++++++--
  1 file changed, 129 insertions(+), 7 deletions(-)

diff --git a/src/modules/module-pipe-source.c b/src/modules/module-pipe-source.c
index f8284c1..af7badc 100644
--- a/src/modules/module-pipe-source.c
+++ b/src/modules/module-pipe-source.c
@@ -57,11 +57,24 @@ PA_MODULE_USAGE(
          "format=<sample format> "
          "rate=<sample rate> "
          "channels=<number of channels> "
-        "channel_map=<channel map>");
+        "channel_map=<channel map> "
+        "autosuspend=<boolean>");
#define DEFAULT_FILE_NAME "/tmp/music.input"
  #define DEFAULT_SOURCE_NAME "fifo_input"
+struct pipe_source_msg {
+    pa_msgobject parent;
+};
+
+typedef struct pipe_source_msg pipe_source_msg;
+PA_DEFINE_PRIVATE_CLASS(pipe_source_msg, pa_msgobject);
+
+enum {
+    PIPE_SOURCE_SUSPEND,
+    PIPE_SOURCE_RESUME,
+};
+
  struct userdata {
      pa_core *core;
      pa_module *module;
@@ -71,7 +84,10 @@ struct userdata {
      pa_thread_mq thread_mq;
      pa_rtpoll *rtpoll;
+ pipe_source_msg *msg;
+
      char *filename;
+    int corkfd;
      int fd;
pa_memchunk memchunk;
@@ -87,9 +103,41 @@ static const char* const valid_modargs[] = {
      "rate",
      "channels",
      "channel_map",
+    "autosuspend",
      NULL
  };
+/* Called from main context */
+static int pipe_source_process_msg(
+        pa_msgobject *o,
+        int code,
+        void *data,
+        int64_t offset,
+        pa_memchunk *chunk) {
+
+    struct userdata *u = (struct userdata *) data;
+
+    pa_assert(u);
+
+    switch (code) {
+        case PIPE_SOURCE_SUSPEND:
+            pa_log_debug("Suspending source %s because no writers left", 
u->source->name);
+            pa_source_suspend(u->source, true, PA_SUSPEND_APPLICATION);
+            break;
+
+        case PIPE_SOURCE_RESUME:
+            pa_log_debug("Resuming source %s because writer connected", 
u->source->name);
+            pa_source_suspend(u->source, false, PA_SUSPEND_APPLICATION);
+            break;
+
+        default:
+            pa_assert_not_reached();
+    }
+
+    return 0;
+}
+
+/* Called from thread context */
  static int source_process_msg(
          pa_msgobject *o,
          int code,
@@ -135,8 +183,15 @@ static void thread_func(void *userdata) {
pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL); + /* Writer connected */
+        if (u->corkfd >= 0 && pollfd->revents & POLLIN) {
+            pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(u->msg), 
PIPE_SOURCE_RESUME, u, 0, NULL, NULL);
+            pa_assert_se(pa_close(u->corkfd) == 0);
+            u->corkfd = -1;
+        }
+
          /* Try to read some data and pass it on to the source driver */
-        if (u->source->thread_info.state == PA_SOURCE_RUNNING && 
pollfd->revents) {
+        if (u->source->thread_info.state == PA_SOURCE_RUNNING && pollfd->revents 
& POLLIN) {
              ssize_t l;
              void *p;
@@ -151,8 +206,6 @@ static void thread_func(void *userdata) {
              l = pa_read(u->fd, (uint8_t*) p + u->memchunk.index, 
pa_memblock_get_length(u->memchunk.memblock) - u->memchunk.index, &read_type);
              pa_memblock_release(u->memchunk.memblock);
- pa_assert(l != 0); /* EOF cannot happen, since we opened the fifo for both reading and writing */
-
              if (l < 0) {
if (errno == EINTR)
@@ -162,6 +215,11 @@ static void thread_func(void *userdata) {
                      goto fail;
                  }
+ } else if (l == 0) {
+
+                /* Nothing to read */
+                pollfd->revents = 0;
+
              } else {
u->memchunk.length = (size_t) l;
@@ -178,7 +236,7 @@ static void thread_func(void *userdata) {
          }
/* Hmm, nothing to do. Let's sleep */
-        pollfd->events = (short) (u->source->thread_info.state == 
PA_SOURCE_RUNNING ? POLLIN : 0);
+        pollfd->events = (short) ((u->source->thread_info.state == PA_SOURCE_RUNNING 
|| u->corkfd >= 0) ? POLLIN : 0);
if ((ret = pa_rtpoll_run(u->rtpoll)) < 0)
              goto fail;
@@ -188,6 +246,29 @@ static void thread_func(void *userdata) {
pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL); + /* Last writer disconnected but data may still be in the buffer */
+        if (pollfd->revents & POLLHUP) {
+            int l = 0;
+
+#ifdef FIONREAD
+            ioctl(u->fd, FIONREAD, &l);
+#endif
+
+            if (!l && u->corkfd < 0) {
+                /* Suspend source */
+                pa_asyncmsgq_post(pa_thread_mq_get()->outq, 
PA_MSGOBJECT(u->msg), PIPE_SOURCE_SUSPEND, u, 0, NULL, NULL);
+
+                /* Open fifo for writing to stop POLLHUP spam */
+                if ((u->corkfd = pa_open_cloexec(u->filename, O_WRONLY, 0)) < 
0) {
+                    pa_log("open('%s'): %s", u->filename, pa_cstrerror(errno));
+                    goto fail;
+                }
+            }
+
+            /* Ignore POOLHUP anyway */

typo above.

+            pollfd->revents &= ~POLLHUP;
+        }
+
          if (pollfd->revents & ~POLLIN) {
              pa_log("FIFO shutdown.");
              goto fail;
@@ -212,6 +293,7 @@ int pa__init(pa_module *m) {
      pa_modargs *ma;
      struct pollfd *pollfd;
      pa_source_new_data data;
+    bool autosuspend = false;
pa_assert(m); @@ -238,17 +320,51 @@ int pa__init(pa_module *m) {
          goto fail;
      }
+ u->corkfd = -1;
+
+    if (!(u->msg = pa_msgobject_new(pipe_source_msg)))
+        goto fail;
+
+    u->msg->parent.process_msg = pipe_source_process_msg;
+
      u->filename = pa_runtime_path(pa_modargs_get_value(ma, "file", 
DEFAULT_FILE_NAME));
if (mkfifo(u->filename, 0666) < 0) {
          pa_log("mkfifo('%s'): %s", u->filename, pa_cstrerror(errno));
          goto fail;
      }
-    if ((u->fd = pa_open_cloexec(u->filename, O_RDWR, 0)) < 0) {
-        pa_log("open('%s'): %s", u->filename, pa_cstrerror(errno));
+
+    if (pa_modargs_get_value_boolean(ma, "autosuspend", &autosuspend) < 0) {
+        pa_log("Failed to parse autosuspend= argument.");
          goto fail;
      }
+ if (autosuspend) {
+        int rwfd;
+
+        /* Open fifo for read and write first, so the next open for read-only
+           will be succeded immediately */
+        if ((rwfd = pa_open_cloexec(u->filename, O_RDWR, 0)) < 0) {
+            pa_log("open('%s'): %s", u->filename, pa_cstrerror(errno));
+            goto fail;
+        }
+
+        if ((u->fd = pa_open_cloexec(u->filename, O_RDONLY, 0)) < 0) {
+            pa_log("open('%s'): %s", u->filename, pa_cstrerror(errno));
+            pa_assert_se(pa_close(rwfd) == 0);
+            goto fail;
+        }
+

You couldĀ  #ifdef O_NONBLOCK and only open the FIFO for RDWR if the
option does not exist.

+        pa_assert_se(pa_close(rwfd) == 0);
+
+    } else {
+        /* Open fifo for read and write, so there will be at least one writer 
*/
+        if ((u->fd = pa_open_cloexec(u->filename, O_RDWR, 0)) < 0) {
+            pa_log("open('%s'): %s", u->filename, pa_cstrerror(errno));
+            goto fail;
+        }
+    }
+
      pa_make_fd_nonblock(u->fd);
if (fstat(u->fd, &st) < 0) {
@@ -360,6 +476,12 @@ void pa__done(pa_module *m) {
          pa_xfree(u->filename);
      }
+ if (u->msg)
+        pa_xfree(u->msg);
+
+    if (u->corkfd >= 0)
+        pa_assert_se(pa_close(u->corkfd) == 0);
+
      if (u->fd >= 0)
          pa_assert_se(pa_close(u->fd) == 0);


_______________________________________________
pulseaudio-discuss mailing list
pulseaudio-discuss@lists.freedesktop.org
https://lists.freedesktop.org/mailman/listinfo/pulseaudio-discuss

Reply via email to