using DataFrames
# using DataArrays
# using DataStructures
# include(joinpath(pwd(), "lib/julia/macro_util.jl"))
# using Debug

# @allconst begin
RUN_WARMUP_TEST = true
RUN_SMALL_TEST_1 = false
RUN_SMALL_TEST_2 = false
RUN_PERF_TEST_1 = true
RUN_PROFILE_TEST_1 = true
RUN_PERF_TEST_2 = false
RUN_PROFILE_TEST_2 = false
# end

# Global types, static because better for performance.
IdType = Int # Assume the same as id2.
DType = Int # Assume the same as D1 & D2 in dt2.
# IdType = Uint32 # Assume the same as id2.
# DType = Uint32 # Assume the same as D1 & D2 in dt2.

# Cross-join D1 <= D <= D2 matching function looping manually.
# dt1 contains records for D and dt2 contains records for the intervals [D1, D2].
# Assumptions:
#  1. dt1 is sorted in increasing values of D.
#  2. dt2 is sorted in increasing values of D1 then D2.
# @debug
function manual_iter1(dt1, dt2)
  const zeroIdType = convert(IdType, 0)
  const oneIdType = convert(IdType, 1)
  const zeroDType = convert(IdType, 0)
  dt1id1, dt1D = dt1[:id1].data, dt1[:D].data
  dt2id2, dt2D1, dt2D2 = dt2[:id2].data, dt2[:D1].data, dt2[:D2].data
#   IdType = eltype(dt1id1) # Assume the same as id2.
#   DType = eltype(dt1D) # Assume the same as D1 & D2 in dt2.
  MAX_INT = typemax(IdType)
  MAX_D = typemax(DType)
  nrow1::IdType = nrow(dt1); nrow2::IdType = nrow(dt2);
  i1::IdType = zeroIdType::IdType; i1id1::IdType = zeroIdType; i1D::IdType = zeroIdType; # Iterator over dt1 -- represents current dt1 record.
  i2Lower::IdType = zeroIdType; i2LowerD2::DType = zeroDType;
  i2Upper::IdType = oneIdType; i2UpperD1::DType = MAX_D::DType; i2UpperD2::DType = MAX_D::DType; # Iterator over dt2 -- represents next dt2 interval to be consider "in scope" of current dt1 iterator.
  if i2Upper <= nrow2
    @inbounds i2UpperD1 = dt2D1[i2Upper]; @inbounds i2UpperD2 = dt2D2[i2Upper];
  end
  i2Match::IdType = zeroIdType::IdType #; isMatchBoth = false;
  i::IdType = zeroIdType; # Index iterator of resulting matches in resulting arrays & DataFrame.
  nrowMax::IdType = convert(IdType, 10 * (nrow1 + nrow2)) # Rough upper bound of returned DataFrame -- no guarantee of being an upper bound, but it is large enough for my tests, resizing the returned DataFrame is not the point of this performance test.
  resid1 = rep(convert(IdType, 0), nrowMax); resid2 = rep(convert(IdType, 0), nrowMax);
  resD1 = rep(convert(DType, 0), nrowMax); resD = rep(convert(DType, 0), nrowMax); resD2 = rep(convert(DType, 0), nrowMax) # Result arrays/columns.
#   dtf = DataFrame(id1 = rep(convert(IdType, 0), nrowMax), id2 = rep(convert(IdType, 0), nrowMax)
#  , D1 = rep(convert(DType, 0), nrowMax), D = rep(convert(DType, 0), nrowMax), D2 = rep(convert(DType, 0), nrowMax))
  while i1 < nrow1 # Iterator over dt1.
    i1 += oneIdType
    @inbounds i1id1 = dt1id1[i1]
    @inbounds i1D = dt1D[i1]
    while i2LowerD2 < i1D # Remove lowest dt2 in scope.
      i2Lower += oneIdType
      @inbounds i2LowerD2 = i2Lower <= nrow2 ? dt2D2[i2Lower] : MAX_INT
    end
    if i2Lower == 0; continue; end;
    if nrow2 < i2Lower; break; end;
    while i2UpperD1 <= i1D # Add to scope this next dt2 as need be and increment to the next one.
      i2Upper += oneIdType
      if i2Upper <= nrow2
        @inbounds i2UpperD1 = dt2D1[i2Upper]; @inbounds i2UpperD2 = dt2D2[i2Upper];
      else
        i2UpperD1 = MAX_D; i2UpperD2 = MAX_D;
      end
    end
    # Find matches in dt2 of this dt1 row.
    i2Match = i2Lower
    while i2Match < i2Upper
      @inbounds i2MatchD2 = dt2D2[i2Match]
      if i1D <= i2MatchD2 # ~ to i2MatchD1 <= i1D && i1D <= i2MatchD2
        i += oneIdType # SLOW!
          @inbounds i2MatchD1 = dt2D1[i2Match]
          @inbounds resid1[i] = i1id1
          @inbounds resid2[i] = dt2id2[i2Match]
          @inbounds resD1[i] = i2MatchD1
          @inbounds resD[i] = i1D
          @inbounds resD2[i] = i2MatchD2
#           dtf[i, :id1] = i1id1
#           dtf[i, :id2] = dt2id2[i2Match]
#           dtf[i, :D1] = i2MatchD1
#           dtf[i, :D] = i1D
#           dtf[i, :D2] = i2MatchD2
      end
      i2Match += oneIdType # SLOW!
    end
  end
  return DataFrame(id1 = resize!(resid1, i), id2 = resize!(resid2, i), D1 = resize!(resD1, i), D = resize!(resD, i), D2 = resize!(resD2, i))
end

if RUN_WARMUP_TEST
  println("Running warmup starting...")
  # Warmup Test data.
  dtwarmup1 = DataFrame(id1 = [convert(IdType, 1):convert(IdType, 5)], D = 2 * [convert(DType, 1):convert(DType, 5)])
  dtwarmup2 = DataFrame(id2 = [convert(IdType, 21):convert(IdType, 22)], D1 = [convert(DType, 2), convert(DType, 7)], D2 = [convert(DType, 4), convert(DType, 12)])
  # Warmup Test data Desired filtered cross-join data table by hand: D1 <= D & D <= D2.
  dtwarmupfDesired = DataFrame(
      id1 = [convert(IdType, 1), convert(IdType, 2), convert(IdType, 4), convert(IdType, 5)]
    , id2 = [rep(convert(IdType, 21), 2), rep(convert(IdType, 22), 2)]
    , D1 = [rep(convert(DType, 2), 2), rep(convert(DType, 7), 2)]
    , D = [convert(DType, 2), convert(DType, 4), convert(DType, 8), convert(DType, 10)]
    , D2 = [rep(convert(DType, 4), 2), rep(convert(DType, 12), 2)]
  )
  sort!(dtwarmupfDesired, cols=(:id1, :id2))
  @show dtwarmupfDesired
  # Run test.
  @time manualIterResWarmup = manual_iter1(dtwarmup1, dtwarmup2)
  @show manualIterResWarmup
  # Test results are the desired ones.
  @show isequal(manualIterResWarmup, dtwarmupfDesired)
  println("Running warmup done.")
end

if RUN_SMALL_TEST_1 # Same small test as the R benchmark.
  println("Running small test 1 starting...")
  # Small Test data.
  @show dtsmall11 = DataFrame(id1 = [convert(IdType, 1):convert(IdType, 10)], D = 2 * [convert(DType, 1):convert(DType, 10)])
  @show dtsmall12 = DataFrame(id2 = [convert(IdType, 21):convert(IdType, 23)]
                              , D1 = [convert(DType, 5), convert(DType, 7), convert(DType, 10)], D2 = [convert(DType, 9), convert(DType, 12), convert(DType, 16)])
  #  Small Test data Desired filtered cross-join data table by hand: D1 <= D & D <= D2.
  @show dtsmall1fDesired = DataFrame(
      id1 = [convert(IdType, 3), convert(IdType, 4), convert(IdType, 4), convert(IdType, 5)
             , convert(IdType, 6), convert(IdType, 5), convert(IdType, 6), convert(IdType, 7), convert(IdType, 8)]
    , id2 = [rep(convert(IdType, 21), 2), rep(convert(IdType, 22), 3), rep(convert(IdType, 23), 4)]
    , D1 = [rep(convert(DType, 5), 2), rep(convert(DType, 7), 3), rep(convert(DType, 10), 4)]
    , D = [convert(DType, 6), convert(DType, 8), convert(DType, 8), convert(DType, 10), convert(DType, 12)
           , convert(DType, 10), convert(DType, 12), convert(DType, 14), convert(DType, 16)]
    , D2 = [rep(convert(DType, 9), 2), rep(convert(DType, 12), 3), rep(convert(DType, 16), 4)]
  )
  sort!(dtsmall1fDesired, cols=(:id1, :id2))
  # Run test.
  @time manualIterResSmall1 = manual_iter1(dtsmall11, dtsmall12)
  @show manualIterResSmall1
  # Test results are the desired ones.
  @show isequal(manualIterResSmall1, dtsmall1fDesired)
  println("Running small test 1 done.")
end

if RUN_SMALL_TEST_2 # Small test to investigate specific conditions, e.g., duplicate intervals.
  println("Running small test 2 starting...")
  # Small Test data.
  @show dtsmall21 = DataFrame(id1 = [convert(IdType, 1):convert(IdType, 10)], D = 2 * [convert(DType, 1):convert(DType, 10)])
  @show dtsmall22 = DataFrame(id2 = [convert(IdType, 21):convert(IdType, 23)]
                              , D1 = [convert(DType, 5), convert(DType, 5), convert(DType, 5)], D2 =[convert(DType, 9), convert(DType, 9), convert(DType, 10)])
  #  Small Test data Desired filtered cross-join data table by hand: D1 <= D & D <= D2.
  @show dtsmall2fDesired = DataFrame(
      id1 = [convert(IdType, 3), convert(IdType, 4), convert(IdType, 3), convert(IdType, 4), convert(IdType, 3), convert(IdType, 4), convert(IdType, 5)]
    , id2 = [rep(convert(IdType, 21), 2), rep(convert(IdType, 22), 2), rep(convert(IdType, 23), 3)]
    , D1 = [rep(convert(DType, 5), 7)]
    , D = [convert(DType, 6), convert(DType, 8), convert(DType, 6), convert(DType, 8), convert(DType, 6), convert(DType, 8), convert(DType, 10)]
    , D2 = [rep(convert(DType, 9), 2), rep(convert(DType, 9), 2), rep(convert(DType, 10), 3)]
  )
  sort!(dtsmall2fDesired, cols=(:id1, :id2))
  # Run test.
  @time manualIterResSmall2 = manual_iter1(dtsmall21, dtsmall22)
  @show manualIterResSmall2
  # Test results are the desired ones.
  @show isequal(manualIterResSmall2, dtsmall2fDesired)
  println("Running small test 2 done.")
end

if RUN_PERF_TEST_1 # Similar benchmark as R but with Julia data randomizer.
  println("Running perf test 1 starting...")
  # Performance benchmark test data.
  srand(1)
  # n1 = 10
  # n2 = 3
  n1 = 10000
  n2 = 1000
  dtbig11 = DataFrame(id1 = [convert(IdType, 1):convert(IdType, n1)], D = [convert(DType, 1):convert(DType, n1)])
  dtbig12 = DataFrame(id2 = [convert(IdType, 1):convert(IdType, n2)], D1 = sort(convert(Array{DType, 1}, rand(1:n1, n2))), D2 = rep(convert(DType, 0), n2))
  dtbig12[:D2] = dtbig12[:D1] + convert(DType, 100)
  if (nrow(dtbig11) < 20)
    @show dtbig11
  end
  if (nrow(dtbig12) < 20)
    @show dtbig12
  end
  # Run test.
  @time manualIterResBig1 = manual_iter1(dtbig11, dtbig12)
  # Show number of matching rows.
  if (nrow(manualIterResBig1) < 10)
    @show manualIterResBig1
  end
  println("# Rows manualIterResBig1 = ", nrow(manualIterResBig1))
  println("Running perf test 1 done.")
end

if RUN_PROFILE_TEST_1 # Profile the above test.
  println("Running profile test 1 starting...")
  # Performance benchmark test data.
  srand(1)
  n1 = 10000
  n2 = 1000
  dtbig11 = DataFrame(id1 = [convert(IdType, 1):convert(IdType, n1)], D = [convert(DType, 1):convert(DType, n1)])
  dtbig12 = DataFrame(id2 = [convert(IdType, 1):convert(IdType, n2)], D1 = sort(convert(Array{DType, 1}, rand(1:n1, n2))), D2 = rep(convert(DType, 0), n2))
  dtbig12[:D2] = dtbig12[:D1] + convert(DType, 100)
  Profile.clear()
  @profile (for i = 1:100; manualIterResBig2Prof = manual_iter1(dtbig11, dtbig12); end)
  Profile.print()
  println("Running profile test 1 done.")
end

if RUN_PERF_TEST_2 # Exact same data as the R benchmark.
  println("Running perf test 2 starting...")
  # Performance benchmark test data.
  dtbig21 = readtable(joinpath(homedir(), "data/analysis/join", "dtbig1.csv"), separator = ',', header = true)
  dtbig22 = readtable(joinpath(homedir(), "data/analysis/join", "dtbig2.csv"), separator = ',', header = true)
  dtbig2fDesired = readtable(joinpath(homedir(), "data/analysis/join", "dtbigfDesired.csv"), separator = ',', header = true)
#   println("# Rows dtbig2fDesired = ", nrow(dtbig2fDesired))
  # Run test.
  gc()
#   gc_disable()
  @time manualIterResBig2 = manual_iter1(dtbig21, dtbig22)
#   gc_enable()
  # Show number of matching rows.
  if (nrow(manualIterResBig2) < 10)
    @show manualIterResBig2
  end
  println("# Rows manualIterResBig2 = ", nrow(manualIterResBig2))
  # Test results are the desired ones.
  @show isequal(manualIterResBig2, dtbig2fDesired)
  println("Running perf test 2 done.")
end

if RUN_PROFILE_TEST_2 # Profile the above test.
  println("Running profile test 2 starting...")
  # Performance benchmark test data.
  dtbig21 = readtable(joinpath(homedir(), "data/analysis/join", "dtbig1.csv"), separator = ',', header = true)
  dtbig22 = readtable(joinpath(homedir(), "data/analysis/join", "dtbig2.csv"), separator = ',', header = true)
#   dtbig2fDesired = readtable(joinpath(homedir(), "data/analysis/join", "dtbigfDesired.csv"), separator = ',', header = true)
  Profile.clear()
  gc()
#   gc_disable()
  @profile (for i = 1:100; manualIterResBig2Prof = manual_iter1(dtbig21, dtbig22); end)
#   gc_enable()
  Profile.print()
  println("Running profile test 2 done.")
end
