spark reshape hive table and save to parquet
Hello, I wonder if there is a way (preferably efficient) in Spark to reshape hive table and save it to parquet. Here is a minimal example, input hive table: col1 col2 col3 1 2 3 4 5 6 output parquet: col1 newcol2 1 [2 3] 4 [5 6] p.s. The real input hive table has ~1000 columns. Thank you, Anton
Re: spark reshape hive table and save to parquet
I am looking for something like: # prepare input data val input_schema = StructType(Seq( StructField("col1", IntegerType), StructField("col2", IntegerType), StructField("col3", IntegerType))) val input_data = spark.createDataFrame( sc.parallelize(Seq( Row(1, 2, 3), Row(4, 5, 6))), schema) # reshape input dataframe according to the output_schema and save to parquet val output_schema = StructType(Seq( StructField("col1", IntegerType), StructField("newcol2", StructType(Seq( StructField("col2", IntegerType), StructField("col3", IntegerType)) *val output_data = spark.createDataFrame(input_data, output_schema) # does not work* output_data.write.parquet("output_data.parquet")
Re: spark reshape hive table and save to parquet
Hi Divya, Thanks, it is exactly what I am looking for! Anton On Wed, Dec 14, 2016 at 6:01 PM, Divya Gehlot wrote: > you can use udfs to do it > http://stackoverflow.com/questions/31615657/how-to-add- > a-new-struct-column-to-a-dataframe > > Hope it will help. > > > Thanks, > Divya > > On 9 December 2016 at 00:53, Anton Kravchenko < > kravchenko.anto...@gmail.com> wrote: > >> Hello, >> >> I wonder if there is a way (preferably efficient) in Spark to reshape >> hive table and save it to parquet. >> >> Here is a minimal example, input hive table: >> col1 col2 col3 >> 1 2 3 >> 4 5 6 >> >> output parquet: >> col1 newcol2 >> 1 [2 3] >> 4 [5 6] >> >> p.s. The real input hive table has ~1000 columns. >> >> Thank you, >> Anton >> > >
spark rdd map error: too many arguments for unapply pattern, maximum = 22
Hi there, When I do rdd map with more than 22 columns - I get "error: too many arguments for unapply pattern, maximum = 22". scala> val rddRes=rows.map{case Row(col1,..col23) => Row(...)} Is there a known way to get around this issue? p.s. Here is a full traceback: C:\spark-2.0.1-bin-hadoop2.7>bin\spark-shell.cmd Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_102) scala> import org.apache.spark.rdd.RDD import org.apache.spark.rdd.RDD scala> import org.apache.spark.sql.Row import org.apache.spark.sql.Row scala> import org.apache.spark.sql.types._ import org.apache.spark.sql.types._ scala> val hive_extract = spark.read.parquet("hdfs:/user/akravchenko/extract_week49_ip.parquet") hive_extract: org.apache.spark.sql.DataFrame = [rec_lngth_cnt: int, nch_near_line_rec_vrsn_cd: string ... 3593 more fields] scala> hive_extract.createOrReplaceTempView("hive_extract_table") scala> val df0=spark.sql("SELECT rec_lngth_cnt,nch_edit_trlr_ind_cd_oc1,nch_edit_trlr_ind_cd_oc2,nch_edit_trlr_ind_cd_oc3,nch_edit_trlr_ind_cd_oc4,nch_edit_trlr_ind_cd_oc5,nch_edit_trlr_ind_cd_oc6,nch_edit_trlr_ind_cd_oc7,nch_edit_trlr_ind_cd_oc8,nch_edit_trlr_ind_cd_oc9,nch_edit_trlr_ind_cd_oc10,nch_edit_trlr_ind_cd_oc11,nch_edit_trlr_ind_cd_oc12,nch_edit_trlr_ind_cd_oc13,nch_edit_cd_oc1,nch_edit_cd_oc2,nch_edit_cd_oc3,nch_edit_cd_oc4,nch_edit_cd_oc5,nch_edit_cd_oc6,nch_edit_cd_oc7,nch_edit_cd_oc8,nch_edit_cd_oc9,nch_edit_cd_oc10,nch_edit_cd_oc11,nch_edit_cd_oc12,nch_edit_cd_oc13 FROM hive_extract_table limit 10") df0: org.apache.spark.sql.DataFrame = [rec_lngth_cnt: int, nch_edit_trlr_ind_cd_oc1: string ... 25 more fields] scala> val rows: RDD[Row]=df0.rdd rows: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[82] at rdd at :35 scala> val rddRes=rows.map{case Row(rec_lngth_cnt,nch_edit_trlr_ind_cd_oc1,nch_edit_trlr_ind_cd_oc2,nch_edit_trlr_ind_cd_oc3,nch_edit_trlr_ind_cd_oc4,nch_edit_trlr_ind_cd_oc5,nch_edit_trlr_ind_cd_oc6,nch_edit_trlr_ind_cd_oc7,nch_edit_trlr_ind_cd_oc8,nch_edit_trlr_ind_cd_oc9,nch_edit_trlr_ind_cd_oc10,nch_edit_trlr_ind_cd_oc11,nch_edit_trlr_ind_cd_oc12,nch_edit_trlr_ind_cd_oc13,nch_edit_cd_oc1,nch_edit_cd_oc2,nch_edit_cd_oc3,nch_edit_cd_oc4,nch_edit_cd_oc5,nch_edit_cd_oc6,nch_edit_cd_oc7,nch_edit_cd_oc8,nch_edit_cd_oc9,nch_edit_cd_oc10,nch_edit_cd_oc11,nch_edit_cd_oc12,nch_edit_cd_oc13) | => Row(rec_lngth_cnt,Row(nch_edit_trlr_ind_cd_oc1,nch_edit_trlr_ind_cd_oc2,nch_edit_trlr_ind_cd_oc3,nch_edit_trlr_ind_cd_oc4,nch_edit_trlr_ind_cd_oc5,nch_edit_trlr_ind_cd_oc6,nch_edit_trlr_ind_cd_oc7,nch_edit_trlr_ind_cd_oc8,nch_edit_trlr_ind_cd_oc9,nch_edit_trlr_ind_cd_oc10,nch_edit_trlr_ind_cd_oc11,nch_edit_trlr_ind_cd_oc12,nch_edit_trlr_ind_cd_oc13),Row(nch_edit_cd_oc1,nch_edit_cd_oc2,nch_edit_cd_oc3,nch_edit_cd_oc4,nch_edit_cd_oc5,nch_edit_cd_oc6,nch_edit_cd_oc7,nch_edit_cd_oc8,nch_edit_cd_oc9,nch_edit_cd_oc10,nch_edit_cd_oc11,nch_edit_cd_oc12,nch_edit_cd_oc13))} :37: error: too many arguments for unapply pattern, maximum = 22 val rddRes=rows.map{case Row(rec_lngth_cnt,nch_edit_trlr_ind_cd_oc1,nch_edit_trlr_ind_cd_oc2,nch_edit_trlr_ind_cd_oc3,nch_edit_trlr_ind_cd_oc4,nch_edit_trlr_ind_cd_oc5,nch_edit_trlr_ind_cd_oc6,nch_edit_trlr_ind_cd_oc7,nch_edit_trlr_ind_cd_oc8,nch_edit_trlr_ind_cd_oc9,nch_edit_trlr_ind_cd_oc10,nch_edit_trlr_ind_cd_oc11,nch_edit_trlr_ind_cd_oc12,nch_edit_trlr_ind_cd_oc13,nch_edit_cd_oc1,nch_edit_cd_oc2,nch_edit_cd_oc3,nch_edit_cd_oc4,nch_edit_cd_oc5,nch_edit_cd_oc6,nch_edit_cd_oc7,nch_edit_cd_oc8,nch_edit_cd_oc9,nch_edit_cd_oc10,nch_edit_cd_oc11,nch_edit_cd_oc12,nch_edit_cd_oc13) ^ Thank you, Anton
[no subject]
df.rdd.foreachPartition(convert_to_sas_single_partition) def convert_to_sas_single_partition(ipartition: Iterator[Row]): Unit = { for (irow <- ipartition) {
foreachPartition in Spark Java API
What would be a Java equivalent of the Scala code below? def void_function_in_scala(ipartition: Iterator[Row]): Unit ={ var df_rows=ArrayBuffer[String]() for(irow<-ipartition){ df_rows+=irow.toString } val df = spark.read.csv("file:///C:/input_data/*.csv") df.foreachPartition(void_function_in_scala); Thank you, Anton
Re: foreachPartition in Spark Java API
Ok, there are at least two ways to do it: Dataset df = spark.read.csv("file:///C:/input_data/*.csv") df.foreachPartition(new ForEachPartFunction()); df.toJavaRDD().foreachPartition(new Void_java_func()); where ForEachPartFunction and Void_java_func are defined below: // ForEachPartFunction.java: import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.sql.Row; import java.util.ArrayList; import java.util.Iterator; import java.util.List; public class Void_java_func implements VoidFunction> { public void call(Iterator it) { List rows = new ArrayList(); while (it.hasNext()) { Row irow = it.next(); rows.add(irow.toString()); } } } // Void_java_func.java: import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.sql.Row; import java.util.ArrayList; import java.util.Iterator; import java.util.List; public class Void_java_func implements VoidFunction> { public void call(Iterator it) { List rows = new ArrayList(); while (it.hasNext()) { Row irow = it.next(); rows.add(irow.toString()); } } } Anton On Tue, May 30, 2017 at 10:58 AM, Anton Kravchenko < kravchenko.anto...@gmail.com> wrote: > What would be a Java equivalent of the Scala code below? > > def void_function_in_scala(ipartition: Iterator[Row]): Unit ={ > var df_rows=ArrayBuffer[String]() > for(irow<-ipartition){ > df_rows+=irow.toString > } > > val df = spark.read.csv("file:///C:/input_data/*.csv") > df.foreachPartition(void_function_in_scala); > > Thank you, > Anton >
Re: foreachPartition in Spark Java API
//ForEachPartFunction.java: import org.apache.spark.api.java.function.ForeachPartitionFunction; import org.apache.spark.sql.Row; import java.util.ArrayList; import java.util.Iterator; import java.util.List; public class ForEachPartFunction implements ForeachPartitionFunction{ public void call(Iterator t) throws Exception{ List rows = new ArrayList(); while(t.hasNext()) { Row irow = t.next(); rows.add(irow.toString()); } System.out.println(rows.toString()); } } On Tue, May 30, 2017 at 2:01 PM, Anton Kravchenko < kravchenko.anto...@gmail.com> wrote: > Ok, there are at least two ways to do it: > Dataset df = spark.read.csv("file:///C:/input_data/*.csv") > > df.foreachPartition(new ForEachPartFunction()); > df.toJavaRDD().foreachPartition(new Void_java_func()); > > where ForEachPartFunction and Void_java_func are defined below: > > // ForEachPartFunction.java: > import org.apache.spark.api.java.function.VoidFunction; > import org.apache.spark.sql.Row; > import java.util.ArrayList; > import java.util.Iterator; > import java.util.List; > > public class Void_java_func implements VoidFunction> { > public void call(Iterator it) { > List rows = new ArrayList(); > > while (it.hasNext()) { > Row irow = it.next(); > rows.add(irow.toString()); > } > } > } > > // Void_java_func.java: > import org.apache.spark.api.java.function.VoidFunction; > import org.apache.spark.sql.Row; > import java.util.ArrayList; > import java.util.Iterator; > import java.util.List; > > public class Void_java_func implements VoidFunction> { > public void call(Iterator it) { > List rows = new ArrayList(); > > while (it.hasNext()) { > Row irow = it.next(); > rows.add(irow.toString()); > } > } > } > > Anton > > > On Tue, May 30, 2017 at 10:58 AM, Anton Kravchenko < > kravchenko.anto...@gmail.com> wrote: > >> What would be a Java equivalent of the Scala code below? >> >> def void_function_in_scala(ipartition: Iterator[Row]): Unit ={ >> var df_rows=ArrayBuffer[String]() >> for(irow<-ipartition){ >> df_rows+=irow.toString >> } >> >> val df = spark.read.csv("file:///C:/input_data/*.csv") >> df.foreachPartition(void_function_in_scala); >> >> Thank you, >> Anton >> > >
Java access to internal representation of DataTypes.DateType
How one would access to internal representation of DataTypes.DateType from Spark (2.0.1) Java API? From https://github.com/apache/spark/blob/51b1c1551d3a7147403b9e821fcc7c8f57b4824c/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DateType.scala : "Internally, this is represented as the number of days from 1970-01-01."
Re: Java access to internal representation of DataTypes.DateType
I switched to java.sql.Date and converted milliseconds to days: while (it.hasNext()) { Row irow = it.next(); long t_long = irow.getAs("time_col").getTime()/(60*60*1000)))/24; int t_int = toIntExact(t_long); } Though if there is more efficient way to do it I would be happy to see that. Anton On Wed, Jun 14, 2017 at 12:42 AM, Kazuaki Ishizaki wrote: > Does this code help you? > https://github.com/apache/spark/blob/master/sql/core/ > src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java#L156-L194 > > Kazuaki Ishizaki > > > > From:Anton Kravchenko > To:"user @spark" > Date:2017/06/14 01:16 > Subject:Java access to internal representation of > DataTypes.DateType > -- > > > > How one would access to internal representation of DataTypes.DateType from > Spark (2.0.1) Java API? > > From > *https://github.com/apache/spark/blob/51b1c1551d3a7147403b9e821fcc7c8f57b4824c/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DateType.scala* > <https://github.com/apache/spark/blob/51b1c1551d3a7147403b9e821fcc7c8f57b4824c/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DateType.scala> > : > "Internally, this is represented as the number of days from 1970-01-01." > > > >
access a broadcasted variable from within ForeachPartitionFunction Java API
How one would access a broadcasted variable from within ForeachPartitionFunction Spark(2.0.1) Java API ? Integer _bcv = 123; Broadcast bcv = spark.sparkContext().broadcast(_bcv); Dataset df_sql = spark.sql("select * from atable"); df_sql.foreachPartition(new ForeachPartitionFunction() { public void call(Iterator t) throws Exception { System.out.println(bcv.value()); }} );
Re: access a broadcasted variable from within ForeachPartitionFunction Java API
ok, this one is doing what I want SparkConf conf = new SparkConf() .set("spark.sql.warehouse.dir", "hdfs://localhost:9000/user/hive/warehouse") .setMaster("local[*]") .setAppName("TestApp"); JavaSparkContext sc = new JavaSparkContext(conf); SparkSession session = SparkSession .builder() .appName("TestApp").master("local[*]") .getOrCreate(); Integer _bcv = 123; Broadcast bcv = sc.broadcast(_bcv); WrapBCV.setBCV(bcv); // implemented in WrapBCV.java df_sql.foreachPartition(new ProcessSinglePartition()); //implemented in ProcessSinglePartition.java Where: ProcessSinglePartition.java public class ProcessSinglePartition implements ForeachPartitionFunction { public void call(Iterator it) throws Exception { System.out.println(WrapBCV.getBCV()); } } WrapBCV.java public class WrapBCV { private static Broadcast bcv; public static void setBCV(Broadcast setbcv){ bcv = setbcv; } public static Integer getBCV() { return bcv.value(); } } On Fri, Jun 16, 2017 at 3:35 AM, Ryan wrote: > I don't think Broadcast itself can be serialized. you can get the value > out on the driver side and refer to it in foreach, then the value would be > serialized with the lambda expr and sent to workers. > > On Fri, Jun 16, 2017 at 2:29 AM, Anton Kravchenko < > kravchenko.anto...@gmail.com> wrote: > >> How one would access a broadcasted variable from within >> ForeachPartitionFunction Spark(2.0.1) Java API ? >> >> Integer _bcv = 123; >> Broadcast bcv = spark.sparkContext().broadcast(_bcv); >> Dataset df_sql = spark.sql("select * from atable"); >> >> df_sql.foreachPartition(new ForeachPartitionFunction() { >> public void call(Iterator t) throws Exception { >> System.out.println(bcv.value()); >> }} >> ); >> >> >
unsubscribe
unsubscribe