I've not found any helpful information about possibility of sending
messages from a node of cluster to the same node (for example
MPI_Send(&f, 1, MPI_UNSIGNED_LONG_LONG, 0, 0, MPI_COMM_WORLD) from
node #0).
I wrote program with two threads and one thread MPI_Send message to
another thread that must it MPI_Recv. And such program freezes because
interlock occurs sometime (not always).
If it's possible I've attached cpp-file with example of problem.
Thanks a lot for your attantion.
#include <cstdlib>
#include <iostream>
#include <tbb/compat/thread>
#include <tbb/tick_count.h>
#include "tbb/spin_mutex.h"
#include "tbb/mutex.h"
#include <mpi/mpi.h>
#include <string>
#include <boost/archive/text_oarchive.hpp>
#include <boost/archive/text_iarchive.hpp>
#include <fstream>
#include <time.h>
#include <vector>
#include <queue>

#include "LibraryInterface.h"

bool continueProcessing = true;

struct MPITags {
    typedef int type;
    enum {
        NoCommand,
        FinishProcessing,
        NewTask,
        ResultSize

    };
};

class SystemPars {
public:
    friend class boost::serialization::access;
    int MPI_size; 

    SystemPars() {}
    template<class Archive>
    void serialize(Archive & ar, const unsigned int version) {
        ar & MPI_size;
    }
} parObject = SystemPars();

bool yesno(std::string text) {
    std::cout << "Are you sure " << text << "? (y/n): ";
    char c;
    std::cin >> c;
    if (c == 'y') return true;
    else return false;
}

class ConsoleHandlerThread {
public:
    void operator ()() {
        std::string s;
        while(false) {
            std::cin >> s;
            if (s == "exit") {
                if (yesno("want to exit")) {
                    break;
                }
            } else {
                std::cout << "Unknown command.\n";
            }
        }
    }

    ConsoleHandlerThread() { }
};

class ConcurentTasksContainer {
private:
    std::vector<UInt64> minTasks;
    size_t minTasksNum;

    std::vector<MPITags::type> commands;
    size_t commandsNum;

    tbb::spin_mutex qMutex;

    std::vector<bool> haveSendedFP;
    size_t nFinishedNodes;

public:
    ConcurentTasksContainer() {
        minTasksNum = 0;
        qMutex = tbb::spin_mutex();
        nFinishedNodes = 0;
    }
    void resize(size_t size) {
        minTasks.resize(size);
        commands.resize(size);
        haveSendedFP.resize(size);
        nFinishedNodes = size;
    }

    bool hasCommands() {
        tbb::spin_mutex::scoped_lock l(qMutex);
        return commandsNum > 0;
    }

    bool hasCommand(size_t node) {
        tbb::spin_mutex::scoped_lock l(qMutex);
        return commands[node] != MPITags::NoCommand;
    }

    void setCommand(size_t node, MPITags::type com) {
        tbb::spin_mutex::scoped_lock l(qMutex);
        if (!haveSendedFP[node] && commands[node] == MPITags::NoCommand) {
            commands[node] = com;
            commandsNum++;
        }
        if (com == MPITags::FinishProcessing) {
            haveSendedFP[node] = true;
        }
    }

    MPITags::type getCommand(size_t node) {
        tbb::spin_mutex::scoped_lock l(qMutex);
        MPITags::type com = commands[node];
        if (commands[node] == MPITags::FinishProcessing) {
            nFinishedNodes--;
        }
        if (commands[node] != MPITags::NoCommand) {
            commands[node] = MPITags::NoCommand;
            commandsNum--;
        }
        return com;
    }

    void newTask(size_t node, UInt64 task) {
        tbb::spin_mutex::scoped_lock l(qMutex);
        if (!haveSendedFP[node]) {
            minTasks[node] = task;
            minTasksNum++;
        }
    }

    void endTask(size_t node) {
        tbb::spin_mutex::scoped_lock l(qMutex);
        minTasks[node] = 0;
        minTasksNum--;
    }

    bool isEmpty() {
        tbb::spin_mutex::scoped_lock l(qMutex);
        return minTasksNum == 0 && commandsNum == 0;
    }

    bool isProcessing() {
        tbb::spin_mutex::scoped_lock l(qMutex);
        return nFinishedNodes != 0;
    }

} taskContainer;

UInt8 stMutex = true;

class ServerThread {
public:
    void operator ()() {
        std::cout << "ServerThread: begin\n";
        srand(time(NULL));
        taskContainer.resize(parObject.MPI_size);
        MPI_Status state;
        UInt64 f;
        UInt32 FunctionsNum;
        stMutex = false;
        for(size_t i = parObject.MPI_size; i--;  ) {
            f = (rand() << 32);            
            MPI_Send(&f, 1, MPI_UNSIGNED_LONG_LONG, i, MPITags::NewTask, MPI_COMM_WORLD);
            taskContainer.newTask(i, f);
            std::cout << "ServerThread: present to " << i << std::endl;
        }

        while(!taskContainer.isEmpty()) {            
            MPI_Recv(&FunctionsNum, 1, MPI_UNSIGNED_LONG, MPI_ANY_SOURCE, MPITags::ResultSize, MPI_COMM_WORLD, &state);
            std::cout << "ServerThread: postreceived from " << state.MPI_SOURCE << "\n";
            taskContainer.endTask(state.MPI_SOURCE);

            MPI_Send(&f, 1, MPI_UNSIGNED_LONG_LONG, state.MPI_SOURCE, MPITags::FinishProcessing, MPI_COMM_WORLD);
        }
        std::cout << "ServerThread: end\n";
    }

    ServerThread() { }
};

void tasksProcessing(const int rank) {
    srand(time(NULL));
    std::cout << "tasksProcessing "<< rank << ": begin\n";
    MPI_Status state;
    UInt64 f1;
    UInt32 resultSize;

    bool continueProcessing = true;

    while(continueProcessing) {
        std::cout << "tasksProcessing "<< rank << ": waiting\n";
        MPI_Recv(&f1, 1, MPI_UNSIGNED_LONG_LONG, 0, MPI_ANY_TAG, MPI_COMM_WORLD, &state);
        std::cout << "tasksProcessing "<< rank << ": have a message\n";

        switch (state.MPI_TAG) {
            case MPITags::NewTask:
                std::this_thread::sleep_for(tbb::tick_count::interval_t((rand() % 5)/3.0));
                std::cout << "tasksProcessing "<< rank << ": minimization: sending\n";
                MPI_Send(&resultSize, 1, MPI_UNSIGNED_LONG, 0, MPITags::ResultSize, MPI_COMM_WORLD);
                break;
            case MPITags::FinishProcessing:
                continueProcessing = false;
                break;
        }

    }
    std::cout << "tasksProcessing "<< rank << ": end\n";
}

void loadSettings(const int size) {
    std::ifstream ifs("settings");
    if (!ifs.fail()) {
        boost::archive::text_iarchive ia(ifs);
        ia >> parObject;
    }
    parObject.MPI_size = size;
}

void saveSettings() {
    std::ofstream ofs("settings");
    boost::archive::text_oarchive oa(ofs);
    oa << parObject;
}

int main(int argc, char** argv) {
    MPI_Init(&argc, &argv);
    int rank, size;
    MPI_Comm_rank(MPI_COMM_WORLD,&rank);
    MPI_Comm_size(MPI_COMM_WORLD,&size);

    if (rank == 0) {
        loadSettings(size);

        std::thread *consoleHandlerThread;
        consoleHandlerThread = new std::thread(ConsoleHandlerThread());

        std::thread *server;
        server = new std::thread(ServerThread());

        while(stMutex)
            std::this_thread::sleep_for(tbb::tick_count::interval_t(0.5));
        tasksProcessing(rank);

        server->join();

        saveSettings();

        consoleHandlerThread->join();
    } else {
        tasksProcessing(rank);
    }
    
    MPI_Finalize();
    return 0;
}

Reply via email to