My apologies for the garbled and illegible code.
I am attaching the source file to this email.
void runLoop()
{
if(_watchCounter)
{
ev_run(_loop, EVRUN_ONCE);
}
if(_watchCounter)
{
_io_service.post(boost::bind(&FileService::runLoop, this));
}
}
-------> As the callbacks are initiated from with in the
"ev_run"function call, i thought that watchers could be added or
deleted before the next loop is scheduled from with in those
callbacks. So i am checking the counter before scheduling the next
loop.
IO_Service class is actually a FIFO task scheduler. All we do is add
tasks to the scheduler. Adding watchers, deleting watchers and running
the loop are just tasks to the IO_Service.
As far as i could comprehend from Libev docs
*ev_set_io_collect_interval (_loop, _poll_interval);*
sets a time out for the loop, so there is no indefinite wait for the
loop to finish. (Please correct me if I am wrong). As soon as the loop
times-out or finishes, the next run is scheduled in the queue and
before that other tasks such as adding and deleting watchers can
execute if they are in the queue.
Thank you for your time reviewing the code.
Praveen
//============================================================================
// Name : TestFS.cpp
// Author : Praveen Baratam
// Version : 0.1
// Copyright : All Rights Reserved.
// Description : FileService and Test classes
//============================================================================
#include <iostream>
#include <fcntl.h>
#include <boost/thread.hpp>
#include <boost/bind.hpp>
#include <boost/function.hpp>
#include <boost/asio.hpp>
#include <ev++.h>
using namespace std;
/**
*
* FileService is a class implementing Reactor pattern to watch files for ready events.
* It uses boost::asio::io_service and boost::thread for scheduling and executing the event loop and handlers.
* Internally it uses the libev library to watch the supplied FDs.
*
* FDs can be attached and detached any time from any thread and the supplied callbacks
* will be invoked by the internal thread. The internal thread also runs the loop internally
* when there are FDs being watched with out the need to explicitly run the libev loop.
*
*/
typedef boost::function< void (void *, int) > FileEvent;
typedef boost::function< void (void *) > AttachHandler;
typedef boost::function< void () > DetachHandler;
#define FS_READ EV_READ
#define FS_WRITE EV_WRITE
#define FS_ERROR EV_ERROR
struct FileWatcher : ev::io
{
FileWatcher(struct ev_loop* loop)
: ev::io(loop)
{
}
using ev::io::set;
FileEvent* eventHandler;
};
class FileService
{
public:
FileService(int poll_interval)
: _poll_interval(poll_interval),
_loop(NULL),
_io_service(),
_watchCounter(0)
{
_localThread = new boost::thread(boost::ref(*this));
}
~FileService()
{
_localThread->interrupt();
delete _localThread;
}
void operator()()
{
cout << "Started File Service" << endl;
startLoop();
while(1)
{
try
{
boost::asio::io_service::work work(_io_service);
// Executes all the pending tasks and returns
_io_service.run();
// Check for interruption to break out of the loop
boost::this_thread::interruption_point();
}catch(boost::thread_interrupted &e)
{
break;
}
catch(...)
{
_io_service.reset();
cout << "Error in FileService" << endl;
}
}
cout << "Stopped File Service" << endl;
stopLoop();
}
void join()
{
_localThread->join();
}
void post(boost::function<void()> function)
{
_io_service.post(function);
}
void dispatch(boost::function<void()> function)
{
_io_service.dispatch(function);
}
void attach(int fd, int events, FileEvent* eventhandler, AttachHandler* attachHandler)
{
_io_service.dispatch(boost::bind(&FileService::doAttach, this, fd, events, eventhandler, attachHandler));
}
void doAttach(int fd, int events, FileEvent* eventhandler, AttachHandler* attachHandler )
{
FileWatcher * file_watcher = new FileWatcher(_loop);
file_watcher->eventHandler = eventhandler;
file_watcher->set<FileService, &FileService::watcherCallback>(this);
file_watcher->start(fd, events);
AttachHandler callback = *attachHandler;
callback((void*)file_watcher);
_watchCounter++;
// Run the loop after attach to watch the fd
_io_service.post(boost::bind(&FileService::runLoop, this));
}
void watcherCallback(ev::io &w, int r_events)
{
FileWatcher* watcher = (FileWatcher*)&w;
FileEvent callback = *(FileEvent*)watcher->eventHandler;
callback((void*)&w, r_events);
}
void detach(void * watcher, DetachHandler* detachHandler)
{
_io_service.dispatch(boost::bind(&FileService::doDetach, this, (ev::io *)watcher, detachHandler));
}
void doDetach(ev::io * watcher, DetachHandler* detachHandler)
{
watcher->stop();
delete watcher;
DetachHandler callback = *detachHandler;
callback();
_watchCounter--;
}
void startLoop()
{
_loop = ev_loop_new(0);
ev_set_io_collect_interval (_loop, _poll_interval);
}
void runLoop()
{
if(_watchCounter)
{
ev_run(_loop, EVRUN_ONCE);
}
if(_watchCounter)
{
_io_service.post(boost::bind(&FileService::runLoop, this));
}
}
void stopLoop()
{
ev_loop_destroy(_loop);
}
private:
int _poll_interval;
struct ev_loop * _loop;
boost::asio::io_service _io_service;
boost::thread * _localThread;
int _watchCounter;
};
class TestFS
{
public:
TestFS(FileService &file_service)
: _file_service(file_service)
{
int fd[2];
if(pipe(fd) < 0)
{
cout << "Pipe creation failed..." << endl;
exit(1);
}
_read = fd[0];
_write = fd[1];
fcntl( _read, F_SETFL, O_NONBLOCK );
fcntl(_write, F_SETFL, O_NONBLOCK );
_attachHandler = boost::bind(&TestFS::attachHandler, this, _1);
_detachHandler = boost::bind(&TestFS::detachHandler, this);
_readHandler = boost::bind(&TestFS::readNow, this, _1, _2);
_writeHandler = boost::bind(&TestFS::writeNow, this, _1, _2);
_file_service.attach( _read, FS_READ, &_readHandler, &_attachHandler);
_file_service.attach(_write, FS_WRITE, &_writeHandler, &_attachHandler);
}
~TestFS()
{
}
void attachHandler(void* watcher)
{
}
void detachHandler()
{
}
void readNow(void* watcher, int flags)
{
if(flags & FS_READ)
{
char buffer[10];
read(_read, buffer, 10);
cout << "Read : " << buffer << endl;
}
}
void writeNow(void* watcher, int flags)
{
if(flags & FS_WRITE)
{
char buffer[] = "123456789";
write(_write, buffer, 10);
}
}
private:
FileService & _file_service;
AttachHandler _attachHandler;
DetachHandler _detachHandler;
FileEvent _readHandler;
FileEvent _writeHandler;
void * _readWatcher;
void * _writeWatcher;
int _read;
int _write;
};
int main()
{
FileService fileService(1);
TestFS testFS(fileService);
fileService.join();
exit(0);
}_______________________________________________
libev mailing list
[email protected]
http://lists.schmorp.de/cgi-bin/mailman/listinfo/libev