On 22/09/10 07:38 PM, Masoume Jabbarifar wrote:


On Mon, Sep 20, 2010 at 8:49 PM, Benjamin Poirier
<[email protected] <mailto:[email protected]>> wrote:

Read linux-2.6/Documentation/SubmittingPatches
7) No MIME, no links, no compression, no attachments.  Just plain text.

More comments below.


    Some introduction about the synchronization code and the benefits of
    modules:
    The synchronization framework consists of a group of modules that
    interchangeably plug in to a central "synchronization chain"
    concept. This chain performs the different steps needed to
    synchronize the time between a group of traces: read the events from
    different traces, match them together, analyze them to come up with
    correction factors and finally reduce the potentially large number
    of factors to a minimal set. Each step of the synchronization chain
    is carefully kept independent of the other steps. This allows, for
    example, the accuracy analysis code to work off of UDP events from
    lttng traces or simulated TCP events from text files. Any
    appropriate module can be plugged in at any step because 1) each
    module is kept independent of other modules, 2) the core is kept
    independent of any specific module, 3) each module performs a
    single, specific task.


Thank you for your introduction about your code, it can be helpful to
whom is not familiar with it.
A few months ago, we had a meeting together and we discuss about your
code. There was some redundant codes in Convex-hull and Linear
Regression. This redundant code converted factors after completing
synchronization and found time based on reference time. So you decided
to put this code in a new module that called the Reduction module.

In this module you find, first of all, reference node, factors for each
pair nodes and then best path for accuracy (and use Minimum Spanning
Tree to find minimum drifts and offsets). Finding Minimum Spanning Tree,
ignoring useless links and routing accurate path does not mean event
filtering! I do implement same concept in other way.

let’s take a look at simple example and talk more about my patch.
In following example, A, B, C and D are connected. Each number in each
link shows the number of exchanged packets.

             D ---- 40  ---- A ---- 5  ---- B
                  \               |               /
                5    \      20 |           /   10
                     \    |     /
                          C
In synchronization based on accuracy (your code), you find factors for
each pair of nodes means A-B, A-D, A-C and B-C and also D-C and
synchronize them two by two. Reduction module finds reference node (A)
then checks factors (drifts and offsets) and finds Minimum Spanning Tree
based on factors (Minimum because you need less drift and offset between
two clocks therefore more accuracy)

It means, it ignores one of three links in the triangles so we lost some
time to find factors in for ignored or useless links (A-B and D-C).

I studied your documentations and some other papers and got that if
there is more exchanged packets in a link, there is better accuracy in
that link. It means, “A” with 70 exchanged packets is reference node and
B will be synchronized with A through (B-C-A) path. Also you will not
use the calculated factors between A-B and D-C after this.

So what I did, is counting the number of packets before
synchronization.To buffer packets and count them, I need to change some
parts of Matching module. In your case, Matching module sends packet to
Analysis module directly and drops them. In my case, packets are
buffered in Matching module and sent to Reduction module to be organized.

I explained my idea in that meeting and you said I can add this to
Reduction module. It is meaningful  because user can chose the Accuracy
or Time based algorithm as user can chose Linear Regression or
Convex-hull in Analysis module.

Following figure illustrates modules briefly.


In Reduction time branch, as I discussed beforehand, the number of
exchanged packets is counted and reference node (A) is chosen based on
them and Maximum Spanning Tree is found means following tree.

             D ---- 60  ---- A              B
                                |               /
                           30 |           /   20
                                |     /
                                  C
This tree will be sent to Analysis module to synchronize nodes.
Obviously, the total synchronization time will be less than the total
synchronization time of the main tree since useless links has been removed.


So, is it right to summarize what you're doing this way?:
Instead of picking the edges that have the best accuracy, your algorithms picks the edges that have the highest number of exchanged messages.

-Ben

Future Improvement:
Since Analysis module works on “CPU-time”  and needs just trace-number
so there is no need to send whole packet to Analysis. So a simple change
is needed in the output of Matching module and the input of Analysis
module. With this improvement, the buffering will be removed from my
algorithm and “time” branch too.

Masoume



    On 15/09/10 12:36 PM, Masoume Jabbarifar wrote:

        ---
          lttv/lttv/Makefile.am                  |    2 +
          lttv/lttv/sync/Makefile.am             |    4 +-
          lttv/lttv/sync/data_structures.c       |    2 -
          lttv/lttv/sync/event_matching_tcp.c    |   87 +++++-
          lttv/lttv/sync/event_matching_tcp.h    |    1 +
          lttv/lttv/sync/event_processing_text.c |    4 +
          lttv/lttv/sync/factor_reduction.h      |    6 +
          lttv/lttv/sync/factor_reduction_time.c |  514
        ++++++++++++++++++++++++++++++++
          lttv/lttv/sync/factor_reduction_time.h |   53 ++++
          lttv/lttv/sync/sync_chain_lttv.c       |    6 +
          lttv/lttv/sync/sync_chain_unittest.c   |    2 +
          11 files changed, 664 insertions(+), 17 deletions(-)
          create mode 100644 lttv/lttv/sync/factor_reduction_time.c
          create mode 100644 lttv/lttv/sync/factor_reduction_time.h

        diff --git a/lttv/lttv/Makefile.am b/lttv/lttv/Makefile.am
        index 30ead55..750f2d2 100644
        --- a/lttv/lttv/Makefile.am
        +++ b/lttv/lttv/Makefile.am
        @@ -87,6 +87,8 @@ lttv_real_SOURCES = \
                sync/factor_reduction.h\
                sync/factor_reduction_accuracy.c\
                sync/factor_reduction_accuracy.h\
        +       sync/factor_reduction_time.c\
        +       sync/factor_reduction_time.h\
                sync/lookup3.h

          lttvinclude_HEADERS = \
        diff --git a/lttv/lttv/sync/Makefile.am b/lttv/lttv/sync/Makefile.am
        index e1d6775..7a5417d 100644
        --- a/lttv/lttv/sync/Makefile.am
        +++ b/lttv/lttv/sync/Makefile.am
        @@ -30,4 +30,6 @@ unittest_SOURCES = \
                event_analysis_linreg.h\
                factor_reduction.h\
                factor_reduction_accuracy.c\
        -       factor_reduction_accuracy.h
        +       factor_reduction_accuracy.h\
        +       factor_reduction_time.c\
        +       factor_reduction_time.h
        diff --git a/lttv/lttv/sync/data_structures.c
        b/lttv/lttv/sync/data_structures.c
        index acac9d7..c5fe736 100644
        --- a/lttv/lttv/sync/data_structures.c
        +++ b/lttv/lttv/sync/data_structures.c
        @@ -271,8 +271,6 @@ void destroyTCPSegment(Message* const segment)
          {
                TCPEvent* inE, *outE;

        -       segment->print(segment);
        -
                g_assert(segment->inE != NULL&&  segment->outE != NULL);
                g_assert(segment->inE->type == TCP&&
          segment->outE->type == TCP);
                inE= segment->inE->event.tcpEvent;
        diff --git a/lttv/lttv/sync/event_matching_tcp.c
        b/lttv/lttv/sync/event_matching_tcp.c
        index 90d6c43..678a334 100644
        --- a/lttv/lttv/sync/event_matching_tcp.c
        +++ b/lttv/lttv/sync/event_matching_tcp.c
        @@ -56,7 +56,8 @@ static void
        buildReversedConnectionKey(ConnectionKey* const
          static void openGraphDataFiles(SyncState* const syncState);
          static void closeGraphDataFiles(SyncState* const syncState);
          static void writeMessagePoint(FILE* stream, const Message*
        const message);
        -
        +static void gfPacketDestroy(gpointer data, gpointer userData);
        +static void gfExchangeDestroy(gpointer data, gpointer userData);

          static MatchingModule matchingModuleTCP = {
                .name= "TCP",
        @@ -101,6 +102,7 @@ void registerMatchingTCP()
          static void initMatchingTCP(SyncState* const syncState)
          {
                MatchingDataTCP* matchingData;
        +       int i, j;

                matchingData= malloc(sizeof(MatchingDataTCP));
                syncState->matchingData= matchingData;
        @@ -113,8 +115,7 @@ static void initMatchingTCP(SyncState* const
        syncState)
        &gefConnectionKeyEqual,&gdnConnectionKeyDestroy,
        &gdnTCPSegmentListDestroy);

        -       if (syncState->stats)
        -       {
        +       if (syncState->reductionModule->preProcessReduction !=
        NULL || syncState->stats) {
                        unsigned int i;

                        matchingData->stats= calloc(1,
        sizeof(MatchingStatsTCP));
        @@ -139,6 +140,20 @@ static void initMatchingTCP(SyncState*
        const syncState)
                {
                        matchingData->messagePoints= NULL;
                }
        +       if (syncState->reductionModule->preProcessReduction !=
        NULL) {
        +               matchingData->packetArray=
        malloc(syncState->traceNb * sizeof(GQueue**));
        +               for (i= 0; i<  syncState->traceNb; i++) {
        +                       matchingData->packetArray[i]=
        malloc(syncState->traceNb * sizeof(GQueue*));
        +
        +                       for (j= 0; j<  syncState->traceNb; j++) {
        +                               matchingData->packetArray[i][j]=
        g_queue_new();
        +                       }
        +               }
        +       }
        +       else
        +       {
        +               matchingData->packetArray= NULL;
        +       }
          }


        @@ -155,6 +170,7 @@ static void initMatchingTCP(SyncState* const
        syncState)
          static void destroyMatchingTCP(SyncState* const syncState)
          {
                MatchingDataTCP* matchingData;
        +       int i, j;

                matchingData= (MatchingDataTCP*) syncState->matchingData;

        @@ -165,8 +181,7 @@ static void destroyMatchingTCP(SyncState*
        const syncState)

                partialDestroyMatchingTCP(syncState);

        -       if (syncState->stats)
        -       {
        +       if (syncState->reductionModule->preProcessReduction !=
        NULL || syncState->stats) {
                        unsigned int i;

                        for (i= 0; i<  syncState->traceNb; i++)
        @@ -179,6 +194,17 @@ static void destroyMatchingTCP(SyncState*
        const syncState)

                free(syncState->matchingData);
                syncState->matchingData= NULL;
        +       if (syncState->reductionModule->preProcessReduction !=
        NULL) {
        +                       for (i= 0; i<  syncState->traceNb; i++) {
        +                               for (j= 0; j<
          syncState->traceNb; j++)
        +                                       if
        (syncState->analysisModule->analyzeMessage != NULL)
        +
        g_queue_foreach(matchingData->packetArray[i][j],
        gfPacketDestroy, NULL);
        +                                       else
        +
        g_queue_foreach(matchingData->packetArray[i][j],
        gfExchangeDestroy, NULL);
        +                               free(matchingData->packetArray[i]);
        +                       }
        +                       free(matchingData->packetArray);
        +               }
          }


        @@ -335,6 +361,7 @@ static void matchEvents(SyncState* const
        syncState, Event* const event,
                Message* packet;
                MatchingDataTCP* matchingData;
                GQueue* conUnAcked;
        +       GQueue* packetMatching;

                matchingData= (MatchingDataTCP*) syncState->matchingData;

        @@ -354,10 +381,11 @@ static void matchEvents(SyncState* const
        syncState, Event* const event,
                        packet->outE->event.tcpEvent->segmentKey=
        packet->inE->event.tcpEvent->segmentKey;

                        if (syncState->stats)
        -               {
                                matchingData->stats->totPacket++;
        +
        +               if
        (syncState->reductionModule->preProcessReduction != NULL ||
        syncState->stats)

          
matchingData->stats->totMessageArray[packet->inE->traceNum][packet->outE->traceNum]++;
        -               }
        +

                        // Discard loopback traffic
                        if (packet->inE->traceNum == packet->outE->traceNum)
        @@ -371,17 +399,24 @@ static void matchEvents(SyncState* const
        syncState, Event* const event,

          
writeMessagePoint(matchingData->messagePoints[packet->inE->traceNum][packet->outE->traceNum],
                                        packet);
                        }
        -
        -               if (syncState->analysisModule->analyzeMessage !=
        NULL)
        -               {
        -
        syncState->analysisModule->analyzeMessage(syncState, packet);
        +               if (syncState->analysisModule->analyzeMessage !=
        NULL) {
        +                       if
        (syncState->reductionModule->preProcessReduction == NULL)
        +
        syncState->analysisModule->analyzeMessage(syncState, packet);
        +                       else {
        +                               packetMatching=
        +
        
matchingData->packetArray[packet->inE->traceNum][packet->outE->traceNum];
        +
        g_queue_push_tail(packetMatching, packet);
        +                       }
        +
                        }

                        // We can skip the rest of the algorithm if the
        analysis module is not
                        // interested in exchanges
                        if (syncState->analysisModule->analyzeExchange
        == NULL)
                        {
        -                       destroyTCPSegment(packet);
        +                       if
        (syncState->reductionModule->preProcessReduction == NULL)
        +                               destroyTCPSegment(packet);
        +
                                return;
                        }

        @@ -452,8 +487,14 @@ static void matchEvents(SyncState* const
        syncState, Event* const event,

          matchingData->stats->totExchangeSync++;
                                                        }

        -
        syncState->analysisModule->analyzeExchange(syncState,
        -                                                       exchange);
        +                                               if
        (syncState->reductionModule->preProcessReduction == NULL)
        +
        syncState->analysisModule->analyzeExchange(syncState, exchange);
        +                                               else
        +                                               {
        +
        packetMatching=
        +
        
matchingData->packetArray[packet->inE->traceNum][packet->outE->traceNum];
        +
        g_queue_push_tail(packetMatching, exchange);
        +                                               }
                                                }

                                                exchange->message= NULL;
        @@ -707,3 +748,21 @@ static void
        writeMatchingGraphsPlotsTCPMessages(SyncState* const syncState,
        "title \"Received messages\" with points linetype 4 "
        "linecolor rgb \"#6699cc\" pointtype 11 pointsize 2, \\\n", i, j);
          }
        +static void gfPacketDestroy(gpointer data, gpointer userData)
        +{
        +       Message* packet;
        +
        +       packet= (Message*) data;
        +       destroyTCPSegment(packet);
        +
        +}
        +
        +static void gfExchangeDestroy(gpointer data, gpointer userData)
        +{
        +       Exchange* exchange;
        +
        +       exchange= (Exchange*) data;
        +       exchange->message= NULL;
        +       destroyTCPExchange(exchange);
        +
        +}
        diff --git a/lttv/lttv/sync/event_matching_tcp.h
        b/lttv/lttv/sync/event_matching_tcp.h
        index 6f9b072..e7d1d12 100644
        --- a/lttv/lttv/sync/event_matching_tcp.h
        +++ b/lttv/lttv/sync/event_matching_tcp.h
        @@ -57,6 +57,7 @@ typedef struct
                 * The elements on the diagonal are not initialized.
                 */
                FILE*** messagePoints;
        +       GQueue*** packetArray;
          } MatchingDataTCP;

          void registerMatchingTCP();
        diff --git a/lttv/lttv/sync/event_processing_text.c
        b/lttv/lttv/sync/event_processing_text.c
        index bcbea9b..894f71c 100644
        --- a/lttv/lttv/sync/event_processing_text.c
        +++ b/lttv/lttv/sync/event_processing_text.c
        @@ -273,6 +273,10 @@ static AllFactors*
        finalizeProcessingText(SyncState* const syncState)
                        free(line);
                }

        +       if (syncState->reductionModule->preProcessReduction !=
        NULL) {
        +
        syncState->reductionModule->preProcessReduction(syncState);
        +       }
        +
                factors=
        syncState->matchingModule->finalizeMatching(syncState);
                if (syncState->stats)
                {
        diff --git a/lttv/lttv/sync/factor_reduction.h
        b/lttv/lttv/sync/factor_reduction.h
        index 561df6b..e971ea6 100644
        --- a/lttv/lttv/sync/factor_reduction.h
        +++ b/lttv/lttv/sync/factor_reduction.h
        @@ -39,6 +39,12 @@ typedef struct
                 */
                void (*destroyReduction)(struct _SyncState* const
        syncState);

        +       /* This function is called when time reduction is needed and
        +        * removes useless communication links and finds
        Spanning Tree
        +        * of nodes in the network and then sends packets to
        synchronization
        +        */
        +       void (*preProcessReduction)(struct _SyncState* const
        syncState);
        +
                /*
                 * Convert trace pair synchronization factors to a
        resulting offset and
                 * drift for each trace.
        diff --git a/lttv/lttv/sync/factor_reduction_time.c
        b/lttv/lttv/sync/factor_reduction_time.c
        new file mode 100644
        index 0000000..2bd37bf
        --- /dev/null
        +++ b/lttv/lttv/sync/factor_reduction_time.c
        @@ -0,0 +1,514 @@
        +/* This file is part of the Linux Trace Toolkit viewer
        + * Copyright (C) 2010 Masoume
        Jabbarifar<[email protected]
        <mailto:[email protected]>>
        + *
        + * This program is free software: you can redistribute it
        and/or modify it
        + * under the terms of the GNU Lesser General Public License as
        published by
        + * the Free Software Foundation, either version 2.1 of the
        License, or (at
        + * your option) any later version.
        + *
        + * This program is distributed in the hope that it will be
        useful, but WITHOUT
        + * ANY WARRANTY; without even the implied warranty of
        MERCHANTABILITY or
        + * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser
        General Public
        + * License for more details.
        + *
        + * You should have received a copy of the GNU Lesser General
        Public License
        + * along with this program.  If not,
        see<http://www.gnu.org/licenses/>.
        + */
        +#define _ISOC99_SOURCE
        +
        +#ifdef HAVE_CONFIG_H
        +#include<config.h>
        +#endif
        +
        +#include<math.h>
        +#include<errno.h>
        +#include<stdlib.h>
        +#include<string.h>
        +#include<unistd.h>
        +
        +
        +#include "event_analysis.h"
        +#include "event_matching.h"
        +#include "sync_chain.h"
        +
        +#include "factor_reduction_time.h"
        +#include "event_matching_tcp.h"
        +
        +
        +/* Functions common to all reduction modules */
        +static void initReductionTime(SyncState* const syncState);
        +static void destroyReductionTime(SyncState* const syncState);
        +static void preProcessReductionTime(SyncState* const syncState);
        +
        +static GArray* finalizeReductionTime(SyncState* const syncState,
        +       AllFactors* allFactors);
        +static void printReductionStatsTime(SyncState* const syncState);
        +
        +/* Functions specific to this module */
        +static void getFactors(SyncState* const syncState,AllFactors*
        const allFactors,
        +       const unsigned int traceNum,
        +       Factors* const factors);
        +static void maximumSpanningTree(SyncState* const syncState);
        +static void maximumCommunicationPacket(SyncState* const syncState);
        +static void floodingSyncExecute(SyncState* const syncState);
        +void updateDistances(int target,ReductionData*
        reductionData,SyncState* const syncState);
        +void sumCommunicationPackets(SyncState* const syncState);
        +void proposeRefNode(int** connectionArray, int* refNodes, int n);
        +void SyncExecute(int refNode,SyncState* const syncState);
        +int findParent(SyncState* const syncState, int root, int node);
        +
        +
        +static ReductionModule reductionModuleTime= {
        +       .name= "time",
        +       .initReduction=&initReductionTime,
        +       .destroyReduction=&destroyReductionTime,
        +       .preProcessReduction=&preProcessReductionTime,
        +       .finalizeReduction=&finalizeReductionTime,
        +       .printReductionStats=&printReductionStatsTime,
        +       .graphFunctions= {},
        +};
        +
        +
        +/*
        + * Reduction module registering function
        + */
        +void registerReductionTime()
        +{
        +       g_queue_push_tail(&reductionModules,&reductionModuleTime);
        +}
        +
        +
        +/*
        + * Reduction init function
        + *
        + * This function is called at the beginning of a
        synchronization run for a set
        + * of traces.
        + *
        + * Allocate some reduction specific data structures
        + *
        + * Args:
        + *   syncState     container for synchronization data.
        + */
        +static void initReductionTime(SyncState* const syncState)
        +{
        +       int i;
        +
        +       ReductionData* reductionData;
        +
        +       reductionData= malloc(sizeof(ReductionData));
        +       syncState->reductionData= reductionData;
        +
        +
        +       reductionData->routed= malloc(syncState->traceNb *
        sizeof(char*));
        +
        +       reductionData->distance= malloc(syncState->traceNb *
        sizeof(int*));
        +
        +       reductionData->neighbour= malloc(syncState->traceNb *
        sizeof(int*));
        +
        +       reductionData->totMessageArray=
        malloc(syncState->traceNb * sizeof(unsigned int*));
        +
        +       for (i= 0; i<  syncState->traceNb; i++) {
        +               reductionData->totMessageArray[i]=
        +                       calloc(syncState->traceNb,
        sizeof(unsigned int));
        +       }
        +
        +       reductionData->totMSTMessageArray=
        malloc(syncState->traceNb * sizeof(unsigned int*));
        +
        +       for (i= 0; i<  syncState->traceNb; i++) {
        +               reductionData->totMSTMessageArray[i]=
        +                       calloc(syncState->traceNb,
        sizeof(unsigned int));
        +       }
        +
        +       reductionData->totMSTAnalysisArray=
        malloc(syncState->traceNb * sizeof(unsigned int*));
        +
        +       for (i= 0; i<  syncState->traceNb; i++) {
        +               reductionData->totMSTAnalysisArray[i]=
        +                       calloc(syncState->traceNb,
        sizeof(unsigned int));
        +       }
        +
        +       reductionData->maxRoot= malloc(syncState->traceNb *
        sizeof(int*));
        +       reductionData->maxRootMST= malloc(syncState->traceNb *
        sizeof(int*));
        +}
        +
        +static void preProcessReductionTime(SyncState* const syncState)
        +{
        +
        +       sumCommunicationPackets(syncState);
        +       maximumCommunicationPacket(syncState);
        +       maximumSpanningTree(syncState);
        +       floodingSyncExecute(syncState);
        +}
        +/* Finding best path in the network that has more interaction
        among the
        + * nodes and synchronisation trough this path
        + *
        + * Maximum Spanning Tree function based on Prim's Algorithm
        + *
        + * The algorithm is implemented according to the code here:
        + * http://snippets.dzone.com/posts/show/4743
        + *
        +*/
        +static void maximumSpanningTree(SyncState* const syncState)
        +{
        +       int i, j;
        +       int total= 0;
        +       int treeSize, max;
        +       ReductionData* reductionData;
        +
        +       reductionData= syncState->reductionData;
        +
        +       /* Initialise distance with 0 */
        +       for (i= 0; i<  syncState->traceNb ; ++i)
        +               reductionData->distance[i]= 0;
        +
        +       /* Mark all nodes as NOT beeing in the maximum spanning
        tree */
        +       for (i= 0; i<  syncState->traceNb; ++i)
        +               reductionData->routed[i]= 0;
        +
        +       /* Add the first node to the tree */
        +       if (reductionData->referenceNode<  0) return;
        +       g_debug("Adding node %d\n", reductionData->referenceNode);
        +       reductionData->routed[reductionData->referenceNode]= 1;
        +       updateDistances(reductionData->referenceNode,
        reductionData, syncState);
        +
        +       for (treeSize= 1; treeSize<  syncState->traceNb;
        ++treeSize) {
        +               /* Find the node with the bigest distance to the
        tree */
        +               max= -1;
        +               for (i= 0; i<  syncState->traceNb; ++i)
        +                       if (!reductionData->routed[i])
        +                               if ((max == -1) ||
        (reductionData->distance[max]<  reductionData->distance[i]))
        +                                       max= i;
        +
        +               /* And add it */
        +
        reductionData->totMSTMessageArray[max][reductionData->neighbour[max]]=
        reductionData->distance[max];
        +
        reductionData->totMSTMessageArray[reductionData->neighbour[max]][max]=
        reductionData->distance[max];
        +
        +
        reductionData->totMSTAnalysisArray[max][reductionData->neighbour[max]]=
        reductionData->distance[max];
        +
        reductionData->totMSTAnalysisArray[reductionData->neighbour[max]][max]=
        reductionData->distance[max];
        +
        +               g_debug("Adding edge %i-MAX(%i)-D(%i)\n",
        reductionData->neighbour[max], max, reductionData->distance[max]);
        +               reductionData->routed[max]= 1;
        +               total+= reductionData->distance[max];
        +
        +               updateDistances(max, reductionData, syncState);
        +       }
        +
        +
        +       for ( i=0 ; i<  syncState->traceNb ; i++) {
        +               for (j= 0 ; j<  syncState->traceNb ; j++)
        +                       g_debug("\t%d ",
        reductionData->totMSTMessageArray[i][j]);
        +       g_debug("\n");
        +       }
        +       g_debug("Total distance: %d\n", total);
        +
        +}
        +
        +
        +void updateDistances(int target, ReductionData* reductionData,
        SyncState* const syncState) {
        +
        +       int i;
        +       g_debug("Update[%i]= ", target);
        +       for (i= 0; i<  syncState->traceNb; ++i) {
        +               if ((reductionData->totMessageArray[target][i]
        != 0)&&  (reductionData->distance[i]<=
        reductionData->totMessageArray[target][i])) {
        +                       if (reductionData->distance[i] !=
        reductionData->totMessageArray[target][i]) {
        +                               reductionData->distance[i]=
        reductionData->totMessageArray[target][i];
        +                               reductionData->neighbour[i]= target;
        +                       }
        +                       else if (reductionData->distance[i] ==
        reductionData->totMessageArray[target][i]&&  target ==
        reductionData->referenceNode) {
        +                               reductionData->distance[i]=
        reductionData->totMessageArray[target][i];
        +                               reductionData->neighbour[i]= target;
        +                       }
        +               }
        +               g_debug("%iD(%i)N(%i)", i,
        reductionData->distance[i], reductionData->neighbour[i]);
        +        }
        +
        
g_debug("\n------------------------------------------------------------\n");
        +}
        +
        +
        +static void maximumCommunicationPacket(SyncState* const syncState)
        +{
        +
        +       ReductionData* reductionData;
        +
        +       reductionData= syncState->reductionData;
        +       proposeRefNode(reductionData->totMessageArray,
        reductionData->maxRoot, syncState->traceNb);
        +       reductionData->referenceNode= reductionData->maxRoot[0];
        +       g_debug("Best RefNODE is : %i\n",
        reductionData->referenceNode);
        +
        +}
        +
        +/* Symmetrical matrix of packet communications */
        +
        +void sumCommunicationPackets(SyncState* const syncState) {
        +       int i, j, sum;
        +
        +       MatchingDataTCP* matchingData;
        +
        +       matchingData= syncState->matchingData;
        +
        +       ReductionData* reductionData;
        +
        +       reductionData= syncState->reductionData;
        +
        +
        +       for (i= 0; i<  syncState->traceNb; ++i)
        +               for (j= 0; j<  syncState->traceNb; ++j){
        +                       sum=
        matchingData->stats->totMessageArray[i][j] +
        matchingData->stats->totMessageArray[j][i];
        +                       reductionData->totMessageArray[i][j]= sum;
        +                       reductionData->totMessageArray[j][i]= sum;
        +               }
        +
        +}
        +
        +/* Finding the node who has more interaction */
        +
        +void proposeRefNode(int** connectionArray, int* refNodes, int n){
        +
        +       int* sumPacket;
        +       int i, j, index=0;
        +       int maxSumPacket=0;
        +
        +       sumPacket= malloc(n* sizeof(int));
        +
        +       for (i= 0; i<  n; ++i) {
        +               sumPacket[i]= 0;
        +               for (j= 0;j<  n; ++j)
        +                       sumPacket[i]+= connectionArray[i][j];
        +               g_debug("sum(%i)=%i\n", i, sumPacket[i]);
        +               if (sumPacket[i]>  maxSumPacket)
        +                       maxSumPacket= sumPacket[i];
        +       }
        +       g_debug("proposed Reference Node: ");
        +       for (i= 0 ; i<  n ; ++i) {
        +               if (sumPacket[i] == maxSumPacket) {
        +                       refNodes[index++]= i;
        +                       g_debug("%i \t", i);
        +               }
        +       }
        +       refNodes[index]= -1;
        +       g_debug("\n");
        +       free(sumPacket);
        +}
        +
        +void floodingSyncExecute(SyncState* const syncState) {
        +
        +       ReductionData* reductionData;
        +       reductionData= syncState->reductionData;
        +       SyncExecute(reductionData->referenceNode, syncState);
        +
        +}
        +
        +
        +/* First root and all of nodes who has connection with it (it's
        childs)
        + * must be synchronised then function will be run for the
        childs recursively
        + */
        +
        +void SyncExecute(int refNode, SyncState* const syncState) {
        +
        +       unsigned int i, j;
        +       GQueue* packetSync;
        +       Message* packet;
        +       Exchange* exchange;
        +
        +       ReductionData* reductionData;
        +
        +       reductionData= syncState->reductionData;
        +
        +       MatchingDataTCP* matchingData;
        +
        +       matchingData= syncState->matchingData;
        +
        +       for (i= 0 ; i<  syncState->traceNb ; ++i)
        +               if
        (reductionData->totMSTMessageArray[refNode][i] != 0){
        +                       packetSync=
        matchingData->packetArray[refNode][i];
        +
        +                       for (j= 0 ; j<  packetSync->length ; ++j){
        +                               if
        (syncState->analysisModule->analyzeMessage != NULL) {
        +                                       packet=
        g_queue_peek_nth(packetSync, j);
        +
        syncState->analysisModule->analyzeMessage(syncState, packet);
        +                               }
        +                               else {
        +                                       exchange=
        g_queue_peek_nth(packetSync, j);
        +
        syncState->analysisModule->analyzeExchange(syncState, exchange);
        +                               }
        +                       }
        +
        +                       packetSync=
        matchingData->packetArray[i][refNode];
        +
        +                       for (j= 0 ;j<  packetSync->length ; ++j){
        +                               if
        (syncState->analysisModule->analyzeMessage != NULL) {
        +                                       packet=
        g_queue_peek_nth(packetSync, j);
        +
        syncState->analysisModule->analyzeMessage(syncState, packet);
        +                               }
        +                               else {
        +                                       exchange=
        g_queue_peek_nth(packetSync, j);
        +
        syncState->analysisModule->analyzeExchange(syncState, exchange);
        +                               }
        +                       }
        +
        +
        reductionData->totMSTMessageArray[i][refNode]= 0;
        +
        reductionData->totMSTMessageArray[refNode][i]= 0;
        +                       SyncExecute(i, syncState);
        +               }
        +       return;
        +}
        +/*
        + * Reduction destroy function
        + *
        + * Free the analysis specific data structures
        + *
        + * Args:
        + *   syncState     container for synchronization data.
        + */
        +static void destroyReductionTime(SyncState* const syncState)
        +{
        +       int i;
        +       ReductionData* reductionData;
        +       reductionData= (ReductionData*) syncState->reductionData;
        +
        +       if (reductionData == NULL) return;
        +
        +       for (i= 0; i<  syncState->traceNb; i++) {
        +               free(reductionData->totMessageArray[i]);
        +               free(reductionData->totMSTMessageArray[i]);
        +               free(reductionData->totMSTAnalysisArray[i]);
        +       }
        +       free(reductionData->totMessageArray);
        +       free(reductionData->totMSTMessageArray);
        +       free(reductionData->totMSTAnalysisArray);
        +       free(reductionData->routed);
        +       free(reductionData->distance);
        +       free(reductionData->neighbour);
        +       free(reductionData->maxRoot);
        +       free(reductionData->maxRootMST);
        +       free(syncState->reductionData);
        +       syncState->reductionData= NULL;
        +}
        +
        +
        +/*
        + * Finalize the factor reduction
        + *
        + * Calculate a resulting offset and drift for each trace.
        + *
        + * Args:
        + *   syncState     container for synchronization data.
        + *   allFactors    offset and drift between each pair of traces
        + *
        + * Returns:
        + *   Factors[traceNb] synchronization factors for each trace
        +
        + */
        +static GArray* finalizeReductionTime(SyncState* const syncState,
        +       AllFactors* allFactors)
        +{
        +       int i, j;
        +       GArray* factors;
        +
        +       factors= g_array_sized_new(FALSE, FALSE, sizeof(Factors),
        +               syncState->traceNb);
        +       g_array_set_size(factors, syncState->traceNb);
        +       for (i= 0; i<  syncState->traceNb; i++) {
        +               getFactors(syncState, allFactors,
        i,&g_array_index(factors,
        +                       Factors, i));
        +       }
        +
        +       return factors;
        +}
        +
        +
        +/*
        + * Print statistics related to reduction. Must be called after
        + * finalizeReduction.
        + *
        + * Args:
        + *   syncState     container for synchronization data.
        + */
        +static void printReductionStatsTime(SyncState* const syncState)
        +{
        +}
        +
        +/*
        + * Cummulate the time correction factors to convert a node's
        time to its
        + * reference's time.
        + * This function recursively calls itself until it reaches the
        reference node.
        + *
        + * Args:
        + *   allFactors:   offset and drift between each pair of traces
        + *   predecessors: matrix of each node's predecessor on the
        shortest
        + *                 path between two nodes
        + *   references:   reference node for each node
        + *   traceNum:     node for which to find the factors
        + *   factors:      resulting factors
        + */
        +static void getFactors(SyncState* const syncState, AllFactors*
        const allFactors, const unsigned int traceNum,
        +       Factors* const factors)
        +{
        +       unsigned int reference;
        +       PairFactors** const pairFactors= allFactors->pairFactors;
        +       int parent, i;
        +
        +       ReductionData* reductionData;
        +       reductionData= syncState->reductionData;
        +
        +       if (traceNum == reductionData->referenceNode)
        +               reference= traceNum;
        +       else if
        
(reductionData->totMSTAnalysisArray[reductionData->referenceNode][traceNum]
        != 0)
        +               reference= reductionData->referenceNode;
        +       else
        +               reference= findParent(syncState, -1, traceNum);
        +
        +       if (reference == traceNum) {
        +               factors->offset= 0.;
        +               factors->drift= 1.;
        +       }
        +       else {
        +               Factors previousVertexFactors;
        +
        +               getFactors(syncState, allFactors,
        reference,&previousVertexFactors);
        +
        +               /* Convert the time from traceNum to reference;
        +                * pairFactors[row][col] converts the time from
        col to row, invert the
        +                * factors as necessary */
        +
        +               if (pairFactors[reference][traceNum].approx !=
        NULL) {
        +                       factors->offset=
        previousVertexFactors.drift *
        +
        pairFactors[reference][traceNum].approx->offset +
        +                               previousVertexFactors.offset;
        +                       factors->drift=
        previousVertexFactors.drift *
        +
        pairFactors[reference][traceNum].approx->drift;
        +               }
        +               else if (pairFactors[traceNum][reference].approx
        != NULL) {
        +                       factors->offset=
        previousVertexFactors.drift * (-1. *
        +
        pairFactors[traceNum][reference].approx->offset /
        +
        pairFactors[traceNum][reference].approx->drift) +
        +                               previousVertexFactors.offset;
        +                       factors->drift=
        previousVertexFactors.drift * (1. /
        +
        pairFactors[traceNum][reference].approx->drift);
        +               }
        +               else {
        +                       g_assert_not_reached();
        +               }
        +       }
        +}
        +
        +int findParent(SyncState* const syncState, int root, int node)
        +{
        +
        +       int i;
        +       int result;
        +       ReductionData* reductionData;
        +
        +       reductionData= syncState->reductionData;
        +       for (i= 0; i<  syncState->traceNb; i++)
        +               if (reductionData->totMSTAnalysisArray[node][i]
        != 0&&  i == reductionData->referenceNode)
        +                       return 1;
        +
        +       for (i= 0; i<  syncState->traceNb; i++)
        +               if (reductionData->totMSTAnalysisArray[node][i]
        != 0)
        +                       if (i != root) {
        +                               result= findParent(syncState,
        node, i);
        +                               if (result == 1&&  root == -1)
        return i;
        +                               if (result == 1) return 1;
        +                       }
        +       return 0;
        +}
        diff --git a/lttv/lttv/sync/factor_reduction_time.h
        b/lttv/lttv/sync/factor_reduction_time.h
        new file mode 100644
        index 0000000..c46cbd1
        --- /dev/null
        +++ b/lttv/lttv/sync/factor_reduction_time.h
        @@ -0,0 +1,53 @@
        +/* This file is part of the Linux Trace Toolkit viewer
        + * Copyright (C) 2010 Masoume
        Jabbarifar<[email protected]
        <mailto:[email protected]>>
        + *
        + * This program is free software: you can redistribute it
        and/or modify it
        + * under the terms of the GNU Lesser General Public License as
        published by
        + * the Free Software Foundation, either version 2.1 of the
        License, or (at
        + * your option) any later version.
        + *
        + * This program is distributed in the hope that it will be
        useful, but WITHOUT
        + * ANY WARRANTY; without even the implied warranty of
        MERCHANTABILITY or
        + * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser
        General Public
        + * License for more details.
        + *
        + * You should have received a copy of the GNU Lesser General
        Public License
        + * along with this program.  If not,
        see<http://www.gnu.org/licenses/>.
        + */
        +
        +#ifndef FACTOR_REDUCTION_TIME_H
        +#define FACTOR_REDUCTION_TIME_H
        +
        +#include<glib.h>
        +
        +#include "data_structures.h"
        +
        +
        +typedef struct {
        +
        +       char* routed;   /* routed[i] is 1 when we have
        concidered the node i in the maximum
        +                       spanning tree; otherwise it is 0 */
        +
        +       int* distance;  /* distance[i] is the distance between
        node i and the maximum
        +                       spanning tree; this is initially 0; if i is
        +                       already in the tree, then d[i] is undefined;
        +                       this is just a temporary variable. It's
        not necessary but speeds
        +                       up execution considerably (by a factor
        of n) */
        +
        +       int* neighbour; /* neighbour[i] holds the index of the
        node i would have to be
        +                       linked to in order to get a distance of
        d[i] */
        +
        +       int referenceNode; /*referenceNode is one of networks
        node whose time will be
        +                          considered as reference time*/
        +
        +       unsigned int** totMessageArray;
        +       unsigned int** totMSTMessageArray;  /*Maximum spanning
        tree is saved in totMSTMessageArray*/
        +       unsigned int** totMSTAnalysisArray;
        +       int* maxRoot;
        +       int* maxRootMST;
        +
        +} ReductionData;
        +
        +void registerReductionTime();
        +
        +#endif
        diff --git a/lttv/lttv/sync/sync_chain_lttv.c
        b/lttv/lttv/sync/sync_chain_lttv.c
        index 95bef44..c60e8ac 100644
        --- a/lttv/lttv/sync/sync_chain_lttv.c
        +++ b/lttv/lttv/sync/sync_chain_lttv.c
        @@ -45,6 +45,7 @@
          #include "event_analysis_linreg.h"
          #include "event_analysis_eval.h"
          #include "factor_reduction_accuracy.h"
        +#include "factor_reduction_time.h"
          #include "sync_chain.h"
          #include "sync_chain_lttv.h"

        @@ -139,6 +140,7 @@ static void init()
                registerAnalysisEval();

                registerReductionAccuracy();
        +       registerReductionTime();

                // Build module names lists for option and help string
                for (i= 0; i<  ARRAY_SIZE(loopValues); i++)
        @@ -313,6 +315,10 @@ bool syncTraceset(LttvTracesetContext*
        const traceSetContext)
                lttv_process_traceset_middle(traceSetContext,
        ltt_time_infinite,
                        G_MAXULONG, NULL);
                lttv_process_traceset_seek_time(traceSetContext,
        ltt_time_zero);
        +
        +       // Find the best refrence node and remove the useless
        sunchronization
        +       if (syncState->reductionModule->preProcessReduction != NULL)
        +
        syncState->reductionModule->preProcessReduction(syncState);

                // Obtain, reduce, adjust and set correction factors
                allFactors=
        syncState->processingModule->finalizeProcessing(syncState);
        diff --git a/lttv/lttv/sync/sync_chain_unittest.c
        b/lttv/lttv/sync/sync_chain_unittest.c
        index 40302a0..e76fa78 100644
        --- a/lttv/lttv/sync/sync_chain_unittest.c
        +++ b/lttv/lttv/sync/sync_chain_unittest.c
        @@ -42,6 +42,7 @@
          #include "event_analysis_linreg.h"
          #include "event_analysis_eval.h"
          #include "factor_reduction_accuracy.h"
        +#include "factor_reduction_time.h"
          #include "sync_chain.h"


        @@ -134,6 +135,7 @@ int main(const int argc, char* const argv[])
                registerAnalysisEval();

                registerReductionAccuracy();
        +       registerReductionTime();

                // Initialize data structures
                syncState= malloc(sizeof(SyncState));


    _______________________________________________
    ltt-dev mailing list
    [email protected] <mailto:[email protected]>
    http://lists.casi.polymtl.ca/cgi-bin/mailman/listinfo/ltt-dev



_______________________________________________
ltt-dev mailing list
[email protected]
http://lists.casi.polymtl.ca/cgi-bin/mailman/listinfo/ltt-dev

Reply via email to