[jira] [Commented] (SPARK-10357) DataFrames unable to drop unwanted columns
[ https://issues.apache.org/jira/browse/SPARK-10357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14729563#comment-14729563 ] Randy Gelhausen commented on SPARK-10357: - Regardless of what spark-csv package does, I'm using the DataFrame API itself to drop columns. SparkSQL's attempt to access those columns seems like a problem with SparkSQL's toDF method. Am I reading the API wrong? Can the docs maybe be updated to indicate that their function depends on the underlying read format? val raw = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("DROPMALFORMED", "true").load(input) val columns = raw.columns.map(x => x.replaceAll(" ", "_")) raw.toDF(columns:_*).registerTempTable(table) val clean = sqlContext.sql("select " + columns.filter(x => x.length() > 0 && x != " ").mkString(", ") + " from " + table) > DataFrames unable to drop unwanted columns > -- > > Key: SPARK-10357 > URL: https://issues.apache.org/jira/browse/SPARK-10357 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.4.1 >Reporter: Randy Gelhausen > > spark-csv seems to be exposing an issue with DataFrame's inability to drop > unwanted columns. > Related GitHub issue: https://github.com/databricks/spark-csv/issues/61 > My data (with header) looks like: > MI_PRINX,offense_id,rpt_date,occur_date,occur_time,poss_date,poss_time,beat,apt_office_prefix,apt_office_num,location,MinOfucr,MinOfibr_code,dispo_code,MaxOfnum_victims,Shift,Avg > Day,loc_type,UC2 Literal,neighborhood,npu,x,y,,, > 934782,90360664,2/5/2009,2/3/2009,13:50:00,2/3/2009,15:00:00,305,NULL,NULL,55 > MCDONOUGH BLVD SW,670,2308,NULL,1,Day,Tue,35,LARCENY-NON VEHICLE,South > Atlanta,Y,-84.38654,33.72024,,, > 934783,90370891,2/6/2009,2/6/2009,8:50:00,2/6/2009,10:45:00,502,NULL,NULL,464 > ANSLEY WALK TER NW,640,2305,NULL,1,Day,Fri,18,LARCENY-FROM VEHICLE,Ansley > Park,E,-84.37276,33.79685,,, > Despite using sqlContext (also tried with the programmatic raw.select, same > result) to remove columns from the dataframe, attempts to operate on it cause > failures. > Snippet: > // Read CSV file, clean field names > val raw = > sqlContext.read.format("com.databricks.spark.csv").option("header", > "true").option("DROPMALFORMED", "true").load(input) > val columns = raw.columns.map(x => x.replaceAll(" ", "_")) > raw.toDF(columns:_*).registerTempTable(table) > val clean = sqlContext.sql("select " + columns.filter(x => x.length() > 0 > && x != " ").mkString(", ") + " from " + table) > System.err.println(clean.schema) > System.err.println(clean.columns.mkString(",")) > System.err.println(clean.take(1).mkString("|")) > StackTrace: > {code} > 15/08/30 18:23:13 INFO scheduler.TaskSetManager: Starting task 0.0 in stage > 0.0 (TID 0, docker.dev, NODE_LOCAL, 1482 bytes) > 15/08/30 18:23:14 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in > memory on docker.dev:58272 (size: 1811.0 B, free: 530.0 MB) > 15/08/30 18:23:14 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in > memory on docker.dev:58272 (size: 21.9 KB, free: 530.0 MB) > 15/08/30 18:23:15 INFO scheduler.TaskSetManager: Finished task 0.0 in stage > 0.0 (TID 0) in 1350 ms on docker.dev (1/1) > 15/08/30 18:23:15 INFO scheduler.DAGScheduler: ResultStage 0 (take at > CsvRelation.scala:174) finished in 1.354 s > 15/08/30 18:23:15 INFO cluster.YarnScheduler: Removed TaskSet 0.0, whose > tasks have all completed, from pool > 15/08/30 18:23:15 INFO scheduler.DAGScheduler: Job 0 finished: take at > CsvRelation.scala:174, took 1.413674 s > StructType(StructField(MI_PRINX,StringType,true), > StructField(offense_id,StringType,true), > StructField(rpt_date,StringType,true), > StructField(occur_date,StringType,true), > StructField(occur_time,StringType,true), > StructField(poss_date,StringType,true), > StructField(poss_time,StringType,true), StructField(beat,StringType,true), > StructField(apt_office_prefix,StringType,true), > StructField(apt_office_num,StringType,true), > StructField(location,StringType,true), StructField(MinOfucr,StringType,true), > StructField(MinOfibr_code,StringType,true), > StructField(dispo_code,StringType,true), > StructField(MaxOfnum_victims,StringType,true), > StructField(Shift,StringType,true), StructField(Avg_Day,StringType,true), > StructField(loc_type,StringType,true), > StructField(UC2_Literal,StringType,true), > StructField(neighborhood,StringType,true), StructField(npu,StringType,true), > StructField(x,StringType,true), StructField(y,StringType,true)) > MI_PRINX,offense_id,rpt_date,occur_date,occur_time,poss_date,poss_time,beat,apt_office_prefix,apt_office_num,location,MinOfucr,MinOfibr_code,dispo_code,MaxOfnum_victims,Shift,Avg_Day,loc_type,UC2_Literal,neighborhood,npu,x,y > 15/08/30 18:23:16
[jira] [Created] (SPARK-10357) DataFrames unable to drop unwanted columns
Randy Gelhausen created SPARK-10357: --- Summary: DataFrames unable to drop unwanted columns Key: SPARK-10357 URL: https://issues.apache.org/jira/browse/SPARK-10357 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 1.4.1 Reporter: Randy Gelhausen spark-csv seems to be exposing an issue with DataFrame's inability to drop unwanted columns. Related GitHub issue: https://github.com/databricks/spark-csv/issues/61 My data (with header) looks like: MI_PRINX,offense_id,rpt_date,occur_date,occur_time,poss_date,poss_time,beat,apt_office_prefix,apt_office_num,location,MinOfucr,MinOfibr_code,dispo_code,MaxOfnum_victims,Shift,Avg Day,loc_type,UC2 Literal,neighborhood,npu,x,y,,, 934782,90360664,2/5/2009,2/3/2009,13:50:00,2/3/2009,15:00:00,305,NULL,NULL,55 MCDONOUGH BLVD SW,670,2308,NULL,1,Day,Tue,35,LARCENY-NON VEHICLE,South Atlanta,Y,-84.38654,33.72024,,, 934783,90370891,2/6/2009,2/6/2009,8:50:00,2/6/2009,10:45:00,502,NULL,NULL,464 ANSLEY WALK TER NW,640,2305,NULL,1,Day,Fri,18,LARCENY-FROM VEHICLE,Ansley Park,E,-84.37276,33.79685,,, Despite using sqlContext (also tried with the programmatic raw.select, same result) to remove columns from the dataframe, attempts to operate on it cause failures. Snippet: // Read CSV file, clean field names val raw = sqlContext.read.format(com.databricks.spark.csv).option(header, true).option(DROPMALFORMED, true).load(input) val columns = raw.columns.map(x = x.replaceAll( , _)) raw.toDF(columns:_*).registerTempTable(table) val clean = sqlContext.sql(select + columns.filter(x = x.length() 0 x != ).mkString(, ) + from + table) System.err.println(clean.schema) System.err.println(clean.columns.mkString(,)) System.err.println(clean.take(1).mkString(|)) StackTrace: 15/08/30 18:23:13 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, docker.dev, NODE_LOCAL, 1482 bytes) 15/08/30 18:23:14 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on docker.dev:58272 (size: 1811.0 B, free: 530.0 MB) 15/08/30 18:23:14 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on docker.dev:58272 (size: 21.9 KB, free: 530.0 MB) 15/08/30 18:23:15 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 1350 ms on docker.dev (1/1) 15/08/30 18:23:15 INFO scheduler.DAGScheduler: ResultStage 0 (take at CsvRelation.scala:174) finished in 1.354 s 15/08/30 18:23:15 INFO cluster.YarnScheduler: Removed TaskSet 0.0, whose tasks have all completed, from pool 15/08/30 18:23:15 INFO scheduler.DAGScheduler: Job 0 finished: take at CsvRelation.scala:174, took 1.413674 s StructType(StructField(MI_PRINX,StringType,true), StructField(offense_id,StringType,true), StructField(rpt_date,StringType,true), StructField(occur_date,StringType,true), StructField(occur_time,StringType,true), StructField(poss_date,StringType,true), StructField(poss_time,StringType,true), StructField(beat,StringType,true), StructField(apt_office_prefix,StringType,true), StructField(apt_office_num,StringType,true), StructField(location,StringType,true), StructField(MinOfucr,StringType,true), StructField(MinOfibr_code,StringType,true), StructField(dispo_code,StringType,true), StructField(MaxOfnum_victims,StringType,true), StructField(Shift,StringType,true), StructField(Avg_Day,StringType,true), StructField(loc_type,StringType,true), StructField(UC2_Literal,StringType,true), StructField(neighborhood,StringType,true), StructField(npu,StringType,true), StructField(x,StringType,true), StructField(y,StringType,true)) MI_PRINX,offense_id,rpt_date,occur_date,occur_time,poss_date,poss_time,beat,apt_office_prefix,apt_office_num,location,MinOfucr,MinOfibr_code,dispo_code,MaxOfnum_victims,Shift,Avg_Day,loc_type,UC2_Literal,neighborhood,npu,x,y 15/08/30 18:23:16 INFO storage.MemoryStore: ensureFreeSpace(232400) called with curMem=259660, maxMem=278019440 15/08/30 18:23:16 INFO storage.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 227.0 KB, free 264.7 MB) 15/08/30 18:23:16 INFO storage.MemoryStore: ensureFreeSpace(22377) called with curMem=492060, maxMem=278019440 15/08/30 18:23:16 INFO storage.MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 21.9 KB, free 264.6 MB) 15/08/30 18:23:16 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on 172.17.0.19:41088 (size: 21.9 KB, free: 265.1 MB) 15/08/30 18:23:16 INFO spark.SparkContext: Created broadcast 2 from textFile at TextFile.scala:30 Exception in thread main java.lang.IllegalArgumentException: The header contains a duplicate entry: '' in [MI_PRINX, offense_id, rpt_date, occur_date, occur_time, poss_date, poss_time, beat, apt_office_prefix, apt_office_num, location, MinOfucr, MinOfibr_code, dispo_code, MaxOfnum_victims, Shift, Avg Day, loc_type, UC2 Literal,