Hi Peace,

I would use boost::asio, and use the ability of it to handle Windows/Posix file 
descriptors.

Below are some snippets of code that might help illustrate this (don't try to 
compile!)

I don't know much of the pipe transport in thrift, so this may not be 
applicable to your question.

As an orthogonal thought, I was wondering if anybody has put some thought 
around using boost::asio for async thrift servers (right now using libevent).

Regards,
alex

#include <boost/asio.hpp> 
#include <boost/system/windows_error.hpp>

#ifdef DVA_OS_WIN
        using boost::asio::windows::stream_handle;
        typedef stream_handle platform_stream;
        typedef HANDLE platform_descriptor;
#       define PIPE_EOF_ERROR_CODE boost::system::windows_error::broken_pipe
#else
        using boost::asio::posix::stream_descriptor;
        typedef stream_descriptor platform_stream;
        typedef int platform_descriptor;
#       define PIPE_EOF_ERROR_CODE boost::asio::error::eof
#endif

        boost::asio::io_service io_service_;
        platform_stream pipe_;
        std::vector<char> data_;
        bool done_;
        bool error_;
        size_t total_size_;
        boost::asio::strand strand_;
        boost::thread thread_;

PipeSession(platform_descriptor fd) :
                io_service_(), pipe_(io_service_, fd),
{
}

bool PipeSession ::start()
{
#ifdef LINUX
        for (;;)
        {
                boost::asio::posix::descriptor_base::bytes_readable 
command(true);
                pipe_.io_control(command);
                std::size_t bytes_readable = command.get();

                if(bytes_readable) {
                        break;
                }
        }
#endif
        _thread = boost::thread(boost::bind(&PipeSession::run))

        return true;
}

void PipeSession ::run() {
        try {
                pipe_.async_read_some(boost::asio::buffer(data_),
                        strand_.wrap(boost::bind(&PipeSession::handle_read,
                        this, boost::asio::placeholders::error,
                        boost::asio::placeholders::bytes_transferred)));

                        std::size_t num = io_service_.run();

                } catch(const boost::system::system_error& e) {
                        done_ = true;
                        if(e.code() != PIPE_EOF_ERROR_CODE)
                                throw;
                }

        static platform_descriptor CreatePipeFD(const std::string& pipeName, 
bool readFlag) {
#ifdef WIN32
                HANDLE fd = CreateNamedPipe( 
                        pipeName.c_str(),    // pipe name 
                        PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,       // 
read/write access TODO PIPE_ACCESS_INBOUND? PIPE_ACCESS_OUTBOUND?
                        PIPE_TYPE_BYTE |          // byte type pipe 
                        PIPE_READMODE_BYTE |      // byte-read mode 
                        PIPE_WAIT,                // blocking mode 
                        PIPE_UNLIMITED_INSTANCES, // max. instances  
                        BUFSIZE,                  // output buffer size 
                        BUFSIZE,                  // input buffer size 
                        0,                        // client time-out 
                        NULL);                    // default security attribute 

                if(fd == INVALID_HANDLE_VALUE) {
                        throw std::runtime_error("CreateNamedPipe failed");
                }

                OVERLAPPED overlapped = {0};
                overlapped.hEvent = CreateEvent(0,TRUE,FALSE,0);
                if(ConnectNamedPipe(fd, &overlapped) != FALSE || GetLastError() 
!= ERROR_IO_PENDING) {
                        CloseHandle(overlapped.hEvent);
                        CloseHandle(fd);
                        throw std::runtime_error("ConnectNamedPipe failed");
                }
#else
                if(mkfifo(pipeName .c_str(), 0660) < 0) {
                        throw std::runtime_error("fifo failed");
                }

                int fd = open(pipeName c_str(), (readFlag ? O_RDONLY : O_RDWR) 
| O_NONBLOCK);
                if(fd <= 0) {
                        throw std::runtime_error("open failed");
                }
#endif

#ifdef WIN32
                DWORD waitRes;
                if((waitRes = WaitForSingleObject(overlapped.hEvent, 
LAUNCH_TIMEOUT * 1000)) != WAIT_OBJECT_0) {
                        CloseHandle(overlapped.hEvent);
                        CloseHandle(fd);
                        throw std::runtime_error("WaitForSingleObject failed");
                }
                CloseHandle(overlapped.hEvent);

#endif

                return fd;
        }
}

-----Original Message-----
From: Peace [mailto:[email protected]] 
Sent: Thursday, December 01, 2011 3:24 PM
To: [email protected]
Subject: Pipe transport for Windows

Hello group,

I am implementing Named and Anonymous Pipes transport for Thrift on the Windows 
platform.  The motivation for this is to provide a lightweight local IPC 
transport for applications that run entirely on one system. Unix already has 
domain sockets support in the TSocket transport but that does not work on 
Windows.  I would have preferred a cross-platform solution but Windows pipes 
are much too different from the Unix implementations. What are your thoughts on 
submitting this for possible inclusion?  Would a Windows-only transport bother 
people?  Is there a better way to accomplish this?


Regards,
Peace

Reply via email to