RPM Package Manager, CVS Repository
  http://rpm5.org/cvs/
  ____________________________________________________________________________

  Server: rpm5.org                         Name:   Jeff Johnson
  Root:   /v/rpm/cvs                       Email:  [email protected]
  Module: rpm                              Date:   29-Jun-2016 14:17:58
  Branch: rpm-5_4                          Handle: 2016062912175701

  Added files:              (Branch: rpm-5_4)
    rpm/macros              mqtt.in
  Modified files:           (Branch: rpm-5_4)
    rpm                     CHANGES configure.ac
    rpm/macros              .cvsignore macros.in
    rpm/rpmio               rpmmqtt.c rpmmqtt.h rpmurl.h tmqtt.c url.c

  Log:
    - mqtt: add self-subscriptions and macro configgery.

  Summary:
    Revision    Changes     Path
    1.3501.2.509+1  -0      rpm/CHANGES
    2.472.2.152 +3  -2      rpm/configure.ac
    1.4.2.3     +1  -0      rpm/macros/.cvsignore
    1.39.2.50   +5  -1      rpm/macros/macros.in
    1.1.2.1     +13 -0      rpm/macros/mqtt.in
    1.1.2.5     +364 -124   rpm/rpmio/rpmmqtt.c
    1.1.2.5     +3  -2      rpm/rpmio/rpmmqtt.h
    1.41.4.10   +1  -0      rpm/rpmio/rpmurl.h
    1.1.2.3     +18 -3      rpm/rpmio/tmqtt.c
    1.73.4.16   +4  -0      rpm/rpmio/url.c
  ____________________________________________________________________________

  patch -p0 <<'@@ .'
  Index: rpm/CHANGES
  ============================================================================
  $ cvs diff -u -r1.3501.2.508 -r1.3501.2.509 CHANGES
  --- rpm/CHANGES       29 Jun 2016 09:58:39 -0000      1.3501.2.508
  +++ rpm/CHANGES       29 Jun 2016 12:17:57 -0000      1.3501.2.509
  @@ -1,4 +1,5 @@
   5.4.17 -> 5.4.18:
  +    - jbj: mqtt: add self-subscriptions and macro configgery.
       - jbj: macro: add primitives useful for log spewage.
       - jbj: mqtt: stub-in a paho-mqtt client.
       - jbj: build: add/use RPMIOPOOL_ macros where possible.
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/configure.ac
  ============================================================================
  $ cvs diff -u -r2.472.2.151 -r2.472.2.152 configure.ac
  --- rpm/configure.ac  27 Jun 2016 18:27:10 -0000      2.472.2.151
  +++ rpm/configure.ac  29 Jun 2016 12:17:57 -0000      2.472.2.152
  @@ -2247,7 +2247,7 @@
   # MQTT
   RPM_CHECK_LIB(
       [MQTT], [mqtt],
  -    [paho-mqtt3c], [MQTTClient_create], [MQTTClient.h],
  +    [paho-mqtt3as], [MQTTAsync_create], [MQTTAsync.h],
       [no,external:none], [],
       [ AC_DEFINE(WITH_MQTT, 1, [Define if building with MQTT])
       ], [])
  @@ -3372,7 +3372,8 @@
       rpmdb/DB_CONFIG
       macros/macros macros/macros.rpmbuild
       macros/cmake macros/gstreamer macros/java macros/kernel macros/libtool
  -    macros/mandriva macros/suse macros/fedora macros/mono macros/perl 
macros/pkgconfig macros/php
  +    macros/mandriva macros/suse macros/fedora macros/mono macros/mqtt
  +    macros/perl macros/pkgconfig macros/php
       macros/python macros/ruby macros/selinux macros/tcl
       lua/tests/Makefile lua/tests/libs/Makefile
       doc/Makefile
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/macros/.cvsignore
  ============================================================================
  $ cvs diff -u -r1.4.2.2 -r1.4.2.3 .cvsignore
  --- rpm/macros/.cvsignore     15 Aug 2011 20:36:23 -0000      1.4.2.2
  +++ rpm/macros/.cvsignore     29 Jun 2016 12:17:58 -0000      1.4.2.3
  @@ -8,6 +8,7 @@
   macros.rpmbuild
   mandriva
   mono
  +mqtt
   perl
   php
   pkgconfig
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/macros/macros.in
  ============================================================================
  $ cvs diff -u -r1.39.2.49 -r1.39.2.50 macros.in
  --- rpm/macros/macros.in      8 May 2016 18:33:29 -0000       1.39.2.49
  +++ rpm/macros/macros.in      29 Jun 2016 12:17:58 -0000      1.39.2.50
  @@ -1,7 +1,7 @@
   #/*! \page config_macros Default configuration: @USRLIBRPM@/macros
   # \verbatim
   #
  -# $Id: macros.in,v 1.39.2.49 2016/05/08 18:33:29 jbj Exp $
  +# $Id: macros.in,v 1.39.2.50 2016/06/29 12:17:58 jbj Exp $
   #
   # This is a global RPM configuration file. All changes made here will
   # be lost when the rpm package is upgraded. Any per-system configuration
  @@ -1057,6 +1057,10 @@
   # ---- rpmbuild macros.
   %{load:%{_usrlibrpm}/macros.rpmbuild}
   
  
+#==============================================================================
  +# ---- MQTT macros.
  +%{load:%{_usrlibrpm}/macros.d/mqtt}
  +
   #------------------------------------------------------------------------
   # cmake(...) configuration
   #%%{load:%{_usrlibrpm}/macros.d/cmake}
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/macros/mqtt.in
  ============================================================================
  $ cvs diff -u -r0 -r1.1.2.1 mqtt.in
  --- /dev/null 2016-06-29 14:15:40.000000000 +0200
  +++ mqtt.in   2016-06-29 14:17:58.615621809 +0200
  @@ -0,0 +1,13 @@
  
+#==============================================================================
  +# ---- MQTT configuration.
  +#
  +%_mqtt_cachedir      /var/cache/mqtt
  +%_mqtt_user  luser
  +%_mqtt_pass  jasnl
  +%_mqtt_host  localhost
  +%_mqtt_port  1883
  +%_mqtt_clientid      rpm
  +%_mqtt_topic rpm/#
  +%_mqtt_qos   1
  +%_mqtt_timeout       10000
  +%_mqtt_prefix        %{now} rpm pid %{pid} on cpu%{cpu} %{user}:%{group} 
%{nil}
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/rpmio/rpmmqtt.c
  ============================================================================
  $ cvs diff -u -r1.1.2.4 -r1.1.2.5 rpmmqtt.c
  --- rpm/rpmio/rpmmqtt.c       28 Jun 2016 07:20:22 -0000      1.1.2.4
  +++ rpm/rpmio/rpmmqtt.c       29 Jun 2016 12:17:58 -0000      1.1.2.5
  @@ -19,7 +19,7 @@
   
   #include "debug.h"
   
  -int _rpmmqtt_debug = 1;
  +int _rpmmqtt_debug = 0;
   
   /*==============================================================*/
   typedef struct key_s {
  @@ -27,7 +27,7 @@
       const char *n;
   } KEY;
   
  -#define _ENTRY(_v)      { MQTTCLIENT_##_v, #_v, }
  +#define _ENTRY(_v)      { MQTTASYNC_##_v, #_v, }
   static KEY rpmmqtt_errs[] = {
   #ifdef       WITH_MQTT
       _ENTRY(SUCCESS),
  @@ -40,6 +40,7 @@
       _ENTRY(TOPICNAME_TRUNCATED),
       _ENTRY(BAD_STRUCTURE),
       _ENTRY(BAD_QOS),
  +    _ENTRY(NO_MORE_MSGIDS),
   #else
       { 0, NULL },
   #endif
  @@ -72,7 +73,7 @@
   
       if (rc != 0) {   /* MQTTCLIENT_SUCCESS */
        int _lvl = RPMLOG_WARNING;
  -     rpmlog(_lvl, "%s:%s:%u: MQTTClient_%s: %s(%d)\n",
  +     rpmlog(_lvl, "%s:%s:%u: MQTTAsync_%s: %s(%d)\n",
                func, fn, ln, msg, rpmmqttStrerror(rc), rc);
       }
       return rc;
  @@ -81,9 +82,12 @@
       Xcheck(_o, _m, _rc, _rpmmqtt_debug, __FUNCTION__, __FILE__, __LINE__)
   
   /*==============================================================*/
  -struct MQTTClient_message;
  -static int rpmmqttMessageArrived(void * _mqtt, char * topic, int topicLen,
  -             MQTTClient_message *  message)
  +struct MQTTAsync_message;
  +struct MQTTAsync_successData;
  +struct MQTTAsync_failureData;
  +
  +static int onMessageArrived(void * _mqtt, char * topic, int topicLen,
  +             MQTTAsync_message *  message)
   {
       rpmmqtt mqtt = (rpmmqtt) _mqtt;
       int rc = 1;
  @@ -94,36 +98,161 @@
       const char * s = message->payload;
       size_t ns = message->payloadlen;
       if (_rpmmqtt_debug < 0)
  -     rpmlog(RPMLOG_DEBUG, "+++ MQTT rcvd topic(%s) \"%.*s\"\n", topic, ns, 
s);
  -    MQTTClient_freeMessage(&message);
  -    MQTTClient_free(topic);
  +     rpmlog(RPMLOG_DEBUG, "MQTT rcvd topic(%s) \"%.*s\"\n", topic, ns, s);
  +
  +    printf("%.*s:\t", topicLen, topic);
  +    for (size_t i = 0; i < ns; i++)
  +        putchar(s[i]); 
  +    putchar('\n');              
  +
  +    MQTTAsync_freeMessage(&message);
  +    MQTTAsync_free(topic);
   #endif
   
       return rc;
   }
   
  -static void rpmmqttDeliveryComplete(void * _mqtt, int token)
  +static void onDeliveryComplete(void * _mqtt, int token)
   {
       rpmmqtt mqtt = (rpmmqtt) _mqtt;
       if (_rpmmqtt_debug < 0)
        rpmlog(RPMLOG_DEBUG, "--- MQTT done(%d)\n", token);
       mqtt->token = token;
  -    mqtt->delivered = 1;
   }
   
  -static void rpmmqttConnlost(void * _mqtt, char *cause)
  +static void onConnectionLost(void * _mqtt, char *cause)
  +{
  +    rpmmqtt mqtt = (rpmmqtt) _mqtt;
  +
  +    rpmlog(RPMLOG_DEBUG,
  +             "--- MQTT disconnect(%s) version(%d) present(%d)\n",
  +             mqtt->serverURI, mqtt->MQTTVersion, mqtt->sessionPresent);
  +    if (cause)
  +     rpmlog(RPMLOG_DEBUG, "\tcause: %s\n", cause);
  +
  +    rpmlog(RPMLOG_WARNING, "MQTT reconnecting(%s) ...\n", mqtt->serverURI);
  +
  +    mqtt->connected = 0;
  +    (void) rpmmqttConnect(mqtt);
  +}
  +
  +static void onDisconnectFailure(void * _mqtt, MQTTAsync_failureData * 
response)
  +{
  +    rpmmqtt mqtt = (rpmmqtt) _mqtt;
  +fprintf(stderr, "<-- %s(%p,%p) MQTT disconnect failed\n", __FUNCTION__, 
_mqtt, response);
  +    if (response) {
  +     const char *s = response->message;
  +     int token = response->token;
  +     int code = response->code;
  +     rpmlog(RPMLOG_WARNING,
  +             "MQTT disconnect failed: code(%d) msg %s\n",
  +             token, code, s);
  +    } else
  +     rpmlog(RPMLOG_WARNING, "MQTT disconnect failed\n");
  +    mqtt->connected = 0;
  +    mqtt->finished = 1;
  +}
  +
  +static void onDisconnect(void * _mqtt, MQTTAsync_successData * response)
   {
       rpmmqtt mqtt = (rpmmqtt) _mqtt;
   
  -    if (mqtt->debug || _rpmmqtt_debug < 0) {
  +    if (mqtt->debug || _rpmmqtt_debug)
        rpmlog(RPMLOG_DEBUG,
  -             "+++ MQTT disconnect(mqtt://%s) version(%d) present(%d)\n",
  +             "MQTT disconnect(%s) version(%d) present(%d)\n",
                mqtt->serverURI, mqtt->MQTTVersion, mqtt->sessionPresent);
  -     if (cause)
  -         rpmlog(RPMLOG_DEBUG, "\tcause: %s\n", cause);
  -     mqtt->serverURI = _free(mqtt->serverURI);
  -     mqtt->connected = 0;
  +    mqtt->serverURI = _free(mqtt->serverURI);
  +    mqtt->connected = 0;
  +    mqtt->finished = 1;
  +}
  +
  +static void onConnectFailure(void * _mqtt, MQTTAsync_failureData * response)
  +{
  +    rpmmqtt mqtt = (rpmmqtt) _mqtt;
  +fprintf(stderr, "<-- %s(%p,%p) MQTT connect failed\n", __FUNCTION__, _mqtt, 
response);
  +    if (response) {
  +     const char *s = response->message;
  +     int token = response->token;
  +     int code = response->code;
  +     rpmlog(RPMLOG_WARNING, "MQTT    connect failed: code(%d) msg %s\n",
  +                     token, code, s);
  +    } else
  +     rpmlog(RPMLOG_WARNING, "MQTT    connect failed\n");
  +    mqtt->connected = 0;
  +    mqtt->finished = 1;
  +}
  +
  +static void onConnect(void * _mqtt, MQTTAsync_successData * response)
  +{
  +    rpmmqtt mqtt = (rpmmqtt) _mqtt;
  +
  +    if (response) {
  +     mqtt->serverURI = xstrdup(response->alt.connect.serverURI);
  +     mqtt->MQTTVersion = response->alt.connect.MQTTVersion;
  +     mqtt->sessionPresent = response->alt.connect.sessionPresent;
       }
  +
  +    if (mqtt->debug || _rpmmqtt_debug)
  +     rpmlog(RPMLOG_DEBUG,
  +             "MQTT    connect(%s) version(%d) present(%d)\n",
  +             mqtt->serverURI, mqtt->MQTTVersion, mqtt->sessionPresent);
  +    mqtt->connected = 1;
  +    mqtt->finished = 1;
  +}
  +
  +static void onSubscribeFailure(void * _mqtt, MQTTAsync_failureData * 
response)
  +{
  +    rpmmqtt mqtt = (rpmmqtt) _mqtt;
  +fprintf(stderr, "<-- %s(%p,%p) MQTT connect failed\n", __FUNCTION__, _mqtt, 
response);
  +    if (response) {
  +     const char *s = response->message;
  +     int token = response->token;
  +     int code = response->code;
  +     rpmlog(RPMLOG_WARNING, "MQTT  subscribe failed: code(%d) msg %s\n",
  +                     token, code, s);
  +    } else
  +     rpmlog(RPMLOG_WARNING, "MQTT  subscribe failed\n");
  +    mqtt->subscribed = 0;
  +    mqtt->finished = 1;
  +}
  +
  +static void onSubscribe(void * _mqtt, MQTTAsync_successData * response)
  +{
  +    rpmmqtt mqtt = (rpmmqtt) _mqtt;
  +    int qos = (response ? response->alt.qos : mqtt->qos);
  +
  +    if (mqtt->debug || _rpmmqtt_debug)
  +     rpmlog(RPMLOG_DEBUG, "MQTT  subscribe qos(%d)\n", qos);
  +    mqtt->subscribed = 1;
  +    mqtt->finished = 1;
  +}
  +
  +static void onSendFailure(void * _mqtt, MQTTAsync_failureData * response)
  +{
  +    rpmmqtt mqtt = (rpmmqtt) _mqtt;
  +
  +    {
  +     const char *s = response->message;
  +     int token = response->token;
  +     int code = response->code;
  +     rpmlog(RPMLOG_WARNING, "MQTT send(%d) failed: code(%d) msg %s\n",
  +                     token, code, s);
  +    }
  +    mqtt->finished = 1;
  +}
  +
  +static void onSend(void * _mqtt, MQTTAsync_successData * response)
  +{
  +    rpmmqtt mqtt = (rpmmqtt) _mqtt;
  +
  +    if (mqtt->debug || _rpmmqtt_debug) {
  +     const char * s = response->alt.pub.message.payload;
  +     size_t ns = response->alt.pub.message.payloadlen;
  +     int token = response->token;
  +     rpmlog(RPMLOG_DEBUG, "MQTT sent(%d) topic(%s) \"%.*s\"\n",
  +                     token, mqtt->topic, ns, s);
  +    }
  +    mqtt->finished = 1;
   }
   
   /*==============================================================*/
  @@ -170,6 +299,8 @@
       if (Rmdir(mqtt->dn) && (errno != ENOENT && errno != ENOTEMPTY))
        goto exit;
   
  +    rpmlog(RPMLOG_DEBUG, D_("removed directory %s\n"), mqtt->dn);
  +
       rc = 0;
   
   exit:
  @@ -425,43 +556,63 @@
   {
       int rc = -1;
   #ifdef       WITH_MQTT
  -    if (MQTTClient_isConnected(mqtt->C)) {
  -     mqtt->connected = 1;
  +    if (MQTTAsync_isConnected(mqtt->C)) {
        rc = 0;
       } else {
  -     MQTTClient_connectOptions Copts = MQTTClient_connectOptions_initializer;
  +     urlinfo u = mqtt->u;
  +
  +     MQTTAsync_willOptions Wopts = MQTTAsync_willOptions_initializer;
  +#ifdef       REF
  +     memcpy(Wopts.struct_id, "MQTW", 4);
  +     Wopts.struct_version = 0;
  +#endif
  +     Wopts.topicName = mqtt->topic;
  +     Wopts.message = "will message";
  +#ifdef       REF
  +     Wopts.retained = 0;
  +     Wopts.qos = 0;
  +#endif
  +
  +     MQTTAsync_connectOptions Copts = MQTTAsync_connectOptions_initializer;
   #ifdef       REF
        memcpy(Copts.struct_id, "MQTC", 4);
  -     Copts.struct_version = 4;       /* 0-4 determines what follows */
  +     Copts.struct_version = 3;       /* 0-4 enables fields below */
   #endif
  -     Copts.keepAliveInterval = 20;   /* 60 */
  +     Copts.keepAliveInterval = 10;   /* 60 */
        Copts.cleansession = 0;         /* 1 discards session state */
  -     Copts.reliable = 0;             /* 1 forces sync */
  -#ifdef       REF
  +     Copts.maxInflight = 10;         /* 10 */
  +
  +#ifdef       NOTYET
  +     Copts.will = &Wopts;            /* last will */
  +#else
        Copts.will = NULL;              /* last will */
  -     Copts.username = NULL;
  -     Copts.password = NULL;
  -     Copts.connectTimeout = 30;
  -     Copts.retryInterval = 20;
  +#endif
  +     Copts.username = u->user;
  +     Copts.password = u->password;
  +     Copts.connectTimeout = 30;      /* secs */
  +     Copts.retryInterval = 0;        /* secs */
  +#ifdef       REF
  +     MQTTAsync_SSLOptions ssl_opts = MQTTAsync_SSLOptions_initializer;
  +     ssl_opts.enableServerCertAuth = 0;
  +     Copts.ssl = &ssl_opts;
  +#else
        Copts.ssl = NULL;
  +#endif
  +     Copts.onSuccess = onConnect;
  +     Copts.onFailure = onConnectFailure;
  +     Copts.context = mqtt;
        Copts.serverURIcount = 0;
        Copts.serverURIs = NULL;
  -     COpts.MQTTVersion = 0;
  -#endif
  +
  +     mqtt->finished = 0;
        rc = check(mqtt, "connect",
  -             MQTTClient_connect(mqtt->C, &Copts));
  -     if (rc == 0) {
  -         mqtt->serverURI = xstrdup(Copts.returned.serverURI);
  -         mqtt->MQTTVersion = Copts.returned.MQTTVersion;
  -         mqtt->sessionPresent = Copts.returned.sessionPresent;
  -         if (mqtt->debug || _rpmmqtt_debug)
  -             rpmlog(RPMLOG_DEBUG,
  -                     "+++ MQTT    connect(mqtt://%s) version(%d) 
present(%d)\n",
  -                     mqtt->serverURI, mqtt->MQTTVersion, 
mqtt->sessionPresent);
  -         mqtt->connected = 1;
  -     }
  +             MQTTAsync_connect(mqtt->C, &Copts));
  +
  +     while (!mqtt->finished)
  +         usleep(100);
  +
       }
  -#endif
  +#endif       /* WITH_MQTT */
       return rc;
   }
   
  @@ -469,51 +620,93 @@
   {
       int rc = -1;
   #ifdef       WITH_MQTT
  -    if (MQTTClient_isConnected(mqtt->C)) {
  +    if (MQTTAsync_isConnected(mqtt->C)) {
  +     MQTTAsync_disconnectOptions Dopts =
  +             MQTTAsync_disconnectOptions_initializer;
  +#ifdef       REF
  +     memcpy(Dopts.struct_id, "MQTD", 4);
  +     Dopts.struct_version = 0;
  +#endif
  +     Dopts.timeout = mqtt->msecs;
  +     Dopts.onSuccess = onDisconnect;
  +     Dopts.onFailure = onDisconnectFailure;
  +     Dopts.context = mqtt;
  +
  +     mqtt->finished = 0;
        rc = check(mqtt, "disconnect",
  -             MQTTClient_disconnect(mqtt->C, mqtt->msecs));
  -     if (mqtt->debug || _rpmmqtt_debug)
  -         rpmlog(RPMLOG_DEBUG,
  -             "+++ MQTT disconnect(mqtt://%s) version(%d) present(%d)\n",
  -             mqtt->serverURI, mqtt->MQTTVersion, mqtt->sessionPresent);
  -     mqtt->serverURI = _free(mqtt->serverURI);
  -     mqtt->connected = 0;
  +             MQTTAsync_disconnect(mqtt->C, &Dopts));
  +     while (!mqtt->finished)
  +         usleep(100);
       }
   #endif
       return rc;
   }
   
  +int rpmmqttSendmsg(rpmmqtt mqtt, const char * s, size_t ns)
  +{
  +    int rc = -1;
  +
  +    if (ns == 0) ns = strlen(s);
  +
  +#ifdef       WITH_MQTT
  +    MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
  +#ifdef       REF
  +    memcpy(pubmsg.struct_id, "MQTM", 4);
  +    pubmsg.struct_version = 0;
  +#endif
  +    pubmsg.payloadlen = ns;
  +    pubmsg.payload = (char *) s;
  +    pubmsg.qos = mqtt->qos;
  +    pubmsg.retained = 0;
  +#ifdef       REF
  +    pubmsg.dup = 0;
  +    pubmsg.msgid = 0;
  +#endif
  +
  +    MQTTAsync_responseOptions Ropts =
  +             MQTTAsync_responseOptions_initializer;
  +#ifdef       REF
  +    memcpy(pubmsg.struct_id, "MQTR", 4);
  +    pubmsg.struct_version = 0;
  +#endif
  +    Ropts.onSuccess = onSend;
  +    Ropts.onFailure = onSendFailure;
  +    Ropts.context = mqtt;
  +    Ropts.token = 0;
  +
  +    mqtt->finished = 0;
  +    rc = check(mqtt, "sendMessage",
  +             MQTTAsync_sendMessage(mqtt->C, mqtt->topic, &pubmsg,
  +                     &Ropts));
  +    while (!mqtt->finished)
  +     usleep(100);
  +#endif       /* WITH_MQTT */
  +
  +    return rc;
  +}
  +
   /*==============================================================*/
   ssize_t rpmmqttWrite(rpmmqtt mqtt, const char *s, size_t ns)
   {
       ssize_t ret = -1;        /* assume failure */
   
  -    if (ns == 0) ns = strlen(s);
  -
   #ifdef       WITH_MQTT
       if (rpmmqttConnect(mqtt) == 0) {
  -     MQTTClient_message pubmsg = MQTTClient_message_initializer;
  -     int rc;
  -     pubmsg.payload = (char *) s;
  -     pubmsg.payloadlen = ns;
  -     pubmsg.qos = mqtt->qos;
  -     pubmsg.retained = 0;
  -
  -     mqtt->delivered = 0;
  -     rc = check(mqtt, "publishMessage",
  -             MQTTClient_publishMessage(mqtt->C, mqtt->topic, &pubmsg,
  -                     &mqtt->token));
  -     if (_rpmmqtt_debug)
  -         rpmlog(RPMLOG_DEBUG, "+++ MQTT sent(%d) topic(%s) \"%.*s\"\n",
  -                     mqtt->token, mqtt->topic, ns, s);
  -
  -     if (!mqtt->delivered)
  -         rc = check(mqtt, "waitForCompletion",
  -             MQTTClient_waitForCompletion(mqtt->C, mqtt->token, 
mqtt->msecs));
  -     if (rc == 0)
  -         ret = ns;
  -    }
  +#ifdef       DYING
  +     static char _mqtt_prefix[] =
  +             "%{now} rpm pid %{pid} on cpu%{cpu} %{user}:%{group} ";
  +#else
  +     static char _mqtt_prefix[] = "%{?_mqtt_prefix}";
   #endif
  +     char * t = rpmExpand(_mqtt_prefix, " ", s, NULL);
  +     size_t nt = strlen(t);
  +
  +     if (!rpmmqttSendmsg(mqtt, t, nt))
  +         ret = nt;
  +
  +     t = _free(t);
  +    }
  +#endif       /* WITH_MQTT */
   
       return ret;
   }
  @@ -524,26 +717,31 @@
   
   #ifdef       WITH_MQTT
       if (rpmmqttConnect(mqtt) == 0) {
  -#ifdef       NOTYET
  -     MQTTClient_message pubmsg = MQTTClient_message_initializer;
  -     int rc;
  -
  -     pubmsg.payload = (char *) s;
  -     pubmsg.payloadlen = ns;
  -     pubmsg.qos = mqtt->qos;
  -     pubmsg.retained = 0;
  -
  -     rc = check(mqtt, "publishMessage",
  -             MQTTClient_publishMessage(mqtt->C, mqtt->topic, &pubmsg,
  -                     &mqtt->token));
  -
  -     rc = check(mqtt, "waitForCompletion",
  -             MQTTClient_waitForCompletion(mqtt->C, mqtt->token, 
mqtt->msecs));
  -     if (rc == 0)
  -         ret = ns;
  +char * topic = rpmExpand((s ? s : mqtt->topic), NULL);
  +int qos = 2;
  +int rc;
  +     MQTTAsync_responseOptions Ropts =
  +             MQTTAsync_responseOptions_initializer;
  +#ifdef       REF
  +     memcpy(pubmsg.struct_id, "MQTR", 4);
  +     pubmsg.struct_version = 0;
   #endif
  +     Ropts.onSuccess = onSubscribe;
  +     Ropts.onFailure = onSubscribeFailure;
  +     Ropts.context = mqtt;
  +
  +     mqtt->finished = 0;
  +     rc = check(mqtt, "subscribe",
  +             MQTTAsync_subscribe(mqtt->C, topic, qos, &Ropts));
  +     while (!mqtt->finished)
  +         usleep(100);
  +
  +     ret = 0;        /* XXX */
  +
  +topic = _free(topic);
  +
       }
  -#endif
  +#endif       /* WITH_MQTT */
   
       return ret;
   }
  @@ -554,13 +752,10 @@
       rpmmqtt mqtt = (rpmmqtt) _mqtt;
   
   #ifdef       WITH_MQTT
  -    {        MQTTClient C = (MQTTClient) mqtt->C;
  -     int xx;
  -     xx = rpmmqttDisconnect(mqtt);
  -     xx = check(mqtt, "destroy",
  -             (MQTTClient_destroy(&C), 0));
  -    }
  -#endif
  +    (void) rpmmqttDisconnect(mqtt);
  +    (void) check(mqtt, "destroy",
  +             (MQTTAsync_destroy(&mqtt->C), 0));
  +#endif       /* WITH_MQTT */
       mqtt->C = NULL;
       mqtt->uri = _free(mqtt->uri);
       mqtt->topic = _free(mqtt->topic);
  @@ -599,6 +794,13 @@
        fprintf(fp, "    host: %s\n", u->host);
        fprintf(fp, " portstr: %s\n", u->portstr);
        fprintf(fp, "   query: %s\n", u->query);
  +     if (u->query) {
  +         ARGV_t av = NULL;
  +         int xx;
  +         xx = argvSplit(&av, u->query, ",");
  +         argvPrint(u->query, av, fp);
  +         av = argvFree(av);
  +     }
        fprintf(fp, "fragment: %s\n", u->fragment);
        fprintf(fp, "    path: %s\n", path);
       }
  @@ -607,9 +809,7 @@
   
   rpmmqtt rpmmqttNew(char ** av, uint32_t flags)
   {
  -static char _test_mqtt[] = "test/mqtt";
  -
  -    static const char *_av[] = { "tcp://localhost:1883/test/mqtt", NULL };
  +    static const char *_av[] = { "mqtt://localhost:1883/rpm/%{pid}/mqtt", 
NULL };
       rpmmqtt mqtt = rpmmqttGetPool(_rpmmqttPool);
       urlinfo u = NULL;
       const char *s = NULL;
  @@ -620,17 +820,24 @@
        av = (char **)_av;
       if (av) {
        int ac = argvCount((ARGV_t)av);
  -     for (int i = 0; i < ac; i++)
  -         (void) argvAdd((ARGV_t *)&mqtt->av, av[i]);
  +     for (int i = 0; i < ac; i++) {
  +         char * t = rpmExpand(av[i], NULL);
  +         (void) argvAdd((ARGV_t *)&mqtt->av, t);
  +         t = _free(t);
  +     }
       }
   
       mqtt->ut = urlSplit(av[0], &u);
       mqtt->u = u;
  -    if (u->scheme == NULL || !strcmp(u->scheme, "mqtt")) {
  +    if (u->scheme == NULL
  +     || !strcmp(u->scheme, "mqtt")
  +     || !strcmp(u->scheme, "mqtts"))
  +    {
  +     if (u->portstr == NULL)
  +         u->portstr = !strcmp(u->scheme, "mqtts")
  +             ? xstrdup("8883") : xstrdup("1883");
        u->scheme = _free(u->scheme);
        u->scheme = xstrdup("tcp");
  -     if (u->portstr == NULL)
  -         u->portstr = xstrdup("1883");
       }
   #ifdef       DYING
   dumpURL(__FUNCTION__, u);
  @@ -639,25 +846,56 @@
       mqtt->uri = rpmExpand(u->scheme, "://", u->host, ":", u->portstr, NULL);
   
       (void) urlPath(u->url, &s);
  +static const char _mqtt_topic[] =
  +     "%{?_mqtt_topic}%{!?_mqtt_topic:rpm/%{pid}/mqtt}";
       if (s && *s == '/')
        s++;
       if (s == NULL || *s == '\0')
  -     s = _test_mqtt;
  -    mqtt->topic = xstrdup(s);
  +     s = _mqtt_topic;
  +    mqtt->topic = rpmExpand(s, NULL);
   
  -    mqtt->clientid = xstrdup("rpm");
  +static const char _mqtt_clientid[] =
  +     "%{?_mqtt_clientid}%{!?_mqtt_clientid:rpm}";
  +    mqtt->clientid = rpmExpand(_mqtt_clientid, NULL);
  +
  +static const char _mqtt_qos[] = "%{?_mqtt_qos}%{!?_mqtt_qos:1}";
  +    mqtt->qos = rpmExpandNumeric(_mqtt_qos);
  +static const char _mqtt_timeout[] = 
"%{?_mqtt_timeout}%{!?_mqtt_timeout:10000}";
  +    mqtt->msecs = rpmExpandNumeric(_mqtt_timeout);
  +
  +    if (u->query) {
  +     ARGV_t qav = NULL;
  +     int qac;
  +     const char *t;
  +     const char *te;
  +
  +     (void) argvSplit(&qav, u->query, ",");
  +     qac = argvCount(qav);
  +     for (int i = 0; i < qac; i++) {
  +         t = qav[i];
  +         if ((te = strchr(t, '=')) == NULL)
  +             continue;
  +         if (!strncmp(t, "qos", (te - t)) && xisdigit(te[1])) {
  +             mqtt->qos = strtol(te+1, NULL, 0);
  +             mqtt->qos %= 3;
  +             continue;
  +         }
  +         if (!strncmp(t, "timeout", (te - t)) && xisdigit(te[1])) {
  +             mqtt->msecs = strtol(te+1, NULL, 0);
  +             continue;
  +         }
   
  -    mqtt->qos = 1;
  -    mqtt->msecs = 10000;
  +     }
  +     qav = argvFree(qav);
  +    }
   
   #ifdef       WITH_MQTT
   
       {        static int oneshot;
  -     static char _var_cache_mqtt[] = "/var/cache/mqtt";
        int xx;
   
        if (!oneshot) {
  -         MQTTClient_nameValue *I = MQTTClient_getVersionInfo();
  +         MQTTAsync_nameValue *I = MQTTAsync_getVersionInfo();
            int _lvl = RPMLOG_DEBUG;
            rpmlog(_lvl, "==================== MQTT\n");
            while (I->name) {
  @@ -667,6 +905,9 @@
            oneshot++;
        }
   
  +static const char _mqtt_cachedir[] =
  +     "%{?_mqtt_cachedir}%{!?_mqtt_cachedir:/var/cache/mqtt}";
  +char *persist_path = rpmGetPath(_mqtt_cachedir, NULL);
        mqtt->persist_type = MQTTCLIENT_PERSISTENCE_USER;
        switch (mqtt->persist_type) {
        default:
  @@ -674,36 +915,35 @@
            mqtt->persist_ctx = NULL;
            break;
        case MQTTCLIENT_PERSISTENCE_DEFAULT:
  -       {
  +       { mqtt->persist_path = xstrdup(persist_path);
            /* XXX rpmmqttFini double free */
            mqtt->persist_ctx = (void *)xstrdup(mqtt->persist_path);
  -         mqtt->persist_path = xstrdup(_var_cache_mqtt);
          } break;
        case MQTTCLIENT_PERSISTENCE_USER:
  -       {
  +       { mqtt->persist_path = xstrdup(persist_path);
            MQTTClient_persistence * ctx = xmalloc(sizeof(*ctx));
            *ctx = _rpmmqtt_persistence;        /* structure assignment */
            ctx->context = mqtt;
            mqtt->persist_ctx = ctx;
  -         mqtt->persist_path = xstrdup(_var_cache_mqtt);
          } break;
        }
  +persist_path = _free(persist_path);
       
        xx = check(mqtt, "create",
  -             MQTTClient_create(&mqtt->C, mqtt->uri, mqtt->clientid,
  +             MQTTAsync_create(&mqtt->C, mqtt->uri, mqtt->clientid,
                mqtt->persist_type, mqtt->persist_ctx));
   
        xx = check(mqtt, "setCallbacks",
  -             MQTTClient_setCallbacks(mqtt->C, mqtt,
  -                     rpmmqttConnlost,
  -                     rpmmqttMessageArrived,
  -                     rpmmqttDeliveryComplete));
  +             MQTTAsync_setCallbacks(mqtt->C, mqtt,
  +                     onConnectionLost,
  +                     onMessageArrived,
  +                     onDeliveryComplete));
   
        xx = rpmmqttConnect(mqtt);
        /* XXX exit if cannot connect? */
   
       }
  -#endif
  +#endif       /* WITH_MQTT */
   
       return rpmmqttLink(mqtt);
   }
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/rpmio/rpmmqtt.h
  ============================================================================
  $ cvs diff -u -r1.1.2.4 -r1.1.2.5 rpmmqtt.h
  --- rpm/rpmio/rpmmqtt.h       28 Jun 2016 07:20:22 -0000      1.1.2.4
  +++ rpm/rpmio/rpmmqtt.h       29 Jun 2016 12:17:58 -0000      1.1.2.5
  @@ -10,7 +10,7 @@
   typedef struct rpmmqtt_s * rpmmqtt;
   
   #if defined(_RPMMQTT_INTERNAL)
  -#include <MQTTClient.h>
  +#include <MQTTAsync.h>
   struct rpmmqtt_s {
       struct rpmioItem_s _item;        /*!< usage mutex and pool identifier. */
       char ** av;
  @@ -35,8 +35,9 @@
       int msecs;
   
       int debug;
  +    int finished;
       int connected;
  -    int delivered;
  +    int subscribed;
   
       char * serverURI;
       int MQTTVersion;
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/rpmio/rpmurl.h
  ============================================================================
  $ cvs diff -u -r1.41.4.9 -r1.41.4.10 rpmurl.h
  --- rpm/rpmio/rpmurl.h        28 Jun 2016 07:20:22 -0000      1.41.4.9
  +++ rpm/rpmio/rpmurl.h        29 Jun 2016 12:17:58 -0000      1.41.4.10
  @@ -32,6 +32,7 @@
   #define      URL_IS_POSTGRES (urltype)34
   #define      URL_IS_SQLSERVER (urltype)35
   #define      URL_IS_MQTT     (urltype)36
  +#define      URL_IS_MQTTS    (urltype)37
   
   #define      URLMAGIC        0xd00b1ed0U
   #define      URLSANE(u)      assert(u && u->magic == URLMAGIC)
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/rpmio/tmqtt.c
  ============================================================================
  $ cvs diff -u -r1.1.2.2 -r1.1.2.3 tmqtt.c
  --- rpm/rpmio/tmqtt.c 28 Jun 2016 07:20:22 -0000      1.1.2.2
  +++ rpm/rpmio/tmqtt.c 29 Jun 2016 12:17:58 -0000      1.1.2.3
  @@ -24,6 +24,9 @@
       ssize_t nw;
       int rc = 0;
   
  +    (void) rpmmqttRead(mqtt, "rpm/#", 0);
  +    (void) rpmmqttRead(mqtt, "$SYS/#", 0);
  +
       nw = rpmmqttWrite(mqtt, "bzzt ...", 0);
       nw = rpmmqttWrite(mqtt, "bzzT ...", 0);
       nw = rpmmqttWrite(mqtt, "bzZT ...", 0);
  @@ -58,13 +61,25 @@
       int ac = argvCount(av);
   #endif
       (void)av;
  -    static char *_av[] = { "mqtt://localhost/test/mqtt", NULL, };
  -    rpmmqtt mqtt = rpmmqttNew(_av, 0);
  +    static char *_av[] = { 
"mqtts://luser:jasnl@localhost:1883/rpm/%{pid}/mqtt?qos=1,timeout=10000", NULL, 
};
  +    rpmmqtt mqtt;
       int rc = -1;
   
  -    rc = _DoMQTT(mqtt);
  +(void) rpmDefineMacro(NULL, "_mqtt_cachedir /var/cache/mqtt", 0);
  +(void) rpmDefineMacro(NULL, "_mqtt_user luser", 0);
  +(void) rpmDefineMacro(NULL, "_mqtt_pass jasnl", 0);
  +(void) rpmDefineMacro(NULL, "_mqtt_host localhost", 0);
  +(void) rpmDefineMacro(NULL, "_mqtt_port 1883", 0);
  +(void) rpmDefineMacro(NULL, "_mqtt_clientid rpm", 0);
  +(void) rpmDefineMacro(NULL, "_mqtt_topic rpm/#", 0);
  +(void) rpmDefineMacro(NULL, "_mqtt_qos 1", 0);
  +(void) rpmDefineMacro(NULL, "_mqtt_timeout 10000", 0);
  +(void) rpmDefineMacro(NULL, "_mqtt_prefix %{now} rpm pid %{pid} on cpu%{cpu} 
%{user}:%{group} ", 0);
   
  +    mqtt = rpmmqttNew(_av, 0);
  +    rc = _DoMQTT(mqtt);
       mqtt = rpmmqttFree(mqtt);
  +
       optCon = rpmioFini(optCon);
       return rc;
   }
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/rpmio/url.c
  ============================================================================
  $ cvs diff -u -r1.73.4.15 -r1.73.4.16 url.c
  --- rpm/rpmio/url.c   28 Jun 2016 07:20:22 -0000      1.73.4.15
  +++ rpm/rpmio/url.c   29 Jun 2016 12:17:58 -0000      1.73.4.16
  @@ -34,6 +34,9 @@
   #ifndef      IPPORT_MQTT
   #define      IPPORT_MQTT     1883
   #endif
  +#ifndef      IPPORT_MQTTS
  +#define      IPPORT_MQTTS    8883
  +#endif
   #ifndef      IPPORT_PGPKEYSERVER
   #define      IPPORT_PGPKEYSERVER     11371
   #endif
  @@ -414,6 +417,7 @@
       { "sqlserver://",        sizeof("sqlserver://")-1, URL_IS_SQLSERVER },
   
       { "mqtt://",     sizeof("mqtt://")-1,    URL_IS_MQTT },
  +    { "mqtts://",    sizeof("mqtts://")-1,   URL_IS_MQTTS },
   
       { "-",           sizeof("-")-1,          URL_IS_DASH },
       { "ssh://",              sizeof("ssh://")-1,     URL_IS_UNKNOWN },
  @@ .
______________________________________________________________________
RPM Package Manager                                    http://rpm5.org
CVS Sources Repository                                [email protected]

Reply via email to