/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.mapreduce.lib.input;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.LineReader;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.Log;

/**
 * Treats keys as offset in file and value as line. 
 */
public class LineRecordReader extends RecordReader<LongWritable, Text> {
  private static final Log LOG = LogFactory.getLog(LineRecordReader.class);

  private CompressionCodecFactory compressionCodecs = null;
  private long start;
  private long pos;
  private long end;
  private LineReader in;
  
  private int maxLineLength;
  
  private LongWritable key = null;
  private Text value = null;

  private long start2;
  private long pos2;
  private long end2;
  private LineReader in2;

  
  private LongWritable key2 = null;
  private Text value2 = null;


  public void initialize(InputSplit genericSplit,
                         TaskAttemptContext context) throws IOException {
    FileSplit split = (FileSplit) genericSplit;
    Configuration job = context.getConfiguration();
    this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",    
                              Integer.MAX_VALUE);

    System.out.println("inside LineRecordReader");

    //orginal code
    //start = split.getStart();
    //end = start + split.getLength();
    
    //set the start of first reader to be the original start
    //end to be the first half of the document
    start = split.getStart();
    end = start + split.getLength()/2;

    //the start of the second reader continues from the last line of first reader
    //ends at the end of the original split
    start2 = start + split.getLength()/2;
    end2 = start + split.getLength();


    final Path file = split.getPath();
    compressionCodecs = new CompressionCodecFactory(job);
    final CompressionCodec codec = compressionCodecs.getCodec(file);

    // open the file and seek to the start of the split
    FileSystem fs = file.getFileSystem(job);
    FSDataInputStream fileIn = fs.open(split.getPath());

    //open the file again for second reader, and seek to start2
    FSDataInputStream fileIn2 = fs.open(split.getPath());
    fileIn2.seek(start2);


    boolean skipFirstLine = false;
    if (codec != null) {
      in = new LineReader(codec.createInputStream(fileIn), job);

      in2 = new LineReader(codec.createInputStream(fileIn2), job);
      
      //???not sure what to do with this case
      //TODO: figure this case out
      end = Long.MAX_VALUE;
    } else {
      if (start != 0) {
        skipFirstLine = true;
        --start;
        fileIn.seek(start);
      }
      in = new LineReader(fileIn, job);
      in2 = new LineReader(fileIn2, job);
    }
    if (skipFirstLine) {  // skip first line and re-establish "start".
      start += in.readLine(new Text(), 0,
                           (int)Math.min((long)Integer.MAX_VALUE, end - start));
      //not sure what to do with filein2
      //TODO: figure this out

    }
    this.pos = start;
    this.pos2 = start2;
  }
  

  public boolean nextKeyValue() throws IOException {
    if (key == null) {
      key = new LongWritable();
    }
    key.set(pos);
    if (value == null) {
      value = new Text();
    }
    int newSize = 0;
    while (pos < end) {
      newSize = in.readLine(value, maxLineLength,
                            Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),
                                     maxLineLength));
      if (newSize == 0) {
        break;
      }
      pos += newSize;
      if (newSize < maxLineLength) {
        break;
      }

      // line too long. try again
      LOG.info("Skipped line of size " + newSize + " at pos " + 
               (pos - newSize));
    }
    if (newSize == 0) {
      key = null;
      value = null;
      return false;
    } else {
      return true;
    }
  }


    //create a method for checking if key,val pair available for reader2
  public boolean nextKeyValue2() throws IOException {
    if (key2 == null) {
      key2 = new LongWritable();
    }
    key2.set(pos2);
    if (value2 == null) {
      value2 = new Text();
    }
    int newSize = 0;
    while (pos2 < end2) {
      newSize = in2.readLine(value2, maxLineLength,
                            Math.max((int)Math.min(Integer.MAX_VALUE, end2-pos2),
                                     maxLineLength));
      if (newSize == 0) {
        break;
      }
      pos2 += newSize;
      if (newSize < maxLineLength) {
        break;
      }

      // line too long. try again
      LOG.info("Skipped line of size " + newSize + " at pos " + 
               (pos2 - newSize));
    }
    if (newSize == 0) {
      key2 = null;
      value2 = null;
      return false;
    } else {
      return true;
    }
  }

  @Override
  public LongWritable getCurrentKey() {
    return key;
  }

  @Override
  public Text getCurrentValue() {
    return value;
  }


  @Override
  public LongWritable getCurrentKey2() throws IOException, InterruptedException {
    return key2;
  }

  @Override
  public Text getCurrentValue2() throws IOException, InterruptedException{
    return value2;
  }


    /**                                                                                                                          
     * Get the current key                                                                                                       
     * @return the current key or null if there is no current key                                                                
     * @throws IOException                                                                                                       
     * @throws InterruptedException                                                                                              
     */
  @Override
  public
      LongWritable getCurrentKeyCopy() throws IOException, InterruptedException{
      return new LongWritable(key.get());
  };

    /**                                                                                                                          
     * Get the current value.                                                                                                    
     * @return the object that was read                                                                                          
     * @throws IOException                                                                                                       
     * @throws InterruptedException                                                                                              
     */
  public
      Text getCurrentValueCopy() throws IOException, InterruptedException{
      return new Text(value.toString());
  }




  /**
   * Get the progress within the split
   */
  public float getProgress() {
    if (start == end) {
      return 0.0f;
    } else {
      return Math.min(1.0f, (pos - start) / (float)(end - start));
    }
  }
  
  public synchronized void close() throws IOException {
    if (in != null) {
      in.close();
      in2.close();
    }
  }
}
