Hi rrd developers,
This is a patch for rrdtool to provide for distributed rrd file
storage with graphing done centrally. This patch is currently in use
with rrdtool 1.2 on centos5 with snmp pollers in remote data centers
writing to their own local rrd storage. Graphs are produced centrally
including aggregates across all data centers with rrd data retrieved
from across the internet.
Transport between an rrd grapher and rrd store is layered on top of an
rpc protocol. A grapher fetches rrd data by invoking an rpc which is
serviced by a remote rpc server running on an rrd data
storage/collecting server.
This patch extends the rrd_fetch syntax to allow rpc dispatching to be
specified. The rrd filename argument to the fetch function can be
passed as rpc//server_alias/rrdfile.rrd, example,
$> rrdtool xport \
> DEF:in1=rpc//server1/ifInUcastPkts.1.rrd:pps:MAX \
> DEF:in2=rpc//server2/ifInUcastPkts.1.rrd:pps:MAX \
> CDEF:sum=in1,in2,+ \
> XPORT:sum
Other than that file syntax change, rrd raph and rrd xport functions
operate the same.
On the graphing host different server_aliases are mapped to rrd
storage servers in a configuration file (rrdrpc.conf)
Storage-Server.server1=RRDDataFetch:tcp -h remote1.com -p 12345
Storage-Server.server2=RRDDataFetch:tcp -h remote2.com -p 12345
On the remote rrd storage/collection servers a similar configuration
file is used to setup the rrd rpc server daemon with its endpoint and
rrd file location (rrdrpcd.conf)
RRD.Base=/var/rrds
RRDFetchAdapter.Endpoints=tcp -p 12345
This patch is against the current rrdtool 1.2 svn branch and requires
the zeroc ice rpc framework,
http://www.zeroc.com/download/Ice/3.2/Ice-3.2.1.tar.gz
It is known to build and run on centos5 x86_64 and i686. This patch
touches autotools files so prior to the first build autoreconf should
be run. With the ice framework installed a confgure build sequence
would look similar to this
$ autoreconf
$ ./configure --enable-rrdrpc --with-zeroc-icelib=/ice_install_location
$ make
After a successful build a test program can be run
$ make check
Thanks,
Scott Brumbaugh
Index: src/rrd_fetch.c
===================================================================
--- src/rrd_fetch.c (revision 1372)
+++ src/rrd_fetch.c (working copy)
@@ -206,6 +206,12 @@
rrd_value_t *data_ptr;
unsigned long rows;
+#ifdef WANT_RRDRPC
+ if (strncmp(filename, "rpc//", 5) == 0) {
+ return _rrd_fetch_rpc(filename, cf_idx, start, end, step, ds_cnt, ds_namv, data);
+ }
+#endif /* WANT_RRDRPC */
+
#ifdef DEBUG
fprintf(stderr,"Entered rrd_fetch_fn() searching for the best match\n");
fprintf(stderr,"Looking for: start %10lu end %10lu step %5lu\n",
Index: src/rpc/rrdrpc.conf.t
===================================================================
--- src/rpc/rrdrpc.conf.t (revision 0)
+++ src/rpc/rrdrpc.conf.t (revision 0)
@@ -0,0 +1 @@
+Storage-Server.localhost=RRDFetchData:tcp -p 10000
Index: src/rpc/rrd_rpcd.cc
===================================================================
--- src/rpc/rrd_rpcd.cc (revision 0)
+++ src/rpc/rrd_rpcd.cc (revision 0)
@@ -0,0 +1,36 @@
+#include <Ice/Service.h>
+#include "storage_impl.h"
+
+#ifndef CONFIG_FILE
+#define CONFIG_FILE="/usr/local/etc/rrdrpcd.conf"
+#endif
+
+class RRDServerService : public Ice::Service {
+protected:
+ virtual bool start(int argc, char *argv[]);
+private:
+ Ice::ObjectAdapterPtr _adapter;
+};
+
+bool RRDServerService::start(int argc, char * argv[]) {
+ _adapter = communicator()->createObjectAdapter("RRDFetchAdapter");
+ _adapter->add(new StorageI, communicator()->stringToIdentity("RRDDataFetch"));
+ _adapter->activate();
+ return true;
+}
+
+int main(int argc, char * argv[]) {
+
+ RRDServerService service;
+ Ice::StringSeq args = Ice::argsToStringSeq(argc, argv);
+ if (!getenv("ICE_CONFIG")) {
+ args.push_back("--Ice.Config=" CONFIG_FILE);
+ }
+ Ice::PropertiesPtr properties = Ice::createProperties(args);
+ args = properties->parseCommandLineOptions("RRDFetchAdapter", args);
+ args = properties->parseCommandLineOptions("RRD", args);
+
+ Ice::InitializationData init_data;
+ init_data.properties = properties;
+ return service.main(args, init_data);
+}
Index: src/rpc/test1.cc
===================================================================
--- src/rpc/test1.cc (revision 0)
+++ src/rpc/test1.cc (revision 0)
@@ -0,0 +1,244 @@
+#include <sys/types.h>
+#include <sys/wait.h>
+#include <signal.h>
+
+#include <Ice/Ice.h>
+#include <IceUtil/IceUtil.h>
+
+extern "C" {
+#include "../rrd_tool.h"
+}
+
+void make_rrds(const char *, long, long, long);
+void print_rrd_error(void);
+
+int main(int argc, char **argv) {
+
+ const char * rrd_base = "rrd";
+ long rrd_step = 300;
+ time_t rrd_end_time = (time(NULL) / rrd_step) * rrd_step;
+ time_t rrd_start_time = rrd_end_time - rrd_step * 288;
+
+ make_rrds(rrd_base, rrd_start_time, rrd_end_time, rrd_step);
+
+ pid_t server_pid = fork();
+ if (server_pid == 0) {
+ setenv("ICE_CONFIG", "server_test.conf", 1);
+ char rrd_base_config[30];
+ sprintf (rrd_base_config, "--RRD.Base=%s", rrd_base);
+ execl("../rrdrpcd-static",
+ "rrdrpcd-static",
+ "--RRDFetchAdapter.Endpoints=tcp -p 10000",
+ rrd_base_config,
+ (char *) 0
+ );
+ perror("execl failed");
+ _exit(1);
+ }
+
+ sleep(1);
+
+ setenv("ICE_CONFIG", "client_test.conf", 1);
+
+ char start_buf[30], end_buf[30], step_buf[30];
+ sprintf(start_buf, "%ld", rrd_start_time);
+ sprintf(end_buf, "%ld", rrd_end_time);
+ sprintf(step_buf, "%ld", rrd_step);
+
+ char * fetch_args[] = {
+ "fetch",
+ "rpc//server1/rrd1.rrd",
+ "MAX",
+ "--resolution", step_buf,
+ "--start", start_buf,
+ "--end", end_buf
+ };
+ int n_args = sizeof(fetch_args)/sizeof(char *);
+
+ time_t start;
+ time_t end;
+ unsigned long step;
+ unsigned long ds_count;
+ char **ds_names;
+ rrd_value_t *data, *data_ptr;
+
+ rrd_fetch(n_args, fetch_args, &start, &end, &step, &ds_count, &ds_names, &data);
+ if (rrd_test_error()) {
+ print_rrd_error();
+ }
+
+ double delta = 0.0;
+ data_ptr = data;
+ for (time_t t=start+step; t<end; t+=step) {
+ double expected = sin(2*3.14159265*(t%86400)/86400);
+ delta += *data_ptr - expected;
+ data_ptr++;
+ }
+
+ if (delta > 1e-3) {
+ printf("error in rrd_fetch values -> %lf\n", delta);
+ }
+
+ char * xport_args[] = {
+ "xport",
+ "--step", step_buf,
+ "--start", start_buf,
+ "--end", end_buf,
+ "DEF:in1=rpc//server1/rrd1.rrd:in:MAX",
+ "DEF:in2=rpc//server1/rrd2.rrd:in:MAX",
+ "DEF:in3=rpc//server2/rrd3.rrd:in:MAX",
+ "CDEF:sum_in=in1,in2,in3,+,+",
+ "XPORT:sum_in"
+ };
+ n_args = sizeof(xport_args)/sizeof(char *);
+
+ int xxsize;
+ unsigned long cols;
+ char **legend;
+ rrd_xport(n_args, xport_args, &xxsize, &start, &end, &step, &cols, &legend, &data);
+ if (rrd_test_error()) {
+ print_rrd_error();
+ }
+
+ printf("\n");
+ printf("time fetched expected\n");
+ printf("---- ------- --------\n");
+
+ delta = 0.0;
+ data_ptr = data;
+ for (time_t t=start+step; t<end; t+=step) {
+ double expected = 6.*sin(2*3.14159265*(t%86400)/86400);
+ printf("%ld %lf %lf\n", t, *data_ptr, expected);
+ delta += *data_ptr - expected;
+ data_ptr++;
+ }
+ printf("\n");
+
+ kill(server_pid, SIGINT);
+
+ int status;
+ waitpid(server_pid, &status, 0);
+ if (WIFEXITED(status)) {
+ printf("server exit code %d\n", WEXITSTATUS(status));
+ }
+ else {
+ printf("server exited abnormally\n");
+ }
+
+ if (delta > 1e-3) {
+ printf("error in rrd_xport values -> %lf\n");
+ }
+
+ return 0;
+}
+
+void make_rrds(const char * base, long start, long end, long step) {
+
+ mkdir(base, 0755);
+
+ char rrd_file1[30];
+ sprintf(rrd_file1, "%s/rrd1.rrd", base);
+
+ char start_buf[30], end_buf[30], step_buf[30];
+ sprintf(start_buf, "%ld", start - 10);
+ sprintf(end_buf, "%ld", end);
+ sprintf(step_buf, "%ld", step);
+
+ char * args1[] = {
+ "create",
+ rrd_file1,
+ "--step", step_buf,
+ "--start", start_buf,
+ "DS:in:GAUGE:600:U:U",
+ "RRA:MAX:0.5:1:288"
+ };
+ int n_args = sizeof(args1)/sizeof(char *);
+
+ rrd_create(n_args, args1);
+ if (rrd_test_error()) {
+ print_rrd_error();
+ }
+
+ char rrd_file2[30];
+ sprintf(rrd_file2, "%s/rrd2.rrd", base);
+
+ char * args2[] = {
+ "create",
+ rrd_file2,
+ "--step", step_buf,
+ "--start", start_buf,
+ "DS:in:GAUGE:600:U:U",
+ "RRA:MAX:0.5:1:288"
+ };
+ n_args = sizeof(args2)/sizeof(char *);
+
+ rrd_create(n_args, args2);
+ if (rrd_test_error()) {
+ print_rrd_error();
+ }
+
+ char rrd_file3[30];
+ sprintf(rrd_file3, "%s/rrd3.rrd", base);
+
+ char * args3[] = {
+ "create",
+ rrd_file3,
+ "--step", step_buf,
+ "--start", start_buf,
+ "DS:in:GAUGE:600:U:U",
+ "RRA:MAX:0.5:1:288"
+ };
+
+ n_args = sizeof(args3)/sizeof(char *);
+
+ rrd_create(n_args, args3);
+ if (rrd_test_error()) {
+ print_rrd_error();
+ }
+
+ for (int i=start+step; i<=end; i+=step) {
+ char data_str[30];
+ double sin_i = sin(2*3.14159265*(i%86400)/86400);
+ sprintf(data_str, "%d:%lf", i, sin_i);
+ char * args1[] = {
+ "update",
+ rrd_file1,
+ data_str
+ };
+ n_args = sizeof(args1)/sizeof(char *);
+ rrd_update(n_args, args1);
+ if (rrd_test_error()) {
+ print_rrd_error();
+ }
+
+ sprintf(data_str, "%d:%lf", i, 2.*sin_i);
+ char * args2[] = {
+ "update",
+ rrd_file2,
+ data_str
+ };
+ n_args = sizeof(args2)/sizeof(char *);
+ rrd_update(n_args, args2);
+ if (rrd_test_error()) {
+ print_rrd_error();
+ }
+
+ sprintf(data_str, "%d:%lf", i, 3.*sin_i);
+ char * args3[] = {
+ "update",
+ rrd_file3,
+ data_str
+ };
+ n_args = sizeof(args3)/sizeof(char *);
+ rrd_update(n_args, args3);
+ if (rrd_test_error()) {
+ print_rrd_error();
+ }
+ }
+
+}
+
+void print_rrd_error() {
+ printf("Error %s\n", rrd_get_error());
+ rrd_clear_error();
+}
Index: src/rpc/communicator.cc
===================================================================
--- src/rpc/communicator.cc (revision 0)
+++ src/rpc/communicator.cc (revision 0)
@@ -0,0 +1,3 @@
+#include "communicator.h"
+
+Ice::CommunicatorPtr * CommunicatorPtrSingleton::_instance = NULL;
Index: src/rpc/communicator.h
===================================================================
--- src/rpc/communicator.h (revision 0)
+++ src/rpc/communicator.h (revision 0)
@@ -0,0 +1,38 @@
+#ifndef __COMMUNICATOR_H__
+#define __COMMUNICATOR_H__
+
+#include <Ice/Ice.h>
+#include <iostream>
+using namespace std;
+#ifndef CONFIG_FILE
+#define CONFIG_FILE "/usr/local/etc/rrdrpc.conf"
+#endif
+
+class CommunicatorPtrSingleton : public Ice::CommunicatorPtr {
+ public:
+ static Ice::CommunicatorPtr get_instance() {
+ if (!_instance) {
+ _instance = new Ice::CommunicatorPtr;
+
+ Ice::StringSeq args;
+ if (!getenv("ICE_CONFIG")) {
+ args.push_back("--Ice.Config=" CONFIG_FILE);
+ }
+ Ice::InitializationData init_data;
+ Ice::PropertiesPtr properties = Ice::createProperties(args);
+ init_data.properties = properties;
+ int argc = 0;
+ char * argv[] = { 0 };
+ *_instance = Ice::initialize(argc, argv, init_data);
+ }
+ return *_instance;
+ }
+
+ private:
+ CommunicatorPtrSingleton();
+ CommunicatorPtrSingleton(CommunicatorPtrSingleton const &);
+
+ static Ice::CommunicatorPtr * _instance;
+};
+
+#endif // __COMMUNICATOR_H__
Index: src/rpc/rrdrpcd.conf.t
===================================================================
--- src/rpc/rrdrpcd.conf.t (revision 0)
+++ src/rpc/rrdrpcd.conf.t (revision 0)
@@ -0,0 +1,2 @@
+RRD.Base=/usr/local/rrd
+RRDFetchAdapter.Endpoints=tcp -h localhost -p 10000
Index: src/rpc/fetch_completer.cc
===================================================================
--- src/rpc/fetch_completer.cc (revision 0)
+++ src/rpc/fetch_completer.cc (revision 0)
@@ -0,0 +1,33 @@
+#include <iostream>
+#include "fetch_completer.h"
+
+using namespace std;
+
+FetchCompleter::FetchCompleter() : _n_started(0), _n_completed(0), _n_failed(0) {}
+
+bool FetchCompleter::waitForCompletion() {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock lock(_monitor);
+ while(_n_started != _n_completed) {
+ _monitor.wait();
+ }
+ if (_n_failed) {
+ return false;
+ }
+ else {
+ return true;
+ }
+}
+
+void FetchCompleter::started() {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock lock(_monitor);
+ _n_started++;
+}
+
+void FetchCompleter::completed(bool success) {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock lock(_monitor);
+ if (!success) {
+ _n_failed++;
+ }
+ _n_completed++;
+ _monitor.notify();
+}
Index: src/rpc/rrd_fetch_rpc.cc
===================================================================
--- src/rpc/rrd_fetch_rpc.cc (revision 0)
+++ src/rpc/rrd_fetch_rpc.cc (revision 0)
@@ -0,0 +1,269 @@
+#include <iostream>
+#include <Ice/Ice.h>
+#include <IceUtil/IceUtil.h>
+#include "storage_impl.h"
+#include "communicator.h"
+#include "fetch_completer.h"
+#include "rrd_fetch_rpc.h"
+#include "../rrd_tool.h"
+
+using namespace std;
+
+static const string Server_Property_Prefix = "Storage-Server.";
+
+static bool parse_rrd_path(string & path, vector<string> & parts);
+
+int _rrd_fetch_rpc(const char *fname, /* name of the rrd */
+ enum cf_en cf_idx, /* which consolidation function ?*/
+ time_t *start,
+ time_t *end, /* which time frame do you want ?
+ * will be changed to represent reality */
+ unsigned long *step, /* which stepsize do you want?
+ * will be changed to represent reality */
+ unsigned long *ds_cnt, /* number of data sources in file */
+ char ***ds_namv, /* names of data_sources */
+ rrd_value_t **data) /* two dimensional array containing the data */
+{
+
+ Ice::CommunicatorPtr communicator;
+ int status = 0;
+ string filename = fname;
+
+ try {
+ communicator = CommunicatorPtrSingleton::get_instance();
+ } catch (Ice::Exception &ex) {
+ rrd_set_error("RPC exception %s", ex.ice_name().c_str());
+ return -1;
+ }
+
+ vector<string> path_parts;
+ if (!parse_rrd_path(filename, path_parts)) {
+ rrd_set_error("failed to parse rrd path '%s'", filename.c_str());
+ return -1;
+ }
+
+ string proxy_property = Server_Property_Prefix + path_parts[1];
+ string storage_path = path_parts[2];
+
+ RRD::DataPrx rrd_data = RRD::DataPrx::checkedCast(communicator->propertyToProxy(proxy_property));
+ if (!rrd_data) {
+ rrd_set_error("invalid proxy: %s", proxy_property.c_str());
+ return -1;
+ }
+ else {
+
+ RRD::DataDescription in, out;
+
+ in.rrd = storage_path;
+ in.cf = (long) cf_idx;
+ in.start = (long) *start;
+ in.end = (long) *end;
+ in.step = (long) *step;
+
+ bool success = rrd_data->fetch(in, out);
+ if (!success) {
+ rrd_set_error("remote fetch failed");
+ cerr << "fetch failed" << endl;
+ status = -1;
+ }
+ else {
+ *start = (unsigned long) out.start;
+ *end = (unsigned long) out.end;
+ *step = (unsigned long) out.step;
+ *ds_cnt = (unsigned long) out.dataSourceCount;
+ if (((*ds_namv) = (char **) malloc(*ds_cnt * sizeof(char*)))==NULL){
+ cerr << "fetch malloc ds_namv failed" << endl;
+ rrd_set_error("malloc ds_namv");
+ status = -1;
+ }
+ else {
+ for(int i=0;(unsigned long)i<*ds_cnt;i++){
+ if ((((*ds_namv)[i]) = (char *) malloc(sizeof(char) * DS_NAM_SIZE))==NULL){
+ rrd_set_error("malloc fetch ds_namv entry");
+ free(*ds_namv);
+ status = -1;
+ }
+ strncpy((*ds_namv)[i],out.dataSourceNames[i].c_str(), DS_NAM_SIZE-1);
+ (*ds_namv)[i][DS_NAM_SIZE-1]='\0';
+ }
+
+ long rows = (*end - *start) / *step + 1;
+ if (((*data) = (rrd_value_t *) malloc(*ds_cnt * rows * sizeof(rrd_value_t)))==NULL){
+ rrd_set_error("malloc data buffer");
+ status = -1;
+ }
+ else {
+ copy(out.data.begin(), out.data.end(), *data);
+ }
+ }
+ }
+ }
+
+ return status;
+}
+
+typedef IceUtil::Handle<FetchCompleter> FetchCompleterPtr;
+
+int _data_fetch_rpc(image_desc_t * im) {
+
+ FetchCompleterPtr completer = new FetchCompleter;
+
+ Ice::CommunicatorPtr communicator;
+
+ try {
+ communicator = CommunicatorPtrSingleton::get_instance();
+
+ string filename;
+ vector<string> path_parts;
+ typedef vector<graph_desc_t *> graph_desc_list_t;
+ map<string, graph_desc_list_t> gdesc_map;
+
+ for (int i=0;i< (int)im->gdes_c;i++){
+ /* only GF_DEF elements fetch data */
+ if (im->gdes[i].gf != GF_DEF)
+ continue;
+
+ int skip=0;
+ /* do we have it already ?*/
+ for (int ii=0;ii<i;ii++) {
+ if (im->gdes[ii].gf != GF_DEF)
+ continue;
+ if ((strcmp(im->gdes[i].rrd, im->gdes[ii].rrd) == 0)
+ && (im->gdes[i].cf == im->gdes[ii].cf)
+ && (im->gdes[i].cf_reduce == im->gdes[ii].cf_reduce)
+ && (im->gdes[i].start_orig == im->gdes[ii].start_orig)
+ && (im->gdes[i].end_orig == im->gdes[ii].end_orig)
+ && (im->gdes[i].step_orig == im->gdes[ii].step_orig)) {
+ skip = 1;
+ break;
+ }
+ }
+ if (skip) {
+ continue;
+ }
+
+ path_parts.clear();
+ filename = im->gdes[i].rrd;
+ if (!parse_rrd_path(filename, path_parts)) {
+ rrd_set_error("failed to parse rrd path '%s'", filename.c_str());
+ return -1;
+ }
+
+ string storage_host = path_parts[1];
+ gdesc_map[storage_host].push_back(&im->gdes[i]);
+
+ }
+
+ map<string, graph_desc_list_t>::iterator gdesc_map_iter;
+ for (gdesc_map_iter = gdesc_map.begin(); gdesc_map_iter != gdesc_map.end(); ++ gdesc_map_iter) {
+ string server = gdesc_map_iter->first;
+ string proxy_property = Server_Property_Prefix + server;
+ RRD::DataPrx rrd_data = RRD::DataPrx::checkedCast(communicator->propertyToProxy(proxy_property));
+ if (!rrd_data) {
+ rrd_set_error("invalid proxy: %s", proxy_property.c_str());
+ return -1;
+ }
+
+ graph_desc_list_t gdesc_list = gdesc_map_iter->second;
+ graph_desc_list_t::iterator gdesc_iter;
+ RRD::DataDescriptions descriptions;
+ for (gdesc_iter = gdesc_list.begin(); gdesc_iter != gdesc_list.end(); ++gdesc_iter) {
+ RRD::DataDescription description;
+ path_parts.clear();
+ string full_path = (*gdesc_iter)->rrd;
+ parse_rrd_path(full_path, path_parts);
+ description.rrd = path_parts[2];
+ description.cf = (long) (*gdesc_iter)->cf;
+ description.cfReduce = (long) (*gdesc_iter)->cf_reduce;
+ description.start = (long) (*gdesc_iter)->start;
+ description.end = (long) (*gdesc_iter)->end;
+ description.step = (long) (*gdesc_iter)->step;
+ descriptions.push_back(description);
+ }
+ rrd_data->batchFetch_async(new AMI_Data_batchFetchI(completer, gdesc_list), descriptions);
+ }
+
+ if (!completer->waitForCompletion()) {
+ rrd_set_error("data_fetch_rpc failed");
+ return -1;
+ }
+
+ for (int i=0;i< (int)im->gdes_c;i++){
+ /* only GF_DEF elements fetch data */
+ if (im->gdes[i].gf != GF_DEF)
+ continue;
+
+ /* do we have it already ?*/
+ for (int ii=0;ii<i;ii++) {
+ if (im->gdes[ii].gf != GF_DEF)
+ continue;
+
+ if ((strcmp(im->gdes[i].rrd, im->gdes[ii].rrd) == 0)
+ && (im->gdes[i].cf == im->gdes[ii].cf)
+ && (im->gdes[i].cf_reduce == im->gdes[ii].cf_reduce)
+ && (im->gdes[i].start_orig == im->gdes[ii].start_orig)
+ && (im->gdes[i].end_orig == im->gdes[ii].end_orig)
+ && (im->gdes[i].step_orig == im->gdes[ii].step_orig)) {
+ cout << "found repeater " << im->gdes[i].rrd << " " << im->gdes[ii].rrd << endl;
+ /* OK, the data is already there.
+ ** Just copy the header portion
+ */
+ im->gdes[i].start = im->gdes[ii].start;
+ im->gdes[i].end = im->gdes[ii].end;
+ im->gdes[i].step = im->gdes[ii].step;
+ im->gdes[i].ds_cnt = im->gdes[ii].ds_cnt;
+ im->gdes[i].ds_namv = im->gdes[ii].ds_namv;
+ im->gdes[i].data = im->gdes[ii].data;
+ im->gdes[i].data_first = 0;
+ break;
+ }
+ }
+
+ /* lets see if the required data source is really there */
+ for(int ii=0;ii<(int)im->gdes[i].ds_cnt;ii++){
+ if(strcmp(im->gdes[i].ds_namv[ii],im->gdes[i].ds_nam) == 0){
+ im->gdes[i].ds=ii; }
+ }
+ if (im->gdes[i].ds== -1){
+ rrd_set_error("No DS called '%s' in '%s'",
+ im->gdes[i].ds_nam,im->gdes[i].rrd);
+ return -1;
+ }
+ }
+ } catch (Ice::Exception &ex) {
+ rrd_set_error("RPC exception %s", ex.ice_name().c_str());
+ return -1;
+ }
+
+ return 0;
+
+}
+
+bool parse_rrd_path(string & path, vector<string> & parts) {
+
+ if (path.find("rpc//") == string::npos) {
+ return false;
+ }
+
+ parts.push_back(path.substr(0, 5));
+
+ int path_pos;
+ if ((path_pos = path.find("/", 5)) == string::npos) {
+ return false;
+ }
+
+ string storage_host = path.substr(5, path_pos-5);
+ if (storage_host.length() == 0) {
+ storage_host = "localhost";
+ }
+ parts.push_back(storage_host);
+
+ string storage_path = path.substr(path_pos+1);
+ if (storage_path.length() == 0) {
+ return false;
+ }
+ parts.push_back(path.substr(path_pos+1));
+
+ return true;
+
+}
Index: src/rpc/server_test.conf
===================================================================
--- src/rpc/server_test.conf (revision 0)
+++ src/rpc/server_test.conf (revision 0)
@@ -0,0 +1,2 @@
+RRD.Base=/usr/local/rrd
+RRDFetchAdapter.Endpoints=tcp -h localhost -p 10000
Index: src/rpc/fetch_completer.h
===================================================================
--- src/rpc/fetch_completer.h (revision 0)
+++ src/rpc/fetch_completer.h (revision 0)
@@ -0,0 +1,24 @@
+#ifndef _FETCH_COMPLETER_H_
+#define _FETCH_COMPLETER_H_
+
+#include <IceUtil/IceUtil.h>
+
+class FetchCompleter : public IceUtil::Shared {
+
+ public:
+ FetchCompleter();
+ void started();
+ void completed(bool success=true);
+ bool waitForCompletion();
+ private:
+ int _n_started;
+ int _n_completed;
+ int _n_failed;
+ IceUtil::Monitor<IceUtil::Mutex> _monitor;
+ protected:
+ virtual ~FetchCompleter() {}
+};
+
+typedef IceUtil::Handle<FetchCompleter> FetchCompleterPtr;
+
+#endif // _FETCH_COMPLETER_H_
Index: src/rpc/rrd_fetch_rpc.h
===================================================================
--- src/rpc/rrd_fetch_rpc.h (revision 0)
+++ src/rpc/rrd_fetch_rpc.h (revision 0)
@@ -0,0 +1,28 @@
+#ifndef _RRD_FETCH_RPC_H_
+#define _RRD_FETCH_RPC_H_
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include "../rrd_graph.h"
+
+int _rrd_fetch_rpc(const char *filename, /* name of the rrd */
+ enum cf_en cf_idx, /* which consolidation function ?*/
+ time_t *start,
+ time_t *end, /* which time frame do you want ?
+ * will be changed to represent reality */
+ unsigned long *step, /* which stepsize do you want?
+ * will be changed to represent reality */
+ unsigned long *ds_cnt, /* number of data sources in file */
+ char ***ds_namv, /* names of data_sources */
+ rrd_value_t **data /* two dimensional array containing the data */
+ );
+
+int _data_fetch_rpc(image_desc_t *);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* _RRD_FETCH_RPC_H_ */
Index: src/rpc/client_test.conf
===================================================================
--- src/rpc/client_test.conf (revision 0)
+++ src/rpc/client_test.conf (revision 0)
@@ -0,0 +1,2 @@
+Storage-Server.server1=RRDDataFetch:tcp -h localhost -p 10000
+Storage-Server.server2=RRDDataFetch:tcp -h localhost -p 10000
Index: src/rpc/storage.ice
===================================================================
--- src/rpc/storage.ice (revision 0)
+++ src/rpc/storage.ice (revision 0)
@@ -0,0 +1,29 @@
+module Storage {
+
+ module RRD {
+
+ enum ConsolidationFunction {Max, Min, Average};
+
+ sequence<string> StringSequence;
+ sequence<double> DoubleSequence;
+
+ struct DataDescription {
+ string rrd;
+ long cf;
+ long cfReduce;
+ long start;
+ long end;
+ long step;
+ long dataSourceCount;
+ StringSequence dataSourceNames;
+ DoubleSequence data;
+ };
+
+ sequence<DataDescription> DataDescriptions;
+
+ interface Data {
+ ["ami", "cpp:const"] idempotent bool fetch(DataDescription requested, out DataDescription returned);
+ ["ami", "cpp:const"] idempotent bool batchFetch(DataDescriptions requested, out DataDescriptions returned);
+ };
+ };
+};
Index: src/rpc/Makefile.am
===================================================================
--- src/rpc/Makefile.am (revision 0)
+++ src/rpc/Makefile.am (revision 0)
@@ -0,0 +1,27 @@
+## Process this file with automake to produce Makefile.in
+
+storage.h storage.cc: storage.ice
+ @ZEROC_ICE_DIR@/bin/slice2cpp --source-ext cc storage.ice
+
+BUILT_SOURCES = storage.h storage.cc
+dist_sysconf_DATA = rrdrpc.conf.t rrdrpcd.conf.t
+EXTRA_DIST = client_test.conf server_test.conf
+noinst_HEADERS = storage_impl.h communicator.h rrd_fetch_rpc.h fetch_completer.h
+lib_LTLIBRARIES = librrdrpc.la
+
+librrdrpc_la_SOURCES = rrd_fetch_rpc.cc storage_impl.cc communicator.cc fetch_completer.cc storage.ice
+nodist_librrdrpc_la_SOURCES = storage.h storage.cc
+librrdrpc_la_CPPFLAGS = -DCONFIG_FILE=\"${sysconfdir}/rrdrpc.conf\" [EMAIL PROTECTED]@/include
+librrdrpc_la_LIBADD = [EMAIL PROTECTED]@ -lIce -lIceUtil
+
+check_PROGRAMS = test1
+test1_SOURCES = test1.cc
+test1_CPPFLAGS = [EMAIL PROTECTED]@/include
+test1_LDADD = ../librrd.la [EMAIL PROTECTED]@ -lIce -lIceUtil
+test1_LDFLAGS = -static
+
+TESTS = test1
+
+# remove rrd files created in make check
+clean-local:
+ rm -rf rrd
Index: src/rpc/storage_impl.cc
===================================================================
--- src/rpc/storage_impl.cc (revision 0)
+++ src/rpc/storage_impl.cc (revision 0)
@@ -0,0 +1,176 @@
+#include <Ice/Ice.h>
+#include "storage_impl.h"
+
+using namespace std;
+
+string rrd_base = "/var/rrd";
+
+bool StorageI::fetch(const RRD::DataDescription & in, RRD::DataDescription & out, const Ice::Current & current) const {
+
+ out = in;
+
+ long unsigned ds_count;
+ char ** ds_names;
+ double * data;
+
+ Ice::ObjectAdapterPtr adapter = current.adapter;
+ Ice::CommunicatorPtr communicator = adapter->getCommunicator();
+ Ice::PropertiesPtr properties = communicator->getProperties();
+ string rrd_path = properties->getPropertyWithDefault("RRD.Base", rrd_base);
+ string rrd_file = rrd_path + '/' + in.rrd;
+ int result = rrd_fetch_fn(rrd_file.c_str(),
+ (enum cf_en) out.cf,
+ (time_t *) &out.start,
+ (time_t *) &out.end,
+ (long unsigned int *) &out.step,
+ &ds_count,
+ &ds_names,
+ &data);
+ if (result == -1) {
+ return false;
+ }
+ else {
+ long rows = (out.end - out.start) / out.step + 1;
+
+ out.dataSourceCount = ds_count;
+ for (unsigned long i=0; i<ds_count; i++) {
+ out.dataSourceNames.push_back(string(ds_names[i]));
+ }
+
+ /*
+ for (int i=0; i<rows; i++) {
+ cout << data[i] << endl;
+ }
+ */
+
+ out.data.assign(data, data+rows);
+
+ free (data);
+ if (ds_names){
+ for (unsigned long i=0; i<ds_count; i++)
+ free(ds_names[i]);
+ free(ds_names);
+ }
+
+ return true;
+ }
+
+}
+
+bool StorageI::batchFetch(const RRD::DataDescriptions & in, RRD::DataDescriptions & out, const Ice::Current & current) const {
+
+ out = in;
+
+ Ice::ObjectAdapterPtr adapter = current.adapter;
+ Ice::CommunicatorPtr communicator = adapter->getCommunicator();
+ Ice::PropertiesPtr properties = communicator->getProperties();
+ string rrd_path = properties->getPropertyWithDefault("RRD.Base", rrd_base);
+
+ RRD::DataDescriptions::iterator desc_iter;
+ for (desc_iter = out.begin(); desc_iter != out.end(); ++desc_iter) {
+ long unsigned ds_count;
+ char ** ds_names;
+ double * data;
+ string rrd_file = rrd_path + '/' + desc_iter->rrd;
+ unsigned long ft_step = desc_iter->step ; /* ft_step will record what we got from fetch */
+ int result = rrd_fetch_fn(rrd_file.c_str(),
+ (enum cf_en) desc_iter->cf,
+ (time_t *) &desc_iter->start,
+ (time_t *) &desc_iter->end,
+ &ft_step,
+ &ds_count,
+ &ds_names,
+ &data);
+ if (result == -1) {
+ rrd_set_error("rrd_fetch_fn error");
+ return false;
+ }
+
+ if (ft_step < desc_iter->step) {
+ reduce_data((enum cf_en) desc_iter->cfReduce,
+ ft_step,
+ (time_t *) &desc_iter->start,
+ (time_t *) &desc_iter->end,
+ (long unsigned *) &desc_iter->step,
+ &ds_count,
+ &data);
+ } else {
+ desc_iter->step = ft_step;
+ }
+
+ long rows = (desc_iter->end - desc_iter->start) / desc_iter->step + 1;
+
+ desc_iter->dataSourceCount = ds_count;
+ for (unsigned long i=0; i<ds_count; i++) {
+ desc_iter->dataSourceNames.push_back(string(ds_names[i]));
+ }
+
+ /*
+ for (int i=0; i<rows; i++) {
+ cout << data[i] << endl;
+ }
+ */
+
+ desc_iter->data.assign(data, data+rows);
+
+ free (data);
+ if (ds_names){
+ for (unsigned long i=0; i<ds_count; i++)
+ free(ds_names[i]);
+ free(ds_names);
+ }
+ }
+
+ return true;
+}
+
+AMI_Data_batchFetchI::AMI_Data_batchFetchI(const FetchCompleterPtr & p, vector<graph_desc_t *> & graph_desc_list) {
+ _completer = p;
+ _graph_desc_list = graph_desc_list;
+ _completer->started();
+}
+
+void AMI_Data_batchFetchI::ice_response(bool result, const ::Storage::RRD::DataDescriptions & out) {
+
+ if (!result) {
+ cout << "rrd_fetch_fn failed" << endl;
+ _completer->completed(false);
+ return;
+ }
+
+ for(int j=0; j<out.size(); j++) {
+ _graph_desc_list[j]->start = out[j].start;
+ _graph_desc_list[j]->end = out[j].end;
+ _graph_desc_list[j]->step = out[j].step;
+ _graph_desc_list[j]->ds_cnt = out[j].dataSourceCount;
+ if (((_graph_desc_list[j]->ds_namv) = (char **) malloc(_graph_desc_list[j]->ds_cnt * sizeof(char*)))==NULL){
+ cerr << "fetch malloc ds_namv failed" << endl;
+ rrd_set_error("malloc ds_namv");
+ }
+ else {
+ for(int i=0;(unsigned long)i<_graph_desc_list[j]->ds_cnt;i++){
+ if ((((_graph_desc_list[j]->ds_namv)[i]) = (char *) malloc(sizeof(char) * DS_NAM_SIZE))==NULL){
+ rrd_set_error("malloc fetch ds_namv entry");
+ free(_graph_desc_list[j]->ds_namv);
+ }
+ strncpy((_graph_desc_list[j]->ds_namv)[i],out[j].dataSourceNames[i].c_str(), DS_NAM_SIZE-1);
+ (_graph_desc_list[j]->ds_namv)[i][DS_NAM_SIZE-1]='\0';
+ }
+
+ long rows = (_graph_desc_list[j]->end - _graph_desc_list[j]->start) / _graph_desc_list[j]->step + 1;
+ if (((_graph_desc_list[j]->data) = (rrd_value_t *) malloc(_graph_desc_list[j]->ds_cnt * rows * sizeof(rrd_value_t)))==NULL){
+ rrd_set_error("malloc data buffer");
+ }
+ else {
+ copy(out[j].data.begin(), out[j].data.end(), _graph_desc_list[j]->data);
+ }
+ }
+ _graph_desc_list[j]->data_first = 1;
+ }
+
+ _completer->completed();
+}
+
+void AMI_Data_batchFetchI::ice_exception(const ::Ice::Exception& ex) {
+ cout << "AMI_Data_batchFetch::ice_exception " << ex.ice_name() << endl;
+}
Index: src/rpc/storage_impl.h
===================================================================
--- src/rpc/storage_impl.h (revision 0)
+++ src/rpc/storage_impl.h (revision 0)
@@ -0,0 +1,40 @@
+#ifndef __STORAGE_IMPL_H__
+#define __STORAGE_IMPL_H__
+#include <vector>
+#include <storage.h>
+#include "fetch_completer.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include "../rrd_graph.h"
+
+#ifdef __cplusplus
+}
+#endif
+
+using namespace std;
+using namespace Storage;
+
+class StorageI : public Storage::RRD::Data {
+ public:
+ virtual bool fetch(const RRD::DataDescription &, RRD::DataDescription &, const Ice::Current &) const;
+ virtual bool batchFetch(const RRD::DataDescriptions &, RRD::DataDescriptions &, const Ice::Current &) const;
+ protected:
+ virtual ~StorageI() {}
+};
+
+class AMI_Data_batchFetchI : public Storage::RRD::AMI_Data_batchFetch {
+ public:
+ AMI_Data_batchFetchI(const FetchCompleterPtr &, vector<graph_desc_t *> &);
+ virtual void ice_response(bool, const ::Storage::RRD::DataDescriptions&);
+ virtual void ice_exception(const ::Ice::Exception&);
+ private:
+ FetchCompleterPtr _completer;
+ vector<graph_desc_t *> _graph_desc_list;
+ protected:
+ virtual ~AMI_Data_batchFetchI() {}
+};
+
+#endif // __STORAGE_IMPL_H__
Index: src/rrd_tool.h
===================================================================
--- src/rrd_tool.h (revision 1372)
+++ src/rrd_tool.h (working copy)
@@ -151,6 +151,26 @@
char ***ds_namv,
rrd_value_t **data);
+#ifdef WANT_RRDRPC
+
+int rrd_fetch_fn_rpc(const char *filename, enum cf_en cf_idx,
+ time_t *start,time_t *end,
+ unsigned long *step,
+ unsigned long *ds_cnt,
+ char ***ds_namv,
+ rrd_value_t **data);
+
+int _rrd_fetch_rpc(const char *filename,
+ enum cf_en cf_idx,
+ time_t *start,
+ time_t *end,
+ unsigned long *step,
+ unsigned long *ds_cnt,
+ char ***ds_namv,
+ rrd_value_t **data);
+
+#endif /* WANT_RRDRPC */
+
void rrd_free(rrd_t *rrd);
void rrd_freemem(void *mem);
void rrd_init(rrd_t *rrd);
Index: src/rrd_graph.c
===================================================================
--- src/rrd_graph.c (revision 1372)
+++ src/rrd_graph.c (working copy)
@@ -688,6 +688,9 @@
#endif
}
+#ifdef WANT_RRDRPC
+int _data_fetch_rpc(image_desc_t *);
+#endif /* WANT_RRDRPC */
/* get the data required for the graphs from the
relevant rrds ... */
@@ -696,8 +699,14 @@
data_fetch(image_desc_t *im )
{
int i,ii;
- int skip;
+ int skip;
+#ifdef WANT_RRDRPC
+ if (strncmp(im->gdes[0].rrd, "rpc//", 5) == 0) {
+ return _data_fetch_rpc(im);
+ }
+#endif /* WANT_RRDRPC */
+
/* pull the data from the rrd files ... */
for (i=0;i< (int)im->gdes_c;i++){
/* only GF_DEF elements fetch data */
Index: src/Makefile.am
===================================================================
--- src/Makefile.am (revision 1372)
+++ src/Makefile.am (working copy)
@@ -12,6 +12,11 @@
[EMAIL PROTECTED]@
AM_CPPFLAGS = -DRRD_DEFAULT_FONT=\"$(RRD_DEFAULT_FONT)\" [EMAIL PROTECTED]@
+if BUILD_RRDRPC
+ MAYBE_RRDRPC = rpc
+endif
+SUBDIRS = $(MAYBE_RRDRPC)
+
UPD_C_FILES = \
rrd_getopt.c \
rrd_getopt1.c \
@@ -67,7 +72,11 @@
librrdupd_la_LIBADD = $(CORE_LIBS)
librrd_la_SOURCES = $(RRD_C_FILES)
-librrd_la_LIBADD = librrdupd.la $(ALL_LIBS)
+librrd_la_LIBADD = librrdupd.la
+if BUILD_RRDRPC
+librrd_la_LIBADD += rpc/librrdrpc.la [EMAIL PROTECTED]@ -lIce -lIceUtil -lstdc++
+endif
+librrd_la_LIBADD += $(ALL_LIBS)
# This flag accepts an argument of the form current[:revision[:age]]. So,
# passing -version-info 3:12:1 sets current to 3, revision to 12, and age to 1.
@@ -112,6 +121,11 @@
include_HEADERS = rrd.h
bin_PROGRAMS = rrdtool rrdupdate
+if BUILD_RRDRPC
+sbin_PROGRAMS = rrdrpcd
+# statically linked version for make check
+noinst_PROGRAMS = rrdrpcd-static
+endif
if BUILD_RRDCGI
bin_PROGRAMS += rrdcgi
@@ -125,6 +139,9 @@
rrdtool_SOURCES =
rrdtool_DEPENDENCIES = rrd_tool.o librrd.la
+if BUILD_RRDRPC
+rrdtool_DEPENDENCIES += rpc/librrdrpc.la
+endif
rrdtool_LDADD = librrd.la
# strftime is here because we do not usually need it. unices have propper
@@ -132,3 +149,13 @@
EXTRA_DIST= strftime.c strftime.h $(fonts_DATA) \
win32comp.c rrd_thread_safe_nt.c get_ver.awk
+if BUILD_RRDRPC
+rrdrpcd_SOURCES = rpc/rrd_rpcd.cc
+rrdrpcd_CPPFLAGS = -DCONFIG_FILE=\"${sysconfdir}/rrdrpcd.conf\" [EMAIL PROTECTED]@/include -Irpc
+rrdrpcd_LDADD = librrd.la
+
+rrdrpcd_static_SOURCES = rpc/rrd_rpcd.cc
+rrdrpcd_static_CPPFLAGS = -DCONFIG_FILE=\"${sysconfdir}/rrdrpcd.conf\" [EMAIL PROTECTED]@/include -Irpc
+rrdrpcd_static_LDFLAGS = -static -no-install
+rrdrpcd_static_LDADD = rpc/librrdrpc.la librrd.la
+endif
Index: configure.ac
===================================================================
--- configure.ac (revision 1372)
+++ configure.ac (working copy)
@@ -259,6 +259,43 @@
AC_DEFINE_UNQUOTED(RRDGRAPH_YLEGEND_ANGLE,${RRDGRAPH_YLEGEND_ANGLE:-90.0},
[Vertical label angle: 90.0 (default) or 270.0])
+AC_ARG_ENABLE(rrdrpc,[ --enable-rrdrpc build rpc dispatching, requires zeroc-icelib],
+[enable_rrdrpc=yes],[])
+
+if test "$enable_rrdrpc" = "yes"; then
+ dnl Check for Ice headers.
+ withval=""
+ AC_ARG_WITH(zeroc-icelib,[ --with-zeroc-icelib=DIR ICE install directory])
+ enable_rrdrpc=no
+ for dir in $withval /usr /usr/local; do
+ AC_MSG_CHECKING(for Ice/Ice.h in $dir)
+ if test -f "$dir/include/Ice/Ice.h" ; then
+ ZEROC_ICE_DIR=$dir
+ if test -d "$ZEROC_ICE_DIR/lib64"; then
+ ZEROC_ICELIB_DIR=$ZEROC_ICE_DIR/lib64
+ else
+ ZEROC_ICELIB_DIR=$ZEROC_ICE_DIR/lib
+ fi
+ enable_rrdrpc=yes
+ AC_DEFINE([WANT_RRDRPC], [], [Build rpc dispatching])
+ AC_MSG_RESULT(yes)
+ break
+ else
+ AC_MSG_RESULT(no)
+ fi
+ done
+
+ if test "$enable_rrdrpc" = "no"; then
+ AC_MSG_ERROR([zeroc-icelib not found - rpc dispatching cannot be built])
+ fi
+
+fi
+
+AC_SUBST(ZEROC_ICE_DIR)
+AC_SUBST(ZEROC_ICELIB_DIR)
+
+AM_CONDITIONAL(BUILD_RRDRPC,[test $enable_rrdrpc = yes])
+
AC_ARG_ENABLE(rrdcgi,[ --disable-rrdcgi disable building of rrdcgi],
[],[enable_rrdcgi=yes])
@@ -756,6 +793,7 @@
AC_CONFIG_FILES([examples/Makefile])
AC_CONFIG_FILES([doc/Makefile])
AC_CONFIG_FILES([src/Makefile])
+AC_CONFIG_FILES([src/rpc/Makefile])
AC_CONFIG_FILES([bindings/Makefile])
AC_CONFIG_FILES([bindings/tcl/Makefile])
AC_CONFIG_FILES([bindings/tcl/ifOctets.tcl])
@@ -793,6 +831,7 @@
echo " Build Tcl Bindings: $enable_tcl"
echo " Build Python Bindings: $enable_python"
echo " Build rrdcgi: $enable_rrdcgi"
+echo " Build rrdrpc: $enable_rrdrpc"
echo " Build librrd MT: $enable_pthread"
echo
echo
_______________________________________________
rrd-developers mailing list
[email protected]
https://lists.oetiker.ch/cgi-bin/listinfo/rrd-developers