Daniel Williams created SPARK-17670:
---------------------------------------

             Summary: Spark DataFrame/Dataset no longer supports Option[Map] in 
case classes
                 Key: SPARK-17670
                 URL: https://issues.apache.org/jira/browse/SPARK-17670
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 2.0.0
            Reporter: Daniel Williams


Upon upgrading to Spark 2.0 I discovered that previously supported case classes 
containing members of the type Option[Map] of any key/value binding, mutable or 
immutable, were no longer supported and produced an exception similar to the 
following.  Upon further testing I also noticed that Option was support for 
Seq, case classes, and primitives.  Validating unit tests included using 
spark-testing-base.

{code}
org.apache.spark.sql.AnalysisException: cannot resolve 
'wrapoption(staticinvoke(class 
org.apache.spark.sql.catalyst.util.ArrayBasedMapData$, ObjectType(interface 
scala.collection.Map), toScalaMap, mapobjects(MapObjects_loopValue32, 
MapObjects_loopIsNull33, StringType, lambdavariable(MapObjects_loopValue32, 
MapObjects_loopIsNull33, StringType).toString, 
cast(lambdavariable(MapObjects_loopValue30, MapObjects_loopIsNull31, 
StructField(uuid,StringType,true), StructField(timestamp,TimestampType,true), 
StructField(sourceSystem,StringType,true), 
StructField(input,MapType(StringType,StringType,true),true)).input as 
map<string,string>).keyArray).array, mapobjects(MapObjects_loopValue34, 
MapObjects_loopIsNull35, StringType, lambdavariable(MapObjects_loopValue34, 
MapObjects_loopIsNull35, StringType).toString, 
cast(lambdavariable(MapObjects_loopValue30, MapObjects_loopIsNull31, 
StructField(uuid,StringType,true), StructField(timestamp,TimestampType,true), 
StructField(sourceSystem,StringType,true), 
StructField(input,MapType(StringType,StringType,true),true)).input as 
map<string,string>).valueArray).array, true), ObjectType(interface 
scala.collection.immutable.Map))' due to data type mismatch: argument 1 
requires scala.collection.immutable.Map type, however, 'staticinvoke(class 
org.apache.spark.sql.catalyst.util.ArrayBasedMapData$, ObjectType(interface 
scala.collection.Map), toScalaMap, mapobjects(MapObjects_loopValue32, 
MapObjects_loopIsNull33, StringType, lambdavariable(MapObjects_loopValue32, 
MapObjects_loopIsNull33, StringType).toString, 
cast(lambdavariable(MapObjects_loopValue30, MapObjects_loopIsNull31, 
StructField(uuid,StringType,true), StructField(timestamp,TimestampType,true), 
StructField(sourceSystem,StringType,true), 
StructField(input,MapType(StringType,StringType,true),true)).input as 
map<string,string>).keyArray).array, mapobjects(MapObjects_loopValue34, 
MapObjects_loopIsNull35, StringType, lambdavariable(MapObjects_loopValue34, 
MapObjects_loopIsNull35, StringType).toString, 
cast(lambdavariable(MapObjects_loopValue30, MapObjects_loopIsNull31, 
StructField(uuid,StringType,true), StructField(timestamp,TimestampType,true), 
StructField(sourceSystem,StringType,true), 
StructField(input,MapType(StringType,StringType,true),true)).input as 
map<string,string>).valueArray).array, true)' is of scala.collection.Map type.;
at 
org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:82)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:74)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:301)
{code}

Unit tests:

{code}
import com.holdenkarau.spark.testing.{DataFrameSuiteBase, SharedSparkContext}
import org.scalatest.{Matchers, FunSuite}
import org.slf4j.LoggerFactory

import scala.util.{Failure, Try, Success}

case class ImmutableMapTest(data: Map[String, String])
case class MapTest(data: scala.collection.mutable.Map[String, String])
case class ImmtableWithOption(data: Option[Map[String, String]])
case class MutableWithOption(data: Option[scala.collection.mutable.Map[String, 
String]])
case class PrimWithOption(data: Option[String])
case class ArrayWithOption(data: Option[Seq[String]])

class TestOptionWithDataTypes
  extends FunSuite
    with Matchers
    with SharedSparkContext
    with DataFrameSuiteBase {

  val logger = LoggerFactory.getLogger(classOf[TestOptionWithDataTypes])


  test("test immutable map") {
    import sqlContext.implicits._
    val rdd = sc.parallelize(Seq(ImmutableMapTest(Map("1"->"2"))))

    val result = Try {
      rdd.toDF()
    } match {
      case Success(e) => Option(e)
      case Failure(e) => {
        logger.error(e.getMessage, e)
        None
      }
    }

    result should not be(None)
    result.get.count() should be(1)
    result.get.show()
  }

  test("test mutable Map") {
    import sqlContext.implicits._
    val rdd = 
sc.parallelize(Seq(MapTest(scala.collection.mutable.Map("1"->"2"))))

    val result = Try {
      rdd.toDF()
    } match {
      case Success(e) => Option(e)
      case Failure(e) => {
        logger.error(e.getMessage, e)
        None
      }
    }

    result should not be(None)
    result.get.count() should be(1)
    result.get.show()
  }

  test("test immutable option Map") {
    import sqlContext.implicits._
    val rdd = sc.parallelize(Seq(ImmtableWithOption(Option(Map("1"->"2")))))

    val result = Try {
      rdd.toDF()
    } match {
      case Success(e) => Option(e)
      case Failure(e) => {
        logger.error(e.getMessage, e)
        None
      }
    }

    result should not be(None)
    result.get.count() should be(1)
    result.get.show()
  }

  test("test mutable option Map") {
    import sqlContext.implicits._
    val rdd = 
sc.parallelize(Seq(MutableWithOption(Option(scala.collection.mutable.Map("1"->"2")))))

    val result = Try {
      rdd.toDF()
    } match {
      case Success(e) => Option(e)
      case Failure(e) => {
        logger.error(e.getMessage, e)
        None
      }
    }

    result should not be(None)
    result.get.count() should be(1)
    result.get.show()
  }

  test("test option with prim") {
    import sqlContext.implicits._
    val rdd = sc.parallelize(Seq(PrimWithOption(Option("foo"))))

    val result = Try {
      rdd.toDF()
    } match {
      case Success(e) => Option(e)
      case Failure(e) => {
        logger.error(e.getMessage, e)
        None
      }
    }

    result should not be(None)
    result.get.count() should be(1)
    result.get.show()
  }

  test("test option with array") {
    import sqlContext.implicits._
    val rdd = sc.parallelize(Seq(ArrayWithOption(Option(Seq("foo")))))

    val result = Try {
      rdd.toDF()
    } match {
      case Success(e) => Option(e)
      case Failure(e) => {
        logger.error(e.getMessage, e)
        None
      }
    }

    result should not be(None)
    result.get.count() should be(1)
    result.get.show()
  }
}
{code}





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to