/*
 * AvroUtil.java
 *
 * Product:  faNGS
 * Next Generation Sequencing
 *
 * Copyright 2007-2011, Strand Life Sciences
 * 5th Floor, Kirloskar Business Park, 
 * Bellary Road, Hebbal,
 * Bangalore 560024,
 * India
 * All rights reserved.
 *
 * This software is the confidential and proprietary information
 * of Strand Life Sciences., ("Confidential Information").  You
 * shall not disclose such Confidential Information and shall use
 * it only in accordance with the terms of the license agreement
 * you entered into with Strand Life Sciences.
 */
package com.strandgenomics.ngs.hadoop.common.io;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.avro.Schema;
import org.apache.avro.SchemaParseException;

import com.strandgenomics.cube.util.StringUtil;

public class AvroSchemaStore {
    /**
     * Extension for avro schema files.
     */
    private static final String AVRO_EXTENSION = ".avro";
    /**
     * The map from fully qualified schema name to the schema
     */
    private static Map<String, Schema> schemas = new HashMap<String, Schema>();
    
    /**
     * Return the schema corresponding to the fully qualified schema name
     * 
     * @param fullSchemaName
     *            fully qualified schema name
     * @return the avro schema. Returns <code>null</code> if the schema is not
     *         found
     * @throws IOException
     */
    public static Schema getSchema(String fullSchemaName) throws IOException {
        Schema schema = schemas.get(fullSchemaName);
        if (schema == null) {
            schema = addSchema(AvroSchemaStore.class
                    .getResourceAsStream("/" + fullSchemaName.replaceAll("\\.", "/") + AVRO_EXTENSION));
        }
        return schema;
    }
    
    /**
     * Replace references to nested type references by the nested type's schema
     * string.
     * 
     * @param schemaString
     */
    private static String resolveSchema(String schemaString) {
        
        String result = schemaString;
        for (Map.Entry<String, Schema> entry : schemas.entrySet())
            result = replace(result, "\"" + entry.getKey() + "\"", 
                                    entry.getValue().toString());
        return result;
        
    }
    
    /**
     * Utility string replace function that used a string buffer to do the
     * replacement.
     */
    private static String replace(String str, String pattern, String replace) {
        
        int s = 0;
        int e = 0;
        StringBuffer result = new StringBuffer();
        while ((e = str.indexOf(pattern, s)) >= 0) {
            result.append(str.substring(s, e));
            result.append(replace);
            s = e + pattern.length();
            
        }
        result.append(str.substring(s));
        return result.toString();
        
    }
    
    /**
     * Add the schema to the store
     * 
     * @param schemaString
     * @return the parsed schema
     * @throws IOException
     */
    public static Schema addSchema(String schemaString) throws IOException {
        String lastUnresolvedSchemaName = "";
        String currentUnresolvedSchemaName = null;
        
        Schema schema = null;
        while (!StringUtil.safeEquals(lastUnresolvedSchemaName, currentUnresolvedSchemaName)) {
            if (currentUnresolvedSchemaName != null) {
                getSchema(currentUnresolvedSchemaName);
            }
            String completeSchema = resolveSchema(schemaString);
            try {
                schema = Schema.parse(completeSchema);
                currentUnresolvedSchemaName = null;
                break;
            } catch (SchemaParseException e) {
                lastUnresolvedSchemaName = currentUnresolvedSchemaName;
                currentUnresolvedSchemaName = getUnresolvedType(e);
                if (currentUnresolvedSchemaName == null) {
                    throw e;
                }
            }
        }
        
        if (currentUnresolvedSchemaName != null) {
            throw new RuntimeException("Count not resolve schema for type: " + currentUnresolvedSchemaName);
        }
        
        String name = schema.getFullName();
        schemas.put(name, schema);
        return schema;
        
    }
    
    /**
     * @param e
     * @return
     */
    private static String getUnresolvedType(SchemaParseException e) {
        try {
            String message = e.getMessage();
            Pattern p = Pattern.compile("Undefined name: \"(.*)\"");
            Matcher matcher = p.matcher(message);
            matcher.find();
            return matcher.group(1);
        } catch (Exception e1) {
            return null;
        }
    }
    
    /**
     * Add the schema to the store
     * 
     * @param schemaString
     * @return the parsed schema
     */
    public static Schema addSchema(InputStream in) throws IOException {
        
        StringBuffer out = new StringBuffer();
        byte[] b = new byte[4096];
        for (int n; (n = in.read(b)) != -1;) {
            out.append(new String(b, 0, n));
        }
        return addSchema(out.toString());
        
    }
    
    /**
     * 
     * @param file
     * @return
     * @throws IOException
     */
    public static Schema addSchema(File file) throws IOException {
        
        FileInputStream fis = new FileInputStream(file);
        return addSchema(fis);
    }
}