import static org.junit.Assert.assertEquals;

import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.Writer;
import org.apache.hadoop.io.serializer.JavaSerialization;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.rdd.HadoopRDD;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import scala.Tuple2;

/**
 * Reproduces what looks like a bug in {@link HadoopRDD} compute() method, where mutable keys are created once and
 * reused for each tuple in the iterator, causing a number of issues demonstrated by the corresponding test methods.
 * 
 * @author Sergey Parhomenko
 */
public class SparkBugTest {
	private static final int DIFFERENT_KEYS = 10;

	// Requires a formatted Hadoop HDFS running on port 9000
	private static final String HDFS_SERVER = "hdfs://localhost:9000";
	private static final String HDFS_FILE_PATH = HDFS_SERVER + "/test.txt";

	private Configuration conf;
	private FileSystem hdfs;
	private Path file;
	private JavaSparkContext sc;

	@Before
	public void setUp() throws IOException, URISyntaxException {
		conf = new Configuration();
		hdfs = FileSystem.get(new URI(HDFS_SERVER), conf);
		file = new Path(HDFS_FILE_PATH);
		hdfs.create(file);

		sc = new JavaSparkContext("local", "Test");
	}

	@After
	public void tearDown() throws IOException {
		if (hdfs.exists(file)) {
			hdfs.delete(file, true);
		}
		hdfs.close();
		sc.stop();
	}

	@Test
	public void testJavaSerialization() throws IOException {
		conf.set("io.serializations", JavaSerialization.class.getName());
		try (Writer writer = SequenceFile.createWriter(conf, Writer.file(file), Writer.keyClass(String.class),
				Writer.valueClass(String.class))) {
			for (int i = 0; i < DIFFERENT_KEYS; i++) {
				String s = Integer.toString(i);
				writer.append(s, s);
			}
		}

		sc.hadoopConfiguration().set("io.serializations", JavaSerialization.class.getName());
		assertEquals(DIFFERENT_KEYS, sc.sequenceFile(HDFS_FILE_PATH, String.class, String.class).collectAsMap().size());
	}

	private static class SerializableIntWriteable extends IntWritable implements Serializable {
	}

	@Test
	public void testWriteableSerialization() throws IOException {
		try (Writer writer = SequenceFile.createWriter(conf, Writer.file(file),
				Writer.keyClass(SerializableIntWriteable.class), Writer.valueClass(SerializableIntWriteable.class))) {
			SerializableIntWriteable writeable = new SerializableIntWriteable();
			for (int i = 0; i < DIFFERENT_KEYS; i++) {
				writeable.set(i);
				writer.append(writeable, writeable);
			}
		}

		JavaPairRDD<SerializableIntWriteable, SerializableIntWriteable> pairs = sc.sequenceFile(HDFS_FILE_PATH,
				SerializableIntWriteable.class, SerializableIntWriteable.class);
		assertEquals(DIFFERENT_KEYS, pairs.collectAsMap().size());
	}

	@Test
	public void testSortKeys() throws IOException {
		try (Writer writer = SequenceFile.createWriter(conf, Writer.file(file),
				Writer.keyClass(SerializableIntWriteable.class), Writer.valueClass(SerializableIntWriteable.class))) {
			SerializableIntWriteable writeable = new SerializableIntWriteable();
			for (int i = 0; i < DIFFERENT_KEYS; i++) {
				writeable.set(i);
				writer.append(writeable, writeable);
			}
		}

		// when properly sorted in descending order, first key should be DIFFERENT_KEYS - 1
		JavaPairRDD<SerializableIntWriteable, SerializableIntWriteable> sortedEntries = sc.sequenceFile(HDFS_FILE_PATH,
				SerializableIntWriteable.class, SerializableIntWriteable.class).sortByKey(false);
		Tuple2<SerializableIntWriteable, SerializableIntWriteable> firstItem = sortedEntries.collect().get(0);
		assertEquals(DIFFERENT_KEYS - 1, firstItem._1.get());
	}
}
