Dear Pieter:

     attached is my first malamute project, it open 2 clients for produce
and consume message. producer can send message to stream or to one client,
consumer can listen to series of stream and subject.

     it is written in c++ use STL and readline.

     do you think it helpful for malamute?

Best
Miao

2015-07-16 19:18 GMT+08:00 Pieter Hintjens <[email protected]>:

> You can use CZMQ's zloop to poll on both a zsock_t and a POSIX socket.
> mlm_client_msgpipe returns the message socket for the client API.
>
> On Thu, Jul 16, 2015 at 9:35 AM, Miao Lin <[email protected]> wrote:
> > Hi:
> >     I am writing my first zeromq code also malamute code, it is a simple
> udp
> > message to malamute message bridge. I need to poll on a udp socket and
> also
> > call mlm_client_recv,
> > my question is , how to poll on these 2 kind of different things?
> >
> >     if zmq, seems can use zmq_poll on both legacy socket and zmq socket,
> but
> > how to do in malamute?
> >
> > Best
> > MiaoLin
> >
> > _______________________________________________
> > zeromq-dev mailing list
> > [email protected]
> > http://lists.zeromq.org/mailman/listinfo/zeromq-dev
> >
> _______________________________________________
> zeromq-dev mailing list
> [email protected]
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>
/*=========================================================================
mcli - interactive mlm cli tool

written by  Miao Lin <[email protected]>
 
MPL
===========================================================================*/

#include <malamute.h>

#include <stdlib.h>
#include <cjson/cJSON.h>
#include <readline/readline.h>	// for readline
#include <readline/history.h>
#include <string>
#include <sstream>
#include <vector>
#include <map>	// for pair
using namespace std;

#define PRODUCT         "Mmalamute Broker Interactive CLI"
#define COPYRIGHT       "Copyright (c) 2015 "
#define HELP			"Type ? for help, exit to quit"
#define MLM_CLI_NAME	"mcli"

#define CLI_NAME_LEN 128

bool verbose = false;						// verbose print when working?
bool null_auth = false;						// not use authenticate?
const char *username = MLM_CLI_NAME;		// user/pass when login malamute server
const char *password = MLM_CLI_NAME;
const char *endpoint = "tcp://127.0.0.1:9999";

static mlm_client_t *producer = NULL;				// malamute client handler as producer
static mlm_client_t *consumer = NULL;				// malamute client handler as consumer

char producer_stream[CLI_NAME_LEN] = MLM_CLI_NAME;	// stream of producer client
char producer_name[CLI_NAME_LEN];					// name of producer client

char consumer_name[CLI_NAME_LEN];					// name of consumer client 
class string_pair_t : public pair < string, string > {};
class string_pair_list_t : public vector < string_pair_t > {};
string_pair_list_t consumer_streams;

zmq_pollitem_t stdin_pollitem;				// for stdin

const char* mlm_cli_prompt = MLM_CLI_NAME"$";	// prompt for cli
bool running = true;							// if we need exit
zloop_t* client_loop;
const char* client_ip = "127.0.0.1";			// ip address of this client

int cmd_help(char *);				// help command handler
int cmd_exit(char *);				// exit command handler
int cmd_stream(char *);		// set stream of this client
int cmd_sub(char *);			// subscribe to a stream and subject
int cmd_sendto(char *);			// set message to mailbox
int cmd_pub(char *);				// broadcast
int cmd_info(char *);				// broadcast

typedef struct {
	const char* name;	// User printable name of the function
	rl_icpfunc_t *func;	// Function to call to do the job.
	const char* doc;			// Documentation for this function.
} COMMAND;

COMMAND commands[] = {
	{ "help", cmd_help, "Print help information(this screen)" },
	{ "?", cmd_help, "Print help information(this screen)" },
	{ "exit", cmd_exit, "Exit mlm_cli"	},
	{ "quit", cmd_exit, "Exit mlm_cli"  },
	{ "setstream", cmd_stream, "Set stream for produce msg" },
	{ "sub", cmd_sub, "subscribe to stream and subject" },
	{ "sendto", cmd_sendto, "send message to client name" },
	{ "pub", cmd_pub, "broad cast message to stream" },
	{ "info", cmd_info, "show information of this client" }, 
	{ NULL, NULL, NULL	},
};

COMMAND* find_command(const char* name)
{
	int i;

	for (i = 0; commands[i].name; i++)
		if (streq(name, commands[i].name))
			return (&commands[i]);

	return ((COMMAND *)NULL);
}

// trim left and right space of string.
char * stripwhite(char *string)
{
	char *s, *t;

	for (s = string; whitespace(*s); s++)
		;

	if (*s == 0)
		return (s);

	t = s + strlen(s) - 1;
	while (t > s && whitespace(*t))
		t--;
	*++t = '\0';

	return s;
}

void SplitString(const std::string& s, std::vector<std::string>& v, const std::string& c)
{
	std::string::size_type pos1, pos2;
	pos2 = s.find(c);
	pos1 = 0;
	while (std::string::npos != pos2)
	{
		v.push_back(s.substr(pos1, pos2 - pos1));

		pos1 = pos2 + c.size();
		pos2 = s.find(c, pos1);
	}
	if (pos1 != s.length())
		v.push_back(s.substr(pos1));
}

/* Execute a command line. */
int execute_line(char *line)
{
	register int i;
	COMMAND *command;
	char *word;

	/* Isolate the command word. */
	i = 0;
	while (line[i] && whitespace(line[i]))
		i++;
	word = line + i;

	while (line[i] && !whitespace(line[i]))
		i++;

	if (line[i])
		line[i++] = '\0';

	command = find_command(word);

	if (!command)
	{
		fprintf(stderr, "%s: No such command for %s.\n", word, MLM_CLI_NAME);
		return (-1);
	}

	/* Get argument to command, if any. */
	while (whitespace(line[i]))
		i++;

	word = line + i;

	/* Call the function. */
	return ((*(command->func)) (word));
}

/* **************************************************************** */
/*                                                                  */
/*                  Interface to Readline Completion                */
/*                                                                  */
/* **************************************************************** */

char *command_generator PARAMS((const char *, int));
char **fileman_completion PARAMS((const char *, int, int));
static void cb_linehandler(char* line);

// Tell the GNU Readline library how to complete.  We want to try to complete
// on command names if this is the first word in the line 
void initialize_readline(void)
{
	// Tell the completer that we want a crack first. 
	rl_attempted_completion_function = fileman_completion;

	// Install the line handler.
	rl_callback_handler_install(mlm_cli_prompt, cb_linehandler);
}

/* Attempt to complete on the contents of TEXT.  START and END bound the
region of rl_line_buffer that contains the word to complete.  TEXT is
the word to complete.  We can use the entire contents of rl_line_buffer
in case we want to do some simple parsing.  Return the array of matches,
or NULL if there aren't any. */
char **fileman_completion(const char *text, int start, int end)
{
	char **matches;

	matches = (char **)NULL;

	/* If this word is at the start of the line, then it is a command
	to complete.  Otherwise it is the name of a file in the current
	directory. */
	if (start == 0)
		matches = rl_completion_matches(text, command_generator);

	return (matches);
}

/* Generator function for command completion.  STATE lets us know whether
to start from scratch; without any state (i.e. STATE == 0), then we
start at the top of the list. */
char *command_generator(const char *text, int state)
{
	static int list_index, len;
	const char *name;

	/* If this is a new word to complete, initialize now.  This includes
	saving the length of TEXT for efficiency, and initializing the index
	variable to 0. */
	if (!state)
	{
		list_index = 0;
		len = strlen(text);
	}

	/* Return the next name which partially matches from the command list. */
	while (name = commands[list_index].name)
	{
		list_index++;

		if (strncmp(name, text, len) == 0)
			return (strdup(name));
	}

	/* If no names matched, then return NULL. */
	return ((char *)NULL);
}

/* **************************************************************** */
/*                                                                  */
/*                       Mlm_cli Commands                           */
/*                                                                  */
/* **************************************************************** */
int cmd_help(char *arg)
{
	if (!arg)
		arg = (char*)"";

	puts("command list:");
	for (int i = 0; NULL != commands[i].name; i++) {
		printf("%s \t %s\n", commands[i].name, commands[i].doc);
	}
	return 1;
}

int cmd_exit(char *arg)
{
	puts("exiting...");

	running = false;
	return 1;
}

// set stream of this client
int cmd_stream(char *arg)
{
	if (!arg) {
		printf("\nInvalid command, use setstream stream\n");
		return 1;
	}

	arg = stripwhite(arg);
	vector<string> list;
	SplitString(string(arg), list, string(" "));
	if (0 == list.size()) {
		printf("\nInvalid command, use setstream stream\n");
		return 1;
	}

	int rc = mlm_client_set_producer(producer, list[0].c_str());
	if (rc >= 0) {
		strncpy(producer_stream, list[0].c_str(), CLI_NAME_LEN);
		producer_stream[CLI_NAME_LEN - 1] = 0;
		printf("\nstream set to %s\n", producer_stream);
	} else
		printf("\nFailed set stream to %s\n", list[0].c_str());

	return 1;
}

// subscribe to a stream and subject
int cmd_sub(char * arg)
{
	if (!arg) {
		printf("Invalid subscribe cmd, use sub stream subject\n");
		return 1;
	}
		
	arg = stripwhite(arg);
	vector<string> list;
	SplitString(string(arg), list, string(" "));

	if (list.size() != 2) {
		printf("Invalid subscribe cmd, use sub stream subject\n");
		return 1;
	}
	
	int rc = mlm_client_set_consumer(consumer, list[0].c_str(), list[1].c_str());
	if (rc >= 0) {
		printf("Subscribed to STREAM %s SUBJECT %s\n", list[0].c_str(), list[1].c_str());
		string_pair_t p;
		p.first = list[0];
		p.second = list[1];
		consumer_streams.push_back(p);
	} else
		printf("Failed subscribe to STREAM %s SUBJECT %s\n", list[0].c_str(), list[1].c_str());

	return 1;

}

// set message to mailbox
int cmd_sendto(char *arg)
{
	int rc;
	if (!arg) {
		printf("0 message send, use sendto client subject content\n");
		return 1;
	}

	arg = stripwhite(arg);
	vector<string> list;
	SplitString(string(arg), list, string(" "));
	
	if (list.size() != 3) {
		printf("0 message send, use sendto client subject content\n");
		return 1;
	} else {
		zmsg_t* msg = zmsg_new();
		zmsg_pushstr(msg, list[2].c_str());
		rc = mlm_client_sendto(	producer,				// mlm client
								list[0].c_str(),		// client address
								list[1].c_str(),		// subject
								producer_name,		// tracker ?
								1000,				// time out, 1000ms;
								&msg);				// mesage body

		if (rc >= 0)
			printf("\nSUCCESS\n");
		else
			printf("\nFAILED!\n");
	}

	return 1;
}

// broadcast
int cmd_pub(char *arg)
{
	int rc;
	if (!arg) {
		printf("0 message send, use pub subject content\n");
		return 1;
	}

	arg = stripwhite(arg);
	vector<string> list;
	SplitString(string(arg), list, string(" "));

	if (list.size() != 2 ) {
		printf("0 message send, use pub subject content\n");
		return 1;
	} else {
		printf("Broadcast to \n\tSTREAM %s\n\tSUBJECT %s\n\tCONTENT %s\n", 
				producer_stream, list[0].c_str(), list[1].c_str());
		rc = mlm_client_sendx(producer, list[0].c_str(), list[1].c_str(), NULL);
		if (rc >= 0)
			printf("SUCCESS\n");
		else
			printf("FAILED!\n");
	}

	return 1;
}

// show information
int cmd_info(char *arg)
{
	int rc;
	printf("PRODUCER NAME: %s\n"
		"PRODUCER STREAM: %s\n"
		"CONSUMER NAME : %s\n",
		producer_name, producer_stream, consumer_name);
	
	if (consumer_streams.size()) {
		string_pair_list_t::iterator it = consumer_streams.begin();

		printf("LISTENING:\n");
		while (it != consumer_streams.end()) {
			printf("STREAM: %s\nSUBJECT: %s\n", it->first.c_str(), it->second.c_str());
			it++;
		}
	} else
		printf("NOT listening to any stream\n");
	return 1;
}

// handling message for producer
static int s_mlm_producer_event(zloop_t *loop, zsock_t *handle, void *arg)
{
	//  Now receive and print any messages we get
	zmsg_t *msg = mlm_client_recv(producer);
	assert(msg);
	char *content = zmsg_popstr(msg);
	
	printf("\nMSG=>\nCommand=%s\nSubject=%s\nContent=%s\n", 
			mlm_client_command(producer), mlm_client_subject(producer), content);
	zstr_free(&content);
	zmsg_destroy(&msg);

	return 0;
}

// handling message for consumer
static int s_mlm_consumer_event(zloop_t *loop, zsock_t *handle, void *arg)
{
	//  Now receive and print any messages we get
	zmsg_t *msg = mlm_client_recv(consumer);
	assert(msg);
	char *content = zmsg_popstr(msg);

	printf("\nMSG=>\nCommand=%s\nSubject=%s\nContent=%s\n", 
			mlm_client_command(consumer), mlm_client_subject(consumer), content);
	zstr_free(&content);
	zmsg_destroy(&msg);

	return 0;
}


static void cb_linehandler(char* line)
{
	if (line == NULL ) 
		return;
		
	line = stripwhite(line);

	if (line[0]) {
		add_history(line);
		execute_line(line);
	}
	free(line);
}


// simply forward console input to readline callback.
static int s_stdin_event(zloop_t *loop, zmq_pollitem_t *item, void *arg)
{
	if (!running)
		return -1;
	else {
		rl_callback_read_char();
		if (!running)
			return -1;
		else
			return 0;
	}
}

int main(int argc, char *argv[])
{
	int argn = 1;
	int rc;

	while (argn < argc) {
		if (streq(argv[argn], "-v")) {
			verbose = true;
		} else if (streq(argv[argn], "-n")) {
			null_auth = true;
		} else if (streq(argv[argn], "-e")) {
			endpoint = argv[++argn];
		} else if (streq(argv[argn], "-a")) {
			client_ip = argv[++argn];
		} else if (streq(argv[argn], "-p")) {
			username = argv[++argn];
			password = argv[++argn];
		} else {
			printf("syntax: "MLM_CLI_NAME" [-v][-n][-e endpoint] [-a ipaddr] [-p username password]\n");
			return 0;
		}
		argn++;
	}
	
	printf(MLM_CLI_NAME": endpoint %s\n", endpoint);

	mlm_client_verbose = verbose;
	producer = mlm_client_new();
	assert(producer);
	consumer = mlm_client_new();
	assert(consumer);

	// use plain text auth to mlm server
	if (!null_auth) {
		mlm_client_set_plain_auth(producer, username, password);
		mlm_client_set_plain_auth(consumer, username, password);
	}

	sprintf(producer_name, "%s://%s/%d#producer", MLM_CLI_NAME, client_ip, getpid());
	if (mlm_client_connect(producer, endpoint, 1000, producer_name)) {
		zsys_error(MLM_CLI_NAME": server not reachable at %s", endpoint);
		mlm_client_destroy(&producer);
		mlm_client_destroy(&consumer);
		return 0;
	}
	
	sprintf(consumer_name, "%s://%s/%d#consumer", MLM_CLI_NAME, client_ip, getpid());
	if (mlm_client_connect(consumer, endpoint, 1000, consumer_name)) {
		zsys_error(MLM_CLI_NAME": server not reachable at %s", endpoint);
		mlm_client_destroy(&producer);
		mlm_client_destroy(&consumer);
		return 0;
	}

	// the client with send all message to the stream
	mlm_client_set_producer(producer, producer_stream);

	// use zloop to do the main message loop
	client_loop = zloop_new();
	assert(client_loop);

	zloop_set_verbose(client_loop, verbose);

	// add producer
	rc = zloop_reader(client_loop, mlm_client_msgpipe(producer), s_mlm_producer_event, NULL);
	assert(rc == 0);

	// add consumer
	rc = zloop_reader(client_loop, mlm_client_msgpipe(consumer), s_mlm_consumer_event, NULL);
	assert(rc == 0);

	// add dahdi udp socket to zloop reactor through raw socket method 
	stdin_pollitem.socket = NULL;
	stdin_pollitem.fd = STDIN_FILENO;	/* for stdin */
	stdin_pollitem.events = ZMQ_POLLIN;

	rc = zloop_poller(client_loop, &stdin_pollitem, s_stdin_event, NULL);
	assert(rc == 0);

	puts(PRODUCT);
	puts(COPYRIGHT);
	puts(HELP);
	initialize_readline();

	// start message loop forever.
	zloop_start(client_loop);

	rl_callback_handler_remove();
	zloop_poller_end(client_loop, &stdin_pollitem);
	zloop_reader_end(client_loop, mlm_client_msgpipe(producer));
	zloop_reader_end(client_loop, mlm_client_msgpipe(consumer));

	printf(MLM_CLI_NAME": zloop stopped\n");
	zloop_destroy(&client_loop);
	mlm_client_destroy(&producer);
	mlm_client_destroy(&consumer);

	return 0;
}
_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev

Reply via email to