using DataFrames
# using DataArrays
# using DataStructures
# include(joinpath(pwd(), "lib/julia/macro_util.jl"))
# using Debug

# @allconst begin
RUN_WARMUP_TEST = true
RUN_SMALL_TEST_1 = false
RUN_SMALL_TEST_2 = false
RUN_PERF_TEST_1 = true
RUN_PROFILE_TEST_1 = true
RUN_PERF_TEST_2 = false
RUN_PROFILE_TEST_2 = false

NUM_PROFILE_RUNS_TEST1 = 100
NUM_PROFILE_RUNS_TEST2 = 100
# end

# Cross-join D1 <= D <= D2 matching function looping manually.
# dt1 contains records for D and dt2 contains records for the intervals [D1, D2].
# Assumptions:
#  1. dt1 is sorted in increasing values of D.
#  2. dt2 is sorted in increasing values of D1 then D2.
# @debug
function manual_iter1{IdType, DType}(dt1, dt2, id::IdType, d::DType)
  const dt1id1, dt1D = dt1[:id1].data::Array{IdType}, dt1[:D].data::Array{IdType}
  const dt2id2, dt2D1, dt2D2 = dt2[:id2].data::Array{DType}, dt2[:D1].data::Array{DType}, dt2[:D2].data::Array{DType}
#   const IdType = eltype(dt1id1) # Assume the same as id2.
#   const DType = eltype(dt1D) # Assume the same as D1 & D2 in dt2.
  const IterType = Int # Array index iteration type.
  const MAX_ID = typemax(IdType)
  const MAX_D = typemax(DType)
  const nrow1::IterType = nrow(dt1); const nrow2::IterType = nrow(dt2);
  i1::IterType = 0; i1id1 = zero(IdType); i1D = zero(IdType); # Iterator over dt1 -- represents current dt1 record.
  i2Lower::IterType = 0; i2LowerD2 = zero(IdType);
  i2Upper::IterType = 1; i2UpperD1 = MAX_D; i2UpperD2 = MAX_D; # Iterator over dt2 -- represents next dt2 interval to be consider "in scope" of current dt1 iterator.
  if i2Upper <= nrow2
    @inbounds i2UpperD1 = dt2D1[i2Upper]; @inbounds i2UpperD2 = dt2D2[i2Upper];
  end
  i2Match::IterType = 0 # Index iterator of matching dt2 intervals.
  i::IterType = zero(IdType) # Index iterator of resulting matches in resulting arrays & DataFrame.
  const nrowMax = convert(IdType, 10 * (nrow1 + nrow2)) # Rough upper bound of returned DataFrame -- no guarantee of being an upper bound, but it is large enough for my tests, resizing the returned DataFrame is not the point of this performance test.
#   resid1 = rep(zero(IdType), nrowMax); resid2 = rep(zero(IdType), nrowMax);
#   resD1 = rep(zero(DType), nrowMax); resD = rep(zero(DType), nrowMax); resD2 = rep(zero(DType), nrowMax) # Result arrays/columns.
  resid1 = zeros(IdType, nrowMax); resid2 = zeros(IdType, nrowMax);
  resD1 = zeros(DType, nrowMax); resD = zeros(DType, nrowMax); resD2 = zeros(DType, nrowMax) # Result arrays/columns.
#   dtf = DataFrame(id1 = zeros(IdType, nrowMax), id2 = zeros(IdType, nrowMax)
#  , D1 = zeros(DType, nrowMax), D = zeros(DType, nrowMax), D2 = zeros(DType, nrowMax))
  while i1 < nrow1 # Iterator over dt1.
    i1 += one(IterType)
    @inbounds i1id1 = dt1id1[i1]; @inbounds i1D = dt1D[i1];
    while i2LowerD2 < i1D # Remove lowest dt2 in scope.
      i2Lower += one(IterType)
      @inbounds i2LowerD2 = i2Lower <= nrow2 ? dt2D2[i2Lower] : MAX_ID
    end
    if i2Lower == zero(IterType); continue; end;
    if nrow2 < i2Lower; break; end;
    while i2UpperD1 <= i1D # Add to scope this next dt2 as need be and increment to the next one.
      i2Upper += one(IterType)
      if i2Upper <= nrow2
        @inbounds i2UpperD1 = dt2D1[i2Upper]; @inbounds i2UpperD2 = dt2D2[i2Upper];
      else
        i2UpperD1 = MAX_D; i2UpperD2 = MAX_D;
      end
    end
    # Find matches in dt2 of this dt1 row.
    i2Match = i2Lower
    while i2Match < i2Upper
      @inbounds i2MatchD2 = dt2D2[i2Match]
      if i1D <= i2MatchD2 # ~ to i2MatchD1 <= i1D && i1D <= i2MatchD2 since i2MatchD1 <= i1D is guaranteed from the definition of i2Lower and dt2 ordered in increasing values of D1 first.
        i += one(IterType)
        @inbounds i2MatchD1 = dt2D1[i2Match]
        @inbounds resid1[i] = i1id1
        @inbounds resid2[i] = dt2id2[i2Match]
        @inbounds resD1[i] = i2MatchD1
        @inbounds resD[i] = i1D
        @inbounds resD2[i] = i2MatchD2
#           dtf[i, :id1] = i1id1
#           dtf[i, :id2] = dt2id2[i2Match]
#           dtf[i, :D1] = i2MatchD1
#           dtf[i, :D] = i1D
#           dtf[i, :D2] = i2MatchD2
      end
      i2Match += one(IterType)
    end
  end
  return DataFrame(id1 = resize!(resid1, i), id2 = resize!(resid2, i), D1 = resize!(resD1, i), D = resize!(resD, i), D2 = resize!(resD2, i))
end


# Global data types so we can test performance with various types.
GlobalIdType = Int # Assume typeof(id1) == typeof(id2).
GlobalDType = Int # Assume typeof(D1) == typeof(D) == typeof(D2).
# GlobalIdType = Uint32 # Assume typeof(id1) == typeof(id2).
# GlobalDType = Uint32 # Assume typeof(D1) == typeof(D) == typeof(D2).

if RUN_WARMUP_TEST
  println("Running warmup starting...")
  # Warmup Test data.
  dtwarmup1 = DataFrame(
      id1 = convert(Array{GlobalIdType}, [1:5])
    , D = convert(Array{GlobalDType}, 2 * [1:5])
    )
  dtwarmup2 = DataFrame(
      id2 = convert(Array{GlobalIdType}, [21:22])
    , D1 = convert(Array{GlobalDType}, [2, 7])
    , D2 = convert(Array{GlobalDType}, [4, 12])
    )
  # Warmup Test data Desired filtered cross-join data table by hand: D1 <= D & D <= D2.
  dtwarmupfDesired = DataFrame(
      id1 = convert(Array{GlobalIdType}, [1, 2, 4, 5])
    , id2 = convert(Array{GlobalIdType}, [rep(21, 2), rep(22, 2)])
    , D1 = convert(Array{GlobalDType}, [rep(2, 2), rep(7, 2)])
    , D = convert(Array{GlobalDType}, [2, 4, 8, 10])
    , D2 = convert(Array{GlobalDType}, [rep(4, 2), rep(12, 2)])
    )
  sort!(dtwarmupfDesired, cols=(:id1, :id2))
  @show dtwarmupfDesired
  # Run test.
  @time manualIterResWarmup = manual_iter1(dtwarmup1, dtwarmup2, zero(GlobalIdType), zero(GlobalDType))
  @show manualIterResWarmup
  # Test results are the desired ones.
  @show isequal(manualIterResWarmup, dtwarmupfDesired)
  println("Running warmup done.")
end

if RUN_SMALL_TEST_1 # Same small test as the R benchmark.
  println("Running small test 1 starting...")
  # Small Test data.
  @show dtsmall11 = DataFrame(
      id1 = convert(Array{GlobalIdType}, [1:10])
    , D = convert(Array{GlobalDType}, 2 * [1:10])
    )
  @show dtsmall12 = DataFrame(
      id2 = convert(Array{GlobalIdType}, [21:23])
    , D1 = convert(Array{GlobalDType}, [5, 7, 10])
    , D2 = convert(Array{GlobalDType}, [9, 12, 16])
    )
  #  Small Test data Desired filtered cross-join data table by hand: D1 <= D & D <= D2.
  @show dtsmall1fDesired = DataFrame(
      id1 = convert(Array{GlobalIdType}, [3, 4, 4, 5, 6, 5, 6, 7, 8])
    , id2 = convert(Array{GlobalIdType}, [rep(21, 2), rep(22, 3), rep(23, 4)])
    , D1 = convert(Array{GlobalDType}, [rep(5, 2), rep(7, 3), rep(10, 4)])
    , D = convert(Array{GlobalDType}, [6, 8, 8, 10, 12, 10, 12, 14, 16])
    , D2 = convert(Array{GlobalDType}, [rep(9, 2), rep(12, 3), rep(16, 4)])
  )
  sort!(dtsmall1fDesired, cols=(:id1, :id2))
  # Run test.
  @time manualIterResSmall1 = manual_iter1(dtsmall11, dtsmall12, zero(GlobalIdType), zero(GlobalDType))
  @show manualIterResSmall1
  # Test results are the desired ones.
  @show isequal(manualIterResSmall1, dtsmall1fDesired)
  println("Running small test 1 done.")
end

if RUN_SMALL_TEST_2 # Small test to investigate specific conditions, e.g., duplicate intervals.
  println("Running small test 2 starting...")
  # Small Test data.
  @show dtsmall21 = DataFrame(
      id1 = convert(Array{GlobalIdType}, [1:10])
    , D = convert(Array{GlobalDType}, 2 * [1:10])
    )
  @show dtsmall22 = DataFrame(
      id2 = convert(Array{GlobalIdType}, [21:23])
    , D1 = convert(Array{GlobalDType}, [5, 5, 5])
    , D2 = convert(Array{GlobalDType}, [9, 9, 10])
    )
  #  Small Test data Desired filtered cross-join data table by hand: D1 <= D & D <= D2.
  @show dtsmall2fDesired = DataFrame(
      id1 = convert(Array{GlobalIdType}, [3, 4, 3, 4, 3, 4, 5])
    , id2 = convert(Array{GlobalIdType}, [rep(21, 2), rep(22, 2), rep(23, 3)])
    , D1 = convert(Array{GlobalDType}, [rep(5, 7)])
    , D = convert(Array{GlobalDType}, [6, 8, 6, 8, 6, 8, 10])
    , D2 = convert(Array{GlobalDType}, [rep(9, 2), rep(9, 2), rep(10, 3)])
  )
  sort!(dtsmall2fDesired, cols=(:id1, :id2))
  # Run test.
  @time manualIterResSmall2 = manual_iter1(dtsmall21, dtsmall22, zero(GlobalIdType), zero(GlobalDType))
  @show manualIterResSmall2
  # Test results are the desired ones.
  @show isequal(manualIterResSmall2, dtsmall2fDesired)
  println("Running small test 2 done.")
end

if RUN_PERF_TEST_1 # Similar benchmark as R but with Julia data randomizer.
  println("Running perf test 1 starting...")
  # Performance benchmark test data.
  srand(1)
  # n1 = 10
  # n2 = 3
  n1 = 10000
  n2 = 1000
  dtbig11 = DataFrame(
      id1 = convert(Array{GlobalIdType}, [1:n1])
    , D = convert(Array{GlobalDType}, [1:n1])
    )
  dtbig12 = DataFrame(
      id2 = convert(Array{GlobalIdType}, [1:n2])
    , D1 = convert(Array{GlobalDType}, sort(rand(1:n1, n2)))
    , D2 = convert(Array{GlobalDType}, rep(0, n2)))
  dtbig12[:D2] = dtbig12[:D1] + convert(GlobalDType, 100)
  if (nrow(dtbig11) < 20)
    @show dtbig11
  end
  if (nrow(dtbig12) < 20)
    @show dtbig12
  end
  # Run test.
  gc()
  @time manualIterResBig1 = manual_iter1(dtbig11, dtbig12, zero(GlobalIdType), zero(GlobalDType))
  # Show number of matching rows.
  if (nrow(manualIterResBig1) < 10)
    @show manualIterResBig1
  end
  println("# Rows manualIterResBig1 = ", nrow(manualIterResBig1))
  println("Running perf test 1 done.")
end

if RUN_PROFILE_TEST_1 # Profile the above test.
  println("Running profile test 1 starting...")
  # Performance benchmark test data.
  srand(1)
  n1 = 10000
  n2 = 1000
  dtbig11 = DataFrame(
      id1 = convert(Array{GlobalIdType}, [1:n1])
    , D = convert(Array{GlobalDType}, [1:n1]))
  dtbig12 = DataFrame(
      id2 = convert(Array{GlobalIdType}, [1:n2])
    , D1 = convert(Array{GlobalDType}, sort(rand(1:n1, n2)))
    , D2 = convert(Array{GlobalDType}, rep(0, n2))
    )
  dtbig12[:D2] = dtbig12[:D1] + convert(GlobalDType, 100)
  Profile.clear()
  gc()
  @profile (for i = 1:NUM_PROFILE_RUNS_TEST1; manualIterResBig2Prof = manual_iter1(dtbig11, dtbig12, zero(GlobalIdType), zero(GlobalDType)); end)
  Profile.print()
  println("Running profile test 1 done.")
end

if RUN_PERF_TEST_2 # Exact same data as the R benchmark.
  println("Running perf test 2 starting...")
  # Performance benchmark test data.
  dtbig21 = readtable(joinpath(homedir(), "data/analysis/join", "dtbig1.csv"), separator = ',', header = true)
  dtbig22 = readtable(joinpath(homedir(), "data/analysis/join", "dtbig2.csv"), separator = ',', header = true)
  dtbig2fDesired = readtable(joinpath(homedir(), "data/analysis/join", "dtbigfDesired.csv"), separator = ',', header = true)
#   println("# Rows dtbig2fDesired = ", nrow(dtbig2fDesired))
  # Run test.
  gc()
#   gc_disable()
  @time manualIterResBig2 = manual_iter1(dtbig21, dtbig22, zero(Int), zero(Int))
#   gc_enable()
  # Show number of matching rows.
  if (nrow(manualIterResBig2) < 10)
    @show manualIterResBig2
  end
  println("# Rows manualIterResBig2 = ", nrow(manualIterResBig2))
  # Test results are the desired ones.
  @show isequal(manualIterResBig2, dtbig2fDesired)
  println("Running perf test 2 done.")
end

if RUN_PROFILE_TEST_2 # Profile the above test.
  println("Running profile test 2 starting...")
  # Performance benchmark test data.
  dtbig21 = readtable(joinpath(homedir(), "data/analysis/join", "dtbig1.csv"), separator = ',', header = true)
  dtbig22 = readtable(joinpath(homedir(), "data/analysis/join", "dtbig2.csv"), separator = ',', header = true)
  Profile.clear()
  gc()
  @profile (for i = 1:NUM_PROFILE_RUNS_TEST2; manualIterResBig2Prof = manual_iter1(dtbig21, dtbig22, zero(Int), zero(Int)); end)
  Profile.print()
  println("Running profile test 2 done.")
end
