package org.apache.flume.serialization;

import java.io.IOException;
import java.io.OutputStream;
import java.util.Map;
import java.util.Set;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.conf.Configurable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class CustomAvrotSerializer implements EventSerializer, Configurable {

	 private final static Logger logger =
	      LoggerFactory.getLogger(CustomAvrotSerializer.class);

	  private final OutputStream out;

	  private CustomAvrotSerializer(OutputStream out) {
	    this.out = out;
	  }

	  @Override
	  public void configure(Context context) {
	    // noop
	  }

	  @Override
	  public boolean supportsReopen() {
	    return true;
	  }

	  @Override
	  public void afterCreate() {
	    // noop
	  }

	  @Override
	  public void afterReopen() {
	    // noop
	  }

	  @Override
	  public void beforeClose() {
	    // noop
	  }

	  @Override
	  public void write(Event e) throws IOException {
		  Map<String, String> mapHeader = e.getHeaders();
		  Set<String> keySet =mapHeader.keySet();
		  for(String key: keySet){
			  String value = mapHeader.get(key);
			  String keyValue =key+": "+value;
			  out.write(keyValue.getBytes());
			  out.write('\n');
		  }
	    out.write(e.getBody());
	    out.write('\n');
	  }

	  @Override
	  public void flush() throws IOException {
	    // noop
	  }

	  public static class Builder implements EventSerializer.Builder {

	    @Override
	    public EventSerializer build(Context context, OutputStream out) {
	    	CustomAvrotSerializer s = new CustomAvrotSerializer(out);
	      s.configure(context);
	      return s;
	    }

	  }
	
}
