package com.robinhood.examples;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.stream.Stream;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Contextful;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Requirements;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.Watch;
import org.apache.beam.sdk.transforms.Watch.Growth;
import org.apache.beam.sdk.transforms.Watch.Growth.PollFn;
import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Processes files as they arrive by continuously matching file pattern, and uses splittable dofn to
 * control event times and watermarks. The pipeline expects files with the following name patterns:
 *
 * <p>*yyyy-MM-dd*
 *
 * <p>*yyyy-MM-dd.complete*
 *
 * <p>Every time it sees *yyyy-MM-dd*, it reads its contents and outputs lines of the file using
 * *outputWithTimestamp(..., timestamp)*. Additionally, it calls
 * *watermarkEstimator.setWatermark(timestamp)*. In both cases the timestamp is
 * *yyyy-MM-ddT00:00:00.000Z*.
 *
 * <p>Once the pipeline matches *yyyy-MM-dd.complete* (which is empty) it calls
 * watermarkEstimator.setWatermark(timestamp)*, where timestamp is yyyy-MM-ddT00:00:00.000Z plus one
 * day* - hence it advances to the next day.
 */
public final class FileProcessing {
  private static final Logger logger = LoggerFactory.getLogger(FileProcessing.class);

  public static void main(String[] args) {
    PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
    Pipeline pipeline = Pipeline.create(options);
    logger.info("running");

    PCollection<KV<String, Long>> output =
        FileProcessing.getData(
            pipeline, "/tmp/input/*", Duration.standardSeconds(1), Growth.never());

    output
        .apply("Window", Window.into(FixedWindows.of(Duration.standardDays(1))))
        .apply("LogWindowed", Log.ofElements("testGetData"))
        .apply(Sum.longsPerKey())
        .apply(
            "FormatResults",
            MapElements.into(TypeDescriptors.strings())
                .via((KV<String, Long> kv) -> String.format("{},{}", kv.getKey(), kv.getValue())))
        .apply("LogResults", Log.ofElements("results"))
        .apply(
            TextIO.write()
                .to(Paths.get("/tmp/output/").resolve("Results").toString())
                .withWindowedWrites()
                .withNumShards(1));

    pipeline.run();
  }

  /**
   * Processes matched files by outputting key-value pairs where key is equal to "my-key" and values
   * are Long values corresponding to the lines in the file. In the case file does not contain one
   * Long per line, IOException is thrown.
   *
   * <p>The Event Times and Watermarks are set based on the file names, which are assumed to be
   * either yyyy-MM-dd or yyyy-MM-dd.complete.
   */
  @DoFn.BoundedPerElement
  private static final class ReadFileFn extends DoFn<Metadata, KV<String, Long>> {
    private static final Logger logger = LoggerFactory.getLogger(ReadFileFn.class);

    @ProcessElement
    public void processElement(
        ProcessContext c,
        RestrictionTracker<OffsetRange, Long> tracker,
        ManualWatermarkEstimator<Instant> watermarkEstimator)
        throws IOException {
      Metadata metadata = c.element();
      logger.info(
          "reading {} with restriction {} @ {}",
          metadata,
          tracker.currentRestriction(),
          c.timestamp());
      String filename = metadata.resourceId().toString();
      Instant timestamp = getTimestamp(filename);
      try (BufferedReader br = new BufferedReader(new FileReader(filename))) {
        String line;
        for (long lineNumber = 0; (line = br.readLine()) != null; ++lineNumber) {
          // We do not check lineNumber >= tracker.currentRestriction().getTo() since reading the
          // file should NOT be based upon the end of the current restriction but continue
          // processing till tryClaim fails.
          if (lineNumber < tracker.currentRestriction().getFrom()) {
            // Skip the lines until the beginning of the current restriction.
            continue;
          }
          if (!tracker.tryClaim(lineNumber)) {
            logger.info("failed to claim {}", lineNumber);
            watermarkEstimator.setWatermark(timestamp);
            return;
          }
          c.outputWithTimestamp(KV.of("my-key", Long.parseLong(line)), timestamp);
        }
      }
      logger.info("setting watermark to {}", timestamp);
      watermarkEstimator.setWatermark(timestamp);
      logger.info("Finish processing {} in file {}", tracker.currentRestriction(), filename);
    }

    /**
     * Returns {@link Instant} based on the {@code filepath}. Filename is assumed to be either
     * yyyy-MM-dd or yyyy-MM-dd.complete. The returned value is "yyyy-MM-ddT00:00:00.000Z".
     */
    private Instant getTimestamp(String filepath) {
      String filename = Paths.get(filepath).getFileName().toString();
      int index = filename.lastIndexOf(".complete");
      if (index != -1) {
        // In the case it has a suffix, strip it.
        filename = filename.substring(0, index);
      }
      Instant timestamp =
          Instant.parse(new StringBuilder().append(filename).append("T00:00:00.000Z").toString());
      if (index != -1) {
        // In the case it has a suffix i.e. it is complete, fast forward to the next day.
        return timestamp.plus(Duration.standardDays(1));
      }
      return timestamp;
    }

    @GetInitialRestriction
    public OffsetRange getInitialRestriction(@Element Metadata metadata) throws IOException {
      long lineCount;
      try (Stream<String> stream = Files.lines(Paths.get(metadata.resourceId().toString()))) {
        lineCount = stream.count();
      }
      return new OffsetRange(0L, lineCount);
    }

    @GetInitialWatermarkEstimatorState
    public Instant getInitialWatermarkEstimatorState(
        @Element Metadata metadata, @Restriction OffsetRange restriction) {
      String filename = metadata.resourceId().toString();
      logger.info("getInitialWatermarkEstimatorState {}", filename);
      // Compute and return the initial watermark estimator state for each element and restriction.
      // All subsequent processing of an element and restriction will be restored from the existing
      // state.
      return getTimestamp(filename);
    }

    @NewWatermarkEstimator
    public WatermarkEstimators.Manual newWatermarkEstimator(
        @WatermarkEstimatorState Instant watermarkEstimatorState) {
      logger.info("newWatermarkEstimator {}", watermarkEstimatorState);
      return new WatermarkEstimators.Manual(watermarkEstimatorState);
    }
  }

  static PCollection<KV<String, Long>> getData(
      Pipeline pipeline,
      String filepattern,
      Duration pollInterval,
      TerminationCondition terminationCondition) {
    final Growth<String, MatchResult.Metadata, String> stringMetadataStringGrowth =
        Watch.growthOf(new MatchPollFn())
            .withPollInterval(pollInterval)
            .withTerminationPerInput(terminationCondition);
    return pipeline
        .apply("Create filepattern", Create.<String>of(filepattern))
        .apply("Continuously match filepatterns", stringMetadataStringGrowth)
        .apply(Values.create())
        .apply(ParDo.of(new ReadFileFn()))
        .apply(Log.ofElements("Get Data"));
  }

  private static class MatchPollFn extends PollFn<String, Metadata> {
    @Override
    public Watch.Growth.PollResult<MatchResult.Metadata> apply(String element, Context c)
        throws Exception {
      // Here we only have the filepattern i.e. element, and hence we do not know what the timestamp
      // and/or watermark should be. As a result, we output EPOCH as both the timestamp and the
      // watermark.
      Instant instant = Instant.EPOCH;
      return Watch.Growth.PollResult.incomplete(
              instant, FileSystems.match(element, EmptyMatchTreatment.ALLOW).metadata());
          //.withWatermark(instant);
    }
  }

  private static class ExtractFilenameFn implements SerializableFunction<Metadata, String> {
    @Override
    public String apply(MatchResult.Metadata input) {
      return input.resourceId().toString();
    }
  }
}
