package com.wordcount.example;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.Map;
import java.util.Scanner;

import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;

import org.w3c.dom.Document;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import org.xml.sax.SAXException;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

public class LineReaderSpout implements IRichSpout {
	private SpoutOutputCollector collector;
	private boolean completed = false;
	private TopologyContext context;
	//private String row;
	public xmlParser parser = new xmlParser();
	public File[] listOfFiles;
	String files;

	@Override
	public void open(Map conf, TopologyContext context,
			SpoutOutputCollector collector) {
		this.context = context;
		File folder = new File("/home/hduser/Desktop/xml_files/PRD_XML/");
		listOfFiles = folder.listFiles();

/*		for (int i = 0; i < listOfFiles.length; i++) {

			if (listOfFiles[i].isFile()) {
				files = listOfFiles[i].getName();
				//System.out.println(files);

				try {
					this.row = parser.parsingData(files);
					//System.out.println("rowvalue:" + row);
				} catch (ParserConfigurationException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		}*/
		this.collector = collector;
	}

	@Override
	public void nextTuple() {
		String row = null;
		if (completed) {
			try {
				Thread.sleep(1000);
			} catch (InterruptedException e) {

			}
		}
		for (int i = 0; i < listOfFiles.length; i++) {

			if (listOfFiles[i].isFile()) {
				files = listOfFiles[i].getName();
				//System.out.println(files);

				try {
					 row = parser.parsingData(files);
					//System.out.println("rowvalue:" + row);
					 this.collector.emit(new Values(row), row);
				} catch (ParserConfigurationException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		}
		

		completed = true;
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("line"));
	}

	@Override
	public void close() {
		/*
		 * try { fileReader.close(); } catch (IOException e) {
		 * e.printStackTrace(); }
		 */
	}

	public boolean isDistributed() {
		return false;
	}

	@Override
	public void activate() {
	}

	@Override
	public void deactivate() {
	}

	@Override
	public void ack(Object msgId) {
	}

	@Override
	public void fail(Object msgId) {
	}

	@Override
	public Map<String, Object> getComponentConfiguration() {
		return null;
	}
}
