//==========================================================================
/**
 *  @file    parser_task.h
 *
 *  @author Baranidharan K.
 */
//==========================================================================

#ifndef _PARSER_TASK_H_
#define _PARSER_TASK_H_

                                                                                                                             

#include<iostream>
#include "ace/Acceptor.h"
#include "ace/SOCK_Acceptor.h"
#include "ace/Reactor.h"
#include "ace/TP_Reactor.h"
#include "ace/Task.h"
#include "ace/INET_Addr.h"
#include "ace/Message_Queue.h"
#include <string>


using namespace std;

class Bping_Acceptor;

/**
 * @class Parser_Task
 *
 * @brief The responsibility of this class is receiving the request from the
 * poller, and parsing the input the message according to the message type,
 * the parsed message putting in to the corresponding task qeue.
 */

class Parser_Task : public ACE_Svc_Handler  <ACE_SOCK_STREAM, ACE_SYNCH>
			                    
{
   private: 
          static Parser_Task *pointer_;
   public:
          /**
           * The start() function used to, register the  select reactor in READ::MASK mode for receving the
           * request from the poller.
           */ 	
          int start(void *args);
          /**
           * The open() function used to, register the TP reactor in READ::MASK mode for receving the
           * ICMP,SNMP reply message.
           */
          virtual int open(void *arg=0);
          /**
           * The handle_input() function  recv the input message and form
           * the one full request.
           */
          virtual int handle_input(ACE_HANDLE handle);
           
          void parse_ipmi(string inputmsg); 

          /// Reponsible for single tone 	
          static Parser_Task *instance (void); 
          string storedata;
          string remaining_data;
          /// Removeing the unwanted whitespaces
          string trim(string input); 
          /**
           * @param inputmsg - inputmessage.
           * The split() function split the inputmsg and finding the request type
           * according the request type ,the inputmsg passed in to the corresponding            
           * parseing function.
           */ 
          int split(string inputmsg);
          /// Sending the reply message to poller
          virtual int svc();
          virtual int handle_close(ACE_HANDLE handle = ACE_INVALID_HANDLE,
                             ACE_Reactor_Mask close_mask = ACE_Event_Handler::ALL_EVENTS_MASK); 
                                                                                                                           
};
#endif
#include "parser_task.h"
#include <fstream>
#include <strstream>
#include "Bping_Acceptor.h"
#include "Server.h"
#include "ace/Shared_Memory_SV.h"

using namespace std;
//int count_ip =1; 
int Parser_Task::handle_input(ACE_HANDLE handle)
{
  char buf[125];
  ACE_OS::memset(buf,'\0',sizeof (buf));
  switch (this->peer().recv(buf ,sizeof (buf)-1))
  {
    case -1:
        ACE_ERROR_RETURN ((LM_ERROR,"bad read"),-1);
                                                                                           
    case 0:
       ACE_ERROR_RETURN ((LM_ERROR,"(%P|%t) closing log dameon %d \n",
                             this->get_handle()),-1);
    default:
       storedata = buf;
      // cout<<" Receive data in parser_task "<<storedata<<endl;
       unsigned int pos1;
       pos1 = storedata.find("\n");
       while(pos1 != string::npos)
       { 
  
          string inputmsg;
          inputmsg = remaining_data+storedata.substr(0,pos1);
                                                                                                                             
          remaining_data = "";
          storedata.erase(0,pos1+1);
          split(inputmsg);

          pos1 = storedata.find("\n");
           
       }
          remaining_data += storedata;
  }

}

static void * spawn_threads_function(void *arg)
{
    ACE_Reactor *reactor = ACE_static_cast (ACE_Reactor *, arg);
    reactor->owner (ACE_OS::thr_self ());
    for (;;)
        reactor->handle_events ();
    return 0;
}

int Parser_Task::open(void *args)
 {
  if(this->reactor()->register_handler (this,
                                ACE_Event_Handler::READ_MASK)==-1)
        {
         ACE_ERROR_RETURN((LM_ERROR,
                           "(%D/%t) Error while registering parser"),
                           -1);
        }
}



int Parser_Task::svc()
{
                                                                                                                             
while(1)
{
  ACE_Message_Block *bb;
  this->getq(bb);
  // cout<<"paraser_task svc "<<endl; 
   string temp = (string) (bb->rd_ptr());
   ACE_DEBUG ((LM_DEBUG, "(%D | %t)  %s\n",temp.c_str()));
   this->peer().send(temp.c_str(),temp.size()); 
   bb->release();                                                                                                                               
                                                                                                                             
}
                                                                                                                             
  return 0;
}
                                                                                                                             

int Parser_Task::start(void *args)
{ 
 
  this->reactor((ACE_Reactor *) args);
  if (Bping_Acceptor::instance()->open(2777,this->reactor()) == -1)
        ACE_ERROR_RETURN ((LM_ERROR,
                           "(%D) %p\n",
                           "cannot register Parser acceptor"),
                            -1); 
                                                                            
  ACE_Thread_Manager::instance()->spawn (ACE_THR_FUNC (spawn_threads_function),
                                         this->reactor(),
                                         THR_JOINABLE|THR_NEW_LWP);
   
  this->activate(THR_JOINABLE, 1);

return 0;

}

int Parser_Task::split(string inputmsg)
{
           string::size_type p_pos1,p_pos2,p_pos3;
           p_pos1 = 0;
           p_pos2 = inputmsg.find("#",p_pos1);
           if(string::npos==p_pos2)
           {
            ACE_DEBUG ((LM_DEBUG , "(%D | %t) wrong command format\n"));
            return 0;
           }
           string input =((inputmsg.substr(p_pos1,p_pos2-p_pos1)).c_str()); 
           string ver   = trim(input);
           if(ver =="")  
           {
            ACE_DEBUG ((LM_DEBUG ,"(%D | %t) wrong command format\n")); 
            return 0;
           }
           p_pos2+=1;
           p_pos3 = inputmsg.find("@",p_pos2);
           if(string::npos==p_pos3)
           {
            ACE_DEBUG ((LM_DEBUG , "(%D | %t) wrong command format\n"));
            return 0;
           }
           
           input=((inputmsg.substr(p_pos2,p_pos3-p_pos2)).c_str());
           string st = trim(input); 
           int status ;
           if(st =="ipmi")
            {
              parse_ipmi(inputmsg);          
            } 
           else
            ACE_DEBUG ((LM_DEBUG , "(%D | %t) wrong command format\n"));                                                                                                                  
}


void Parser_Task::parse_ipmi(string inputmsg)
{ 
//  cout<<"Inside the parse_ipmi"<<endl;   
  string rce_message =inputmsg;
//  cout<<"Second time "<<rce_message<<endl;
  ACE_Message_Block  *rce_data = new  ACE_Message_Block(rce_message.size()+1);
  rce_data->copy(rce_message.c_str (),rce_message.size()+1);
  usleep(15000);

  Ipmi_Task *ipm_ta = BPing_Server::instance()->get_ipmi_task();
  ipm_ta->putq(rce_data);
  ACE_DEBUG((LM_DEBUG , "(%D | %t) IPMI TASK MESSAGE QUEUE SIZE:  %d \n ",ipm_ta->msg_queue()->message_bytes()));
         
}

Parser_Task *Parser_Task::pointer_ = 0;

Parser_Task *Parser_Task::instance (void)
 {
   if (Parser_Task::pointer_ == 0)
      {
        ACE_MT (ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, ace_mon, 
				  *ACE_Static_Object_Lock::instance (), 0));
        if (Parser_Task::pointer_ == 0)
          ACE_NEW_RETURN (Parser_Task::pointer_, Parser_Task, 0);
      }
   return Parser_Task::pointer_;
 }



int
Parser_Task::handle_close(ACE_HANDLE handle,
                                 ACE_Reactor_Mask close_mask)
{
      ACE_DEBUG((LM_INFO,
               "\n(%D/%t) Closing the poller peer socket handle %d\n",
               this->peer().get_handle()));
     this->reactor()->remove_handler(this->peer().get_handle(),
                                ACE_Event_Handler::READ_MASK|ACE_Event_Handler::DONT_CALL|ACE_Event_Handler::WRITE_MASK);
                                                                                                                             
      this->peer ().close ();
      return 0;
}

string Parser_Task::trim(string input)
{

 string::size_type pos = input.find_last_not_of(' ');
  if(pos != string::npos)
    {
    input.erase(pos + 1);
    pos = input.find_first_not_of(' ');
    if(pos != string::npos)
    input.erase(0, pos);
    }
  else
  {
  input.erase(input.begin(), input.end());
  }
  return input; 
}


-------------------------------------------------------------------------
This SF.net email is sponsored by DB2 Express
Download DB2 Express C - the FREE version of DB2 express and take
control of your XML. No limits. Just data. Click to get it now.
http://sourceforge.net/powerbar/db2/
_______________________________________________
Openipmi-developer mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/openipmi-developer

Reply via email to