Re: Implementing TableProvider in Spark 3.0
Saw Sent from Yahoo Mail for iPhone On Wednesday, July 8, 2020, 9:26 PM, Sricheta Ruj wrote: Hello Spark Team I am trying to use the DataSourceV2 API from Spark 3.0. I wanted to ask in case of write- how do I get the user specified schema? This is what I am trying to achieve- valdata =Seq( Row("one",1,true,12.34,6L, date, Decimal(999.00), timestamp,2f, byteVal, shortVal), Row("two",1,false,13.34,7L, date, Decimal(3.3), timestamp,3.59f, byteVal, shortVal) ) val schema = new StructType() .add(StructField("name", StringType,true)) .add(StructField("id", IntegerType,true)) .add(StructField("flag", BooleanType,true)) .add(StructField("salary", DoubleType,true)) .add(StructField("phone", LongType,true)) .add(StructField("dob", DateType,true)) .add(StructField("weight", DecimalType(Constants.DECIMAL_PRECISION,7),true)) .add(StructField("time", TimestampType,true)) .add(StructField("float", FloatType,true)) .add(StructField("byte", ByteType,true)) .add(StructField("short", ShortType,true)) val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema) //Create a new manifest and add the entity to it df.write.format("com.microsoft.cdm") .option("storage",storageAccountName) .option("container",outputContainer) .option("manifest","/root/default.manifest.cdm.json") .option("entity","TestEntity") .option("format","parquet") .option("compression","gzip") .option("appId",appid).option("appKey",appkey).option("tenantId",tenantid) .mode(SaveMode.Append) .save() I have my custom DataSource Implementation as follows – class DefaultSource extends DataSourceRegister with TableProvider { override def shortName(): String = "spark-cdm" override def inferSchema(options: CaseInsensitiveStringMap): StructType = null override def inferPartitioning(options: CaseInsensitiveStringMap): Array[Transform] = { getTable(null, null, options).partitioning } override def supportsExternalMetadata = true override def getTable(schema: StructType, partitioning: Array[Transform], properties: util.Map[String, String]): Table = { println(schema) new MysqlTable(schema, properties) } } I get null here. I am not sure how should I get the StructType I created on df.write.. Any help would be appreciated. Thank and Regards, Sricheta Ruj. Thanks, Sricheta Ruj
Re: tcps oracle connection from spark
and btw, same connection string works fine when used in SQL Developer. On Tuesday, June 18, 2019, 03:49:24 PM PDT, Richard Xin wrote: HI, I need help with tcps oracle connection from spark (version: spark-2.4.0-bin-hadoop2.7) Properties prop = new Properties();prop.putAll(sparkOracle); // username/password prop.put("javax.net.ssl.trustStore", "path to root.jks");prop.put("javax.net.ssl.trustStorePassword", "password_here"); df.write().mode(SaveMode.Append).option("driver", "oracle.jdbc.driver.OracleDriver").jdbc("jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS_LIST=(ADDRESS=(PROTOCOL=tcps)(HOST=host.mycomapny.com)(PORT=1234)))(CONNECT_DATA=(SERVICE_NAME=service_name)))","tableName", prop); note "PROTOCOL=tcps" in the connection string. The code worked fine for "tcp" hosts, but some of our servers use "tcps" only, I got following errors when hitting oracld tcps hosts, can someone shed some light? Thanks a lot! Exception in thread "main" java.sql.SQLRecoverableException: IO Error: Remote host terminated the handshake at oracle.jdbc.driver.T4CConnection.logon(T4CConnection.java:682) at oracle.jdbc.driver.PhysicalConnection.(PhysicalConnection.java:715) at oracle.jdbc.driver.T4CConnection.(T4CConnection.java:385) at oracle.jdbc.driver.T4CDriverExtension.getConnection(T4CDriverExtension.java:30) at oracle.jdbc.driver.OracleDriver.connect(OracleDriver.java:564) at org.apache.spark.sql.execution.datasources.jdbc.DriverWrapper.connect(DriverWrapper.scala:45) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:63) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:54) at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:48) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:668) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:276) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:270) at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:506) at com.apple.jmet.pallas.data_migration.DirectMigrationWConfig.main(DirectMigrationWConfig.java:103) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)Caused by: javax.ne
tcps oracle connection from spark
HI, I need help with tcps oracle connection from spark (version: spark-2.4.0-bin-hadoop2.7) Properties prop = new Properties();prop.putAll(sparkOracle); // username/password prop.put("javax.net.ssl.trustStore", "path to root.jks");prop.put("javax.net.ssl.trustStorePassword", "password_here"); df.write().mode(SaveMode.Append).option("driver", "oracle.jdbc.driver.OracleDriver").jdbc("jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS_LIST=(ADDRESS=(PROTOCOL=tcps)(HOST=host.mycomapny.com)(PORT=1234)))(CONNECT_DATA=(SERVICE_NAME=service_name)))","tableName", prop); note "PROTOCOL=tcps" in the connection string. The code worked fine for "tcp" hosts, but some of our servers use "tcps" only, I got following errors when hitting oracld tcps hosts, can someone shed some light? Thanks a lot! Exception in thread "main" java.sql.SQLRecoverableException: IO Error: Remote host terminated the handshake at oracle.jdbc.driver.T4CConnection.logon(T4CConnection.java:682) at oracle.jdbc.driver.PhysicalConnection.(PhysicalConnection.java:715) at oracle.jdbc.driver.T4CConnection.(T4CConnection.java:385) at oracle.jdbc.driver.T4CDriverExtension.getConnection(T4CDriverExtension.java:30) at oracle.jdbc.driver.OracleDriver.connect(OracleDriver.java:564) at org.apache.spark.sql.execution.datasources.jdbc.DriverWrapper.connect(DriverWrapper.scala:45) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:63) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:54) at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:48) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:668) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:276) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:270) at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:506) at com.apple.jmet.pallas.data_migration.DirectMigrationWConfig.main(DirectMigrationWConfig.java:103) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)Caused by: javax.net.ssl.SSLHandshakeException: Remote host terminated the handshake at java.base/sun.security.ssl.SSLSocketImpl.handleEOF(SSLSocketImpl.java:1321) at java.base/sun.security.ssl.SSLSocketImpl.decode(SSLSocketImpl.java:1160) at java.base/sun.security.ssl.SSLSocketImpl.readHand
Re: spark-cassandra-connector_2.1 caused java.lang.NoClassDefFoundError under Spark 2.4.2?
Thanks for the reply. Unfortunately this is the highest version available for Cassandra connector. One thing I don’t quite understand is that it worked perfectly under Spark 2.4.0. I thought support for Scala 2.11 only became deprecated starting spark 2.4.1, will be removed after spark 3.0 Sent from Yahoo Mail for iPhone On Monday, May 6, 2019, 18:34, Russell Spitzer wrote: Scala version mismatched Spark is shown at 2.12, the connector only has a 2.11 release On Mon, May 6, 2019, 7:59 PM Richard Xin wrote: org.apache.spark spark-core_2.12 2.4.0 compile org.apache.spark spark-sql_2.12 2.4.0 com.datastax.spark spark-cassandra-connector_2.11 2.4.1 I run spark-submit I got following exceptions on Spark 2.4.2, it works fine when running spark-submit under Spark 2.4.0 with exact the same command-line call, any idea how do i fix this? Thanks a lot! Exception in thread "main" java.lang.NoClassDefFoundError: scala/Product$class at com.datastax.spark.connector.util.ConfigParameter.(ConfigParameter.scala:7) at com.datastax.spark.connector.rdd.ReadConf$.(ReadConf.scala:33) at com.datastax.spark.connector.rdd.ReadConf$.(ReadConf.scala) at org.apache.spark.sql.cassandra.DefaultSource$.(DefaultSource.scala:134) at org.apache.spark.sql.cassandra.DefaultSource$.(DefaultSource.scala) at org.apache.spark.sql.cassandra.DefaultSource.createRelation(DefaultSource.scala:55) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:318) at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167) at com.apple.jmet.pallas.data_migration.DirectMigrationWConfig.main(DirectMigrationWConfig.java:76) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
spark-cassandra-connector_2.1 caused java.lang.NoClassDefFoundError under Spark 2.4.2?
org.apache.spark spark-core_2.12 2.4.0 compile org.apache.spark spark-sql_2.12 2.4.0 com.datastax.spark spark-cassandra-connector_2.11 2.4.1 I run spark-submit I got following exceptions on Spark 2.4.2, it works fine when running spark-submit under Spark 2.4.0 with exact the same command-line call, any idea how do i fix this? Thanks a lot! Exception in thread "main" java.lang.NoClassDefFoundError: scala/Product$class at com.datastax.spark.connector.util.ConfigParameter.(ConfigParameter.scala:7) at com.datastax.spark.connector.rdd.ReadConf$.(ReadConf.scala:33) at com.datastax.spark.connector.rdd.ReadConf$.(ReadConf.scala) at org.apache.spark.sql.cassandra.DefaultSource$.(DefaultSource.scala:134) at org.apache.spark.sql.cassandra.DefaultSource$.(DefaultSource.scala) at org.apache.spark.sql.cassandra.DefaultSource.createRelation(DefaultSource.scala:55) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:318) at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167) at com.apple.jmet.pallas.data_migration.DirectMigrationWConfig.main(DirectMigrationWConfig.java:76) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
how sequence of chained jars in spark.(driver/executor).extraClassPath matters
so let's say I have chained path in spark.driver.extraClassPath/spark.executor.extraClassPath such as /path1/*:/path2/*, and I have different versions of the same jar under those 2 directories, how spark pick the version of jar to use, from /path1/*? Thanks.
can I do spark-submit --jars [s3://bucket/folder/jar_file]? or --jars
Can we add extra library (jars on S3) to spark-submit? if yes, how? such as --jars, extraClassPath, extraLibPathThanks,Richard
Re: Flatten JSON to multiple columns in Spark
I believe you could use JOLT (bazaarvoice/jolt) to flatten it to a json string and then to dataframe or dataset. | | | | | | | | | | | bazaarvoice/jolt jolt - JSON to JSON transformation library written in Java. | | | On Monday, July 17, 2017, 11:18:24 PM PDT, Chetan Khatri wrote: Explode is not working in this scenario with error - string cannot be used in explore either array or map in spark On Tue, Jul 18, 2017 at 11:39 AM, 刘虓 wrote: Hi,have you tried to use explode? Chetan Khatri 于2017年7月18日 周二下午2:06写道: Hello Spark Dev's, Can you please guide me, how to flatten JSON to multiple columns in Spark. Example: | Sr No | Title | ISBN | Info | | 1 | Calculus Theory | 1234567890 | [{"cert":[{ "authSbmtr":"009415da-c8cd- 418d-869e-0a19601d79fa", 009415da-c8cd-418d-869e- 0a19601d79fa "certUUID":"03ea5a1a-5530- 4fa3-8871-9d1ebac627c4", "effDt":"2016-05-06T15:04:56. 279Z", "fileFmt":"rjrCsv","status":" live"}], "expdCnt":"15", "mfgAcctNum":"531093", "oUUID":"23d07397-4fbe-4897- 8a18-b79c9f64726c", "pgmRole":["RETAILER"], "pgmUUID":"1cb5dd63-817a-45bc- a15c-5660e4accd63", "regUUID":"cc1bd898-657d-40dc- af5d-4bf1569a1cc4", "rtlrsSbmtd":["009415da-c8cd- 418d-869e-0a19601d79fa"]}] | I want to get single row with 11 columns. Thanks.
Re: apache-spark: Converting List of Rows into Dataset Java
Maybe you could try something like that: SparkSession sparkSession = SparkSession .builder() .appName("Rows2DataSet") .master("local") .getOrCreate(); List results = new LinkedList(); JavaRDD jsonRDD = new JavaSparkContext(sparkSession.sparkContext()).parallelize(results); Dataset peopleDF = sparkSession.createDataFrame(jsonRDD, Row.class); Richard Xin On Tuesday, March 28, 2017 7:51 AM, Karin Valisova wrote: Hello! I am running Spark on Java and bumped into a problem I can't solve or find anything helpful among answered questions, so I would really appreciate your help. I am running some calculations, creating rows for each result: List results = new LinkedList(); for(something){ results.add(RowFactory.create( someStringVariable, someIntegerVariable )); } Now I ended up with a list of rows I need to turn into dataframe to perform some spark sql operations on them, like groupings and sorting. Would like to keep the dataTypes. I tried: Dataset toShow = spark.createDataFrame(results, Row.class); but it throws nullpointer. (spark being SparkSession) Is my logic wrong there somewhere, should this operation be possible, resulting in what I want? Or do I have to create a custom class which extends serializable and create a list of those objects rather than Rows? Will I be able to perform SQL queries on dataset consisting of custom class objects rather than rows? I'm sorry if this is a duplicate question.Thank you for your help!Karin
Re: Issue creating row with java.util.Map type
try Row newRow = RowFactory.create(row.getString(0), row.getString(1), row.getMap(2)); On Friday, January 27, 2017 10:52 AM, Ankur Srivastava wrote: + DEV Mailing List On Thu, Jan 26, 2017 at 5:12 PM, Ankur Srivastava wrote: Hi, I am trying to map a Dataset with rows which have a map attribute. When I try to create a Row with the map attribute I get cast errors. I am able to reproduce the issue with the below sample code. The surprising thing is with same schema I am able to create a dataset from the List of rows. I am on Spark 2.0 and scala 2.11public static void main(String[] args) { StructType schema = new StructType().add("src", DataTypes.StringType) .add("dst", DataTypes.StringType) .add("freq", DataTypes.createMapType( DataTypes.StringType, DataTypes.IntegerType)); List inputData = new ArrayList<>(); inputData.add(RowFactory.creat e("1", "2", new HashMap<>())); SparkSession sparkSession = SparkSession .builder() .appName("IPCountFilterTest") .master("local") .getOrCreate(); Dataset out = sparkSession.createDataFrame( inputData, schema); out.show(); Encoder rowEncoder = RowEncoder.apply(schema); out.map((MapFunction) row -> { Row newRow = RowFactory.create(row. getString(0), row.getString(1), new HashMap()); //Row newRow = RowFactory.create(row. getString(0), row.getString(1), row.getJavaMap(2));return newRow; }, rowEncoder).show(); } Below is the error: 17/01/26 17:05:30 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)java.lang.RuntimeException: java.util.HashMap is not a valid external type for schema of map at org.apache.spark.sql.catalyst. expressions.GeneratedClass$ GeneratedIterator.processNext( Unknown Source) at org.apache.spark.sql. execution.BufferedRowIterator. hasNext(BufferedRowIterator. java:43) at org.apache.spark.sql. execution. WholeStageCodegenExec$$ anonfun$8$$anon$1.hasNext( WholeStageCodegenExec.scala: 370) at org.apache.spark.sql. execution.SparkPlan$$anonfun$ 4.apply(SparkPlan.scala:246) at org.apache.spark.sql. execution.SparkPlan$$anonfun$ 4.apply(SparkPlan.scala:240) at org.apache.spark.rdd.RDD$$ anonfun$mapPartitionsInternal$ 1$$anonfun$apply$24.apply(RDD. scala:784) at org.apache.spark.rdd.RDD$$ anonfun$mapPartitionsInternal$ 1$$anonfun$apply$24.apply(RDD. scala:784) at org.apache.spark.rdd. MapPartitionsRDD.compute( MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD. computeOrReadCheckpoint(RDD. scala:319) at org.apache.spark.rdd.RDD. iterator(RDD.scala:283) at org.apache.spark.scheduler. ResultTask.runTask(ResultTask. scala:70) at org.apache.spark.scheduler. Task.run(Task.scala:85) at org.apache.spark.executor. Executor$TaskRunner.run( Executor.scala:274) at java.util.concurrent. ThreadPoolExecutor.runWorker( ThreadPoolExecutor.java:1142) at java.util.concurrent. ThreadPoolExecutor$Worker.run( ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread. java:745)17/01/26 17:05:30 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.RuntimeException: java.util.HashMap is not a valid external type for schema of map at org.apache.spark.sql.catalyst. expressions.GeneratedClass$ GeneratedIterator.processNext( Unknown Source) at org.apache.spark.sql. execution.BufferedRowIterator. hasNext(BufferedRowIterator. java:43) at org.apache.spark.sql. execution. WholeStageCodegenExec$$ anonfun$8$$anon$1.hasNext( WholeStageCodegenExec.scala: 370) at org.apache.spark.sql. execution.SparkPlan$$anonfun$ 4.apply(SparkPlan.scala:246) at org.apache.spark.sql. execution.SparkPlan$$anonfun$ 4.apply(SparkPlan.scala:240) at org.apache.spark.rdd.RDD$$ anonfun$mapPartitionsInternal$ 1$$anonfun$apply$24.apply(RDD. scala:784) at org.apache.spark.rdd.RDD$$ anonfun$mapPartitionsInternal$ 1$$anonfun$apply$24.apply(RDD. scala:784) at org.apache.spark.rdd. MapPartitionsRDD.compute( MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD. computeOrReadCheckpoint(RDD. scala:319) at org.apache.spark.rdd.RDD. iterator(RDD.scala:283) at org.apache.spark.scheduler. ResultTask.runTask(ResultTask. scala:70) at org.apache.spark.scheduler. Task.run(Task.scala:85) at org.apache.spark.executor. Executor$TaskRunner.run( Executor.scala:274) at java.util.concurrent. ThreadPoolExecutor.runWorker( ThreadPoolExecutor.java:1142) at java.util.concurrent. ThreadPoolExecutor$Worker.run( ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread. java:745) ThanksAnkur
is partitionBy of DataFrameWriter supported in 1.6.x?
I found contradictions in document 1.6.0 and 2.1.x in http://spark.apache.org/docs/1.6.0/api/scala/index.html#org.apache.spark.sql.DataFrameWriterit says: "This is only applicable for Parquet at the moment." in http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameWriterit says: "This was initially applicable for Parquet but in 1.5+ covers JSON, text, ORC and avro as well." and I got warning when trying to save in scala: > df.write.mode("overwrite").format("orc").partitionBy("date").saveAsTable("test.my_test") 17/01/19 13:34:43 WARN hive.HiveContext$$anon$2: Persisting partitioned data source relation `test`.`my_test` into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. Input path(s): hdfs://nameservice1/user/hive/warehouse/test.db/my_test looking at hdfs directories, the folders are there, but not selectable on both presto and hive. any comments? Thanks.
Re: DataFrame to read json and include raw Json in DataFrame
thanks, I have seen this, but this doesn't cover my question. What I need is read json and include raw json as part of my dataframe. On Friday, December 30, 2016 10:23 AM, Annabel Melongo wrote: Richard, Below documentation will show you how to create a sparkSession and how to programmatically load data: Spark SQL and DataFrames - Spark 2.1.0 Documentation | | | Spark SQL and DataFrames - Spark 2.1.0 Documentation | | | On Thursday, December 29, 2016 5:16 PM, Richard Xin wrote: Say I have following data in file:{"id":1234,"ln":"Doe","fn":"John","age":25} {"id":1235,"ln":"Doe","fn":"Jane","age":22} java code snippet: final SparkConf sparkConf = new SparkConf().setMaster("local[2]").setAppName("json_test"); JavaSparkContext ctx = new JavaSparkContext(sparkConf); HiveContext hc = new HiveContext(ctx.sc()); DataFrame df = hc.read().json("files/json/example2.json"); what I need is a DataFrame with columns id, ln, fn, age as well as raw_json string any advice on the best practice in java?Thanks, Richard
DataFrame to read json and include raw Json in DataFrame
Say I have following data in file:{"id":1234,"ln":"Doe","fn":"John","age":25} {"id":1235,"ln":"Doe","fn":"Jane","age":22} java code snippet: final SparkConf sparkConf = new SparkConf().setMaster("local[2]").setAppName("json_test"); JavaSparkContext ctx = new JavaSparkContext(sparkConf); HiveContext hc = new HiveContext(ctx.sc()); DataFrame df = hc.read().json("files/json/example2.json"); what I need is a DataFrame with columns id, ln, fn, age as well as raw_json string any advice on the best practice in java?Thanks, Richard
Re: access Broadcast Variables in Spark java
try this:JavaRDD mapr = listrdd.map(x -> broadcastVar.value().get(x)); On Wednesday, December 21, 2016 2:25 PM, Sateesh Karuturi wrote: I need to process spark Broadcast variables using Java RDD API. This is my code what i have tried so far:This is only sample code to check whether its works or not? In my case i need to work on two csvfiles. SparkConf conf = new SparkConf().setAppName("BroadcastVariable").setMaster("local"); JavaSparkContext ctx = new JavaSparkContext(conf); Map map = new HashMap(); map.put(1, "aa"); map.put(2, "bb"); map.put(9, "ccc"); Broadcast> broadcastVar = ctx.broadcast(map); List list = new ArrayList(); list.add(1); list.add(2); list.add(9); JavaRDD listrdd = ctx.parallelize(list); JavaRDD mapr = listrdd.map(x -> broadcastVar.value()); System.out.println(mapr.collect()); and its prints output like this:[{1=aa, 2=bb, 9=ccc}, {1=aa, 2=bb, 9=ccc}, {1=aa, 2=bb, 9=ccc}] and my requirement is : [{aa, bb, ccc}] Is it possible to do like in my required way?please help me out.
Re: How to get recent value in spark dataframe
I am not sure I understood your logic, but it seems to me that you could take a look of Hive's Lead/Lag functions. On Monday, December 19, 2016 1:41 AM, Milin korath wrote: thanks, I tried with left outer join. My dataset having around 400M records and lot of shuffling is happening.Is there any other workaround apart from Join,I tried use window function but I am not getting a proper solution, Thanks On Sat, Dec 17, 2016 at 4:55 AM, Michael Armbrust wrote: Oh and to get the null for missing years, you'd need to do an outer join with a table containing all of the years you are interested in. On Fri, Dec 16, 2016 at 3:24 PM, Michael Armbrust wrote: Are you looking for argmax? Here is an example. On Wed, Dec 14, 2016 at 8:49 PM, Milin korath wrote: Hi | I have a spark data frame with following structure id flag price date a 0100 2015 a 050 2015 a 1200 2014 a 1300 2013 a 0400 2012I need to create a data frame with recent value of flag 1 and updated in the flag 0 rows. id flag price date new_column a 0100 2015200 a 050 2015200 a 1200 2014null a 1300 2013null a 0400 2012nullWe have 2 rows having flag=0. Consider the first row(flag=0),I will have 2 values(200 and 300) and I am taking the recent one 200(2014). And the last row I don't have any recent value for flag 1 so it is updated with null.Looking for a solution using scala. Any help would be appreciated.Thanks | | Thanks Milin
Re: Java to show struct field from a Dataframe
blockquote, div.yahoo_quoted { margin-left: 0 !important; border-left:1px #715FFA solid !important; padding-left:1ex !important; background-color:white !important; } Super, that works! Thanks Sent from Yahoo Mail for iPhone On Sunday, December 18, 2016, 11:28 AM, Yong Zhang wrote: -- P {margin-top:0;margin-bottom:0;} Why not you just return the struct you defined, instead of an array? @Override public Row call(Double x, Double y) throws Exception { Row row = RowFactory.create(x, y); return row; } From: Richard Xin Sent: Saturday, December 17, 2016 8:53 PM To: Yong Zhang; zjp_j...@163.com; user Subject: Re: Java to show struct field from a Dataframe I tried to transform root |-- latitude: double (nullable = false) |-- longitude: double (nullable = false) |-- name: string (nullable = true) to: root |-- name: string (nullable = true) |-- location: struct (nullable = true) | |-- longitude: double (nullable = true) | |-- latitude: double (nullable = true) Code snippet is as followings: sqlContext.udf().register("toLocation", new UDF2() { @Override public Row call(Double x, Double y) throws Exception { Row row = RowFactory.create(new double[] { x, y }); return row; } }, DataTypes.createStructType(new StructField[] { new StructField("longitude", DataTypes.DoubleType, true, Metadata.empty()), new StructField("latitude", DataTypes.DoubleType, true, Metadata.empty()) })); DataFrame transformedDf1 = citiesDF.withColumn("location", callUDF("toLocation", col("longitude"), col("latitude"))); transformedDf1.drop("latitude").drop("longitude").schema().printTreeString(); // prints schema tree OK as expected transformedDf.show(); // java.lang.ClassCastException: [D cannot be cast to java.lang.Double seems to me that the ReturnType of the UDF2 might be the root cause. but not sure how to correct. Thanks,Richard On Sunday, December 18, 2016 7:15 AM, Yong Zhang wrote: "[D" type means a double array type. So this error simple means you have double[] data, but Spark needs to cast it to Double, as your schema defined. The error message clearly indicates the data doesn't match with the type specified in the schema. I wonder how you are so sure about your data? Do you check it under other tool? Yong From: Richard Xin Sent: Saturday, December 17, 2016 10:56 AM To: zjp_j...@163.com; user Subject: Re: Java to show struct field from a Dataframe data is good On Saturday, December 17, 2016 11:50 PM, "zjp_j...@163.com" wrote: I think the causation is your invanlid Double data , have u checked your data ? zjp_j...@163.com From: Richard XinDate: 2016-12-17 23:28To: UserSubject: Java to show struct field from a Dataframelet's say I have a DataFrame with schema of followings:root |-- name: string (nullable = true) |-- location: struct (nullable = true) | |-- longitude: double (nullable = true) | |-- latitude: double (nullable = true) df.show(); throws following exception: java.lang.ClassCastException: [D cannot be cast to java.lang.Double at scala.runtime.BoxesRunTime.unboxToDouble(BoxesRunTime.java:119) at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getDouble(rows.scala:44) at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getDouble(rows.scala:221) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) Any advise?Thanks in advance.Richard
Re: Java to show struct field from a Dataframe
I tried to transform root |-- latitude: double (nullable = false) |-- longitude: double (nullable = false) |-- name: string (nullable = true) to: root |-- name: string (nullable = true) |-- location: struct (nullable = true) | |-- longitude: double (nullable = true) | |-- latitude: double (nullable = true) Code snippet is as followings: sqlContext.udf().register("toLocation", new UDF2() { @Override public Row call(Double x, Double y) throws Exception { Row row = RowFactory.create(new double[] { x, y }); return row; } }, DataTypes.createStructType(new StructField[] { new StructField("longitude", DataTypes.DoubleType, true, Metadata.empty()), new StructField("latitude", DataTypes.DoubleType, true, Metadata.empty()) })); DataFrame transformedDf1 = citiesDF.withColumn("location", callUDF("toLocation", col("longitude"), col("latitude"))); transformedDf1.drop("latitude").drop("longitude").schema().printTreeString(); // prints schema tree OK as expected transformedDf.show(); // java.lang.ClassCastException: [D cannot be cast to java.lang.Double seems to me that the ReturnType of the UDF2 might be the root cause. but not sure how to correct. Thanks,Richard On Sunday, December 18, 2016 7:15 AM, Yong Zhang wrote: #yiv1972361746 #yiv1972361746 -- P {margin-top:0;margin-bottom:0;}#yiv1972361746 "[D" type means a double array type. So this error simple means you have double[] data, but Spark needs to cast it to Double, as your schema defined. The error message clearly indicates the data doesn't match with the type specified in the schema. I wonder how you are so sure about your data? Do you check it under other tool? Yong From: Richard Xin Sent: Saturday, December 17, 2016 10:56 AM To: zjp_j...@163.com; user Subject: Re: Java to show struct field from a Dataframe data is good On Saturday, December 17, 2016 11:50 PM, "zjp_j...@163.com" wrote: I think the causation is your invanlid Double data , have u checked your data ? zjp_j...@163.com From: Richard XinDate: 2016-12-17 23:28To: UserSubject: Java to show struct field from a Dataframelet's say I have a DataFrame with schema of followings:root |-- name: string (nullable = true) |-- location: struct (nullable = true) | |-- longitude: double (nullable = true) | |-- latitude: double (nullable = true) df.show(); throws following exception: java.lang.ClassCastException: [D cannot be cast to java.lang.Double at scala.runtime.BoxesRunTime.unboxToDouble(BoxesRunTime.java:119) at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getDouble(rows.scala:44) at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getDouble(rows.scala:221) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) Any advise?Thanks in advance.Richard
Re: Java to show struct field from a Dataframe
data is good On Saturday, December 17, 2016 11:50 PM, "zjp_j...@163.com" wrote: #yiv7434848277 body {line-height:1.5;}#yiv7434848277 blockquote {margin-top:0px;margin-bottom:0px;margin-left:0.5em;}#yiv7434848277 div.yiv7434848277foxdiv20161217234614718397 {}#yiv7434848277 body {font-size:10.5pt;color:rgb(0, 0, 0);line-height:1.5;}I think the causation is your invanlid Double data , have u checked your data ? zjp_j...@163.com From: Richard XinDate: 2016-12-17 23:28To: UserSubject: Java to show struct field from a Dataframelet's say I have a DataFrame with schema of followings:root |-- name: string (nullable = true) |-- location: struct (nullable = true) | |-- longitude: double (nullable = true) | |-- latitude: double (nullable = true) df.show(); throws following exception: java.lang.ClassCastException: [D cannot be cast to java.lang.Double at scala.runtime.BoxesRunTime.unboxToDouble(BoxesRunTime.java:119) at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getDouble(rows.scala:44) at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getDouble(rows.scala:221) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) Any advise?Thanks in advance.Richard
Java to show struct field from a Dataframe
let's say I have a DataFrame with schema of followings:root |-- name: string (nullable = true) |-- location: struct (nullable = true) | |-- longitude: double (nullable = true) | |-- latitude: double (nullable = true) df.show(); throws following exception: java.lang.ClassCastException: [D cannot be cast to java.lang.Double at scala.runtime.BoxesRunTime.unboxToDouble(BoxesRunTime.java:119) at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getDouble(rows.scala:44) at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getDouble(rows.scala:221) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) Any advise?Thanks in advance.Richard
Re: need help to have a Java version of this scala script
thanks for pointing to the right direction, I have figured out the way. On Saturday, December 17, 2016 5:23 PM, Igor Berman wrote: do you mind to show what you have in java?in general $"bla" is col("bla") as soon as you import appropriate functionimport static org.apache.spark.sql.functions.callUDF;import static org.apache.spark.sql.functions.col; udf should be callUDF e.g.ds.withColumn("localMonth", callUDF("toLocalMonth", col("unixTs"), col("tz"))) On 17 December 2016 at 09:54, Richard Xin wrote: what I am trying to do:I need to add column (could be complicated transformation based on value of a column) to a give dataframe. scala script:val hContext = new HiveContext(sc) import hContext.implicits._ val df = hContext.sql("select x,y,cluster_no from test.dc") val len = udf((str: String) => str.length) val twice = udf { (x: Int) => println(s"Computed: twice($x)"); x * 2 } val triple = udf { (x: Int) => println(s"Computed: triple($x)"); x * 3} val df1 = df.withColumn("name-len", len($"x")) val df2 = df1.withColumn("twice", twice($"cluster_no")) val df3 = df2.withColumn("triple", triple($"cluster_no")) The scala script above seems to work ok, but I am having trouble to do it Java way (note that transformation based on value of a column could be complicated, not limited to simple add/minus etc.). is there a way in java? Thanks.
need help to have a Java version of this scala script
what I am trying to do:I need to add column (could be complicated transformation based on value of a column) to a give dataframe. scala script:val hContext = new HiveContext(sc) import hContext.implicits._ val df = hContext.sql("select x,y,cluster_no from test.dc") val len = udf((str: String) => str.length) val twice = udf { (x: Int) => println(s"Computed: twice($x)"); x * 2 } val triple = udf { (x: Int) => println(s"Computed: triple($x)"); x * 3} val df1 = df.withColumn("name-len", len($"x")) val df2 = df1.withColumn("twice", twice($"cluster_no")) val df3 = df2.withColumn("triple", triple($"cluster_no")) The scala script above seems to work ok, but I am having trouble to do it Java way (note that transformation based on value of a column could be complicated, not limited to simple add/minus etc.). is there a way in java? Thanks.