Hi there,

I'm referring to a forum discussion dealing with a multi-threading
issue when using the notification API heavily. I'm sorry that it is
over half a year ago (original post:
http://forum.openscenegraph.org/viewtopic.php?t=10533).

In the forum thread mentioned above, I stated a potential race
condition around the global static instance of osg::NotifyStream
defined in Notify.cpp. Hence, I made a suggestion to workaround this
issue by using a thread local storage:

> Based on the svn trunk version of Notify.cpp, I experimented with another 
> approach. The solution is prototype, however, here is what worked for me: I 
> used a thread local storage to separate access to osg::NotifyStream for each 
> thread. As discussed earlier in this thread, this assumes stream classes 
> (including std::ostream and std::stringbuf) to be at least reentrant. 
> According to 
> http://msdn.microsoft.com/en-us/library/c9ceah3b%28v=vs.100%29.aspx this is 
> true at least for my Visual compiler implementation.
> Currently, there are some other drawbacks with this solution: As I used 
> QThreadStorage for now, it requires QtCore library to be linked in core osg. 
> Maybe, another TLS implementation would be more suitable (does OpenThreads 
> has one?). Although I could not recognize capital performance loss yet, this 
> may be another critical issue.
> Nevertheless, I tried the modified version upon OSG 3.0.1 on all my Win 7 x64 
> machines and the heap corruption never showed up again. For further 
> inspection and interest, I attached my modified version of Notify.cpp which 
> has a preprocessor flag OSG_NOTIFY_STREAM_PER_THREAD to toggle the use of 
> thread local storage.

Robert then requested, if it is possible to use OpenThreads instead of
QThreadStorage for the job:

> OpenThreads doesn't expose thread local storage but does use it internally to 
> provide the Thread::CurrentThread() functionality. Might it be possible to 
> use this in your implementation?

Finally, I followed this hint to implement a local class
StreamPerThreadStorage which is independent from Qt. I attached a
version of Notify.cpp based on the current trunk version. This is
tested on a Win 7 x64 machine where the former heap corruption did not
show up again.

Again, there is a preprocessor flag OSG_NOTIFY_STREAM_PER_THREAD.
Currently, its definition is hardcoded (and commented out in the
attached file). If my changes would be submitted, maybe one could
expose the flag to CMake in order to make the use of thread local
storage optional. Unfortunately, I have no clue about CMake
programming.

Kind regards,
Matthias Schütze, Germany
/* -*-c++-*- OpenSceneGraph - Copyright (C) 1998-2006 Robert Osfield
 *
 * This library is open source and may be redistributed and/or modified under
 * the terms of the OpenSceneGraph Public License (OSGPL) version 0.0 or
 * (at your option) any later version.  The full license is in LICENSE file
 * included with this distribution, and on the openscenegraph.org website.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * OpenSceneGraph Public License for more details.
*/
#include <osg/Notify>
#include <osg/ApplicationUsage>
#include <osg/ref_ptr>
#include <string>
#include <stdlib.h>
#include <stdio.h>
#include <sstream>
#include <iostream>

#include <ctype.h>

//#define OSG_NOTIFY_STREAM_PER_THREAD // enables or disables the workaround

#ifdef OSG_NOTIFY_STREAM_PER_THREAD
#include <OpenThreads/Thread>
#include <OpenThreads/ScopedLock>
#include <map>
#endif // OSG_NOTIFY_STREAM_PER_THREAD

#define OSG_INIT_SINGLETON_PROXY(ProxyName, Func) static struct ProxyName{ ProxyName() { Func; } } s_##ProxyName;

namespace osg
{

class NullStreamBuffer : public std::streambuf
{
private:
    std::streamsize xsputn(const std::streambuf::char_type *str, std::streamsize n)
    {
        return n;
    }
};

struct NullStream : public std::ostream
{
public:
    NullStream():
        std::ostream(new NullStreamBuffer)
    { _buffer = dynamic_cast<NullStreamBuffer *>(rdbuf()); }

    ~NullStream()
    {
        rdbuf(0);
        delete _buffer;
    }

protected:
    NullStreamBuffer* _buffer;
};

/** Stream buffer calling notify handler when buffer is synchronized (usually on std::endl).
 * Stream stores last notification severity to pass it to handler call.
 */
struct NotifyStreamBuffer : public std::stringbuf
{
    NotifyStreamBuffer() : _severity(osg::NOTICE)
    {
    }

    void setNotifyHandler(osg::NotifyHandler *handler) { _handler = handler; }
    osg::NotifyHandler *getNotifyHandler() const { return _handler.get(); }

    /** Sets severity for next call of notify handler */
    void setCurrentSeverity(osg::NotifySeverity severity)
    {
        if (_severity != severity)
        {
            sync();
            _severity = severity;
        }
    }

    osg::NotifySeverity getCurrentSeverity() const { return _severity; }

private:

    int sync()
    {
        sputc(0); // string termination
        if (_handler.valid())
            _handler->notify(_severity, pbase());
        pubseekpos(0, std::ios_base::out); // or str(std::string())
        return 0;
    }

    osg::ref_ptr<osg::NotifyHandler> _handler;
    osg::NotifySeverity _severity;
};

struct NotifyStream : public std::ostream
{
public:
    NotifyStream():
        std::ostream(new NotifyStreamBuffer)
    { _buffer = dynamic_cast<NotifyStreamBuffer *>(rdbuf()); }

    void setCurrentSeverity(osg::NotifySeverity severity)
    {
        _buffer->setCurrentSeverity(severity);
    }

    osg::NotifySeverity getCurrentSeverity() const
    {
        return _buffer->getCurrentSeverity();
    }

    ~NotifyStream()
    {
        rdbuf(0);
        delete _buffer;
    }

protected:
    NotifyStreamBuffer* _buffer;
};

#ifdef OSG_NOTIFY_STREAM_PER_THREAD
class StreamPerThreadStorage
{
private:
    typedef std::map< OpenThreads::Thread*, std::ostream* > StreamPerThreadMap;

public:
    StreamPerThreadStorage()
        : _mutex(OpenThreads::Mutex::MUTEX_NORMAL)
    {
    }

    ~StreamPerThreadStorage()
    {
        OpenThreads::ScopedLock<OpenThreads::Mutex> locker(_mutex);

        for (StreamPerThreadMap::iterator it = _streams.begin(); it != _streams.end(); ++it)
        {
            if (it->second)
                delete it->second;
        }

        _streams.clear();
    }

    bool hasLocalStream() const
    {
        return (getLocalStream() != 0);
    }

    std::ostream* getLocalStream() const
    {
        OpenThreads::Thread *thread = OpenThreads::Thread::CurrentThread();
        OpenThreads::ScopedLock<OpenThreads::Mutex> locker(_mutex);
        StreamPerThreadMap::const_iterator it = _streams.find(thread);
        return (it != _streams.end()) ? it->second : 0;
    }

    void setLocalStream(std::ostream *stream)
    {
        OpenThreads::Thread *thread = OpenThreads::Thread::CurrentThread();
        OpenThreads::ScopedLock<OpenThreads::Mutex> locker(_mutex);
        StreamPerThreadMap::const_iterator it = _streams.find(thread);
        std::ostream *oldStream = (it != _streams.end()) ? it->second : 0;

        _streams[thread] = stream;

        if (oldStream)
            delete oldStream;
    }

private:
    // Disable copying
    StreamPerThreadStorage(const StreamPerThreadStorage &other);
    StreamPerThreadStorage& operator=(const StreamPerThreadStorage &other);

    mutable OpenThreads::Mutex _mutex;
    StreamPerThreadMap _streams;
};
#endif // OSG_NOTIFY_STREAM_PER_THREAD

}

using namespace osg;

static osg::ApplicationUsageProxy Notify_e0(osg::ApplicationUsage::ENVIRONMENTAL_VARIABLE, "OSG_NOTIFY_LEVEL <mode>", "FATAL | WARN | NOTICE | DEBUG_INFO | DEBUG_FP | DEBUG | INFO | ALWAYS");

struct NotifySingleton
{
    NotifySingleton()
    {
        // _notifyLevel
        // =============

        _notifyLevel = osg::NOTICE; // Default value

        char* OSGNOTIFYLEVEL=getenv("OSG_NOTIFY_LEVEL");
        if (!OSGNOTIFYLEVEL) OSGNOTIFYLEVEL=getenv("OSGNOTIFYLEVEL");
        if(OSGNOTIFYLEVEL)
        {

            std::string stringOSGNOTIFYLEVEL(OSGNOTIFYLEVEL);

            // Convert to upper case
            for(std::string::iterator i=stringOSGNOTIFYLEVEL.begin();
                i!=stringOSGNOTIFYLEVEL.end();
                ++i)
            {
                *i=toupper(*i);
            }

            if(stringOSGNOTIFYLEVEL.find("ALWAYS")!=std::string::npos)          _notifyLevel=osg::ALWAYS;
            else if(stringOSGNOTIFYLEVEL.find("FATAL")!=std::string::npos)      _notifyLevel=osg::FATAL;
            else if(stringOSGNOTIFYLEVEL.find("WARN")!=std::string::npos)       _notifyLevel=osg::WARN;
            else if(stringOSGNOTIFYLEVEL.find("NOTICE")!=std::string::npos)     _notifyLevel=osg::NOTICE;
            else if(stringOSGNOTIFYLEVEL.find("DEBUG_INFO")!=std::string::npos) _notifyLevel=osg::DEBUG_INFO;
            else if(stringOSGNOTIFYLEVEL.find("DEBUG_FP")!=std::string::npos)   _notifyLevel=osg::DEBUG_FP;
            else if(stringOSGNOTIFYLEVEL.find("DEBUG")!=std::string::npos)      _notifyLevel=osg::DEBUG_INFO;
            else if(stringOSGNOTIFYLEVEL.find("INFO")!=std::string::npos)       _notifyLevel=osg::INFO;
            else std::cout << "Warning: invalid OSG_NOTIFY_LEVEL set ("<<stringOSGNOTIFYLEVEL<<")"<<std::endl;

        }

#ifndef OSG_NOTIFY_STREAM_PER_THREAD
        // Setup standard notify handler
        osg::NotifyStreamBuffer *buffer = dynamic_cast<osg::NotifyStreamBuffer *>(_notifyStream.rdbuf());
        if (buffer && !buffer->getNotifyHandler())
            buffer->setNotifyHandler(new StandardNotifyHandler);
#else
        // Setup standard notify handler
        if (!_notifyHandler.valid())
            _notifyHandler = new StandardNotifyHandler();
#endif // OSG_NOTIFY_STREAM_PER_THREAD
    }

    osg::NotifySeverity _notifyLevel;
#ifndef OSG_NOTIFY_STREAM_PER_THREAD
    osg::NullStream     _nullStream;
    osg::NotifyStream   _notifyStream;
#else
    osg::ref_ptr<osg::NotifyHandler> _notifyHandler;
    osg::StreamPerThreadStorage _nullStreams;
    osg::StreamPerThreadStorage _notifyStreams;
#endif // OSG_NOTIFY_STREAM_PER_THREAD
};

static NotifySingleton& getNotifySingleton()
{
    static NotifySingleton s_NotifySingleton;
    return s_NotifySingleton;
}

bool osg::initNotifyLevel()
{
    getNotifySingleton();
    return true;
}

// Use a proxy to force the initialization of the the NotifySingleton during static initialization
OSG_INIT_SINGLETON_PROXY(NotifySingletonProxy, osg::initNotifyLevel())

void osg::setNotifyLevel(osg::NotifySeverity severity)
{
    getNotifySingleton()._notifyLevel = severity;
}

osg::NotifySeverity osg::getNotifyLevel()
{
    return getNotifySingleton()._notifyLevel;
}

void osg::setNotifyHandler(osg::NotifyHandler *handler)
{
#ifndef OSG_NOTIFY_STREAM_PER_THREAD
    osg::NotifyStreamBuffer *buffer = static_cast<osg::NotifyStreamBuffer*>(getNotifySingleton()._notifyStream.rdbuf());
    if (buffer) buffer->setNotifyHandler(handler);
#else
    getNotifySingleton()._notifyHandler = handler;
#endif // OSG_NOTIFY_STREAM_PER_THREAD
}

osg::NotifyHandler* osg::getNotifyHandler()
{
#ifndef OSG_NOTIFY_STREAM_PER_THREAD
    osg::NotifyStreamBuffer *buffer = static_cast<osg::NotifyStreamBuffer *>(getNotifySingleton()._notifyStream.rdbuf());
    return buffer ? buffer->getNotifyHandler() : 0;
#else
    return getNotifySingleton()._notifyHandler.get();
#endif // OSG_NOTIFY_STREAM_PER_THREAD
}


#ifndef OSG_NOTIFY_DISABLED
bool osg::isNotifyEnabled( osg::NotifySeverity severity )
{
    return severity<=getNotifySingleton()._notifyLevel;
}
#endif

#ifndef OSG_NOTIFY_STREAM_PER_THREAD
std::ostream& osg::notify(const osg::NotifySeverity severity)
{
    if (osg::isNotifyEnabled(severity))
    {
        getNotifySingleton()._notifyStream.setCurrentSeverity(severity);
        return getNotifySingleton()._notifyStream;
    }
    return getNotifySingleton()._nullStream;
}
#else
std::ostream& osg::notify(const osg::NotifySeverity severity)
{
    if (osg::isNotifyEnabled(severity))
    {
        osg::NotifyStream *stream = dynamic_cast<osg::NotifyStream*>(getNotifySingleton()._notifyStreams.getLocalStream());
        if (!stream)
        {
            stream = new osg::NotifyStream();
            getNotifySingleton()._notifyStreams.setLocalStream(stream);
        }

        osg::NotifyStreamBuffer *buffer = dynamic_cast<osg::NotifyStreamBuffer*>(stream->rdbuf());
        if (buffer)
        {
            buffer->setNotifyHandler(getNotifySingleton()._notifyHandler.get());
            buffer->setCurrentSeverity(severity);
        }

        return *stream;
    }
    else
    {
        osg::NullStream *stream = dynamic_cast<osg::NullStream*>(getNotifySingleton()._nullStreams.getLocalStream());
        if (!stream)
        {
            stream = new osg::NullStream();
            getNotifySingleton()._nullStreams.setLocalStream(stream);
        }

        return *stream;
    }
}
#endif // OSG_NOTIFY_STREAM_PER_THREAD

void osg::StandardNotifyHandler::notify(osg::NotifySeverity severity, const char *message)
{
    if (severity <= osg::WARN)
        fputs(message, stderr);
    else
        fputs(message, stdout);
}

#if defined(WIN32) && !defined(__CYGWIN__)

#ifndef WIN32_LEAN_AND_MEAN
    #define WIN32_LEAN_AND_MEAN
#endif
#include <windows.h>

void osg::WinDebugNotifyHandler::notify(osg::NotifySeverity severity, const char *message)
{
    OutputDebugStringA(message);
}

#endif
_______________________________________________
osg-submissions mailing list
[email protected]
http://lists.openscenegraph.org/listinfo.cgi/osg-submissions-openscenegraph.org

Reply via email to