hotienvu commented on a change in pull request #29661:
URL: https://github.com/apache/spark/pull/29661#discussion_r487359185



##########
File path: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala
##########
@@ -238,4 +238,34 @@ class OptimizeInSuite extends PlanTest {
 
     comparePlans(optimized, correctAnswer)
   }
+
+  test("OptimizedIn test: optimize range compare") {
+    val originalQuery = testRelation
+      .select('a)
+      .where('a in(4, 1, 2, 3, 3, 5))
+
+    val optimized = Optimize.execute(originalQuery.analyze)
+
+    val expected = testRelation
+      .select('a)
+      .where('a >= 1 && 'a <= 5)
+      .analyze
+
+    comparePlans(optimized, expected)
+  }
+
+  test("OptimizedIn test: do not optimize range compare if non-continuous") {
+    val originalQuery = testRelation
+      .select('a)
+      .where('a in(1, 2, 3, 3, 5))
+
+    val optimized = Optimize.execute(originalQuery.analyze)
+
+    val expected = testRelation
+      .select('a)
+      .where('a in(1, 2, 3, 5))

Review comment:
       agreed. though I think something like a >=1 && a <= 3 and a == 5 would 
be more simple to implement. 

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
##########
@@ -249,6 +280,10 @@ object OptimizeIn extends Rule[LogicalPlan] {
           && !v.isInstanceOf[CreateNamedStruct]
           && !newList.head.isInstanceOf[CreateNamedStruct]) {
           EqualTo(v, newList.head)
+        } else if (isInteger(v) && isContinousIntegers(newList.toSet)) {

Review comment:
       I've added OPTIMIZER_INSET_RANGE_CHECK_THRESHOLD (default to 20) for 
this purpose

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
##########
@@ -231,10 +231,41 @@ object ReorderAssociativeOperator extends 
Rule[LogicalPlan] {
  * 1. Converts the predicate to false when the list is empty and
  *    the value is not nullable.
  * 2. Removes literal repetitions.
- * 3. Replaces [[In (value, seq[Literal])]] with optimized version
+ * 3. Replaces value IN (x,x+1,x+2..x+n) with x <= value AND value <= x + n
+ * 4. Replaces [[In (value, seq[Literal])]] with optimized version
  *    [[InSet (value, HashSet[Literal])]] which is much faster.
  */
 object OptimizeIn extends Rule[LogicalPlan] {
+  private def isContinousIntegers(nums: Set[Expression]): Boolean = {
+    if (nums.nonEmpty && isInteger(nums.head)) {
+      val (min, max) = getBound(nums)
+      val minL = min.eval(EmptyRow).asInstanceOf[Number].longValue()
+      val maxL = max.eval(EmptyRow).asInstanceOf[Number].longValue()
+       minL + (nums.size - 1) == maxL
+    } else {
+      false
+    }
+  }
+
+  private def isInteger(v: Expression): Boolean =
+    v.dataType.isInstanceOf[ShortType] ||

Review comment:
       fixed

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
##########
@@ -231,10 +231,41 @@ object ReorderAssociativeOperator extends 
Rule[LogicalPlan] {
  * 1. Converts the predicate to false when the list is empty and
  *    the value is not nullable.
  * 2. Removes literal repetitions.
- * 3. Replaces [[In (value, seq[Literal])]] with optimized version
+ * 3. Replaces value IN (x,x+1,x+2..x+n) with x <= value AND value <= x + n
+ * 4. Replaces [[In (value, seq[Literal])]] with optimized version
  *    [[InSet (value, HashSet[Literal])]] which is much faster.
  */
 object OptimizeIn extends Rule[LogicalPlan] {
+  private def isContinousIntegers(nums: Set[Expression]): Boolean = {
+    if (nums.nonEmpty && isInteger(nums.head)) {
+      val (min, max) = getBound(nums)
+      val minL = min.eval(EmptyRow).asInstanceOf[Number].longValue()
+      val maxL = max.eval(EmptyRow).asInstanceOf[Number].longValue()
+       minL + (nums.size - 1) == maxL
+    } else {
+      false
+    }
+  }
+
+  private def isInteger(v: Expression): Boolean =
+    v.dataType.isInstanceOf[ShortType] ||
+      v.dataType.isInstanceOf[IntegerType] ||
+      v.dataType.isInstanceOf[LongType]
+
+  def getBound(nums: Set[Expression]): (Expression, Expression) = {
+    nums.head.dataType match {
+      case ShortType =>
+        val values = nums.map(e => e.eval(EmptyRow).asInstanceOf[Short])

Review comment:
       Thanks for your suggestions. I've fixed the ordering. The null case is 
interesting, what would be the expected outcome? Filter(null)? 

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
##########
@@ -231,10 +231,41 @@ object ReorderAssociativeOperator extends 
Rule[LogicalPlan] {
  * 1. Converts the predicate to false when the list is empty and
  *    the value is not nullable.
  * 2. Removes literal repetitions.
- * 3. Replaces [[In (value, seq[Literal])]] with optimized version
+ * 3. Replaces value IN (x,x+1,x+2..x+n) with x <= value AND value <= x + n
+ * 4. Replaces [[In (value, seq[Literal])]] with optimized version
  *    [[InSet (value, HashSet[Literal])]] which is much faster.
  */
 object OptimizeIn extends Rule[LogicalPlan] {
+  private def isContinousIntegers(nums: Set[Expression]): Boolean = {
+    if (nums.nonEmpty && isInteger(nums.head)) {
+      val (min, max) = getBound(nums)
+      val minL = min.eval(EmptyRow).asInstanceOf[Number].longValue()
+      val maxL = max.eval(EmptyRow).asInstanceOf[Number].longValue()
+       minL + (nums.size - 1) == maxL
+    } else {
+      false
+    }
+  }
+
+  private def isInteger(v: Expression): Boolean =
+    v.dataType.isInstanceOf[ShortType] ||
+      v.dataType.isInstanceOf[IntegerType] ||
+      v.dataType.isInstanceOf[LongType]
+
+  def getBound(nums: Set[Expression]): (Expression, Expression) = {

Review comment:
       fixed

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
##########
@@ -231,10 +231,41 @@ object ReorderAssociativeOperator extends 
Rule[LogicalPlan] {
  * 1. Converts the predicate to false when the list is empty and
  *    the value is not nullable.
  * 2. Removes literal repetitions.
- * 3. Replaces [[In (value, seq[Literal])]] with optimized version
+ * 3. Replaces value IN (x,x+1,x+2..x+n) with x <= value AND value <= x + n
+ * 4. Replaces [[In (value, seq[Literal])]] with optimized version
  *    [[InSet (value, HashSet[Literal])]] which is much faster.
  */
 object OptimizeIn extends Rule[LogicalPlan] {
+  private def isContinousIntegers(nums: Set[Expression]): Boolean = {
+    if (nums.nonEmpty && isInteger(nums.head)) {
+      val (min, max) = getBound(nums)
+      val minL = min.eval(EmptyRow).asInstanceOf[Number].longValue()
+      val maxL = max.eval(EmptyRow).asInstanceOf[Number].longValue()
+       minL + (nums.size - 1) == maxL
+    } else {
+      false
+    }
+  }
+
+  private def isInteger(v: Expression): Boolean =
+    v.dataType.isInstanceOf[ShortType] ||
+      v.dataType.isInstanceOf[IntegerType] ||
+      v.dataType.isInstanceOf[LongType]
+
+  def getBound(nums: Set[Expression]): (Expression, Expression) = {
+    nums.head.dataType match {
+      case ShortType =>
+        val values = nums.map(e => e.eval(EmptyRow).asInstanceOf[Short])
+        (Literal(values.min, ShortType), Literal(values.max, ShortType))
+      case IntegerType =>
+        val values = nums.map(e => e.eval(EmptyRow).asInstanceOf[Integer])
+        (Literal(values.min, IntegerType), Literal(values.max, IntegerType))
+      case LongType =>
+        val values = nums.map(e => e.eval(EmptyRow).asInstanceOf[Long])
+        (Literal(values.min, LongType), Literal(values.max, LongType))
+    }

Review comment:
       I have fixed this to use ordering as suggested 

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
##########
@@ -231,10 +231,41 @@ object ReorderAssociativeOperator extends 
Rule[LogicalPlan] {
  * 1. Converts the predicate to false when the list is empty and
  *    the value is not nullable.
  * 2. Removes literal repetitions.
- * 3. Replaces [[In (value, seq[Literal])]] with optimized version
+ * 3. Replaces value IN (x,x+1,x+2..x+n) with x <= value AND value <= x + n
+ * 4. Replaces [[In (value, seq[Literal])]] with optimized version
  *    [[InSet (value, HashSet[Literal])]] which is much faster.
  */
 object OptimizeIn extends Rule[LogicalPlan] {
+  private def isContinousIntegers(nums: Set[Expression]): Boolean = {
+    if (nums.nonEmpty && isInteger(nums.head)) {
+      val (min, max) = getBound(nums)
+      val minL = min.eval(EmptyRow).asInstanceOf[Number].longValue()
+      val maxL = max.eval(EmptyRow).asInstanceOf[Number].longValue()
+       minL + (nums.size - 1) == maxL
+    } else {
+      false
+    }
+  }
+
+  private def isInteger(v: Expression): Boolean =

Review comment:
       updated to use IntegralType which covers byte, short, int, long

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
##########
@@ -231,10 +231,41 @@ object ReorderAssociativeOperator extends 
Rule[LogicalPlan] {
  * 1. Converts the predicate to false when the list is empty and
  *    the value is not nullable.
  * 2. Removes literal repetitions.
- * 3. Replaces [[In (value, seq[Literal])]] with optimized version
+ * 3. Replaces value IN (x,x+1,x+2..x+n) with x <= value AND value <= x + n
+ * 4. Replaces [[In (value, seq[Literal])]] with optimized version
  *    [[InSet (value, HashSet[Literal])]] which is much faster.
  */
 object OptimizeIn extends Rule[LogicalPlan] {
+  private def isContinousIntegers(nums: Set[Expression]): Boolean = {
+    if (nums.nonEmpty && isInteger(nums.head)) {
+      val (min, max) = getBound(nums)
+      val minL = min.eval(EmptyRow).asInstanceOf[Number].longValue()
+      val maxL = max.eval(EmptyRow).asInstanceOf[Number].longValue()
+       minL + (nums.size - 1) == maxL
+    } else {
+      false
+    }
+  }
+
+  private def isInteger(v: Expression): Boolean =
+    v.dataType.isInstanceOf[ShortType] ||
+      v.dataType.isInstanceOf[IntegerType] ||
+      v.dataType.isInstanceOf[LongType]
+
+  def getBound(nums: Set[Expression]): (Expression, Expression) = {
+    nums.head.dataType match {
+      case ShortType =>
+        val values = nums.map(e => e.eval(EmptyRow).asInstanceOf[Short])

Review comment:
       Thanks for your suggestions. I've fixed the ordering. The null case is 
interesting, what would be the expected outcome? Filter(null)? 
   Edit: I've added non-null check so this optimization will be skipped in case 
there is a null element 

##########
File path: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala
##########
@@ -238,4 +238,34 @@ class OptimizeInSuite extends PlanTest {
 
     comparePlans(optimized, correctAnswer)
   }
+
+  test("OptimizedIn test: optimize range compare") {
+    val originalQuery = testRelation
+      .select('a)
+      .where('a in(4, 1, 2, 3, 3, 5))
+
+    val optimized = Optimize.execute(originalQuery.analyze)
+
+    val expected = testRelation
+      .select('a)
+      .where('a >= 1 && 'a <= 5)
+      .analyze
+
+    comparePlans(optimized, expected)
+  }
+
+  test("OptimizedIn test: do not optimize range compare if non-continuous") {
+    val originalQuery = testRelation
+      .select('a)
+      .where('a in(1, 2, 3, 3, 5))
+
+    val optimized = Optimize.execute(originalQuery.analyze)
+
+    val expected = testRelation
+      .select('a)
+      .where('a in(1, 2, 3, 5))

Review comment:
       agreed. though I think something like a >=1 && a <= 3 and a == 5 would 
be more simple to implement. 

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
##########
@@ -249,6 +280,10 @@ object OptimizeIn extends Rule[LogicalPlan] {
           && !v.isInstanceOf[CreateNamedStruct]
           && !newList.head.isInstanceOf[CreateNamedStruct]) {
           EqualTo(v, newList.head)
+        } else if (isInteger(v) && isContinousIntegers(newList.toSet)) {

Review comment:
       I've added OPTIMIZER_INSET_RANGE_CHECK_THRESHOLD (default to 20) for 
this purpose

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
##########
@@ -231,10 +231,41 @@ object ReorderAssociativeOperator extends 
Rule[LogicalPlan] {
  * 1. Converts the predicate to false when the list is empty and
  *    the value is not nullable.
  * 2. Removes literal repetitions.
- * 3. Replaces [[In (value, seq[Literal])]] with optimized version
+ * 3. Replaces value IN (x,x+1,x+2..x+n) with x <= value AND value <= x + n
+ * 4. Replaces [[In (value, seq[Literal])]] with optimized version
  *    [[InSet (value, HashSet[Literal])]] which is much faster.
  */
 object OptimizeIn extends Rule[LogicalPlan] {
+  private def isContinousIntegers(nums: Set[Expression]): Boolean = {
+    if (nums.nonEmpty && isInteger(nums.head)) {
+      val (min, max) = getBound(nums)
+      val minL = min.eval(EmptyRow).asInstanceOf[Number].longValue()
+      val maxL = max.eval(EmptyRow).asInstanceOf[Number].longValue()
+       minL + (nums.size - 1) == maxL
+    } else {
+      false
+    }
+  }
+
+  private def isInteger(v: Expression): Boolean =
+    v.dataType.isInstanceOf[ShortType] ||

Review comment:
       fixed

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
##########
@@ -231,10 +231,41 @@ object ReorderAssociativeOperator extends 
Rule[LogicalPlan] {
  * 1. Converts the predicate to false when the list is empty and
  *    the value is not nullable.
  * 2. Removes literal repetitions.
- * 3. Replaces [[In (value, seq[Literal])]] with optimized version
+ * 3. Replaces value IN (x,x+1,x+2..x+n) with x <= value AND value <= x + n
+ * 4. Replaces [[In (value, seq[Literal])]] with optimized version
  *    [[InSet (value, HashSet[Literal])]] which is much faster.
  */
 object OptimizeIn extends Rule[LogicalPlan] {
+  private def isContinousIntegers(nums: Set[Expression]): Boolean = {
+    if (nums.nonEmpty && isInteger(nums.head)) {
+      val (min, max) = getBound(nums)
+      val minL = min.eval(EmptyRow).asInstanceOf[Number].longValue()
+      val maxL = max.eval(EmptyRow).asInstanceOf[Number].longValue()
+       minL + (nums.size - 1) == maxL
+    } else {
+      false
+    }
+  }
+
+  private def isInteger(v: Expression): Boolean =
+    v.dataType.isInstanceOf[ShortType] ||
+      v.dataType.isInstanceOf[IntegerType] ||
+      v.dataType.isInstanceOf[LongType]
+
+  def getBound(nums: Set[Expression]): (Expression, Expression) = {
+    nums.head.dataType match {
+      case ShortType =>
+        val values = nums.map(e => e.eval(EmptyRow).asInstanceOf[Short])

Review comment:
       Thanks for your suggestions. I've fixed the ordering. The null case is 
interesting, what would be the expected outcome? Filter(null)? 

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
##########
@@ -231,10 +231,41 @@ object ReorderAssociativeOperator extends 
Rule[LogicalPlan] {
  * 1. Converts the predicate to false when the list is empty and
  *    the value is not nullable.
  * 2. Removes literal repetitions.
- * 3. Replaces [[In (value, seq[Literal])]] with optimized version
+ * 3. Replaces value IN (x,x+1,x+2..x+n) with x <= value AND value <= x + n
+ * 4. Replaces [[In (value, seq[Literal])]] with optimized version
  *    [[InSet (value, HashSet[Literal])]] which is much faster.
  */
 object OptimizeIn extends Rule[LogicalPlan] {
+  private def isContinousIntegers(nums: Set[Expression]): Boolean = {
+    if (nums.nonEmpty && isInteger(nums.head)) {
+      val (min, max) = getBound(nums)
+      val minL = min.eval(EmptyRow).asInstanceOf[Number].longValue()
+      val maxL = max.eval(EmptyRow).asInstanceOf[Number].longValue()
+       minL + (nums.size - 1) == maxL
+    } else {
+      false
+    }
+  }
+
+  private def isInteger(v: Expression): Boolean =
+    v.dataType.isInstanceOf[ShortType] ||
+      v.dataType.isInstanceOf[IntegerType] ||
+      v.dataType.isInstanceOf[LongType]
+
+  def getBound(nums: Set[Expression]): (Expression, Expression) = {

Review comment:
       fixed

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
##########
@@ -231,10 +231,41 @@ object ReorderAssociativeOperator extends 
Rule[LogicalPlan] {
  * 1. Converts the predicate to false when the list is empty and
  *    the value is not nullable.
  * 2. Removes literal repetitions.
- * 3. Replaces [[In (value, seq[Literal])]] with optimized version
+ * 3. Replaces value IN (x,x+1,x+2..x+n) with x <= value AND value <= x + n
+ * 4. Replaces [[In (value, seq[Literal])]] with optimized version
  *    [[InSet (value, HashSet[Literal])]] which is much faster.
  */
 object OptimizeIn extends Rule[LogicalPlan] {
+  private def isContinousIntegers(nums: Set[Expression]): Boolean = {
+    if (nums.nonEmpty && isInteger(nums.head)) {
+      val (min, max) = getBound(nums)
+      val minL = min.eval(EmptyRow).asInstanceOf[Number].longValue()
+      val maxL = max.eval(EmptyRow).asInstanceOf[Number].longValue()
+       minL + (nums.size - 1) == maxL
+    } else {
+      false
+    }
+  }
+
+  private def isInteger(v: Expression): Boolean =
+    v.dataType.isInstanceOf[ShortType] ||
+      v.dataType.isInstanceOf[IntegerType] ||
+      v.dataType.isInstanceOf[LongType]
+
+  def getBound(nums: Set[Expression]): (Expression, Expression) = {
+    nums.head.dataType match {
+      case ShortType =>
+        val values = nums.map(e => e.eval(EmptyRow).asInstanceOf[Short])
+        (Literal(values.min, ShortType), Literal(values.max, ShortType))
+      case IntegerType =>
+        val values = nums.map(e => e.eval(EmptyRow).asInstanceOf[Integer])
+        (Literal(values.min, IntegerType), Literal(values.max, IntegerType))
+      case LongType =>
+        val values = nums.map(e => e.eval(EmptyRow).asInstanceOf[Long])
+        (Literal(values.min, LongType), Literal(values.max, LongType))
+    }

Review comment:
       I have fixed this to use ordering as suggested 

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
##########
@@ -231,10 +231,41 @@ object ReorderAssociativeOperator extends 
Rule[LogicalPlan] {
  * 1. Converts the predicate to false when the list is empty and
  *    the value is not nullable.
  * 2. Removes literal repetitions.
- * 3. Replaces [[In (value, seq[Literal])]] with optimized version
+ * 3. Replaces value IN (x,x+1,x+2..x+n) with x <= value AND value <= x + n
+ * 4. Replaces [[In (value, seq[Literal])]] with optimized version
  *    [[InSet (value, HashSet[Literal])]] which is much faster.
  */
 object OptimizeIn extends Rule[LogicalPlan] {
+  private def isContinousIntegers(nums: Set[Expression]): Boolean = {
+    if (nums.nonEmpty && isInteger(nums.head)) {
+      val (min, max) = getBound(nums)
+      val minL = min.eval(EmptyRow).asInstanceOf[Number].longValue()
+      val maxL = max.eval(EmptyRow).asInstanceOf[Number].longValue()
+       minL + (nums.size - 1) == maxL
+    } else {
+      false
+    }
+  }
+
+  private def isInteger(v: Expression): Boolean =

Review comment:
       updated to use IntegralType which covers byte, short, int, long

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
##########
@@ -231,10 +231,41 @@ object ReorderAssociativeOperator extends 
Rule[LogicalPlan] {
  * 1. Converts the predicate to false when the list is empty and
  *    the value is not nullable.
  * 2. Removes literal repetitions.
- * 3. Replaces [[In (value, seq[Literal])]] with optimized version
+ * 3. Replaces value IN (x,x+1,x+2..x+n) with x <= value AND value <= x + n
+ * 4. Replaces [[In (value, seq[Literal])]] with optimized version
  *    [[InSet (value, HashSet[Literal])]] which is much faster.
  */
 object OptimizeIn extends Rule[LogicalPlan] {
+  private def isContinousIntegers(nums: Set[Expression]): Boolean = {
+    if (nums.nonEmpty && isInteger(nums.head)) {
+      val (min, max) = getBound(nums)
+      val minL = min.eval(EmptyRow).asInstanceOf[Number].longValue()
+      val maxL = max.eval(EmptyRow).asInstanceOf[Number].longValue()
+       minL + (nums.size - 1) == maxL
+    } else {
+      false
+    }
+  }
+
+  private def isInteger(v: Expression): Boolean =
+    v.dataType.isInstanceOf[ShortType] ||
+      v.dataType.isInstanceOf[IntegerType] ||
+      v.dataType.isInstanceOf[LongType]
+
+  def getBound(nums: Set[Expression]): (Expression, Expression) = {
+    nums.head.dataType match {
+      case ShortType =>
+        val values = nums.map(e => e.eval(EmptyRow).asInstanceOf[Short])

Review comment:
       Thanks for your suggestions. I've fixed the ordering. The null case is 
interesting, what would be the expected outcome? Filter(null)? 
   Edit: I've added non-null check so this optimization will be skipped in case 
there is a null element 

##########
File path: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala
##########
@@ -238,4 +238,34 @@ class OptimizeInSuite extends PlanTest {
 
     comparePlans(optimized, correctAnswer)
   }
+
+  test("OptimizedIn test: optimize range compare") {
+    val originalQuery = testRelation
+      .select('a)
+      .where('a in(4, 1, 2, 3, 3, 5))
+
+    val optimized = Optimize.execute(originalQuery.analyze)
+
+    val expected = testRelation
+      .select('a)
+      .where('a >= 1 && 'a <= 5)
+      .analyze
+
+    comparePlans(optimized, expected)
+  }
+
+  test("OptimizedIn test: do not optimize range compare if non-continuous") {
+    val originalQuery = testRelation
+      .select('a)
+      .where('a in(1, 2, 3, 3, 5))
+
+    val optimized = Optimize.execute(originalQuery.analyze)
+
+    val expected = testRelation
+      .select('a)
+      .where('a in(1, 2, 3, 5))

Review comment:
       agreed. though I think something like a >=1 && a <= 3 and a == 5 would 
be more simple to implement. 

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
##########
@@ -249,6 +280,10 @@ object OptimizeIn extends Rule[LogicalPlan] {
           && !v.isInstanceOf[CreateNamedStruct]
           && !newList.head.isInstanceOf[CreateNamedStruct]) {
           EqualTo(v, newList.head)
+        } else if (isInteger(v) && isContinousIntegers(newList.toSet)) {

Review comment:
       I've added OPTIMIZER_INSET_RANGE_CHECK_THRESHOLD (default to 20) for 
this purpose

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
##########
@@ -231,10 +231,41 @@ object ReorderAssociativeOperator extends 
Rule[LogicalPlan] {
  * 1. Converts the predicate to false when the list is empty and
  *    the value is not nullable.
  * 2. Removes literal repetitions.
- * 3. Replaces [[In (value, seq[Literal])]] with optimized version
+ * 3. Replaces value IN (x,x+1,x+2..x+n) with x <= value AND value <= x + n
+ * 4. Replaces [[In (value, seq[Literal])]] with optimized version
  *    [[InSet (value, HashSet[Literal])]] which is much faster.
  */
 object OptimizeIn extends Rule[LogicalPlan] {
+  private def isContinousIntegers(nums: Set[Expression]): Boolean = {
+    if (nums.nonEmpty && isInteger(nums.head)) {
+      val (min, max) = getBound(nums)
+      val minL = min.eval(EmptyRow).asInstanceOf[Number].longValue()
+      val maxL = max.eval(EmptyRow).asInstanceOf[Number].longValue()
+       minL + (nums.size - 1) == maxL
+    } else {
+      false
+    }
+  }
+
+  private def isInteger(v: Expression): Boolean =
+    v.dataType.isInstanceOf[ShortType] ||

Review comment:
       fixed

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
##########
@@ -231,10 +231,41 @@ object ReorderAssociativeOperator extends 
Rule[LogicalPlan] {
  * 1. Converts the predicate to false when the list is empty and
  *    the value is not nullable.
  * 2. Removes literal repetitions.
- * 3. Replaces [[In (value, seq[Literal])]] with optimized version
+ * 3. Replaces value IN (x,x+1,x+2..x+n) with x <= value AND value <= x + n
+ * 4. Replaces [[In (value, seq[Literal])]] with optimized version
  *    [[InSet (value, HashSet[Literal])]] which is much faster.
  */
 object OptimizeIn extends Rule[LogicalPlan] {
+  private def isContinousIntegers(nums: Set[Expression]): Boolean = {
+    if (nums.nonEmpty && isInteger(nums.head)) {
+      val (min, max) = getBound(nums)
+      val minL = min.eval(EmptyRow).asInstanceOf[Number].longValue()
+      val maxL = max.eval(EmptyRow).asInstanceOf[Number].longValue()
+       minL + (nums.size - 1) == maxL
+    } else {
+      false
+    }
+  }
+
+  private def isInteger(v: Expression): Boolean =
+    v.dataType.isInstanceOf[ShortType] ||
+      v.dataType.isInstanceOf[IntegerType] ||
+      v.dataType.isInstanceOf[LongType]
+
+  def getBound(nums: Set[Expression]): (Expression, Expression) = {
+    nums.head.dataType match {
+      case ShortType =>
+        val values = nums.map(e => e.eval(EmptyRow).asInstanceOf[Short])

Review comment:
       Thanks for your suggestions. I've fixed the ordering. The null case is 
interesting, what would be the expected outcome? Filter(null)? 

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
##########
@@ -231,10 +231,41 @@ object ReorderAssociativeOperator extends 
Rule[LogicalPlan] {
  * 1. Converts the predicate to false when the list is empty and
  *    the value is not nullable.
  * 2. Removes literal repetitions.
- * 3. Replaces [[In (value, seq[Literal])]] with optimized version
+ * 3. Replaces value IN (x,x+1,x+2..x+n) with x <= value AND value <= x + n
+ * 4. Replaces [[In (value, seq[Literal])]] with optimized version
  *    [[InSet (value, HashSet[Literal])]] which is much faster.
  */
 object OptimizeIn extends Rule[LogicalPlan] {
+  private def isContinousIntegers(nums: Set[Expression]): Boolean = {
+    if (nums.nonEmpty && isInteger(nums.head)) {
+      val (min, max) = getBound(nums)
+      val minL = min.eval(EmptyRow).asInstanceOf[Number].longValue()
+      val maxL = max.eval(EmptyRow).asInstanceOf[Number].longValue()
+       minL + (nums.size - 1) == maxL
+    } else {
+      false
+    }
+  }
+
+  private def isInteger(v: Expression): Boolean =
+    v.dataType.isInstanceOf[ShortType] ||
+      v.dataType.isInstanceOf[IntegerType] ||
+      v.dataType.isInstanceOf[LongType]
+
+  def getBound(nums: Set[Expression]): (Expression, Expression) = {

Review comment:
       fixed

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
##########
@@ -231,10 +231,41 @@ object ReorderAssociativeOperator extends 
Rule[LogicalPlan] {
  * 1. Converts the predicate to false when the list is empty and
  *    the value is not nullable.
  * 2. Removes literal repetitions.
- * 3. Replaces [[In (value, seq[Literal])]] with optimized version
+ * 3. Replaces value IN (x,x+1,x+2..x+n) with x <= value AND value <= x + n
+ * 4. Replaces [[In (value, seq[Literal])]] with optimized version
  *    [[InSet (value, HashSet[Literal])]] which is much faster.
  */
 object OptimizeIn extends Rule[LogicalPlan] {
+  private def isContinousIntegers(nums: Set[Expression]): Boolean = {
+    if (nums.nonEmpty && isInteger(nums.head)) {
+      val (min, max) = getBound(nums)
+      val minL = min.eval(EmptyRow).asInstanceOf[Number].longValue()
+      val maxL = max.eval(EmptyRow).asInstanceOf[Number].longValue()
+       minL + (nums.size - 1) == maxL
+    } else {
+      false
+    }
+  }
+
+  private def isInteger(v: Expression): Boolean =
+    v.dataType.isInstanceOf[ShortType] ||
+      v.dataType.isInstanceOf[IntegerType] ||
+      v.dataType.isInstanceOf[LongType]
+
+  def getBound(nums: Set[Expression]): (Expression, Expression) = {
+    nums.head.dataType match {
+      case ShortType =>
+        val values = nums.map(e => e.eval(EmptyRow).asInstanceOf[Short])
+        (Literal(values.min, ShortType), Literal(values.max, ShortType))
+      case IntegerType =>
+        val values = nums.map(e => e.eval(EmptyRow).asInstanceOf[Integer])
+        (Literal(values.min, IntegerType), Literal(values.max, IntegerType))
+      case LongType =>
+        val values = nums.map(e => e.eval(EmptyRow).asInstanceOf[Long])
+        (Literal(values.min, LongType), Literal(values.max, LongType))
+    }

Review comment:
       I have fixed this to use ordering as suggested 

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
##########
@@ -231,10 +231,41 @@ object ReorderAssociativeOperator extends 
Rule[LogicalPlan] {
  * 1. Converts the predicate to false when the list is empty and
  *    the value is not nullable.
  * 2. Removes literal repetitions.
- * 3. Replaces [[In (value, seq[Literal])]] with optimized version
+ * 3. Replaces value IN (x,x+1,x+2..x+n) with x <= value AND value <= x + n
+ * 4. Replaces [[In (value, seq[Literal])]] with optimized version
  *    [[InSet (value, HashSet[Literal])]] which is much faster.
  */
 object OptimizeIn extends Rule[LogicalPlan] {
+  private def isContinousIntegers(nums: Set[Expression]): Boolean = {
+    if (nums.nonEmpty && isInteger(nums.head)) {
+      val (min, max) = getBound(nums)
+      val minL = min.eval(EmptyRow).asInstanceOf[Number].longValue()
+      val maxL = max.eval(EmptyRow).asInstanceOf[Number].longValue()
+       minL + (nums.size - 1) == maxL
+    } else {
+      false
+    }
+  }
+
+  private def isInteger(v: Expression): Boolean =

Review comment:
       updated to use IntegralType which covers byte, short, int, long

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
##########
@@ -231,10 +231,41 @@ object ReorderAssociativeOperator extends 
Rule[LogicalPlan] {
  * 1. Converts the predicate to false when the list is empty and
  *    the value is not nullable.
  * 2. Removes literal repetitions.
- * 3. Replaces [[In (value, seq[Literal])]] with optimized version
+ * 3. Replaces value IN (x,x+1,x+2..x+n) with x <= value AND value <= x + n
+ * 4. Replaces [[In (value, seq[Literal])]] with optimized version
  *    [[InSet (value, HashSet[Literal])]] which is much faster.
  */
 object OptimizeIn extends Rule[LogicalPlan] {
+  private def isContinousIntegers(nums: Set[Expression]): Boolean = {
+    if (nums.nonEmpty && isInteger(nums.head)) {
+      val (min, max) = getBound(nums)
+      val minL = min.eval(EmptyRow).asInstanceOf[Number].longValue()
+      val maxL = max.eval(EmptyRow).asInstanceOf[Number].longValue()
+       minL + (nums.size - 1) == maxL
+    } else {
+      false
+    }
+  }
+
+  private def isInteger(v: Expression): Boolean =
+    v.dataType.isInstanceOf[ShortType] ||
+      v.dataType.isInstanceOf[IntegerType] ||
+      v.dataType.isInstanceOf[LongType]
+
+  def getBound(nums: Set[Expression]): (Expression, Expression) = {
+    nums.head.dataType match {
+      case ShortType =>
+        val values = nums.map(e => e.eval(EmptyRow).asInstanceOf[Short])

Review comment:
       Thanks for your suggestions. I've fixed the ordering. The null case is 
interesting, what would be the expected outcome? Filter(null)? 
   Edit: I've added non-null check so this optimization will be skipped in case 
there is a null element 

##########
File path: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala
##########
@@ -238,4 +238,34 @@ class OptimizeInSuite extends PlanTest {
 
     comparePlans(optimized, correctAnswer)
   }
+
+  test("OptimizedIn test: optimize range compare") {
+    val originalQuery = testRelation
+      .select('a)
+      .where('a in(4, 1, 2, 3, 3, 5))
+
+    val optimized = Optimize.execute(originalQuery.analyze)
+
+    val expected = testRelation
+      .select('a)
+      .where('a >= 1 && 'a <= 5)
+      .analyze
+
+    comparePlans(optimized, expected)
+  }
+
+  test("OptimizedIn test: do not optimize range compare if non-continuous") {
+    val originalQuery = testRelation
+      .select('a)
+      .where('a in(1, 2, 3, 3, 5))
+
+    val optimized = Optimize.execute(originalQuery.analyze)
+
+    val expected = testRelation
+      .select('a)
+      .where('a in(1, 2, 3, 5))

Review comment:
       agreed. though I think something like a >=1 && a <= 3 and a == 5 would 
be more simple to implement. 

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
##########
@@ -249,6 +280,10 @@ object OptimizeIn extends Rule[LogicalPlan] {
           && !v.isInstanceOf[CreateNamedStruct]
           && !newList.head.isInstanceOf[CreateNamedStruct]) {
           EqualTo(v, newList.head)
+        } else if (isInteger(v) && isContinousIntegers(newList.toSet)) {

Review comment:
       I've added OPTIMIZER_INSET_RANGE_CHECK_THRESHOLD (default to 20) for 
this purpose

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
##########
@@ -231,10 +231,41 @@ object ReorderAssociativeOperator extends 
Rule[LogicalPlan] {
  * 1. Converts the predicate to false when the list is empty and
  *    the value is not nullable.
  * 2. Removes literal repetitions.
- * 3. Replaces [[In (value, seq[Literal])]] with optimized version
+ * 3. Replaces value IN (x,x+1,x+2..x+n) with x <= value AND value <= x + n
+ * 4. Replaces [[In (value, seq[Literal])]] with optimized version
  *    [[InSet (value, HashSet[Literal])]] which is much faster.
  */
 object OptimizeIn extends Rule[LogicalPlan] {
+  private def isContinousIntegers(nums: Set[Expression]): Boolean = {
+    if (nums.nonEmpty && isInteger(nums.head)) {
+      val (min, max) = getBound(nums)
+      val minL = min.eval(EmptyRow).asInstanceOf[Number].longValue()
+      val maxL = max.eval(EmptyRow).asInstanceOf[Number].longValue()
+       minL + (nums.size - 1) == maxL
+    } else {
+      false
+    }
+  }
+
+  private def isInteger(v: Expression): Boolean =
+    v.dataType.isInstanceOf[ShortType] ||

Review comment:
       fixed

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
##########
@@ -231,10 +231,41 @@ object ReorderAssociativeOperator extends 
Rule[LogicalPlan] {
  * 1. Converts the predicate to false when the list is empty and
  *    the value is not nullable.
  * 2. Removes literal repetitions.
- * 3. Replaces [[In (value, seq[Literal])]] with optimized version
+ * 3. Replaces value IN (x,x+1,x+2..x+n) with x <= value AND value <= x + n
+ * 4. Replaces [[In (value, seq[Literal])]] with optimized version
  *    [[InSet (value, HashSet[Literal])]] which is much faster.
  */
 object OptimizeIn extends Rule[LogicalPlan] {
+  private def isContinousIntegers(nums: Set[Expression]): Boolean = {
+    if (nums.nonEmpty && isInteger(nums.head)) {
+      val (min, max) = getBound(nums)
+      val minL = min.eval(EmptyRow).asInstanceOf[Number].longValue()
+      val maxL = max.eval(EmptyRow).asInstanceOf[Number].longValue()
+       minL + (nums.size - 1) == maxL
+    } else {
+      false
+    }
+  }
+
+  private def isInteger(v: Expression): Boolean =
+    v.dataType.isInstanceOf[ShortType] ||
+      v.dataType.isInstanceOf[IntegerType] ||
+      v.dataType.isInstanceOf[LongType]
+
+  def getBound(nums: Set[Expression]): (Expression, Expression) = {
+    nums.head.dataType match {
+      case ShortType =>
+        val values = nums.map(e => e.eval(EmptyRow).asInstanceOf[Short])

Review comment:
       Thanks for your suggestions. I've fixed the ordering. The null case is 
interesting, what would be the expected outcome? Filter(null)? 

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
##########
@@ -231,10 +231,41 @@ object ReorderAssociativeOperator extends 
Rule[LogicalPlan] {
  * 1. Converts the predicate to false when the list is empty and
  *    the value is not nullable.
  * 2. Removes literal repetitions.
- * 3. Replaces [[In (value, seq[Literal])]] with optimized version
+ * 3. Replaces value IN (x,x+1,x+2..x+n) with x <= value AND value <= x + n
+ * 4. Replaces [[In (value, seq[Literal])]] with optimized version
  *    [[InSet (value, HashSet[Literal])]] which is much faster.
  */
 object OptimizeIn extends Rule[LogicalPlan] {
+  private def isContinousIntegers(nums: Set[Expression]): Boolean = {
+    if (nums.nonEmpty && isInteger(nums.head)) {
+      val (min, max) = getBound(nums)
+      val minL = min.eval(EmptyRow).asInstanceOf[Number].longValue()
+      val maxL = max.eval(EmptyRow).asInstanceOf[Number].longValue()
+       minL + (nums.size - 1) == maxL
+    } else {
+      false
+    }
+  }
+
+  private def isInteger(v: Expression): Boolean =
+    v.dataType.isInstanceOf[ShortType] ||
+      v.dataType.isInstanceOf[IntegerType] ||
+      v.dataType.isInstanceOf[LongType]
+
+  def getBound(nums: Set[Expression]): (Expression, Expression) = {

Review comment:
       fixed

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
##########
@@ -231,10 +231,41 @@ object ReorderAssociativeOperator extends 
Rule[LogicalPlan] {
  * 1. Converts the predicate to false when the list is empty and
  *    the value is not nullable.
  * 2. Removes literal repetitions.
- * 3. Replaces [[In (value, seq[Literal])]] with optimized version
+ * 3. Replaces value IN (x,x+1,x+2..x+n) with x <= value AND value <= x + n
+ * 4. Replaces [[In (value, seq[Literal])]] with optimized version
  *    [[InSet (value, HashSet[Literal])]] which is much faster.
  */
 object OptimizeIn extends Rule[LogicalPlan] {
+  private def isContinousIntegers(nums: Set[Expression]): Boolean = {
+    if (nums.nonEmpty && isInteger(nums.head)) {
+      val (min, max) = getBound(nums)
+      val minL = min.eval(EmptyRow).asInstanceOf[Number].longValue()
+      val maxL = max.eval(EmptyRow).asInstanceOf[Number].longValue()
+       minL + (nums.size - 1) == maxL
+    } else {
+      false
+    }
+  }
+
+  private def isInteger(v: Expression): Boolean =
+    v.dataType.isInstanceOf[ShortType] ||
+      v.dataType.isInstanceOf[IntegerType] ||
+      v.dataType.isInstanceOf[LongType]
+
+  def getBound(nums: Set[Expression]): (Expression, Expression) = {
+    nums.head.dataType match {
+      case ShortType =>
+        val values = nums.map(e => e.eval(EmptyRow).asInstanceOf[Short])
+        (Literal(values.min, ShortType), Literal(values.max, ShortType))
+      case IntegerType =>
+        val values = nums.map(e => e.eval(EmptyRow).asInstanceOf[Integer])
+        (Literal(values.min, IntegerType), Literal(values.max, IntegerType))
+      case LongType =>
+        val values = nums.map(e => e.eval(EmptyRow).asInstanceOf[Long])
+        (Literal(values.min, LongType), Literal(values.max, LongType))
+    }

Review comment:
       I have fixed this to use ordering as suggested 

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
##########
@@ -231,10 +231,41 @@ object ReorderAssociativeOperator extends 
Rule[LogicalPlan] {
  * 1. Converts the predicate to false when the list is empty and
  *    the value is not nullable.
  * 2. Removes literal repetitions.
- * 3. Replaces [[In (value, seq[Literal])]] with optimized version
+ * 3. Replaces value IN (x,x+1,x+2..x+n) with x <= value AND value <= x + n
+ * 4. Replaces [[In (value, seq[Literal])]] with optimized version
  *    [[InSet (value, HashSet[Literal])]] which is much faster.
  */
 object OptimizeIn extends Rule[LogicalPlan] {
+  private def isContinousIntegers(nums: Set[Expression]): Boolean = {
+    if (nums.nonEmpty && isInteger(nums.head)) {
+      val (min, max) = getBound(nums)
+      val minL = min.eval(EmptyRow).asInstanceOf[Number].longValue()
+      val maxL = max.eval(EmptyRow).asInstanceOf[Number].longValue()
+       minL + (nums.size - 1) == maxL
+    } else {
+      false
+    }
+  }
+
+  private def isInteger(v: Expression): Boolean =

Review comment:
       updated to use IntegralType which covers byte, short, int, long

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
##########
@@ -231,10 +231,41 @@ object ReorderAssociativeOperator extends 
Rule[LogicalPlan] {
  * 1. Converts the predicate to false when the list is empty and
  *    the value is not nullable.
  * 2. Removes literal repetitions.
- * 3. Replaces [[In (value, seq[Literal])]] with optimized version
+ * 3. Replaces value IN (x,x+1,x+2..x+n) with x <= value AND value <= x + n
+ * 4. Replaces [[In (value, seq[Literal])]] with optimized version
  *    [[InSet (value, HashSet[Literal])]] which is much faster.
  */
 object OptimizeIn extends Rule[LogicalPlan] {
+  private def isContinousIntegers(nums: Set[Expression]): Boolean = {
+    if (nums.nonEmpty && isInteger(nums.head)) {
+      val (min, max) = getBound(nums)
+      val minL = min.eval(EmptyRow).asInstanceOf[Number].longValue()
+      val maxL = max.eval(EmptyRow).asInstanceOf[Number].longValue()
+       minL + (nums.size - 1) == maxL
+    } else {
+      false
+    }
+  }
+
+  private def isInteger(v: Expression): Boolean =
+    v.dataType.isInstanceOf[ShortType] ||
+      v.dataType.isInstanceOf[IntegerType] ||
+      v.dataType.isInstanceOf[LongType]
+
+  def getBound(nums: Set[Expression]): (Expression, Expression) = {
+    nums.head.dataType match {
+      case ShortType =>
+        val values = nums.map(e => e.eval(EmptyRow).asInstanceOf[Short])

Review comment:
       Thanks for your suggestions. I've fixed the ordering. The null case is 
interesting, what would be the expected outcome? Filter(null)? 
   Edit: I've added non-null check so this optimization will be skipped in case 
there is a null element 

##########
File path: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala
##########
@@ -238,4 +238,34 @@ class OptimizeInSuite extends PlanTest {
 
     comparePlans(optimized, correctAnswer)
   }
+
+  test("OptimizedIn test: optimize range compare") {
+    val originalQuery = testRelation
+      .select('a)
+      .where('a in(4, 1, 2, 3, 3, 5))
+
+    val optimized = Optimize.execute(originalQuery.analyze)
+
+    val expected = testRelation
+      .select('a)
+      .where('a >= 1 && 'a <= 5)
+      .analyze
+
+    comparePlans(optimized, expected)
+  }
+
+  test("OptimizedIn test: do not optimize range compare if non-continuous") {
+    val originalQuery = testRelation
+      .select('a)
+      .where('a in(1, 2, 3, 3, 5))
+
+    val optimized = Optimize.execute(originalQuery.analyze)
+
+    val expected = testRelation
+      .select('a)
+      .where('a in(1, 2, 3, 5))

Review comment:
       agreed. though I think something like a >=1 && a <= 3 and a == 5 would 
be more simple to implement. 

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
##########
@@ -249,6 +280,10 @@ object OptimizeIn extends Rule[LogicalPlan] {
           && !v.isInstanceOf[CreateNamedStruct]
           && !newList.head.isInstanceOf[CreateNamedStruct]) {
           EqualTo(v, newList.head)
+        } else if (isInteger(v) && isContinousIntegers(newList.toSet)) {

Review comment:
       I've added OPTIMIZER_INSET_RANGE_CHECK_THRESHOLD (default to 20) for 
this purpose

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
##########
@@ -231,10 +231,41 @@ object ReorderAssociativeOperator extends 
Rule[LogicalPlan] {
  * 1. Converts the predicate to false when the list is empty and
  *    the value is not nullable.
  * 2. Removes literal repetitions.
- * 3. Replaces [[In (value, seq[Literal])]] with optimized version
+ * 3. Replaces value IN (x,x+1,x+2..x+n) with x <= value AND value <= x + n
+ * 4. Replaces [[In (value, seq[Literal])]] with optimized version
  *    [[InSet (value, HashSet[Literal])]] which is much faster.
  */
 object OptimizeIn extends Rule[LogicalPlan] {
+  private def isContinousIntegers(nums: Set[Expression]): Boolean = {
+    if (nums.nonEmpty && isInteger(nums.head)) {
+      val (min, max) = getBound(nums)
+      val minL = min.eval(EmptyRow).asInstanceOf[Number].longValue()
+      val maxL = max.eval(EmptyRow).asInstanceOf[Number].longValue()
+       minL + (nums.size - 1) == maxL
+    } else {
+      false
+    }
+  }
+
+  private def isInteger(v: Expression): Boolean =
+    v.dataType.isInstanceOf[ShortType] ||

Review comment:
       fixed

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
##########
@@ -231,10 +231,41 @@ object ReorderAssociativeOperator extends 
Rule[LogicalPlan] {
  * 1. Converts the predicate to false when the list is empty and
  *    the value is not nullable.
  * 2. Removes literal repetitions.
- * 3. Replaces [[In (value, seq[Literal])]] with optimized version
+ * 3. Replaces value IN (x,x+1,x+2..x+n) with x <= value AND value <= x + n
+ * 4. Replaces [[In (value, seq[Literal])]] with optimized version
  *    [[InSet (value, HashSet[Literal])]] which is much faster.
  */
 object OptimizeIn extends Rule[LogicalPlan] {
+  private def isContinousIntegers(nums: Set[Expression]): Boolean = {
+    if (nums.nonEmpty && isInteger(nums.head)) {
+      val (min, max) = getBound(nums)
+      val minL = min.eval(EmptyRow).asInstanceOf[Number].longValue()
+      val maxL = max.eval(EmptyRow).asInstanceOf[Number].longValue()
+       minL + (nums.size - 1) == maxL
+    } else {
+      false
+    }
+  }
+
+  private def isInteger(v: Expression): Boolean =
+    v.dataType.isInstanceOf[ShortType] ||
+      v.dataType.isInstanceOf[IntegerType] ||
+      v.dataType.isInstanceOf[LongType]
+
+  def getBound(nums: Set[Expression]): (Expression, Expression) = {
+    nums.head.dataType match {
+      case ShortType =>
+        val values = nums.map(e => e.eval(EmptyRow).asInstanceOf[Short])

Review comment:
       Thanks for your suggestions. I've fixed the ordering. The null case is 
interesting, what would be the expected outcome? Filter(null)? 

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
##########
@@ -231,10 +231,41 @@ object ReorderAssociativeOperator extends 
Rule[LogicalPlan] {
  * 1. Converts the predicate to false when the list is empty and
  *    the value is not nullable.
  * 2. Removes literal repetitions.
- * 3. Replaces [[In (value, seq[Literal])]] with optimized version
+ * 3. Replaces value IN (x,x+1,x+2..x+n) with x <= value AND value <= x + n
+ * 4. Replaces [[In (value, seq[Literal])]] with optimized version
  *    [[InSet (value, HashSet[Literal])]] which is much faster.
  */
 object OptimizeIn extends Rule[LogicalPlan] {
+  private def isContinousIntegers(nums: Set[Expression]): Boolean = {
+    if (nums.nonEmpty && isInteger(nums.head)) {
+      val (min, max) = getBound(nums)
+      val minL = min.eval(EmptyRow).asInstanceOf[Number].longValue()
+      val maxL = max.eval(EmptyRow).asInstanceOf[Number].longValue()
+       minL + (nums.size - 1) == maxL
+    } else {
+      false
+    }
+  }
+
+  private def isInteger(v: Expression): Boolean =
+    v.dataType.isInstanceOf[ShortType] ||
+      v.dataType.isInstanceOf[IntegerType] ||
+      v.dataType.isInstanceOf[LongType]
+
+  def getBound(nums: Set[Expression]): (Expression, Expression) = {

Review comment:
       fixed

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
##########
@@ -231,10 +231,41 @@ object ReorderAssociativeOperator extends 
Rule[LogicalPlan] {
  * 1. Converts the predicate to false when the list is empty and
  *    the value is not nullable.
  * 2. Removes literal repetitions.
- * 3. Replaces [[In (value, seq[Literal])]] with optimized version
+ * 3. Replaces value IN (x,x+1,x+2..x+n) with x <= value AND value <= x + n
+ * 4. Replaces [[In (value, seq[Literal])]] with optimized version
  *    [[InSet (value, HashSet[Literal])]] which is much faster.
  */
 object OptimizeIn extends Rule[LogicalPlan] {
+  private def isContinousIntegers(nums: Set[Expression]): Boolean = {
+    if (nums.nonEmpty && isInteger(nums.head)) {
+      val (min, max) = getBound(nums)
+      val minL = min.eval(EmptyRow).asInstanceOf[Number].longValue()
+      val maxL = max.eval(EmptyRow).asInstanceOf[Number].longValue()
+       minL + (nums.size - 1) == maxL
+    } else {
+      false
+    }
+  }
+
+  private def isInteger(v: Expression): Boolean =
+    v.dataType.isInstanceOf[ShortType] ||
+      v.dataType.isInstanceOf[IntegerType] ||
+      v.dataType.isInstanceOf[LongType]
+
+  def getBound(nums: Set[Expression]): (Expression, Expression) = {
+    nums.head.dataType match {
+      case ShortType =>
+        val values = nums.map(e => e.eval(EmptyRow).asInstanceOf[Short])
+        (Literal(values.min, ShortType), Literal(values.max, ShortType))
+      case IntegerType =>
+        val values = nums.map(e => e.eval(EmptyRow).asInstanceOf[Integer])
+        (Literal(values.min, IntegerType), Literal(values.max, IntegerType))
+      case LongType =>
+        val values = nums.map(e => e.eval(EmptyRow).asInstanceOf[Long])
+        (Literal(values.min, LongType), Literal(values.max, LongType))
+    }

Review comment:
       I have fixed this to use ordering as suggested 

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
##########
@@ -231,10 +231,41 @@ object ReorderAssociativeOperator extends 
Rule[LogicalPlan] {
  * 1. Converts the predicate to false when the list is empty and
  *    the value is not nullable.
  * 2. Removes literal repetitions.
- * 3. Replaces [[In (value, seq[Literal])]] with optimized version
+ * 3. Replaces value IN (x,x+1,x+2..x+n) with x <= value AND value <= x + n
+ * 4. Replaces [[In (value, seq[Literal])]] with optimized version
  *    [[InSet (value, HashSet[Literal])]] which is much faster.
  */
 object OptimizeIn extends Rule[LogicalPlan] {
+  private def isContinousIntegers(nums: Set[Expression]): Boolean = {
+    if (nums.nonEmpty && isInteger(nums.head)) {
+      val (min, max) = getBound(nums)
+      val minL = min.eval(EmptyRow).asInstanceOf[Number].longValue()
+      val maxL = max.eval(EmptyRow).asInstanceOf[Number].longValue()
+       minL + (nums.size - 1) == maxL
+    } else {
+      false
+    }
+  }
+
+  private def isInteger(v: Expression): Boolean =

Review comment:
       updated to use IntegralType which covers byte, short, int, long

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
##########
@@ -231,10 +231,41 @@ object ReorderAssociativeOperator extends 
Rule[LogicalPlan] {
  * 1. Converts the predicate to false when the list is empty and
  *    the value is not nullable.
  * 2. Removes literal repetitions.
- * 3. Replaces [[In (value, seq[Literal])]] with optimized version
+ * 3. Replaces value IN (x,x+1,x+2..x+n) with x <= value AND value <= x + n
+ * 4. Replaces [[In (value, seq[Literal])]] with optimized version
  *    [[InSet (value, HashSet[Literal])]] which is much faster.
  */
 object OptimizeIn extends Rule[LogicalPlan] {
+  private def isContinousIntegers(nums: Set[Expression]): Boolean = {
+    if (nums.nonEmpty && isInteger(nums.head)) {
+      val (min, max) = getBound(nums)
+      val minL = min.eval(EmptyRow).asInstanceOf[Number].longValue()
+      val maxL = max.eval(EmptyRow).asInstanceOf[Number].longValue()
+       minL + (nums.size - 1) == maxL
+    } else {
+      false
+    }
+  }
+
+  private def isInteger(v: Expression): Boolean =
+    v.dataType.isInstanceOf[ShortType] ||
+      v.dataType.isInstanceOf[IntegerType] ||
+      v.dataType.isInstanceOf[LongType]
+
+  def getBound(nums: Set[Expression]): (Expression, Expression) = {
+    nums.head.dataType match {
+      case ShortType =>
+        val values = nums.map(e => e.eval(EmptyRow).asInstanceOf[Short])

Review comment:
       Thanks for your suggestions. I've fixed the ordering. The null case is 
interesting, what would be the expected outcome? Filter(null)? 
   Edit: I've added non-null check so this optimization will be skipped in case 
there is a null element 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to