//==========================================================================
/**
* @file parser_task.h
*
* @author Baranidharan K.
*/
//==========================================================================
#ifndef _PARSER_TASK_H_
#define _PARSER_TASK_H_
#include<iostream>
#include "ace/Acceptor.h"
#include "ace/SOCK_Acceptor.h"
#include "ace/Reactor.h"
#include "ace/TP_Reactor.h"
#include "ace/Task.h"
#include "ace/INET_Addr.h"
#include "ace/Message_Queue.h"
#include <string>
using namespace std;
class Bping_Acceptor;
/**
* @class Parser_Task
*
* @brief The responsibility of this class is receiving the request from the
* poller, and parsing the input the message according to the message type,
* the parsed message putting in to the corresponding task qeue.
*/
class Parser_Task : public ACE_Svc_Handler <ACE_SOCK_STREAM, ACE_SYNCH>
{
private:
static Parser_Task *pointer_;
public:
/**
* The start() function used to, register the select reactor in READ::MASK mode for receving the
* request from the poller.
*/
int start(void *args);
/**
* The open() function used to, register the TP reactor in READ::MASK mode for receving the
* ICMP,SNMP reply message.
*/
virtual int open(void *arg=0);
/**
* The handle_input() function recv the input message and form
* the one full request.
*/
virtual int handle_input(ACE_HANDLE handle);
void parse_ipmi(string inputmsg);
/// Reponsible for single tone
static Parser_Task *instance (void);
string storedata;
string remaining_data;
/// Removeing the unwanted whitespaces
string trim(string input);
/**
* @param inputmsg - inputmessage.
* The split() function split the inputmsg and finding the request type
* according the request type ,the inputmsg passed in to the corresponding
* parseing function.
*/
int split(string inputmsg);
/// Sending the reply message to poller
virtual int svc();
virtual int handle_close(ACE_HANDLE handle = ACE_INVALID_HANDLE,
ACE_Reactor_Mask close_mask = ACE_Event_Handler::ALL_EVENTS_MASK);
};
#endif
#include "parser_task.h"
#include <fstream>
#include <strstream>
#include "Bping_Acceptor.h"
#include "Server.h"
#include "ace/Shared_Memory_SV.h"
using namespace std;
//int count_ip =1;
int Parser_Task::handle_input(ACE_HANDLE handle)
{
char buf[125];
ACE_OS::memset(buf,'\0',sizeof (buf));
switch (this->peer().recv(buf ,sizeof (buf)-1))
{
case -1:
ACE_ERROR_RETURN ((LM_ERROR,"bad read"),-1);
case 0:
ACE_ERROR_RETURN ((LM_ERROR,"(%P|%t) closing log dameon %d \n",
this->get_handle()),-1);
default:
storedata = buf;
// cout<<" Receive data in parser_task "<<storedata<<endl;
unsigned int pos1;
pos1 = storedata.find("\n");
while(pos1 != string::npos)
{
string inputmsg;
inputmsg = remaining_data+storedata.substr(0,pos1);
remaining_data = "";
storedata.erase(0,pos1+1);
split(inputmsg);
pos1 = storedata.find("\n");
}
remaining_data += storedata;
}
}
static void * spawn_threads_function(void *arg)
{
ACE_Reactor *reactor = ACE_static_cast (ACE_Reactor *, arg);
reactor->owner (ACE_OS::thr_self ());
for (;;)
reactor->handle_events ();
return 0;
}
int Parser_Task::open(void *args)
{
if(this->reactor()->register_handler (this,
ACE_Event_Handler::READ_MASK)==-1)
{
ACE_ERROR_RETURN((LM_ERROR,
"(%D/%t) Error while registering parser"),
-1);
}
}
int Parser_Task::svc()
{
while(1)
{
ACE_Message_Block *bb;
this->getq(bb);
// cout<<"paraser_task svc "<<endl;
string temp = (string) (bb->rd_ptr());
ACE_DEBUG ((LM_DEBUG, "(%D | %t) %s\n",temp.c_str()));
this->peer().send(temp.c_str(),temp.size());
bb->release();
}
return 0;
}
int Parser_Task::start(void *args)
{
this->reactor((ACE_Reactor *) args);
if (Bping_Acceptor::instance()->open(2777,this->reactor()) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"(%D) %p\n",
"cannot register Parser acceptor"),
-1);
ACE_Thread_Manager::instance()->spawn (ACE_THR_FUNC (spawn_threads_function),
this->reactor(),
THR_JOINABLE|THR_NEW_LWP);
this->activate(THR_JOINABLE, 1);
return 0;
}
int Parser_Task::split(string inputmsg)
{
string::size_type p_pos1,p_pos2,p_pos3;
p_pos1 = 0;
p_pos2 = inputmsg.find("#",p_pos1);
if(string::npos==p_pos2)
{
ACE_DEBUG ((LM_DEBUG , "(%D | %t) wrong command format\n"));
return 0;
}
string input =((inputmsg.substr(p_pos1,p_pos2-p_pos1)).c_str());
string ver = trim(input);
if(ver =="")
{
ACE_DEBUG ((LM_DEBUG ,"(%D | %t) wrong command format\n"));
return 0;
}
p_pos2+=1;
p_pos3 = inputmsg.find("@",p_pos2);
if(string::npos==p_pos3)
{
ACE_DEBUG ((LM_DEBUG , "(%D | %t) wrong command format\n"));
return 0;
}
input=((inputmsg.substr(p_pos2,p_pos3-p_pos2)).c_str());
string st = trim(input);
int status ;
if(st =="ipmi")
{
parse_ipmi(inputmsg);
}
else
ACE_DEBUG ((LM_DEBUG , "(%D | %t) wrong command format\n"));
}
void Parser_Task::parse_ipmi(string inputmsg)
{
// cout<<"Inside the parse_ipmi"<<endl;
string rce_message =inputmsg;
// cout<<"Second time "<<rce_message<<endl;
ACE_Message_Block *rce_data = new ACE_Message_Block(rce_message.size()+1);
rce_data->copy(rce_message.c_str (),rce_message.size()+1);
usleep(15000);
Ipmi_Task *ipm_ta = BPing_Server::instance()->get_ipmi_task();
ipm_ta->putq(rce_data);
ACE_DEBUG((LM_DEBUG , "(%D | %t) IPMI TASK MESSAGE QUEUE SIZE: %d \n ",ipm_ta->msg_queue()->message_bytes()));
}
Parser_Task *Parser_Task::pointer_ = 0;
Parser_Task *Parser_Task::instance (void)
{
if (Parser_Task::pointer_ == 0)
{
ACE_MT (ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, ace_mon,
*ACE_Static_Object_Lock::instance (), 0));
if (Parser_Task::pointer_ == 0)
ACE_NEW_RETURN (Parser_Task::pointer_, Parser_Task, 0);
}
return Parser_Task::pointer_;
}
int
Parser_Task::handle_close(ACE_HANDLE handle,
ACE_Reactor_Mask close_mask)
{
ACE_DEBUG((LM_INFO,
"\n(%D/%t) Closing the poller peer socket handle %d\n",
this->peer().get_handle()));
this->reactor()->remove_handler(this->peer().get_handle(),
ACE_Event_Handler::READ_MASK|ACE_Event_Handler::DONT_CALL|ACE_Event_Handler::WRITE_MASK);
this->peer ().close ();
return 0;
}
string Parser_Task::trim(string input)
{
string::size_type pos = input.find_last_not_of(' ');
if(pos != string::npos)
{
input.erase(pos + 1);
pos = input.find_first_not_of(' ');
if(pos != string::npos)
input.erase(0, pos);
}
else
{
input.erase(input.begin(), input.end());
}
return input;
}
-------------------------------------------------------------------------
This SF.net email is sponsored by DB2 Express
Download DB2 Express C - the FREE version of DB2 express and take
control of your XML. No limits. Just data. Click to get it now.
http://sourceforge.net/powerbar/db2/
_______________________________________________
Openipmi-developer mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/openipmi-developer