[ 
https://issues.apache.org/jira/browse/SPARK-24869?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiao Li updated SPARK-24869:
----------------------------
    Description: 
{code}
    withTable("t") {
      withTempPath { path =>

        var numTotalCachedHit = 0
        val listener = new QueryExecutionListener {
          override def onFailure(f: String, qe: QueryExecution, e: 
Exception):Unit = {}

          override def onSuccess(funcName: String, qe: QueryExecution, 
duration: Long): Unit = {
            qe.withCachedData match {
              case c: SaveIntoDataSourceCommand
                  if c.query.isInstanceOf[InMemoryRelation] =>
                numTotalCachedHit += 1
              case _ =>
                println(qe.withCachedData)
            }
          }
        }
        spark.listenerManager.register(listener)

        val udf1 = udf({ (x: Int, y: Int) => x + y })
        val df = spark.range(0, 3).toDF("a")
          .withColumn("b", udf1(col("a"), lit(10)))
        df.cache()
        df.write.mode(SaveMode.Overwrite).jdbc(url1, "TEST.DROPTEST", 
properties)
        assert(numTotalCachedHit == 1, "expected to be cached in jdbc")
      }
    }
{code}

  was:
{code}
    withTable("t") {
      withTempPath { path =>

        var numTotalCachedHit = 0
        val listener = new QueryExecutionListener {
          override def onFailure(f: String, qe: QueryExecution, e: 
Exception):Unit = {}

          override def onSuccess(funcName: String, qe: QueryExecution, 
duration: Long): Unit = {
            qe.withCachedData match {
              case c: SaveIntoDataSourceCommand
                  if c.query.isInstanceOf[InMemoryRelation] =>
                numTotalCachedHit += 1
              case _ =>
                println(qe.withCachedData)
            }
          }
        }
        spark.listenerManager.register(listener)

        val udf1 = udf({ (x: Int, y: Int) => x + y })
        val df = spark.range(0, 3).toDF("a")
          .withColumn("b", udf1(col("a"), lit(10)))
        df.write.mode(SaveMode.Overwrite).jdbc(url1, "TEST.DROPTEST", 
properties)
        assert(numTotalCachedHit == 1, "expected to be cached in jdbc")
      }
    }
{code}


> SaveIntoDataSourceCommand's input Dataset does not use Cached Data
> ------------------------------------------------------------------
>
>                 Key: SPARK-24869
>                 URL: https://issues.apache.org/jira/browse/SPARK-24869
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.3.1
>            Reporter: Xiao Li
>            Priority: Major
>
> {code}
>     withTable("t") {
>       withTempPath { path =>
>         var numTotalCachedHit = 0
>         val listener = new QueryExecutionListener {
>           override def onFailure(f: String, qe: QueryExecution, e: 
> Exception):Unit = {}
>           override def onSuccess(funcName: String, qe: QueryExecution, 
> duration: Long): Unit = {
>             qe.withCachedData match {
>               case c: SaveIntoDataSourceCommand
>                   if c.query.isInstanceOf[InMemoryRelation] =>
>                 numTotalCachedHit += 1
>               case _ =>
>                 println(qe.withCachedData)
>             }
>           }
>         }
>         spark.listenerManager.register(listener)
>         val udf1 = udf({ (x: Int, y: Int) => x + y })
>         val df = spark.range(0, 3).toDF("a")
>           .withColumn("b", udf1(col("a"), lit(10)))
>         df.cache()
>         df.write.mode(SaveMode.Overwrite).jdbc(url1, "TEST.DROPTEST", 
> properties)
>         assert(numTotalCachedHit == 1, "expected to be cached in jdbc")
>       }
>     }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to