RPM Package Manager, CVS Repository http://rpm5.org/cvs/ ____________________________________________________________________________
Server: rpm5.org Name: Jeff Johnson Root: /v/rpm/cvs Email: [email protected] Module: rpm Date: 29-Jun-2016 14:17:58 Branch: rpm-5_4 Handle: 2016062912175701 Added files: (Branch: rpm-5_4) rpm/macros mqtt.in Modified files: (Branch: rpm-5_4) rpm CHANGES configure.ac rpm/macros .cvsignore macros.in rpm/rpmio rpmmqtt.c rpmmqtt.h rpmurl.h tmqtt.c url.c Log: - mqtt: add self-subscriptions and macro configgery. Summary: Revision Changes Path 1.3501.2.509+1 -0 rpm/CHANGES 2.472.2.152 +3 -2 rpm/configure.ac 1.4.2.3 +1 -0 rpm/macros/.cvsignore 1.39.2.50 +5 -1 rpm/macros/macros.in 1.1.2.1 +13 -0 rpm/macros/mqtt.in 1.1.2.5 +364 -124 rpm/rpmio/rpmmqtt.c 1.1.2.5 +3 -2 rpm/rpmio/rpmmqtt.h 1.41.4.10 +1 -0 rpm/rpmio/rpmurl.h 1.1.2.3 +18 -3 rpm/rpmio/tmqtt.c 1.73.4.16 +4 -0 rpm/rpmio/url.c ____________________________________________________________________________ patch -p0 <<'@@ .' Index: rpm/CHANGES ============================================================================ $ cvs diff -u -r1.3501.2.508 -r1.3501.2.509 CHANGES --- rpm/CHANGES 29 Jun 2016 09:58:39 -0000 1.3501.2.508 +++ rpm/CHANGES 29 Jun 2016 12:17:57 -0000 1.3501.2.509 @@ -1,4 +1,5 @@ 5.4.17 -> 5.4.18: + - jbj: mqtt: add self-subscriptions and macro configgery. - jbj: macro: add primitives useful for log spewage. - jbj: mqtt: stub-in a paho-mqtt client. - jbj: build: add/use RPMIOPOOL_ macros where possible. @@ . patch -p0 <<'@@ .' Index: rpm/configure.ac ============================================================================ $ cvs diff -u -r2.472.2.151 -r2.472.2.152 configure.ac --- rpm/configure.ac 27 Jun 2016 18:27:10 -0000 2.472.2.151 +++ rpm/configure.ac 29 Jun 2016 12:17:57 -0000 2.472.2.152 @@ -2247,7 +2247,7 @@ # MQTT RPM_CHECK_LIB( [MQTT], [mqtt], - [paho-mqtt3c], [MQTTClient_create], [MQTTClient.h], + [paho-mqtt3as], [MQTTAsync_create], [MQTTAsync.h], [no,external:none], [], [ AC_DEFINE(WITH_MQTT, 1, [Define if building with MQTT]) ], []) @@ -3372,7 +3372,8 @@ rpmdb/DB_CONFIG macros/macros macros/macros.rpmbuild macros/cmake macros/gstreamer macros/java macros/kernel macros/libtool - macros/mandriva macros/suse macros/fedora macros/mono macros/perl macros/pkgconfig macros/php + macros/mandriva macros/suse macros/fedora macros/mono macros/mqtt + macros/perl macros/pkgconfig macros/php macros/python macros/ruby macros/selinux macros/tcl lua/tests/Makefile lua/tests/libs/Makefile doc/Makefile @@ . patch -p0 <<'@@ .' Index: rpm/macros/.cvsignore ============================================================================ $ cvs diff -u -r1.4.2.2 -r1.4.2.3 .cvsignore --- rpm/macros/.cvsignore 15 Aug 2011 20:36:23 -0000 1.4.2.2 +++ rpm/macros/.cvsignore 29 Jun 2016 12:17:58 -0000 1.4.2.3 @@ -8,6 +8,7 @@ macros.rpmbuild mandriva mono +mqtt perl php pkgconfig @@ . patch -p0 <<'@@ .' Index: rpm/macros/macros.in ============================================================================ $ cvs diff -u -r1.39.2.49 -r1.39.2.50 macros.in --- rpm/macros/macros.in 8 May 2016 18:33:29 -0000 1.39.2.49 +++ rpm/macros/macros.in 29 Jun 2016 12:17:58 -0000 1.39.2.50 @@ -1,7 +1,7 @@ #/*! \page config_macros Default configuration: @USRLIBRPM@/macros # \verbatim # -# $Id: macros.in,v 1.39.2.49 2016/05/08 18:33:29 jbj Exp $ +# $Id: macros.in,v 1.39.2.50 2016/06/29 12:17:58 jbj Exp $ # # This is a global RPM configuration file. All changes made here will # be lost when the rpm package is upgraded. Any per-system configuration @@ -1057,6 +1057,10 @@ # ---- rpmbuild macros. %{load:%{_usrlibrpm}/macros.rpmbuild} +#============================================================================== +# ---- MQTT macros. +%{load:%{_usrlibrpm}/macros.d/mqtt} + #------------------------------------------------------------------------ # cmake(...) configuration #%%{load:%{_usrlibrpm}/macros.d/cmake} @@ . patch -p0 <<'@@ .' Index: rpm/macros/mqtt.in ============================================================================ $ cvs diff -u -r0 -r1.1.2.1 mqtt.in --- /dev/null 2016-06-29 14:15:40.000000000 +0200 +++ mqtt.in 2016-06-29 14:17:58.615621809 +0200 @@ -0,0 +1,13 @@ +#============================================================================== +# ---- MQTT configuration. +# +%_mqtt_cachedir /var/cache/mqtt +%_mqtt_user luser +%_mqtt_pass jasnl +%_mqtt_host localhost +%_mqtt_port 1883 +%_mqtt_clientid rpm +%_mqtt_topic rpm/# +%_mqtt_qos 1 +%_mqtt_timeout 10000 +%_mqtt_prefix %{now} rpm pid %{pid} on cpu%{cpu} %{user}:%{group} %{nil} @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/rpmmqtt.c ============================================================================ $ cvs diff -u -r1.1.2.4 -r1.1.2.5 rpmmqtt.c --- rpm/rpmio/rpmmqtt.c 28 Jun 2016 07:20:22 -0000 1.1.2.4 +++ rpm/rpmio/rpmmqtt.c 29 Jun 2016 12:17:58 -0000 1.1.2.5 @@ -19,7 +19,7 @@ #include "debug.h" -int _rpmmqtt_debug = 1; +int _rpmmqtt_debug = 0; /*==============================================================*/ typedef struct key_s { @@ -27,7 +27,7 @@ const char *n; } KEY; -#define _ENTRY(_v) { MQTTCLIENT_##_v, #_v, } +#define _ENTRY(_v) { MQTTASYNC_##_v, #_v, } static KEY rpmmqtt_errs[] = { #ifdef WITH_MQTT _ENTRY(SUCCESS), @@ -40,6 +40,7 @@ _ENTRY(TOPICNAME_TRUNCATED), _ENTRY(BAD_STRUCTURE), _ENTRY(BAD_QOS), + _ENTRY(NO_MORE_MSGIDS), #else { 0, NULL }, #endif @@ -72,7 +73,7 @@ if (rc != 0) { /* MQTTCLIENT_SUCCESS */ int _lvl = RPMLOG_WARNING; - rpmlog(_lvl, "%s:%s:%u: MQTTClient_%s: %s(%d)\n", + rpmlog(_lvl, "%s:%s:%u: MQTTAsync_%s: %s(%d)\n", func, fn, ln, msg, rpmmqttStrerror(rc), rc); } return rc; @@ -81,9 +82,12 @@ Xcheck(_o, _m, _rc, _rpmmqtt_debug, __FUNCTION__, __FILE__, __LINE__) /*==============================================================*/ -struct MQTTClient_message; -static int rpmmqttMessageArrived(void * _mqtt, char * topic, int topicLen, - MQTTClient_message * message) +struct MQTTAsync_message; +struct MQTTAsync_successData; +struct MQTTAsync_failureData; + +static int onMessageArrived(void * _mqtt, char * topic, int topicLen, + MQTTAsync_message * message) { rpmmqtt mqtt = (rpmmqtt) _mqtt; int rc = 1; @@ -94,36 +98,161 @@ const char * s = message->payload; size_t ns = message->payloadlen; if (_rpmmqtt_debug < 0) - rpmlog(RPMLOG_DEBUG, "+++ MQTT rcvd topic(%s) \"%.*s\"\n", topic, ns, s); - MQTTClient_freeMessage(&message); - MQTTClient_free(topic); + rpmlog(RPMLOG_DEBUG, "MQTT rcvd topic(%s) \"%.*s\"\n", topic, ns, s); + + printf("%.*s:\t", topicLen, topic); + for (size_t i = 0; i < ns; i++) + putchar(s[i]); + putchar('\n'); + + MQTTAsync_freeMessage(&message); + MQTTAsync_free(topic); #endif return rc; } -static void rpmmqttDeliveryComplete(void * _mqtt, int token) +static void onDeliveryComplete(void * _mqtt, int token) { rpmmqtt mqtt = (rpmmqtt) _mqtt; if (_rpmmqtt_debug < 0) rpmlog(RPMLOG_DEBUG, "--- MQTT done(%d)\n", token); mqtt->token = token; - mqtt->delivered = 1; } -static void rpmmqttConnlost(void * _mqtt, char *cause) +static void onConnectionLost(void * _mqtt, char *cause) +{ + rpmmqtt mqtt = (rpmmqtt) _mqtt; + + rpmlog(RPMLOG_DEBUG, + "--- MQTT disconnect(%s) version(%d) present(%d)\n", + mqtt->serverURI, mqtt->MQTTVersion, mqtt->sessionPresent); + if (cause) + rpmlog(RPMLOG_DEBUG, "\tcause: %s\n", cause); + + rpmlog(RPMLOG_WARNING, "MQTT reconnecting(%s) ...\n", mqtt->serverURI); + + mqtt->connected = 0; + (void) rpmmqttConnect(mqtt); +} + +static void onDisconnectFailure(void * _mqtt, MQTTAsync_failureData * response) +{ + rpmmqtt mqtt = (rpmmqtt) _mqtt; +fprintf(stderr, "<-- %s(%p,%p) MQTT disconnect failed\n", __FUNCTION__, _mqtt, response); + if (response) { + const char *s = response->message; + int token = response->token; + int code = response->code; + rpmlog(RPMLOG_WARNING, + "MQTT disconnect failed: code(%d) msg %s\n", + token, code, s); + } else + rpmlog(RPMLOG_WARNING, "MQTT disconnect failed\n"); + mqtt->connected = 0; + mqtt->finished = 1; +} + +static void onDisconnect(void * _mqtt, MQTTAsync_successData * response) { rpmmqtt mqtt = (rpmmqtt) _mqtt; - if (mqtt->debug || _rpmmqtt_debug < 0) { + if (mqtt->debug || _rpmmqtt_debug) rpmlog(RPMLOG_DEBUG, - "+++ MQTT disconnect(mqtt://%s) version(%d) present(%d)\n", + "MQTT disconnect(%s) version(%d) present(%d)\n", mqtt->serverURI, mqtt->MQTTVersion, mqtt->sessionPresent); - if (cause) - rpmlog(RPMLOG_DEBUG, "\tcause: %s\n", cause); - mqtt->serverURI = _free(mqtt->serverURI); - mqtt->connected = 0; + mqtt->serverURI = _free(mqtt->serverURI); + mqtt->connected = 0; + mqtt->finished = 1; +} + +static void onConnectFailure(void * _mqtt, MQTTAsync_failureData * response) +{ + rpmmqtt mqtt = (rpmmqtt) _mqtt; +fprintf(stderr, "<-- %s(%p,%p) MQTT connect failed\n", __FUNCTION__, _mqtt, response); + if (response) { + const char *s = response->message; + int token = response->token; + int code = response->code; + rpmlog(RPMLOG_WARNING, "MQTT connect failed: code(%d) msg %s\n", + token, code, s); + } else + rpmlog(RPMLOG_WARNING, "MQTT connect failed\n"); + mqtt->connected = 0; + mqtt->finished = 1; +} + +static void onConnect(void * _mqtt, MQTTAsync_successData * response) +{ + rpmmqtt mqtt = (rpmmqtt) _mqtt; + + if (response) { + mqtt->serverURI = xstrdup(response->alt.connect.serverURI); + mqtt->MQTTVersion = response->alt.connect.MQTTVersion; + mqtt->sessionPresent = response->alt.connect.sessionPresent; } + + if (mqtt->debug || _rpmmqtt_debug) + rpmlog(RPMLOG_DEBUG, + "MQTT connect(%s) version(%d) present(%d)\n", + mqtt->serverURI, mqtt->MQTTVersion, mqtt->sessionPresent); + mqtt->connected = 1; + mqtt->finished = 1; +} + +static void onSubscribeFailure(void * _mqtt, MQTTAsync_failureData * response) +{ + rpmmqtt mqtt = (rpmmqtt) _mqtt; +fprintf(stderr, "<-- %s(%p,%p) MQTT connect failed\n", __FUNCTION__, _mqtt, response); + if (response) { + const char *s = response->message; + int token = response->token; + int code = response->code; + rpmlog(RPMLOG_WARNING, "MQTT subscribe failed: code(%d) msg %s\n", + token, code, s); + } else + rpmlog(RPMLOG_WARNING, "MQTT subscribe failed\n"); + mqtt->subscribed = 0; + mqtt->finished = 1; +} + +static void onSubscribe(void * _mqtt, MQTTAsync_successData * response) +{ + rpmmqtt mqtt = (rpmmqtt) _mqtt; + int qos = (response ? response->alt.qos : mqtt->qos); + + if (mqtt->debug || _rpmmqtt_debug) + rpmlog(RPMLOG_DEBUG, "MQTT subscribe qos(%d)\n", qos); + mqtt->subscribed = 1; + mqtt->finished = 1; +} + +static void onSendFailure(void * _mqtt, MQTTAsync_failureData * response) +{ + rpmmqtt mqtt = (rpmmqtt) _mqtt; + + { + const char *s = response->message; + int token = response->token; + int code = response->code; + rpmlog(RPMLOG_WARNING, "MQTT send(%d) failed: code(%d) msg %s\n", + token, code, s); + } + mqtt->finished = 1; +} + +static void onSend(void * _mqtt, MQTTAsync_successData * response) +{ + rpmmqtt mqtt = (rpmmqtt) _mqtt; + + if (mqtt->debug || _rpmmqtt_debug) { + const char * s = response->alt.pub.message.payload; + size_t ns = response->alt.pub.message.payloadlen; + int token = response->token; + rpmlog(RPMLOG_DEBUG, "MQTT sent(%d) topic(%s) \"%.*s\"\n", + token, mqtt->topic, ns, s); + } + mqtt->finished = 1; } /*==============================================================*/ @@ -170,6 +299,8 @@ if (Rmdir(mqtt->dn) && (errno != ENOENT && errno != ENOTEMPTY)) goto exit; + rpmlog(RPMLOG_DEBUG, D_("removed directory %s\n"), mqtt->dn); + rc = 0; exit: @@ -425,43 +556,63 @@ { int rc = -1; #ifdef WITH_MQTT - if (MQTTClient_isConnected(mqtt->C)) { - mqtt->connected = 1; + if (MQTTAsync_isConnected(mqtt->C)) { rc = 0; } else { - MQTTClient_connectOptions Copts = MQTTClient_connectOptions_initializer; + urlinfo u = mqtt->u; + + MQTTAsync_willOptions Wopts = MQTTAsync_willOptions_initializer; +#ifdef REF + memcpy(Wopts.struct_id, "MQTW", 4); + Wopts.struct_version = 0; +#endif + Wopts.topicName = mqtt->topic; + Wopts.message = "will message"; +#ifdef REF + Wopts.retained = 0; + Wopts.qos = 0; +#endif + + MQTTAsync_connectOptions Copts = MQTTAsync_connectOptions_initializer; #ifdef REF memcpy(Copts.struct_id, "MQTC", 4); - Copts.struct_version = 4; /* 0-4 determines what follows */ + Copts.struct_version = 3; /* 0-4 enables fields below */ #endif - Copts.keepAliveInterval = 20; /* 60 */ + Copts.keepAliveInterval = 10; /* 60 */ Copts.cleansession = 0; /* 1 discards session state */ - Copts.reliable = 0; /* 1 forces sync */ -#ifdef REF + Copts.maxInflight = 10; /* 10 */ + +#ifdef NOTYET + Copts.will = &Wopts; /* last will */ +#else Copts.will = NULL; /* last will */ - Copts.username = NULL; - Copts.password = NULL; - Copts.connectTimeout = 30; - Copts.retryInterval = 20; +#endif + Copts.username = u->user; + Copts.password = u->password; + Copts.connectTimeout = 30; /* secs */ + Copts.retryInterval = 0; /* secs */ +#ifdef REF + MQTTAsync_SSLOptions ssl_opts = MQTTAsync_SSLOptions_initializer; + ssl_opts.enableServerCertAuth = 0; + Copts.ssl = &ssl_opts; +#else Copts.ssl = NULL; +#endif + Copts.onSuccess = onConnect; + Copts.onFailure = onConnectFailure; + Copts.context = mqtt; Copts.serverURIcount = 0; Copts.serverURIs = NULL; - COpts.MQTTVersion = 0; -#endif + + mqtt->finished = 0; rc = check(mqtt, "connect", - MQTTClient_connect(mqtt->C, &Copts)); - if (rc == 0) { - mqtt->serverURI = xstrdup(Copts.returned.serverURI); - mqtt->MQTTVersion = Copts.returned.MQTTVersion; - mqtt->sessionPresent = Copts.returned.sessionPresent; - if (mqtt->debug || _rpmmqtt_debug) - rpmlog(RPMLOG_DEBUG, - "+++ MQTT connect(mqtt://%s) version(%d) present(%d)\n", - mqtt->serverURI, mqtt->MQTTVersion, mqtt->sessionPresent); - mqtt->connected = 1; - } + MQTTAsync_connect(mqtt->C, &Copts)); + + while (!mqtt->finished) + usleep(100); + } -#endif +#endif /* WITH_MQTT */ return rc; } @@ -469,51 +620,93 @@ { int rc = -1; #ifdef WITH_MQTT - if (MQTTClient_isConnected(mqtt->C)) { + if (MQTTAsync_isConnected(mqtt->C)) { + MQTTAsync_disconnectOptions Dopts = + MQTTAsync_disconnectOptions_initializer; +#ifdef REF + memcpy(Dopts.struct_id, "MQTD", 4); + Dopts.struct_version = 0; +#endif + Dopts.timeout = mqtt->msecs; + Dopts.onSuccess = onDisconnect; + Dopts.onFailure = onDisconnectFailure; + Dopts.context = mqtt; + + mqtt->finished = 0; rc = check(mqtt, "disconnect", - MQTTClient_disconnect(mqtt->C, mqtt->msecs)); - if (mqtt->debug || _rpmmqtt_debug) - rpmlog(RPMLOG_DEBUG, - "+++ MQTT disconnect(mqtt://%s) version(%d) present(%d)\n", - mqtt->serverURI, mqtt->MQTTVersion, mqtt->sessionPresent); - mqtt->serverURI = _free(mqtt->serverURI); - mqtt->connected = 0; + MQTTAsync_disconnect(mqtt->C, &Dopts)); + while (!mqtt->finished) + usleep(100); } #endif return rc; } +int rpmmqttSendmsg(rpmmqtt mqtt, const char * s, size_t ns) +{ + int rc = -1; + + if (ns == 0) ns = strlen(s); + +#ifdef WITH_MQTT + MQTTAsync_message pubmsg = MQTTAsync_message_initializer; +#ifdef REF + memcpy(pubmsg.struct_id, "MQTM", 4); + pubmsg.struct_version = 0; +#endif + pubmsg.payloadlen = ns; + pubmsg.payload = (char *) s; + pubmsg.qos = mqtt->qos; + pubmsg.retained = 0; +#ifdef REF + pubmsg.dup = 0; + pubmsg.msgid = 0; +#endif + + MQTTAsync_responseOptions Ropts = + MQTTAsync_responseOptions_initializer; +#ifdef REF + memcpy(pubmsg.struct_id, "MQTR", 4); + pubmsg.struct_version = 0; +#endif + Ropts.onSuccess = onSend; + Ropts.onFailure = onSendFailure; + Ropts.context = mqtt; + Ropts.token = 0; + + mqtt->finished = 0; + rc = check(mqtt, "sendMessage", + MQTTAsync_sendMessage(mqtt->C, mqtt->topic, &pubmsg, + &Ropts)); + while (!mqtt->finished) + usleep(100); +#endif /* WITH_MQTT */ + + return rc; +} + /*==============================================================*/ ssize_t rpmmqttWrite(rpmmqtt mqtt, const char *s, size_t ns) { ssize_t ret = -1; /* assume failure */ - if (ns == 0) ns = strlen(s); - #ifdef WITH_MQTT if (rpmmqttConnect(mqtt) == 0) { - MQTTClient_message pubmsg = MQTTClient_message_initializer; - int rc; - pubmsg.payload = (char *) s; - pubmsg.payloadlen = ns; - pubmsg.qos = mqtt->qos; - pubmsg.retained = 0; - - mqtt->delivered = 0; - rc = check(mqtt, "publishMessage", - MQTTClient_publishMessage(mqtt->C, mqtt->topic, &pubmsg, - &mqtt->token)); - if (_rpmmqtt_debug) - rpmlog(RPMLOG_DEBUG, "+++ MQTT sent(%d) topic(%s) \"%.*s\"\n", - mqtt->token, mqtt->topic, ns, s); - - if (!mqtt->delivered) - rc = check(mqtt, "waitForCompletion", - MQTTClient_waitForCompletion(mqtt->C, mqtt->token, mqtt->msecs)); - if (rc == 0) - ret = ns; - } +#ifdef DYING + static char _mqtt_prefix[] = + "%{now} rpm pid %{pid} on cpu%{cpu} %{user}:%{group} "; +#else + static char _mqtt_prefix[] = "%{?_mqtt_prefix}"; #endif + char * t = rpmExpand(_mqtt_prefix, " ", s, NULL); + size_t nt = strlen(t); + + if (!rpmmqttSendmsg(mqtt, t, nt)) + ret = nt; + + t = _free(t); + } +#endif /* WITH_MQTT */ return ret; } @@ -524,26 +717,31 @@ #ifdef WITH_MQTT if (rpmmqttConnect(mqtt) == 0) { -#ifdef NOTYET - MQTTClient_message pubmsg = MQTTClient_message_initializer; - int rc; - - pubmsg.payload = (char *) s; - pubmsg.payloadlen = ns; - pubmsg.qos = mqtt->qos; - pubmsg.retained = 0; - - rc = check(mqtt, "publishMessage", - MQTTClient_publishMessage(mqtt->C, mqtt->topic, &pubmsg, - &mqtt->token)); - - rc = check(mqtt, "waitForCompletion", - MQTTClient_waitForCompletion(mqtt->C, mqtt->token, mqtt->msecs)); - if (rc == 0) - ret = ns; +char * topic = rpmExpand((s ? s : mqtt->topic), NULL); +int qos = 2; +int rc; + MQTTAsync_responseOptions Ropts = + MQTTAsync_responseOptions_initializer; +#ifdef REF + memcpy(pubmsg.struct_id, "MQTR", 4); + pubmsg.struct_version = 0; #endif + Ropts.onSuccess = onSubscribe; + Ropts.onFailure = onSubscribeFailure; + Ropts.context = mqtt; + + mqtt->finished = 0; + rc = check(mqtt, "subscribe", + MQTTAsync_subscribe(mqtt->C, topic, qos, &Ropts)); + while (!mqtt->finished) + usleep(100); + + ret = 0; /* XXX */ + +topic = _free(topic); + } -#endif +#endif /* WITH_MQTT */ return ret; } @@ -554,13 +752,10 @@ rpmmqtt mqtt = (rpmmqtt) _mqtt; #ifdef WITH_MQTT - { MQTTClient C = (MQTTClient) mqtt->C; - int xx; - xx = rpmmqttDisconnect(mqtt); - xx = check(mqtt, "destroy", - (MQTTClient_destroy(&C), 0)); - } -#endif + (void) rpmmqttDisconnect(mqtt); + (void) check(mqtt, "destroy", + (MQTTAsync_destroy(&mqtt->C), 0)); +#endif /* WITH_MQTT */ mqtt->C = NULL; mqtt->uri = _free(mqtt->uri); mqtt->topic = _free(mqtt->topic); @@ -599,6 +794,13 @@ fprintf(fp, " host: %s\n", u->host); fprintf(fp, " portstr: %s\n", u->portstr); fprintf(fp, " query: %s\n", u->query); + if (u->query) { + ARGV_t av = NULL; + int xx; + xx = argvSplit(&av, u->query, ","); + argvPrint(u->query, av, fp); + av = argvFree(av); + } fprintf(fp, "fragment: %s\n", u->fragment); fprintf(fp, " path: %s\n", path); } @@ -607,9 +809,7 @@ rpmmqtt rpmmqttNew(char ** av, uint32_t flags) { -static char _test_mqtt[] = "test/mqtt"; - - static const char *_av[] = { "tcp://localhost:1883/test/mqtt", NULL }; + static const char *_av[] = { "mqtt://localhost:1883/rpm/%{pid}/mqtt", NULL }; rpmmqtt mqtt = rpmmqttGetPool(_rpmmqttPool); urlinfo u = NULL; const char *s = NULL; @@ -620,17 +820,24 @@ av = (char **)_av; if (av) { int ac = argvCount((ARGV_t)av); - for (int i = 0; i < ac; i++) - (void) argvAdd((ARGV_t *)&mqtt->av, av[i]); + for (int i = 0; i < ac; i++) { + char * t = rpmExpand(av[i], NULL); + (void) argvAdd((ARGV_t *)&mqtt->av, t); + t = _free(t); + } } mqtt->ut = urlSplit(av[0], &u); mqtt->u = u; - if (u->scheme == NULL || !strcmp(u->scheme, "mqtt")) { + if (u->scheme == NULL + || !strcmp(u->scheme, "mqtt") + || !strcmp(u->scheme, "mqtts")) + { + if (u->portstr == NULL) + u->portstr = !strcmp(u->scheme, "mqtts") + ? xstrdup("8883") : xstrdup("1883"); u->scheme = _free(u->scheme); u->scheme = xstrdup("tcp"); - if (u->portstr == NULL) - u->portstr = xstrdup("1883"); } #ifdef DYING dumpURL(__FUNCTION__, u); @@ -639,25 +846,56 @@ mqtt->uri = rpmExpand(u->scheme, "://", u->host, ":", u->portstr, NULL); (void) urlPath(u->url, &s); +static const char _mqtt_topic[] = + "%{?_mqtt_topic}%{!?_mqtt_topic:rpm/%{pid}/mqtt}"; if (s && *s == '/') s++; if (s == NULL || *s == '\0') - s = _test_mqtt; - mqtt->topic = xstrdup(s); + s = _mqtt_topic; + mqtt->topic = rpmExpand(s, NULL); - mqtt->clientid = xstrdup("rpm"); +static const char _mqtt_clientid[] = + "%{?_mqtt_clientid}%{!?_mqtt_clientid:rpm}"; + mqtt->clientid = rpmExpand(_mqtt_clientid, NULL); + +static const char _mqtt_qos[] = "%{?_mqtt_qos}%{!?_mqtt_qos:1}"; + mqtt->qos = rpmExpandNumeric(_mqtt_qos); +static const char _mqtt_timeout[] = "%{?_mqtt_timeout}%{!?_mqtt_timeout:10000}"; + mqtt->msecs = rpmExpandNumeric(_mqtt_timeout); + + if (u->query) { + ARGV_t qav = NULL; + int qac; + const char *t; + const char *te; + + (void) argvSplit(&qav, u->query, ","); + qac = argvCount(qav); + for (int i = 0; i < qac; i++) { + t = qav[i]; + if ((te = strchr(t, '=')) == NULL) + continue; + if (!strncmp(t, "qos", (te - t)) && xisdigit(te[1])) { + mqtt->qos = strtol(te+1, NULL, 0); + mqtt->qos %= 3; + continue; + } + if (!strncmp(t, "timeout", (te - t)) && xisdigit(te[1])) { + mqtt->msecs = strtol(te+1, NULL, 0); + continue; + } - mqtt->qos = 1; - mqtt->msecs = 10000; + } + qav = argvFree(qav); + } #ifdef WITH_MQTT { static int oneshot; - static char _var_cache_mqtt[] = "/var/cache/mqtt"; int xx; if (!oneshot) { - MQTTClient_nameValue *I = MQTTClient_getVersionInfo(); + MQTTAsync_nameValue *I = MQTTAsync_getVersionInfo(); int _lvl = RPMLOG_DEBUG; rpmlog(_lvl, "==================== MQTT\n"); while (I->name) { @@ -667,6 +905,9 @@ oneshot++; } +static const char _mqtt_cachedir[] = + "%{?_mqtt_cachedir}%{!?_mqtt_cachedir:/var/cache/mqtt}"; +char *persist_path = rpmGetPath(_mqtt_cachedir, NULL); mqtt->persist_type = MQTTCLIENT_PERSISTENCE_USER; switch (mqtt->persist_type) { default: @@ -674,36 +915,35 @@ mqtt->persist_ctx = NULL; break; case MQTTCLIENT_PERSISTENCE_DEFAULT: - { + { mqtt->persist_path = xstrdup(persist_path); /* XXX rpmmqttFini double free */ mqtt->persist_ctx = (void *)xstrdup(mqtt->persist_path); - mqtt->persist_path = xstrdup(_var_cache_mqtt); } break; case MQTTCLIENT_PERSISTENCE_USER: - { + { mqtt->persist_path = xstrdup(persist_path); MQTTClient_persistence * ctx = xmalloc(sizeof(*ctx)); *ctx = _rpmmqtt_persistence; /* structure assignment */ ctx->context = mqtt; mqtt->persist_ctx = ctx; - mqtt->persist_path = xstrdup(_var_cache_mqtt); } break; } +persist_path = _free(persist_path); xx = check(mqtt, "create", - MQTTClient_create(&mqtt->C, mqtt->uri, mqtt->clientid, + MQTTAsync_create(&mqtt->C, mqtt->uri, mqtt->clientid, mqtt->persist_type, mqtt->persist_ctx)); xx = check(mqtt, "setCallbacks", - MQTTClient_setCallbacks(mqtt->C, mqtt, - rpmmqttConnlost, - rpmmqttMessageArrived, - rpmmqttDeliveryComplete)); + MQTTAsync_setCallbacks(mqtt->C, mqtt, + onConnectionLost, + onMessageArrived, + onDeliveryComplete)); xx = rpmmqttConnect(mqtt); /* XXX exit if cannot connect? */ } -#endif +#endif /* WITH_MQTT */ return rpmmqttLink(mqtt); } @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/rpmmqtt.h ============================================================================ $ cvs diff -u -r1.1.2.4 -r1.1.2.5 rpmmqtt.h --- rpm/rpmio/rpmmqtt.h 28 Jun 2016 07:20:22 -0000 1.1.2.4 +++ rpm/rpmio/rpmmqtt.h 29 Jun 2016 12:17:58 -0000 1.1.2.5 @@ -10,7 +10,7 @@ typedef struct rpmmqtt_s * rpmmqtt; #if defined(_RPMMQTT_INTERNAL) -#include <MQTTClient.h> +#include <MQTTAsync.h> struct rpmmqtt_s { struct rpmioItem_s _item; /*!< usage mutex and pool identifier. */ char ** av; @@ -35,8 +35,9 @@ int msecs; int debug; + int finished; int connected; - int delivered; + int subscribed; char * serverURI; int MQTTVersion; @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/rpmurl.h ============================================================================ $ cvs diff -u -r1.41.4.9 -r1.41.4.10 rpmurl.h --- rpm/rpmio/rpmurl.h 28 Jun 2016 07:20:22 -0000 1.41.4.9 +++ rpm/rpmio/rpmurl.h 29 Jun 2016 12:17:58 -0000 1.41.4.10 @@ -32,6 +32,7 @@ #define URL_IS_POSTGRES (urltype)34 #define URL_IS_SQLSERVER (urltype)35 #define URL_IS_MQTT (urltype)36 +#define URL_IS_MQTTS (urltype)37 #define URLMAGIC 0xd00b1ed0U #define URLSANE(u) assert(u && u->magic == URLMAGIC) @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/tmqtt.c ============================================================================ $ cvs diff -u -r1.1.2.2 -r1.1.2.3 tmqtt.c --- rpm/rpmio/tmqtt.c 28 Jun 2016 07:20:22 -0000 1.1.2.2 +++ rpm/rpmio/tmqtt.c 29 Jun 2016 12:17:58 -0000 1.1.2.3 @@ -24,6 +24,9 @@ ssize_t nw; int rc = 0; + (void) rpmmqttRead(mqtt, "rpm/#", 0); + (void) rpmmqttRead(mqtt, "$SYS/#", 0); + nw = rpmmqttWrite(mqtt, "bzzt ...", 0); nw = rpmmqttWrite(mqtt, "bzzT ...", 0); nw = rpmmqttWrite(mqtt, "bzZT ...", 0); @@ -58,13 +61,25 @@ int ac = argvCount(av); #endif (void)av; - static char *_av[] = { "mqtt://localhost/test/mqtt", NULL, }; - rpmmqtt mqtt = rpmmqttNew(_av, 0); + static char *_av[] = { "mqtts://luser:jasnl@localhost:1883/rpm/%{pid}/mqtt?qos=1,timeout=10000", NULL, }; + rpmmqtt mqtt; int rc = -1; - rc = _DoMQTT(mqtt); +(void) rpmDefineMacro(NULL, "_mqtt_cachedir /var/cache/mqtt", 0); +(void) rpmDefineMacro(NULL, "_mqtt_user luser", 0); +(void) rpmDefineMacro(NULL, "_mqtt_pass jasnl", 0); +(void) rpmDefineMacro(NULL, "_mqtt_host localhost", 0); +(void) rpmDefineMacro(NULL, "_mqtt_port 1883", 0); +(void) rpmDefineMacro(NULL, "_mqtt_clientid rpm", 0); +(void) rpmDefineMacro(NULL, "_mqtt_topic rpm/#", 0); +(void) rpmDefineMacro(NULL, "_mqtt_qos 1", 0); +(void) rpmDefineMacro(NULL, "_mqtt_timeout 10000", 0); +(void) rpmDefineMacro(NULL, "_mqtt_prefix %{now} rpm pid %{pid} on cpu%{cpu} %{user}:%{group} ", 0); + mqtt = rpmmqttNew(_av, 0); + rc = _DoMQTT(mqtt); mqtt = rpmmqttFree(mqtt); + optCon = rpmioFini(optCon); return rc; } @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/url.c ============================================================================ $ cvs diff -u -r1.73.4.15 -r1.73.4.16 url.c --- rpm/rpmio/url.c 28 Jun 2016 07:20:22 -0000 1.73.4.15 +++ rpm/rpmio/url.c 29 Jun 2016 12:17:58 -0000 1.73.4.16 @@ -34,6 +34,9 @@ #ifndef IPPORT_MQTT #define IPPORT_MQTT 1883 #endif +#ifndef IPPORT_MQTTS +#define IPPORT_MQTTS 8883 +#endif #ifndef IPPORT_PGPKEYSERVER #define IPPORT_PGPKEYSERVER 11371 #endif @@ -414,6 +417,7 @@ { "sqlserver://", sizeof("sqlserver://")-1, URL_IS_SQLSERVER }, { "mqtt://", sizeof("mqtt://")-1, URL_IS_MQTT }, + { "mqtts://", sizeof("mqtts://")-1, URL_IS_MQTTS }, { "-", sizeof("-")-1, URL_IS_DASH }, { "ssh://", sizeof("ssh://")-1, URL_IS_UNKNOWN }, @@ . ______________________________________________________________________ RPM Package Manager http://rpm5.org CVS Sources Repository [email protected]
