[ 
https://issues.apache.org/jira/browse/HADOOP-15207?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steve Loughran updated HADOOP-15207:
------------------------------------
    Description: 
I am doing a hadoop project where I am working with 100MB, 500MB, 1GB files. A 
multinode hadoop cluster with 4 nodes is implemented for the purpose. The time 
taken for running the mapreduce program in multinode cluster is much larger 
than the time taken in running single node cluster setup. Also, it is shocking 
to observe that the basic Java program(without finishes the operation faster 
than both the single and multi node clusters. Here is the code for the mapper 
class:

 
{code:java}
public class myMapperClass extends MapReduceBase implements 
Mapper<LongWritable, Text, Text, IntWritable>
{

 private final static IntWritable one = new IntWritable(1);
 private final static IntWritable two = new IntWritable(2);
 private final static IntWritable three = new IntWritable(3);
 private final static IntWritable four = new IntWritable(4);
 private final static IntWritable five = new IntWritable(5);
 private final static IntWritable six = new IntWritable(6);
 private final static IntWritable seven = new IntWritable(7);
 private final static IntWritable eight = new IntWritable(8);
 private final static IntWritable nine= new IntWritable(9);

  private Text srcIP,srcIPN;
  private Text dstIP,dstIPN;
  private Text srcPort,srcPortN;
  private Text dstPort,dstPortN;
  private Text counter1,counter2,counter3,counter4,counter5 ;
  //private Text total_records;

  int ddos_line = 0;
  //map method that performs the tokenizer job and framing the initial key 
value pairs
  @Override
public void map(LongWritable key, Text value, OutputCollector<Text, 
IntWritable> output, Reporter reporter) throws IOException
  {
        String line1 = value.toString();
        ddos_line++;
        int pos1=0;
        int lineno=0;

        int[] count = {100000, 100000, 100000, 100000, 100000};
        int[] lineIndex = {0, 0, 0, 0, 0};

        for(int i=0;i<9;i++)
        {
            pos1 = line1.indexOf("|",pos1+1);
        }

        srcIP =  new Text( line1.substring(0,line1.indexOf("|")) );
        String srcIPP = srcIP.toString();
        dstIP = new Text(line1.substring( 
srcIPP.length()+1,line1.indexOf("|",srcIPP.length()+1)) ) ;

        srcPort = new Text( line1.substring(pos1+1,line1.indexOf("|",pos1+1)) );
        pos1 = line1.indexOf("|",pos1+1);
        dstPort = new Text( line1.substring(pos1+1,line1.indexOf("|",pos1+1)) );

        //BufferedReader br = new BufferedReader(new 
FileReader("/home/yogi/Desktop/normal_small"));            

        FileSystem fs = FileSystem.get(new Configuration());            
        FileStatus[] status = fs.listStatus(new 
Path("hdfs://master:54310/usr/local/hadoop/input/normal_small"));            
        BufferedReader br=new BufferedReader(new 
InputStreamReader(fs.open(status[0].getPath())));           

        String line=br.readLine();

        lineno++;
        boolean bool = true;
        while (bool) {

            for(int i=0; i<5;i++)
            {
                if(bool==false)
                    break;
                int pos=0;
                int temp;
                for(int j=0;j<9;j++)
                {
                    pos = line.indexOf("|",pos+1);
                }


                srcIPN =  new Text( line.substring(0,line.indexOf("|")) );
                String srcIPP2 = srcIPN.toString();
                dstIPN = new Text(line.substring( 
srcIPP2.length()+1,line.indexOf("|",srcIPP2.length()+1)) ) ;

                srcPortN = new Text( 
line.substring(pos+1,line.indexOf("|",pos+1)) );
                pos = line.indexOf("|",pos+1);
                dstPortN = new Text( 
line.substring(pos+1,line.indexOf("|",pos+1)) );


                if(srcIP.equals(srcIPN) && dstIP.equals(dstIPN))
                {
                    int tmp, tmp2;

                    tmp = Integer.parseInt(srcPort.toString()) - 
Integer.parseInt(srcPortN.toString());
                    if(tmp<0)
                        tmp*=-1;

                    tmp2 = Integer.parseInt(dstPort.toString()) - 
Integer.parseInt(dstPortN.toString());
                    if(tmp2<0)                      
                        tmp2*=-1;

                    temp=tmp+tmp2;


                    if(count[4] > temp)
                    {
                        count[4] = temp;
                        lineIndex[4]=lineno;
                    } 


                    for(int k=0;k<5;k++)
                    {
                        for(int j=0;j<4;j++)
                        {   
                            if(count[j] > count[j+1]) 
                            {
                                int temp2 = count[j+1];
                                count[j+1] = count[j];
                                count[j] = temp2;

                                int temp3 = lineIndex[j+1];
                                lineIndex[j+1] = lineIndex[j];
                                lineIndex[j] = temp3;
                            }
                        }
                    }



                }

                if ((line = br.readLine()) != null )
                {
                    lineno++;
                    continue;
                } 

                else 
                    bool = false;
            }


        }
        br.close();         



        counter1 = new Text(count[0]+" "+lineIndex[0]+":"+ddos_line);
        counter2 = new Text(count[1]+" "+lineIndex[1]+":"+ddos_line);
        counter3 = new Text(count[2]+" "+lineIndex[2]+":"+ddos_line);
        counter4 = new Text(count[3]+" "+lineIndex[3]+":"+ddos_line);
        counter5 = new Text(count[4]+" "+lineIndex[4]+":"+ddos_line);




      output.collect(srcIP, one);
      output.collect(dstIP, two);
      output.collect(srcPort, three);
      output.collect(dstPort, four);
      output.collect(counter1, five);
      output.collect(counter2, six);
      output.collect(counter3, seven);
      output.collect(counter4, eight);
      output.collect(counter5, nine);
     // output.collect(total_records, total);

      //iterating through all the words available in that line and forming the 
key value pair

   }
{code}
Kindly mention as to what is the reason behind this anomaly.

P.S There are no issues with the configuration of the multinode hadoop cluster.

Edit: I would like to explain what I am doing in this mapper class. I am having 
a 100KB file of normal flow record data which contains 1000 flows. There is 
another flow record of 100MB which contains 1,000,000 flows in it. For every 
flow from the 100MB file, I am checking(comparing the src IP's, port no.s and 
the Dst IP's and port no.s) with all of the flows from the 100KB file(to select 
the closest five flows from the 100KB file for every flow from the 100MB file). 
Thus, I am performing a total of 1000*1,000,000 similarity check operations. I 
would like to further stress that both of these files are stored in HDFS and 
not local system. Kindly suggest as to what is wrong with the code.

Thanks

Nicolewells

 

 

  was:
I am doing a hadoop project where I am working with 100MB, 500MB, 1GB files. A 
multinode hadoop cluster with 4 nodes is implemented for the purpose. The time 
taken for running the mapreduce program in multinode cluster is much larger 
than the time taken in running single node cluster setup. Also, it is shocking 
to observe that the basic Java program(without [Hadoop 
BigData|https://mindmajix.com/hadoop-training]) finishes the operation faster 
than both the single and multi node clusters. Here is the code for the mapper 
class:

 
{code:java}
public class myMapperClass extends MapReduceBase implements 
Mapper<LongWritable, Text, Text, IntWritable>
{

 private final static IntWritable one = new IntWritable(1);
 private final static IntWritable two = new IntWritable(2);
 private final static IntWritable three = new IntWritable(3);
 private final static IntWritable four = new IntWritable(4);
 private final static IntWritable five = new IntWritable(5);
 private final static IntWritable six = new IntWritable(6);
 private final static IntWritable seven = new IntWritable(7);
 private final static IntWritable eight = new IntWritable(8);
 private final static IntWritable nine= new IntWritable(9);

  private Text srcIP,srcIPN;
  private Text dstIP,dstIPN;
  private Text srcPort,srcPortN;
  private Text dstPort,dstPortN;
  private Text counter1,counter2,counter3,counter4,counter5 ;
  //private Text total_records;

  int ddos_line = 0;
  //map method that performs the tokenizer job and framing the initial key 
value pairs
  @Override
public void map(LongWritable key, Text value, OutputCollector<Text, 
IntWritable> output, Reporter reporter) throws IOException
  {
        String line1 = value.toString();
        ddos_line++;
        int pos1=0;
        int lineno=0;

        int[] count = {100000, 100000, 100000, 100000, 100000};
        int[] lineIndex = {0, 0, 0, 0, 0};

        for(int i=0;i<9;i++)
        {
            pos1 = line1.indexOf("|",pos1+1);
        }

        srcIP =  new Text( line1.substring(0,line1.indexOf("|")) );
        String srcIPP = srcIP.toString();
        dstIP = new Text(line1.substring( 
srcIPP.length()+1,line1.indexOf("|",srcIPP.length()+1)) ) ;

        srcPort = new Text( line1.substring(pos1+1,line1.indexOf("|",pos1+1)) );
        pos1 = line1.indexOf("|",pos1+1);
        dstPort = new Text( line1.substring(pos1+1,line1.indexOf("|",pos1+1)) );

        //BufferedReader br = new BufferedReader(new 
FileReader("/home/yogi/Desktop/normal_small"));            

        FileSystem fs = FileSystem.get(new Configuration());            
        FileStatus[] status = fs.listStatus(new 
Path("hdfs://master:54310/usr/local/hadoop/input/normal_small"));            
        BufferedReader br=new BufferedReader(new 
InputStreamReader(fs.open(status[0].getPath())));           

        String line=br.readLine();

        lineno++;
        boolean bool = true;
        while (bool) {

            for(int i=0; i<5;i++)
            {
                if(bool==false)
                    break;
                int pos=0;
                int temp;
                for(int j=0;j<9;j++)
                {
                    pos = line.indexOf("|",pos+1);
                }


                srcIPN =  new Text( line.substring(0,line.indexOf("|")) );
                String srcIPP2 = srcIPN.toString();
                dstIPN = new Text(line.substring( 
srcIPP2.length()+1,line.indexOf("|",srcIPP2.length()+1)) ) ;

                srcPortN = new Text( 
line.substring(pos+1,line.indexOf("|",pos+1)) );
                pos = line.indexOf("|",pos+1);
                dstPortN = new Text( 
line.substring(pos+1,line.indexOf("|",pos+1)) );


                if(srcIP.equals(srcIPN) && dstIP.equals(dstIPN))
                {
                    int tmp, tmp2;

                    tmp = Integer.parseInt(srcPort.toString()) - 
Integer.parseInt(srcPortN.toString());
                    if(tmp<0)
                        tmp*=-1;

                    tmp2 = Integer.parseInt(dstPort.toString()) - 
Integer.parseInt(dstPortN.toString());
                    if(tmp2<0)                      
                        tmp2*=-1;

                    temp=tmp+tmp2;


                    if(count[4] > temp)
                    {
                        count[4] = temp;
                        lineIndex[4]=lineno;
                    } 


                    for(int k=0;k<5;k++)
                    {
                        for(int j=0;j<4;j++)
                        {   
                            if(count[j] > count[j+1]) 
                            {
                                int temp2 = count[j+1];
                                count[j+1] = count[j];
                                count[j] = temp2;

                                int temp3 = lineIndex[j+1];
                                lineIndex[j+1] = lineIndex[j];
                                lineIndex[j] = temp3;
                            }
                        }
                    }



                }

                if ((line = br.readLine()) != null )
                {
                    lineno++;
                    continue;
                } 

                else 
                    bool = false;
            }


        }
        br.close();         



        counter1 = new Text(count[0]+" "+lineIndex[0]+":"+ddos_line);
        counter2 = new Text(count[1]+" "+lineIndex[1]+":"+ddos_line);
        counter3 = new Text(count[2]+" "+lineIndex[2]+":"+ddos_line);
        counter4 = new Text(count[3]+" "+lineIndex[3]+":"+ddos_line);
        counter5 = new Text(count[4]+" "+lineIndex[4]+":"+ddos_line);




      output.collect(srcIP, one);
      output.collect(dstIP, two);
      output.collect(srcPort, three);
      output.collect(dstPort, four);
      output.collect(counter1, five);
      output.collect(counter2, six);
      output.collect(counter3, seven);
      output.collect(counter4, eight);
      output.collect(counter5, nine);
     // output.collect(total_records, total);

      //iterating through all the words available in that line and forming the 
key value pair

   }
{code}
Kindly mention as to what is the reason behind this anomaly.

P.S There are no issues with the configuration of the multinode hadoop cluster.

Edit: I would like to explain what I am doing in this mapper class. I am having 
a 100KB file of normal flow record data which contains 1000 flows. There is 
another flow record of 100MB which contains 1,000,000 flows in it. For every 
flow from the 100MB file, I am checking(comparing the src IP's, port no.s and 
the Dst IP's and port no.s) with all of the flows from the 100KB file(to select 
the closest five flows from the 100KB file for every flow from the 100MB file). 
Thus, I am performing a total of 1000*1,000,000 similarity check operations. I 
would like to further stress that both of these files are stored in HDFS and 
not local system. Kindly suggest as to what is wrong with the code.

Thanks

Nicolewells

 

 


> Hadoop performance Issues
> -------------------------
>
>                 Key: HADOOP-15207
>                 URL: https://issues.apache.org/jira/browse/HADOOP-15207
>             Project: Hadoop Common
>          Issue Type: Bug
>          Components: common
>    Affects Versions: HADOOP-13345
>            Reporter: nicole wells
>            Priority: Minor
>
> I am doing a hadoop project where I am working with 100MB, 500MB, 1GB files. 
> A multinode hadoop cluster with 4 nodes is implemented for the purpose. The 
> time taken for running the mapreduce program in multinode cluster is much 
> larger than the time taken in running single node cluster setup. Also, it is 
> shocking to observe that the basic Java program(without finishes the 
> operation faster than both the single and multi node clusters. Here is the 
> code for the mapper class:
>  
> {code:java}
> public class myMapperClass extends MapReduceBase implements 
> Mapper<LongWritable, Text, Text, IntWritable>
> {
>  private final static IntWritable one = new IntWritable(1);
>  private final static IntWritable two = new IntWritable(2);
>  private final static IntWritable three = new IntWritable(3);
>  private final static IntWritable four = new IntWritable(4);
>  private final static IntWritable five = new IntWritable(5);
>  private final static IntWritable six = new IntWritable(6);
>  private final static IntWritable seven = new IntWritable(7);
>  private final static IntWritable eight = new IntWritable(8);
>  private final static IntWritable nine= new IntWritable(9);
>   private Text srcIP,srcIPN;
>   private Text dstIP,dstIPN;
>   private Text srcPort,srcPortN;
>   private Text dstPort,dstPortN;
>   private Text counter1,counter2,counter3,counter4,counter5 ;
>   //private Text total_records;
>   int ddos_line = 0;
>   //map method that performs the tokenizer job and framing the initial key 
> value pairs
>   @Override
> public void map(LongWritable key, Text value, OutputCollector<Text, 
> IntWritable> output, Reporter reporter) throws IOException
>   {
>         String line1 = value.toString();
>         ddos_line++;
>         int pos1=0;
>         int lineno=0;
>         int[] count = {100000, 100000, 100000, 100000, 100000};
>         int[] lineIndex = {0, 0, 0, 0, 0};
>         for(int i=0;i<9;i++)
>         {
>             pos1 = line1.indexOf("|",pos1+1);
>         }
>         srcIP =  new Text( line1.substring(0,line1.indexOf("|")) );
>         String srcIPP = srcIP.toString();
>         dstIP = new Text(line1.substring( 
> srcIPP.length()+1,line1.indexOf("|",srcIPP.length()+1)) ) ;
>         srcPort = new Text( line1.substring(pos1+1,line1.indexOf("|",pos1+1)) 
> );
>         pos1 = line1.indexOf("|",pos1+1);
>         dstPort = new Text( line1.substring(pos1+1,line1.indexOf("|",pos1+1)) 
> );
>         //BufferedReader br = new BufferedReader(new 
> FileReader("/home/yogi/Desktop/normal_small"));            
>         FileSystem fs = FileSystem.get(new Configuration());            
>         FileStatus[] status = fs.listStatus(new 
> Path("hdfs://master:54310/usr/local/hadoop/input/normal_small"));            
>         BufferedReader br=new BufferedReader(new 
> InputStreamReader(fs.open(status[0].getPath())));           
>         String line=br.readLine();
>         lineno++;
>         boolean bool = true;
>         while (bool) {
>             for(int i=0; i<5;i++)
>             {
>                 if(bool==false)
>                     break;
>                 int pos=0;
>                 int temp;
>                 for(int j=0;j<9;j++)
>                 {
>                     pos = line.indexOf("|",pos+1);
>                 }
>                 srcIPN =  new Text( line.substring(0,line.indexOf("|")) );
>                 String srcIPP2 = srcIPN.toString();
>                 dstIPN = new Text(line.substring( 
> srcIPP2.length()+1,line.indexOf("|",srcIPP2.length()+1)) ) ;
>                 srcPortN = new Text( 
> line.substring(pos+1,line.indexOf("|",pos+1)) );
>                 pos = line.indexOf("|",pos+1);
>                 dstPortN = new Text( 
> line.substring(pos+1,line.indexOf("|",pos+1)) );
>                 if(srcIP.equals(srcIPN) && dstIP.equals(dstIPN))
>                 {
>                     int tmp, tmp2;
>                     tmp = Integer.parseInt(srcPort.toString()) - 
> Integer.parseInt(srcPortN.toString());
>                     if(tmp<0)
>                         tmp*=-1;
>                     tmp2 = Integer.parseInt(dstPort.toString()) - 
> Integer.parseInt(dstPortN.toString());
>                     if(tmp2<0)                      
>                         tmp2*=-1;
>                     temp=tmp+tmp2;
>                     if(count[4] > temp)
>                     {
>                         count[4] = temp;
>                         lineIndex[4]=lineno;
>                     } 
>                     for(int k=0;k<5;k++)
>                     {
>                         for(int j=0;j<4;j++)
>                         {   
>                             if(count[j] > count[j+1]) 
>                             {
>                                 int temp2 = count[j+1];
>                                 count[j+1] = count[j];
>                                 count[j] = temp2;
>                                 int temp3 = lineIndex[j+1];
>                                 lineIndex[j+1] = lineIndex[j];
>                                 lineIndex[j] = temp3;
>                             }
>                         }
>                     }
>                 }
>                 if ((line = br.readLine()) != null )
>                 {
>                     lineno++;
>                     continue;
>                 } 
>                 else 
>                     bool = false;
>             }
>         }
>         br.close();         
>         counter1 = new Text(count[0]+" "+lineIndex[0]+":"+ddos_line);
>         counter2 = new Text(count[1]+" "+lineIndex[1]+":"+ddos_line);
>         counter3 = new Text(count[2]+" "+lineIndex[2]+":"+ddos_line);
>         counter4 = new Text(count[3]+" "+lineIndex[3]+":"+ddos_line);
>         counter5 = new Text(count[4]+" "+lineIndex[4]+":"+ddos_line);
>       output.collect(srcIP, one);
>       output.collect(dstIP, two);
>       output.collect(srcPort, three);
>       output.collect(dstPort, four);
>       output.collect(counter1, five);
>       output.collect(counter2, six);
>       output.collect(counter3, seven);
>       output.collect(counter4, eight);
>       output.collect(counter5, nine);
>      // output.collect(total_records, total);
>       //iterating through all the words available in that line and forming 
> the key value pair
>    }
> {code}
> Kindly mention as to what is the reason behind this anomaly.
> P.S There are no issues with the configuration of the multinode hadoop 
> cluster.
> Edit: I would like to explain what I am doing in this mapper class. I am 
> having a 100KB file of normal flow record data which contains 1000 flows. 
> There is another flow record of 100MB which contains 1,000,000 flows in it. 
> For every flow from the 100MB file, I am checking(comparing the src IP's, 
> port no.s and the Dst IP's and port no.s) with all of the flows from the 
> 100KB file(to select the closest five flows from the 100KB file for every 
> flow from the 100MB file). Thus, I am performing a total of 1000*1,000,000 
> similarity check operations. I would like to further stress that both of 
> these files are stored in HDFS and not local system. Kindly suggest as to 
> what is wrong with the code.
> Thanks
> Nicolewells
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to