[jira] [Updated] (CARBONDATA-3902) Query on partition table gives incorrect results after Delete records using CDC

2020-07-15 Thread Indhumathi Muthumurugesh (Jira)


 [ 
https://issues.apache.org/jira/browse/CARBONDATA-3902?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Indhumathi Muthumurugesh updated CARBONDATA-3902:
-
Description: 
Steps to Reproduce Issue :
{code:java}
import scala.collection.JavaConverters.
import java.sql.Date
import org.apache.spark.sql._
import org.apache.spark.sql.CarbonSession._
import org.apache.spark.sql.catalyst.TableIdentifier
import 
org.apache.spark.sql.execution.command.mutation.merge.{CarbonMergeDataSetCommand,
 DeleteAction, InsertAction, InsertInHistoryTableAction, MergeDataSetMatches, 
MergeMatch, UpdateAction, WhenMatched, WhenNotMatched, 
WhenNotMatchedAndExistsOnlyOnTarget}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.util.QueryTest
import org.apache.spark.sql.types.{BooleanType, DateType, IntegerType, 
StringType, StructField, StructType}
import spark.implicits.


sql("drop table if exists target").show()

val initframe = spark.createDataFrame(Seq(
Row("a", "0"),
Row("b", "1"),
Row("c", "2"),
Row("d", "3")
).asJava, StructType(Seq(StructField("key", StringType), StructField("value", 
StringType

initframe.write
.format("carbondata")
.option("tableName", "target")
.option("partitionColumns", "value")
.mode(SaveMode.Overwrite)
.save()
val target = spark.read.format("carbondata").option("tableName", 
"target").load()

var ccd =
spark.createDataFrame(Seq(
Row("a", "10", false, 0),
Row("a", null, true, 1),
Row("b", null, true, 2),
Row("c", null, true, 3),
Row("c", "20", false, 4),
Row("c", "200", false, 5),
Row("e", "100", false, 6)
).asJava,
StructType(Seq(StructField("key", StringType),
StructField("newValue", StringType),
StructField("deleted", BooleanType), StructField("time", IntegerType

ccd.createOrReplaceTempView("changes")

ccd = sql("SELECT key, latest.newValue as newValue, latest.deleted as deleted 
FROM ( SELECT key, max(struct(time, newValue, deleted)) as latest FROM changes 
GROUP BY key)")

val updateMap = Map("key" -> "B.key", "value" -> 
"B.newValue").asInstanceOf[Map[Any, Any]]
val insertMap = Map("key" -> "B.key", "value" -> 
"B.newValue").asInstanceOf[Map[Any, Any]]

target.as("A").merge(ccd.as("B"), "A.key=B.key").
whenMatched("B.deleted=true").
delete().execute(){code}
 

 After this delete operation, partition 0, 1 and 2 should have deleted from it.

Actual:

{color:#067d17}select * from target order by key;{color}

{color:#067d17}+---+-+
|key|value|
+---+-+
|a |0 |
|b |1 |
|c |2 |
|d |3 |
+---+-+{color}

{color:#067d17}Expected:{color}

{color:#067d17}+---+-+
|key|value|
+---+-+
|d |3 |
+---+-+{color}

  was:
Steps to Reproduce Issue :
{code:java}
import scala.collection.JavaConverters.
import java.sql.Date
import org.apache.spark.sql._
import org.apache.spark.sql.CarbonSession._
import org.apache.spark.sql.catalyst.TableIdentifier
import 
org.apache.spark.sql.execution.command.mutation.merge.{CarbonMergeDataSetCommand,
 DeleteAction, InsertAction, InsertInHistoryTableAction, MergeDataSetMatches, 
MergeMatch, UpdateAction, WhenMatched, WhenNotMatched, 
WhenNotMatchedAndExistsOnlyOnTarget}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.util.QueryTest
import org.apache.spark.sql.types.{BooleanType, DateType, IntegerType, 
StringType, StructField, StructType}
import spark.implicits.


sql("drop table if exists target").show()

val initframe = spark.createDataFrame(Seq(
Row("a", "0"),
Row("b", "1"),
Row("c", "2"),
Row("d", "3")
).asJava, StructType(Seq(StructField("key", StringType), StructField("value", 
StringType

initframe.write
.format("carbondata")
.option("tableName", "target")
.option("partitionColumns", "value")
.mode(SaveMode.Overwrite)
.save()
val target = spark.read.format("carbondata").option("tableName", 
"target").load()

var ccd =
spark.createDataFrame(Seq(
Row("a", "10", false, 0),
Row("a", null, true, 1),
Row("b", null, true, 2),
Row("c", null, true, 3),
Row("c", "20", false, 4),
Row("c", "200", false, 5),
Row("e", "100", false, 6)
).asJava,
StructType(Seq(StructField("key", StringType),
StructField("newValue", StringType),
StructField("deleted", BooleanType), StructField("time", IntegerType

ccd.createOrReplaceTempView("changes")

ccd = sql("SELECT key, latest.newValue as newValue, latest.deleted as deleted 
FROM ( SELECT key, max(struct(time, newValue, deleted)) as latest FROM changes 
GROUP BY key)")

val updateMap = Map("key" -> "B.key", "value" -> 
"B.newValue").asInstanceOf[Map[Any, Any]]
val insertMap = Map("key" -> "B.key", "value" -> 
"B.newValue").asInstanceOf[Map[Any, Any]]

target.as("A").merge(ccd.as("B"), "A.key=B.key").
whenMatched("B.deleted=true").
delete().execute(){code}
 

 

abc


> Query on partition table gives incorrect results after Delete records using 
> CDC
> ---
>
> Key: CARBONDATA-3902
> 

[jira] [Updated] (CARBONDATA-3902) Query on partition table gives incorrect results after Delete records using CDC

2020-07-15 Thread Indhumathi Muthumurugesh (Jira)


 [ 
https://issues.apache.org/jira/browse/CARBONDATA-3902?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Indhumathi Muthumurugesh updated CARBONDATA-3902:
-
Description: 
Steps to Reproduce Issue :
{code:java}
import scala.collection.JavaConverters.
import java.sql.Date
import org.apache.spark.sql._
import org.apache.spark.sql.CarbonSession._
import org.apache.spark.sql.catalyst.TableIdentifier
import 
org.apache.spark.sql.execution.command.mutation.merge.{CarbonMergeDataSetCommand,
 DeleteAction, InsertAction, InsertInHistoryTableAction, MergeDataSetMatches, 
MergeMatch, UpdateAction, WhenMatched, WhenNotMatched, 
WhenNotMatchedAndExistsOnlyOnTarget}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.util.QueryTest
import org.apache.spark.sql.types.{BooleanType, DateType, IntegerType, 
StringType, StructField, StructType}
import spark.implicits.


sql("drop table if exists target").show()

val initframe = spark.createDataFrame(Seq(
Row("a", "0"),
Row("b", "1"),
Row("c", "2"),
Row("d", "3")
).asJava, StructType(Seq(StructField("key", StringType), StructField("value", 
StringType

initframe.write
.format("carbondata")
.option("tableName", "target")
.option("partitionColumns", "value")
.mode(SaveMode.Overwrite)
.save()
val target = spark.read.format("carbondata").option("tableName", 
"target").load()

var ccd =
spark.createDataFrame(Seq(
Row("a", "10", false, 0),
Row("a", null, true, 1),
Row("b", null, true, 2),
Row("c", null, true, 3),
Row("c", "20", false, 4),
Row("c", "200", false, 5),
Row("e", "100", false, 6)
).asJava,
StructType(Seq(StructField("key", StringType),
StructField("newValue", StringType),
StructField("deleted", BooleanType), StructField("time", IntegerType

ccd.createOrReplaceTempView("changes")

ccd = sql("SELECT key, latest.newValue as newValue, latest.deleted as deleted 
FROM ( SELECT key, max(struct(time, newValue, deleted)) as latest FROM changes 
GROUP BY key)")

val updateMap = Map("key" -> "B.key", "value" -> 
"B.newValue").asInstanceOf[Map[Any, Any]]
val insertMap = Map("key" -> "B.key", "value" -> 
"B.newValue").asInstanceOf[Map[Any, Any]]

target.as("A").merge(ccd.as("B"), "A.key=B.key").
whenMatched("B.deleted=true").
delete().execute(){code}
 

 

abc

  was:
Steps to Reproduce Issue :
{code:java}
import scala.collection.JavaConverters.
import java.sql.Date
import org.apache.spark.sql._
import org.apache.spark.sql.CarbonSession._
import org.apache.spark.sql.catalyst.TableIdentifier
import 
org.apache.spark.sql.execution.command.mutation.merge.{CarbonMergeDataSetCommand,
 DeleteAction, InsertAction, InsertInHistoryTableAction, MergeDataSetMatches, 
MergeMatch, UpdateAction, WhenMatched, WhenNotMatched, 
WhenNotMatchedAndExistsOnlyOnTarget}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.util.QueryTest
import org.apache.spark.sql.types.{BooleanType, DateType, IntegerType, 
StringType, StructField, StructType}
import spark.implicits.


sql("drop table if exists target").show()

val initframe = spark.createDataFrame(Seq(
Row("a", "0"),
Row("b", "1"),
Row("c", "2"),
Row("d", "3")
).asJava, StructType(Seq(StructField("key", StringType), StructField("value", 
StringTypeinitframe.write
.format("carbondata")
.option("tableName", "target")
.option("partitionColumns", "value")
.mode(SaveMode.Overwrite)
.save()
val target = spark.read.format("carbondata").option("tableName", 
"target").load()

var ccd =
spark.createDataFrame(Seq(
Row("a", "10", false, 0),
Row("a", null, true, 1),
Row("b", null, true, 2),
Row("c", null, true, 3),
Row("c", "20", false, 4),
Row("c", "200", false, 5),
Row("e", "100", false, 6)
).asJava,
StructType(Seq(StructField("key", StringType),
StructField("newValue", StringType),
StructField("deleted", BooleanType), StructField("time", IntegerType

ccd.createOrReplaceTempView("changes")

ccd = sql("SELECT key, latest.newValue as newValue, latest.deleted as deleted 
FROM ( SELECT key, max(struct(time, newValue, deleted)) as latest FROM changes 
GROUP BY key)")

val updateMap = Map("key" -> "B.key", "value" -> 
"B.newValue").asInstanceOf[Map[Any, Any]]
val insertMap = Map("key" -> "B.key", "value" -> 
"B.newValue").asInstanceOf[Map[Any, Any]]

target.as("A").merge(ccd.as("B"), "A.key=B.key").
whenMatched("B.deleted=true").
delete().execute(){code}
 

 

abc


> Query on partition table gives incorrect results after Delete records using 
> CDC
> ---
>
> Key: CARBONDATA-3902
> URL: https://issues.apache.org/jira/browse/CARBONDATA-3902
> Project: CarbonData
>  Issue Type: Bug
>Reporter: Indhumathi Muthumurugesh
>Priority: Major
>
> Steps to Reproduce Issue :
> {code:java}
> import scala.collection.JavaConverters.
> import java.sql.Date
> import org.apache.spark.sql._
> import org.apach

[jira] [Updated] (CARBONDATA-3902) Query on partition table gives incorrect results after Delete records using CDC

2020-07-15 Thread Indhumathi Muthumurugesh (Jira)


 [ 
https://issues.apache.org/jira/browse/CARBONDATA-3902?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Indhumathi Muthumurugesh updated CARBONDATA-3902:
-
Description: 
Steps to Reproduce Issue :
{code:java}
import scala.collection.JavaConverters.
import java.sql.Date
import org.apache.spark.sql._
import org.apache.spark.sql.CarbonSession._
import org.apache.spark.sql.catalyst.TableIdentifier
import 
org.apache.spark.sql.execution.command.mutation.merge.{CarbonMergeDataSetCommand,
 DeleteAction, InsertAction, InsertInHistoryTableAction, MergeDataSetMatches, 
MergeMatch, UpdateAction, WhenMatched, WhenNotMatched, 
WhenNotMatchedAndExistsOnlyOnTarget}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.util.QueryTest
import org.apache.spark.sql.types.{BooleanType, DateType, IntegerType, 
StringType, StructField, StructType}
import spark.implicits.


sql("drop table if exists target").show()

val initframe = spark.createDataFrame(Seq(
Row("a", "0"),
Row("b", "1"),
Row("c", "2"),
Row("d", "3")
).asJava, StructType(Seq(StructField("key", StringType), StructField("value", 
StringTypeinitframe.write
.format("carbondata")
.option("tableName", "target")
.option("partitionColumns", "value")
.mode(SaveMode.Overwrite)
.save()
val target = spark.read.format("carbondata").option("tableName", 
"target").load()

var ccd =
spark.createDataFrame(Seq(
Row("a", "10", false, 0),
Row("a", null, true, 1),
Row("b", null, true, 2),
Row("c", null, true, 3),
Row("c", "20", false, 4),
Row("c", "200", false, 5),
Row("e", "100", false, 6)
).asJava,
StructType(Seq(StructField("key", StringType),
StructField("newValue", StringType),
StructField("deleted", BooleanType), StructField("time", IntegerType

ccd.createOrReplaceTempView("changes")

ccd = sql("SELECT key, latest.newValue as newValue, latest.deleted as deleted 
FROM ( SELECT key, max(struct(time, newValue, deleted)) as latest FROM changes 
GROUP BY key)")

val updateMap = Map("key" -> "B.key", "value" -> 
"B.newValue").asInstanceOf[Map[Any, Any]]
val insertMap = Map("key" -> "B.key", "value" -> 
"B.newValue").asInstanceOf[Map[Any, Any]]

target.as("A").merge(ccd.as("B"), "A.key=B.key").
whenMatched("B.deleted=true").
delete().execute(){code}
 

 

abc

  was:
Steps to Reproduce Issue :
{code:java}
import scala.collection.JavaConverters.
import java.sql.Date
import org.apache.spark.sql._
import org.apache.spark.sql.CarbonSession._
import org.apache.spark.sql.catalyst.TableIdentifier
import 
org.apache.spark.sql.execution.command.mutation.merge.{CarbonMergeDataSetCommand,
 DeleteAction, InsertAction, InsertInHistoryTableAction, MergeDataSetMatches, 
MergeMatch, UpdateAction, WhenMatched, WhenNotMatched, 
WhenNotMatchedAndExistsOnlyOnTarget}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.util.QueryTest
import org.apache.spark.sql.types.{BooleanType, DateType, IntegerType, 
StringType, StructField, StructType}
import spark.implicits.


sql("drop table if exists target").show()

val initframe = spark.createDataFrame(Seq(
Row("a", "0"),
Row("b", "1"),
Row("c", "2"),
Row("d", "3")
).asJava, StructType(Seq(StructField("key", StringType), StructField("value", 
StringTypeinitframe.write
.format("carbondata")
.option("tableName", "target")
.option("partitionColumns", "value")
.mode(SaveMode.Overwrite)
.save()
val target = spark.read.format("carbondata").option("tableName", 
"target").load()

var ccd =
spark.createDataFrame(Seq(
Row("a", "10", false, 0),
Row("a", null, true, 1),
Row("b", null, true, 2),
Row("c", null, true, 3),
Row("c", "20", false, 4),
Row("c", "200", false, 5),
Row("e", "100", false, 6)
).asJava,
StructType(Seq(StructField("key", StringType),
StructField("newValue", StringType),
StructField("deleted", BooleanType), StructField("time", IntegerType

ccd.createOrReplaceTempView("changes")

ccd = sql("SELECT key, latest.newValue as newValue, latest.deleted as deleted 
FROM ( SELECT key, max(struct(time, newValue, deleted)) as latest FROM changes 
GROUP BY key)")

val updateMap = Map("key" -> "B.key", "value" -> 
"B.newValue").asInstanceOf[Map[Any, Any]]
val insertMap = Map("key" -> "B.key", "value" -> 
"B.newValue").asInstanceOf[Map[Any, Any]]

target.as("A").merge(ccd.as("B"), "A.key=B.key").
whenMatched("B.deleted=true").
delete().execute()
{code}


> Query on partition table gives incorrect results after Delete records using 
> CDC
> ---
>
> Key: CARBONDATA-3902
> URL: https://issues.apache.org/jira/browse/CARBONDATA-3902
> Project: CarbonData
>  Issue Type: Bug
>Reporter: Indhumathi Muthumurugesh
>Priority: Major
>
> Steps to Reproduce Issue :
> {code:java}
> import scala.collection.JavaConverters.
> import java.sql.Date
> import org.apache.spark.sql._
> import org.apache.spark.sql

[jira] [Updated] (CARBONDATA-3902) Query on partition table gives incorrect results after Delete records using CDC

2020-07-15 Thread Indhumathi Muthumurugesh (Jira)


 [ 
https://issues.apache.org/jira/browse/CARBONDATA-3902?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Indhumathi Muthumurugesh updated CARBONDATA-3902:
-
Attachment: (was: issue.scala)

> Query on partition table gives incorrect results after Delete records using 
> CDC
> ---
>
> Key: CARBONDATA-3902
> URL: https://issues.apache.org/jira/browse/CARBONDATA-3902
> Project: CarbonData
>  Issue Type: Bug
>Reporter: Indhumathi Muthumurugesh
>Priority: Major
>
> Steps to Reproduce Issue :
> [^issue.scala]
> {code:java}
> // code placeholder
> import scala.collection.JavaConverters.
> import java.sql.Date
> import org.apache.spark.sql._
> import org.apache.spark.sql.CarbonSession._
> import org.apache.spark.sql.catalyst.TableIdentifier
> import 
> org.apache.spark.sql.execution.command.mutation.merge.{CarbonMergeDataSetCommand,
>  DeleteAction, InsertAction, InsertInHistoryTableAction, MergeDataSetMatches, 
> MergeMatch, UpdateAction, WhenMatched, WhenNotMatched, 
> WhenNotMatchedAndExistsOnlyOnTarget}
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.test.util.QueryTest
> import org.apache.spark.sql.types.{BooleanType, DateType, IntegerType, 
> StringType, StructField, StructType}
> import spark.implicits.
> sql("drop table if exists target").show()
> val initframe = spark.createDataFrame(Seq(
> Row("a", "0"),
> Row("b", "1"),
> Row("c", "2"),
> Row("d", "3")
> ).asJava, StructType(Seq(StructField("key", StringType), StructField("value", 
> StringTypeinitframe.write
> .format("carbondata")
> .option("tableName", "target")
> .option("partitionColumns", "value")
> .mode(SaveMode.Overwrite)
> .save()
> val target = spark.read.format("carbondata").option("tableName", 
> "target").load()
> var ccd =
> spark.createDataFrame(Seq(
> Row("a", "10", false, 0),
> Row("a", null, true, 1),
> Row("b", null, true, 2),
> Row("c", null, true, 3),
> Row("c", "20", false, 4),
> Row("c", "200", false, 5),
> Row("e", "100", false, 6)
> ).asJava,
> StructType(Seq(StructField("key", StringType),
> StructField("newValue", StringType),
> StructField("deleted", BooleanType), StructField("time", IntegerType
> ccd.createOrReplaceTempView("changes")
> ccd = sql("SELECT key, latest.newValue as newValue, latest.deleted as deleted 
> FROM ( SELECT key, max(struct(time, newValue, deleted)) as latest FROM 
> changes GROUP BY key)")
> val updateMap = Map("key" -> "B.key", "value" -> 
> "B.newValue").asInstanceOf[Map[Any, Any]]
> val insertMap = Map("key" -> "B.key", "value" -> 
> "B.newValue").asInstanceOf[Map[Any, Any]]
> target.as("A").merge(ccd.as("B"), "A.key=B.key").
> whenMatched("B.deleted=true").
> delete().execute()
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (CARBONDATA-3902) Query on partition table gives incorrect results after Delete records using CDC

2020-07-15 Thread Indhumathi Muthumurugesh (Jira)


 [ 
https://issues.apache.org/jira/browse/CARBONDATA-3902?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Indhumathi Muthumurugesh updated CARBONDATA-3902:
-
Description: 
Steps to Reproduce Issue :
{code:java}
import scala.collection.JavaConverters.
import java.sql.Date
import org.apache.spark.sql._
import org.apache.spark.sql.CarbonSession._
import org.apache.spark.sql.catalyst.TableIdentifier
import 
org.apache.spark.sql.execution.command.mutation.merge.{CarbonMergeDataSetCommand,
 DeleteAction, InsertAction, InsertInHistoryTableAction, MergeDataSetMatches, 
MergeMatch, UpdateAction, WhenMatched, WhenNotMatched, 
WhenNotMatchedAndExistsOnlyOnTarget}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.util.QueryTest
import org.apache.spark.sql.types.{BooleanType, DateType, IntegerType, 
StringType, StructField, StructType}
import spark.implicits.


sql("drop table if exists target").show()

val initframe = spark.createDataFrame(Seq(
Row("a", "0"),
Row("b", "1"),
Row("c", "2"),
Row("d", "3")
).asJava, StructType(Seq(StructField("key", StringType), StructField("value", 
StringTypeinitframe.write
.format("carbondata")
.option("tableName", "target")
.option("partitionColumns", "value")
.mode(SaveMode.Overwrite)
.save()
val target = spark.read.format("carbondata").option("tableName", 
"target").load()

var ccd =
spark.createDataFrame(Seq(
Row("a", "10", false, 0),
Row("a", null, true, 1),
Row("b", null, true, 2),
Row("c", null, true, 3),
Row("c", "20", false, 4),
Row("c", "200", false, 5),
Row("e", "100", false, 6)
).asJava,
StructType(Seq(StructField("key", StringType),
StructField("newValue", StringType),
StructField("deleted", BooleanType), StructField("time", IntegerType

ccd.createOrReplaceTempView("changes")

ccd = sql("SELECT key, latest.newValue as newValue, latest.deleted as deleted 
FROM ( SELECT key, max(struct(time, newValue, deleted)) as latest FROM changes 
GROUP BY key)")

val updateMap = Map("key" -> "B.key", "value" -> 
"B.newValue").asInstanceOf[Map[Any, Any]]
val insertMap = Map("key" -> "B.key", "value" -> 
"B.newValue").asInstanceOf[Map[Any, Any]]

target.as("A").merge(ccd.as("B"), "A.key=B.key").
whenMatched("B.deleted=true").
delete().execute()
{code}

  was:
Steps to Reproduce Issue :

[^issue.scala]
{code:java}
// code placeholder
import scala.collection.JavaConverters.
import java.sql.Date
import org.apache.spark.sql._
import org.apache.spark.sql.CarbonSession._
import org.apache.spark.sql.catalyst.TableIdentifier
import 
org.apache.spark.sql.execution.command.mutation.merge.{CarbonMergeDataSetCommand,
 DeleteAction, InsertAction, InsertInHistoryTableAction, MergeDataSetMatches, 
MergeMatch, UpdateAction, WhenMatched, WhenNotMatched, 
WhenNotMatchedAndExistsOnlyOnTarget}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.util.QueryTest
import org.apache.spark.sql.types.{BooleanType, DateType, IntegerType, 
StringType, StructField, StructType}
import spark.implicits.


sql("drop table if exists target").show()

val initframe = spark.createDataFrame(Seq(
Row("a", "0"),
Row("b", "1"),
Row("c", "2"),
Row("d", "3")
).asJava, StructType(Seq(StructField("key", StringType), StructField("value", 
StringTypeinitframe.write
.format("carbondata")
.option("tableName", "target")
.option("partitionColumns", "value")
.mode(SaveMode.Overwrite)
.save()
val target = spark.read.format("carbondata").option("tableName", 
"target").load()

var ccd =
spark.createDataFrame(Seq(
Row("a", "10", false, 0),
Row("a", null, true, 1),
Row("b", null, true, 2),
Row("c", null, true, 3),
Row("c", "20", false, 4),
Row("c", "200", false, 5),
Row("e", "100", false, 6)
).asJava,
StructType(Seq(StructField("key", StringType),
StructField("newValue", StringType),
StructField("deleted", BooleanType), StructField("time", IntegerType

ccd.createOrReplaceTempView("changes")

ccd = sql("SELECT key, latest.newValue as newValue, latest.deleted as deleted 
FROM ( SELECT key, max(struct(time, newValue, deleted)) as latest FROM changes 
GROUP BY key)")

val updateMap = Map("key" -> "B.key", "value" -> 
"B.newValue").asInstanceOf[Map[Any, Any]]
val insertMap = Map("key" -> "B.key", "value" -> 
"B.newValue").asInstanceOf[Map[Any, Any]]

target.as("A").merge(ccd.as("B"), "A.key=B.key").
whenMatched("B.deleted=true").
delete().execute()
{code}


> Query on partition table gives incorrect results after Delete records using 
> CDC
> ---
>
> Key: CARBONDATA-3902
> URL: https://issues.apache.org/jira/browse/CARBONDATA-3902
> Project: CarbonData
>  Issue Type: Bug
>Reporter: Indhumathi Muthumurugesh
>Priority: Major
>
> Steps to Reproduce Issue :
> {code:java}
> import scala.collection.JavaConverters.
> import java.sql.Date
> import org.apache.spark.sql._
> 

[jira] [Updated] (CARBONDATA-3902) Query on partition table gives incorrect results after Delete records using CDC

2020-07-15 Thread Indhumathi Muthumurugesh (Jira)


 [ 
https://issues.apache.org/jira/browse/CARBONDATA-3902?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Indhumathi Muthumurugesh updated CARBONDATA-3902:
-
Description: 
Steps to Reproduce Issue :

[^issue.scala]
{code:java}
// code placeholder
import scala.collection.JavaConverters.
import java.sql.Date
import org.apache.spark.sql._
import org.apache.spark.sql.CarbonSession._
import org.apache.spark.sql.catalyst.TableIdentifier
import 
org.apache.spark.sql.execution.command.mutation.merge.{CarbonMergeDataSetCommand,
 DeleteAction, InsertAction, InsertInHistoryTableAction, MergeDataSetMatches, 
MergeMatch, UpdateAction, WhenMatched, WhenNotMatched, 
WhenNotMatchedAndExistsOnlyOnTarget}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.util.QueryTest
import org.apache.spark.sql.types.{BooleanType, DateType, IntegerType, 
StringType, StructField, StructType}
import spark.implicits.


sql("drop table if exists target").show()

val initframe = spark.createDataFrame(Seq(
Row("a", "0"),
Row("b", "1"),
Row("c", "2"),
Row("d", "3")
).asJava, StructType(Seq(StructField("key", StringType), StructField("value", 
StringTypeinitframe.write
.format("carbondata")
.option("tableName", "target")
.option("partitionColumns", "value")
.mode(SaveMode.Overwrite)
.save()
val target = spark.read.format("carbondata").option("tableName", 
"target").load()

var ccd =
spark.createDataFrame(Seq(
Row("a", "10", false, 0),
Row("a", null, true, 1),
Row("b", null, true, 2),
Row("c", null, true, 3),
Row("c", "20", false, 4),
Row("c", "200", false, 5),
Row("e", "100", false, 6)
).asJava,
StructType(Seq(StructField("key", StringType),
StructField("newValue", StringType),
StructField("deleted", BooleanType), StructField("time", IntegerType

ccd.createOrReplaceTempView("changes")

ccd = sql("SELECT key, latest.newValue as newValue, latest.deleted as deleted 
FROM ( SELECT key, max(struct(time, newValue, deleted)) as latest FROM changes 
GROUP BY key)")

val updateMap = Map("key" -> "B.key", "value" -> 
"B.newValue").asInstanceOf[Map[Any, Any]]
val insertMap = Map("key" -> "B.key", "value" -> 
"B.newValue").asInstanceOf[Map[Any, Any]]

target.as("A").merge(ccd.as("B"), "A.key=B.key").
whenMatched("B.deleted=true").
delete().execute()
{code}

  was:
Steps to Reproduce Issue :

[^issue.scala]
{code:java}
// code placeholder
import scala.collection.JavaConverters.
import java.sql.Date
import org.apache.spark.sql._
import org.apache.spark.sql.CarbonSession._
import org.apache.spark.sql.catalyst.TableIdentifier
import 
org.apache.spark.sql.execution.command.mutation.merge.{CarbonMergeDataSetCommand,
 DeleteAction, InsertAction, InsertInHistoryTableAction, MergeDataSetMatches, 
MergeMatch, UpdateAction, WhenMatched, WhenNotMatched, 
WhenNotMatchedAndExistsOnlyOnTarget}import 
org.apache.spark.sql.functions._import 
org.apache.spark.sql.test.util.QueryTestimport 
org.apache.spark.sql.types.{BooleanType, DateType, IntegerType, StringType, 
StructField, StructType}
import spark.implicits.sql("drop table if exists target").show()val initframe = 
spark.createDataFrame(Seq(
Row("a", "0"),
Row("b", "1"),
Row("c", "2"),
Row("d", "3")
).asJava, StructType(Seq(StructField("key", StringType), StructField("value", 
StringTypeinitframe.write
.format("carbondata")
.option("tableName", "target")
.option("partitionColumns", "value")
.mode(SaveMode.Overwrite)
.save()val target = spark.read.format("carbondata").option("tableName", 
"target").load()var ccd =
spark.createDataFrame(Seq(
Row("a", "10", false, 0),
Row("a", null, true, 1),
Row("b", null, true, 2),
Row("c", null, true, 3),
Row("c", "20", false, 4),
Row("c", "200", false, 5),
Row("e", "100", false, 6)
).asJava,
StructType(Seq(StructField("key", StringType),
StructField("newValue", StringType),
StructField("deleted", BooleanType), StructField("time", 
IntegerTypeccd.createOrReplaceTempView("changes")ccd = sql("SELECT key, 
latest.newValue as newValue, latest.deleted as deleted FROM ( SELECT key, 
max(struct(time, newValue, deleted)) as latest FROM changes GROUP BY key)")val 
updateMap = Map("key" -> "B.key", "value" -> 
"B.newValue").asInstanceOf[Map[Any, Any]]val insertMap = Map("key" -> "B.key", 
"value" -> "B.newValue").asInstanceOf[Map[Any, 
Any]]target.as("A").merge(ccd.as("B"), "A.key=B.key").
whenMatched("B.deleted=true").
delete().execute()
{code}


> Query on partition table gives incorrect results after Delete records using 
> CDC
> ---
>
> Key: CARBONDATA-3902
> URL: https://issues.apache.org/jira/browse/CARBONDATA-3902
> Project: CarbonData
>  Issue Type: Bug
>Reporter: Indhumathi Muthumurugesh
>Priority: Major
> Attachments: issue.scala
>
>
> Steps to Reproduce Issue :
> [^issue.scala]
> {code:java}
> // code placeholder

[jira] [Updated] (CARBONDATA-3902) Query on partition table gives incorrect results after Delete records using CDC

2020-07-15 Thread Indhumathi Muthumurugesh (Jira)


 [ 
https://issues.apache.org/jira/browse/CARBONDATA-3902?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Indhumathi Muthumurugesh updated CARBONDATA-3902:
-
Attachment: issue.scala

> Query on partition table gives incorrect results after Delete records using 
> CDC
> ---
>
> Key: CARBONDATA-3902
> URL: https://issues.apache.org/jira/browse/CARBONDATA-3902
> Project: CarbonData
>  Issue Type: Bug
>Reporter: Indhumathi Muthumurugesh
>Priority: Major
> Attachments: issue.scala
>
>
> Steps to Reproduce Issue :
> import scala.collection.JavaConverters._import java.sql.Dateimport 
> org.apache.spark.sql._import org.apache.spark.sql.CarbonSession._import 
> org.apache.spark.sql.catalyst.TableIdentifierimport 
> org.apache.spark.sql.execution.command.mutation.merge.\{CarbonMergeDataSetCommand,
>  DeleteAction, InsertAction, InsertInHistoryTableAction, MergeDataSetMatches, 
> MergeMatch, UpdateAction, WhenMatched, WhenNotMatched, 
> WhenNotMatchedAndExistsOnlyOnTarget}import 
> org.apache.spark.sql.functions._import 
> org.apache.spark.sql.test.util.QueryTestimport 
> org.apache.spark.sql.types.\{BooleanType, DateType, IntegerType, StringType, 
> StructField, StructType}import spark.implicits._
>   
> sql("drop table if exists target").show()
> val initframe = spark.createDataFrame(Seq(
>   Row("a", "0"),
>   Row("b", "1"),
>   Row("c", "2"),
>   Row("d", "3")
> ).asJava, StructType(Seq(StructField("key", StringType), StructField("value", 
> StringType
> initframe.write
>   .format("carbondata")
>   .option("tableName", "target")
>   .option("partitionColumns", "value")
>   .mode(SaveMode.Overwrite)
>   .save()
>   
> val target = spark.read.format("carbondata").option("tableName", 
> "target").load()var ccd =
>   spark.createDataFrame(Seq(
> Row("a", "10", false,  0),
> Row("a", null, true, 1),   
> Row("b", null, true, 2),   
> Row("c", null, true, 3),   
> Row("c", "20", false, 4),
> Row("c", "200", false, 5),
> Row("e", "100", false, 6) 
>   ).asJava,
> StructType(Seq(StructField("key", StringType),
>   StructField("newValue", StringType),
>   StructField("deleted", BooleanType), StructField("time", IntegerType
> 
> ccd.createOrReplaceTempView("changes")
> ccd = sql("SELECT key, latest.newValue as newValue, latest.deleted as deleted 
> FROM ( SELECT key, max(struct(time, newValue, deleted)) as latest FROM 
> changes GROUP BY key)")
> val updateMap = Map("key" -> "B.key", "value" -> 
> "B.newValue").asInstanceOf[Map[Any, Any]]
> val insertMap = Map("key" -> "B.key", "value" -> 
> "B.newValue").asInstanceOf[Map[Any, Any]]
> target.as("A").merge(ccd.as("B"), "A.key=B.key").
>   whenMatched("B.deleted=true").
>   delete().execute()



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (CARBONDATA-3902) Query on partition table gives incorrect results after Delete records using CDC

2020-07-15 Thread Indhumathi Muthumurugesh (Jira)


 [ 
https://issues.apache.org/jira/browse/CARBONDATA-3902?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Indhumathi Muthumurugesh updated CARBONDATA-3902:
-
Description: 
Steps to Reproduce Issue :

[^issue.scala]
{code:java}
// code placeholder
import scala.collection.JavaConverters.
import java.sql.Date
import org.apache.spark.sql._
import org.apache.spark.sql.CarbonSession._
import org.apache.spark.sql.catalyst.TableIdentifier
import 
org.apache.spark.sql.execution.command.mutation.merge.{CarbonMergeDataSetCommand,
 DeleteAction, InsertAction, InsertInHistoryTableAction, MergeDataSetMatches, 
MergeMatch, UpdateAction, WhenMatched, WhenNotMatched, 
WhenNotMatchedAndExistsOnlyOnTarget}import 
org.apache.spark.sql.functions._import 
org.apache.spark.sql.test.util.QueryTestimport 
org.apache.spark.sql.types.{BooleanType, DateType, IntegerType, StringType, 
StructField, StructType}
import spark.implicits.sql("drop table if exists target").show()val initframe = 
spark.createDataFrame(Seq(
Row("a", "0"),
Row("b", "1"),
Row("c", "2"),
Row("d", "3")
).asJava, StructType(Seq(StructField("key", StringType), StructField("value", 
StringTypeinitframe.write
.format("carbondata")
.option("tableName", "target")
.option("partitionColumns", "value")
.mode(SaveMode.Overwrite)
.save()val target = spark.read.format("carbondata").option("tableName", 
"target").load()var ccd =
spark.createDataFrame(Seq(
Row("a", "10", false, 0),
Row("a", null, true, 1),
Row("b", null, true, 2),
Row("c", null, true, 3),
Row("c", "20", false, 4),
Row("c", "200", false, 5),
Row("e", "100", false, 6)
).asJava,
StructType(Seq(StructField("key", StringType),
StructField("newValue", StringType),
StructField("deleted", BooleanType), StructField("time", 
IntegerTypeccd.createOrReplaceTempView("changes")ccd = sql("SELECT key, 
latest.newValue as newValue, latest.deleted as deleted FROM ( SELECT key, 
max(struct(time, newValue, deleted)) as latest FROM changes GROUP BY key)")val 
updateMap = Map("key" -> "B.key", "value" -> 
"B.newValue").asInstanceOf[Map[Any, Any]]val insertMap = Map("key" -> "B.key", 
"value" -> "B.newValue").asInstanceOf[Map[Any, 
Any]]target.as("A").merge(ccd.as("B"), "A.key=B.key").
whenMatched("B.deleted=true").
delete().execute()
{code}

  was:
Steps to Reproduce Issue :
import scala.collection.JavaConverters._import java.sql.Dateimport 
org.apache.spark.sql._import org.apache.spark.sql.CarbonSession._import 
org.apache.spark.sql.catalyst.TableIdentifierimport 
org.apache.spark.sql.execution.command.mutation.merge.\{CarbonMergeDataSetCommand,
 DeleteAction, InsertAction, InsertInHistoryTableAction, MergeDataSetMatches, 
MergeMatch, UpdateAction, WhenMatched, WhenNotMatched, 
WhenNotMatchedAndExistsOnlyOnTarget}import 
org.apache.spark.sql.functions._import 
org.apache.spark.sql.test.util.QueryTestimport 
org.apache.spark.sql.types.\{BooleanType, DateType, IntegerType, StringType, 
StructField, StructType}import spark.implicits._

sql("drop table if exists target").show()

val initframe = spark.createDataFrame(Seq(
  Row("a", "0"),
  Row("b", "1"),
  Row("c", "2"),
  Row("d", "3")
).asJava, StructType(Seq(StructField("key", StringType), StructField("value", 
StringType

initframe.write
  .format("carbondata")
  .option("tableName", "target")
  .option("partitionColumns", "value")
  .mode(SaveMode.Overwrite)
  .save()
  
val target = spark.read.format("carbondata").option("tableName", 
"target").load()var ccd =
  spark.createDataFrame(Seq(
Row("a", "10", false,  0),
Row("a", null, true, 1),   
Row("b", null, true, 2),   
Row("c", null, true, 3),   
Row("c", "20", false, 4),
Row("c", "200", false, 5),
Row("e", "100", false, 6) 
  ).asJava,
StructType(Seq(StructField("key", StringType),
  StructField("newValue", StringType),
  StructField("deleted", BooleanType), StructField("time", IntegerType
  
ccd.createOrReplaceTempView("changes")

ccd = sql("SELECT key, latest.newValue as newValue, latest.deleted as deleted 
FROM ( SELECT key, max(struct(time, newValue, deleted)) as latest FROM changes 
GROUP BY key)")

val updateMap = Map("key" -> "B.key", "value" -> 
"B.newValue").asInstanceOf[Map[Any, Any]]

val insertMap = Map("key" -> "B.key", "value" -> 
"B.newValue").asInstanceOf[Map[Any, Any]]

target.as("A").merge(ccd.as("B"), "A.key=B.key").
  whenMatched("B.deleted=true").
  delete().execute()


> Query on partition table gives incorrect results after Delete records using 
> CDC
> ---
>
> Key: CARBONDATA-3902
> URL: https://issues.apache.org/jira/browse/CARBONDATA-3902
> Project: CarbonData
>  Issue Type: Bug
>Reporter: Indhumathi Muthumurugesh
>Priority: Major
> Attachments: issue.scala
>
>
> Steps to Reproduce Issue :
> [^is

[jira] [Updated] (CARBONDATA-3902) Query on partition table gives incorrect results after Delete records using CDC

2020-07-15 Thread Indhumathi Muthumurugesh (Jira)


 [ 
https://issues.apache.org/jira/browse/CARBONDATA-3902?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Indhumathi Muthumurugesh updated CARBONDATA-3902:
-
Description: 
Steps to Reproduce Issue :
import scala.collection.JavaConverters._import java.sql.Dateimport 
org.apache.spark.sql._import org.apache.spark.sql.CarbonSession._import 
org.apache.spark.sql.catalyst.TableIdentifierimport 
org.apache.spark.sql.execution.command.mutation.merge.\{CarbonMergeDataSetCommand,
 DeleteAction, InsertAction, InsertInHistoryTableAction, MergeDataSetMatches, 
MergeMatch, UpdateAction, WhenMatched, WhenNotMatched, 
WhenNotMatchedAndExistsOnlyOnTarget}import 
org.apache.spark.sql.functions._import 
org.apache.spark.sql.test.util.QueryTestimport 
org.apache.spark.sql.types.\{BooleanType, DateType, IntegerType, StringType, 
StructField, StructType}import spark.implicits._

sql("drop table if exists target").show()

val initframe = spark.createDataFrame(Seq(
  Row("a", "0"),
  Row("b", "1"),
  Row("c", "2"),
  Row("d", "3")
).asJava, StructType(Seq(StructField("key", StringType), StructField("value", 
StringType

initframe.write
  .format("carbondata")
  .option("tableName", "target")
  .option("partitionColumns", "value")
  .mode(SaveMode.Overwrite)
  .save()
  
val target = spark.read.format("carbondata").option("tableName", 
"target").load()var ccd =
  spark.createDataFrame(Seq(
Row("a", "10", false,  0),
Row("a", null, true, 1),   
Row("b", null, true, 2),   
Row("c", null, true, 3),   
Row("c", "20", false, 4),
Row("c", "200", false, 5),
Row("e", "100", false, 6) 
  ).asJava,
StructType(Seq(StructField("key", StringType),
  StructField("newValue", StringType),
  StructField("deleted", BooleanType), StructField("time", IntegerType
  
ccd.createOrReplaceTempView("changes")

ccd = sql("SELECT key, latest.newValue as newValue, latest.deleted as deleted 
FROM ( SELECT key, max(struct(time, newValue, deleted)) as latest FROM changes 
GROUP BY key)")

val updateMap = Map("key" -> "B.key", "value" -> 
"B.newValue").asInstanceOf[Map[Any, Any]]

val insertMap = Map("key" -> "B.key", "value" -> 
"B.newValue").asInstanceOf[Map[Any, Any]]

target.as("A").merge(ccd.as("B"), "A.key=B.key").
  whenMatched("B.deleted=true").
  delete().execute()

> Query on partition table gives incorrect results after Delete records using 
> CDC
> ---
>
> Key: CARBONDATA-3902
> URL: https://issues.apache.org/jira/browse/CARBONDATA-3902
> Project: CarbonData
>  Issue Type: Bug
>Reporter: Indhumathi Muthumurugesh
>Priority: Major
>
> Steps to Reproduce Issue :
> import scala.collection.JavaConverters._import java.sql.Dateimport 
> org.apache.spark.sql._import org.apache.spark.sql.CarbonSession._import 
> org.apache.spark.sql.catalyst.TableIdentifierimport 
> org.apache.spark.sql.execution.command.mutation.merge.\{CarbonMergeDataSetCommand,
>  DeleteAction, InsertAction, InsertInHistoryTableAction, MergeDataSetMatches, 
> MergeMatch, UpdateAction, WhenMatched, WhenNotMatched, 
> WhenNotMatchedAndExistsOnlyOnTarget}import 
> org.apache.spark.sql.functions._import 
> org.apache.spark.sql.test.util.QueryTestimport 
> org.apache.spark.sql.types.\{BooleanType, DateType, IntegerType, StringType, 
> StructField, StructType}import spark.implicits._
>   
> sql("drop table if exists target").show()
> val initframe = spark.createDataFrame(Seq(
>   Row("a", "0"),
>   Row("b", "1"),
>   Row("c", "2"),
>   Row("d", "3")
> ).asJava, StructType(Seq(StructField("key", StringType), StructField("value", 
> StringType
> initframe.write
>   .format("carbondata")
>   .option("tableName", "target")
>   .option("partitionColumns", "value")
>   .mode(SaveMode.Overwrite)
>   .save()
>   
> val target = spark.read.format("carbondata").option("tableName", 
> "target").load()var ccd =
>   spark.createDataFrame(Seq(
> Row("a", "10", false,  0),
> Row("a", null, true, 1),   
> Row("b", null, true, 2),   
> Row("c", null, true, 3),   
> Row("c", "20", false, 4),
> Row("c", "200", false, 5),
> Row("e", "100", false, 6) 
>   ).asJava,
> StructType(Seq(StructField("key", StringType),
>   StructField("newValue", StringType),
>   StructField("deleted", BooleanType), StructField("time", IntegerType
> 
> ccd.createOrReplaceTempView("changes")
> ccd = sql("SELECT key, latest.newValue as newValue, latest.deleted as deleted 
> FROM ( SELECT key, max(struct(time, newValue, deleted)) as latest FROM 
> changes GROUP BY key)")
> val updateMap = Map("key" -> "B.key", "value" -> 
> "B.newValue").asInstanceOf[Map[Any, Any]]
> val insertMap = Map("key" -> "B.key", "value" -> 
> "B.newValue").asInstanceOf[Map[Any, Any]]
> target.as("A").merge(ccd.as("B"