Thank you Richard for responding. I am able to run it successfully by using row.getMap but since I have to update the map I wanted to use the HashMap api. Is there a way I can use that? And I am surprised it worked in first case where I am creating Dataset from list of rows but fails in the Map function.
Thanks Ankur On Fri, Jan 27, 2017 at 12:15 PM, Richard Xin <richardxin...@yahoo.com> wrote: > try > Row newRow = RowFactory.create(row.getString(0), row.getString(1), > row.getMap(2)); > > > > On Friday, January 27, 2017 10:52 AM, Ankur Srivastava < > ankur.srivast...@gmail.com> wrote: > > > + DEV Mailing List > > On Thu, Jan 26, 2017 at 5:12 PM, Ankur Srivastava < > ankur.srivast...@gmail.com> wrote: > > Hi, > > I am trying to map a Dataset with rows which have a map attribute. When I > try to create a Row with the map attribute I get cast errors. I am able to > reproduce the issue with the below sample code. The surprising thing is > with same schema I am able to create a dataset from the List of rows. > > I am on Spark 2.0 and scala 2.11 > > public static void main(String[] args) { > StructType schema = new StructType().add("src", DataTypes.StringType) > .add("dst", DataTypes.StringType) > .add("freq", DataTypes.createMapType( DataTypes.StringType, > DataTypes.IntegerType)); > List<Row> inputData = new ArrayList<>(); > inputData.add(RowFactory.creat e("1", "2", new HashMap<>())); > SparkSession sparkSession = SparkSession > .builder() > .appName("IPCountFilterTest") > .master("local") > .getOrCreate(); > > Dataset<Row> out = sparkSession.createDataFrame( inputData, schema); > out.show(); > > Encoder<Row> rowEncoder = RowEncoder.apply(schema); > out.map((MapFunction<Row, Row>) row -> { > Row newRow = RowFactory.create(row. getString(0), row.getString(1), > new HashMap<String, Integer>()); > > //Row newRow = RowFactory.create(row. getString(0), row.getString(1), > row.getJavaMap(2)); > > return newRow; > }, rowEncoder).show(); > } > > Below is the error: > > 17/01/26 17:05:30 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID > 0) > java.lang.RuntimeException: java.util.HashMap is not a valid external type > for schema of map<string,int> > at org.apache.spark.sql.catalyst. expressions.GeneratedClass$ > GeneratedIterator.processNext( Unknown Source) > at org.apache.spark.sql. execution.BufferedRowIterator. > hasNext(BufferedRowIterator. java:43) > at org.apache.spark.sql. execution. WholeStageCodegenExec$$ > anonfun$8$$anon$1.hasNext( WholeStageCodegenExec.scala: 370) > at org.apache.spark.sql. execution.SparkPlan$$anonfun$ > 4.apply(SparkPlan.scala:246) > at org.apache.spark.sql. execution.SparkPlan$$anonfun$ > 4.apply(SparkPlan.scala:240) > at org.apache.spark.rdd.RDD$$ anonfun$mapPartitionsInternal$ > 1$$anonfun$apply$24.apply(RDD. scala:784) > at org.apache.spark.rdd.RDD$$ anonfun$mapPartitionsInternal$ > 1$$anonfun$apply$24.apply(RDD. scala:784) > at org.apache.spark.rdd. MapPartitionsRDD.compute( > MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD. computeOrReadCheckpoint(RDD. scala:319) > at org.apache.spark.rdd.RDD. iterator(RDD.scala:283) > at org.apache.spark.scheduler. ResultTask.runTask(ResultTask. scala:70) > at org.apache.spark.scheduler. Task.run(Task.scala:85) > at org.apache.spark.executor. Executor$TaskRunner.run( Executor.scala:274) > at java.util.concurrent. ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1142) > at java.util.concurrent. ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread. java:745) > 17/01/26 17:05:30 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, > localhost): java.lang.RuntimeException: java.util.HashMap is not a valid > external type for schema of map<string,int> > at org.apache.spark.sql.catalyst. expressions.GeneratedClass$ > GeneratedIterator.processNext( Unknown Source) > at org.apache.spark.sql. execution.BufferedRowIterator. > hasNext(BufferedRowIterator. java:43) > at org.apache.spark.sql. execution. WholeStageCodegenExec$$ > anonfun$8$$anon$1.hasNext( WholeStageCodegenExec.scala: 370) > at org.apache.spark.sql. execution.SparkPlan$$anonfun$ > 4.apply(SparkPlan.scala:246) > at org.apache.spark.sql. execution.SparkPlan$$anonfun$ > 4.apply(SparkPlan.scala:240) > at org.apache.spark.rdd.RDD$$ anonfun$mapPartitionsInternal$ > 1$$anonfun$apply$24.apply(RDD. scala:784) > at org.apache.spark.rdd.RDD$$ anonfun$mapPartitionsInternal$ > 1$$anonfun$apply$24.apply(RDD. scala:784) > at org.apache.spark.rdd. MapPartitionsRDD.compute( > MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD. computeOrReadCheckpoint(RDD. scala:319) > at org.apache.spark.rdd.RDD. iterator(RDD.scala:283) > at org.apache.spark.scheduler. ResultTask.runTask(ResultTask. scala:70) > at org.apache.spark.scheduler. Task.run(Task.scala:85) > at org.apache.spark.executor. Executor$TaskRunner.run( Executor.scala:274) > at java.util.concurrent. ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1142) > at java.util.concurrent. ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread. java:745) > > > Thanks > Ankur > > > > >