Re: Implementing TableProvider in Spark 3.0

2020-07-08 Thread Richard Xin
 Saw


Sent from Yahoo Mail for iPhone


On Wednesday, July 8, 2020, 9:26 PM, Sricheta Ruj 
 wrote:

 
Hello Spark Team
 
  
 
I am trying to use the DataSourceV2 API from Spark 3.0. I wanted to ask in case 
of write- how do I get the user specified schema?
 
  
 
This is what I am trying to achieve- 
 
  
 
valdata =Seq(
   Row("one",1,true,12.34,6L, date, Decimal(999.00), timestamp,2f, byteVal, 
shortVal),
   Row("two",1,false,13.34,7L, date, Decimal(3.3), timestamp,3.59f, byteVal, 
shortVal)
)

val schema = new StructType()
   .add(StructField("name", StringType,true))
   .add(StructField("id", IntegerType,true))
   .add(StructField("flag", BooleanType,true))
   .add(StructField("salary", DoubleType,true))
   .add(StructField("phone", LongType,true))
   .add(StructField("dob", DateType,true))
   .add(StructField("weight",  DecimalType(Constants.DECIMAL_PRECISION,7),true))
   .add(StructField("time", TimestampType,true))
   .add(StructField("float", FloatType,true))
   .add(StructField("byte", ByteType,true))
   .add(StructField("short", ShortType,true))


val df = spark.createDataFrame(spark.sparkContext.parallelize(data),
   schema)

//Create a new manifest and add the entity to it
df.write.format("com.microsoft.cdm")
   .option("storage",storageAccountName)
   .option("container",outputContainer)
   .option("manifest","/root/default.manifest.cdm.json")
   .option("entity","TestEntity")
   .option("format","parquet")
   .option("compression","gzip")
   .option("appId",appid).option("appKey",appkey).option("tenantId",tenantid)
   .mode(SaveMode.Append)
   .save()
 
  
 
I have my custom DataSource Implementation as follows –
 
  
 class DefaultSource extends DataSourceRegister with TableProvider  {

  override def shortName(): String = "spark-cdm"

  override def inferSchema(options: CaseInsensitiveStringMap): StructType = null

  override def inferPartitioning(options: CaseInsensitiveStringMap): 
Array[Transform] = {
    getTable(null, null, options).partitioning
  }

  override def supportsExternalMetadata = true

  override def getTable(schema: StructType, partitioning: Array[Transform], 
properties: util.Map[String, String]): Table = {
    println(schema)
    new MysqlTable(schema, properties)
  }
} 
  
 
I get null here. I am not sure how should I get the StructType I created on 
df.write.. Any help would be appreciated.
 
  
 
Thank and Regards,
 
Sricheta Ruj.
 
  
 
  
 
Thanks,
Sricheta Ruj
 




Re: tcps oracle connection from spark

2019-06-18 Thread Richard Xin
 and btw, same connection string works fine when used in SQL Developer. 
On Tuesday, June 18, 2019, 03:49:24 PM PDT, Richard Xin 
 wrote:  
 
 HI, I need help with tcps oracle connection from spark (version: 
spark-2.4.0-bin-hadoop2.7)
Properties prop = new Properties();prop.putAll(sparkOracle);  // 
username/password
prop.put("javax.net.ssl.trustStore", "path to 
root.jks");prop.put("javax.net.ssl.trustStorePassword", "password_here");
df.write().mode(SaveMode.Append).option("driver", 
"oracle.jdbc.driver.OracleDriver").jdbc("jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS_LIST=(ADDRESS=(PROTOCOL=tcps)(HOST=host.mycomapny.com)(PORT=1234)))(CONNECT_DATA=(SERVICE_NAME=service_name)))","tableName",
 prop);

note "PROTOCOL=tcps" in the connection string. 
The code worked fine for "tcp" hosts, but some of our servers use "tcps" only, 
I got following errors when hitting oracld tcps hosts, can someone shed some 
light? Thanks a lot!
Exception in thread "main" java.sql.SQLRecoverableException: IO Error: Remote 
host terminated the handshake at 
oracle.jdbc.driver.T4CConnection.logon(T4CConnection.java:682) at 
oracle.jdbc.driver.PhysicalConnection.(PhysicalConnection.java:715) at 
oracle.jdbc.driver.T4CConnection.(T4CConnection.java:385) at 
oracle.jdbc.driver.T4CDriverExtension.getConnection(T4CDriverExtension.java:30) 
at oracle.jdbc.driver.OracleDriver.connect(OracleDriver.java:564) at 
org.apache.spark.sql.execution.datasources.jdbc.DriverWrapper.connect(DriverWrapper.scala:45)
 at 
org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:63)
 at 
org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:54)
 at 
org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:48)
 at 
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
 at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
 at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
 at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
 at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
 at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
 at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
 at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) 
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at 
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
 at 
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80) at 
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
 at 
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
 at 
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
 at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
 at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
 at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:668) 
at 
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:276) 
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:270) at 
org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:506) at 
com.apple.jmet.pallas.data_migration.DirectMigrationWConfig.main(DirectMigrationWConfig.java:103)
 at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method) at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.base/java.lang.reflect.Method.invoke(Method.java:566) at 
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at 
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849)
 at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167) at 
org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195) at 
org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86) at 
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924) at 
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933) at 
org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)Caused by: 
javax.ne

tcps oracle connection from spark

2019-06-18 Thread Richard Xin
HI, I need help with tcps oracle connection from spark (version: 
spark-2.4.0-bin-hadoop2.7)
Properties prop = new Properties();prop.putAll(sparkOracle);  // 
username/password
prop.put("javax.net.ssl.trustStore", "path to 
root.jks");prop.put("javax.net.ssl.trustStorePassword", "password_here");
df.write().mode(SaveMode.Append).option("driver", 
"oracle.jdbc.driver.OracleDriver").jdbc("jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS_LIST=(ADDRESS=(PROTOCOL=tcps)(HOST=host.mycomapny.com)(PORT=1234)))(CONNECT_DATA=(SERVICE_NAME=service_name)))","tableName",
 prop);

note "PROTOCOL=tcps" in the connection string. 
The code worked fine for "tcp" hosts, but some of our servers use "tcps" only, 
I got following errors when hitting oracld tcps hosts, can someone shed some 
light? Thanks a lot!
Exception in thread "main" java.sql.SQLRecoverableException: IO Error: Remote 
host terminated the handshake at 
oracle.jdbc.driver.T4CConnection.logon(T4CConnection.java:682) at 
oracle.jdbc.driver.PhysicalConnection.(PhysicalConnection.java:715) at 
oracle.jdbc.driver.T4CConnection.(T4CConnection.java:385) at 
oracle.jdbc.driver.T4CDriverExtension.getConnection(T4CDriverExtension.java:30) 
at oracle.jdbc.driver.OracleDriver.connect(OracleDriver.java:564) at 
org.apache.spark.sql.execution.datasources.jdbc.DriverWrapper.connect(DriverWrapper.scala:45)
 at 
org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:63)
 at 
org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:54)
 at 
org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:48)
 at 
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
 at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
 at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
 at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
 at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
 at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
 at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
 at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) 
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at 
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
 at 
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80) at 
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
 at 
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
 at 
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
 at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
 at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
 at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:668) 
at 
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:276) 
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:270) at 
org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:506) at 
com.apple.jmet.pallas.data_migration.DirectMigrationWConfig.main(DirectMigrationWConfig.java:103)
 at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method) at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.base/java.lang.reflect.Method.invoke(Method.java:566) at 
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at 
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849)
 at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167) at 
org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195) at 
org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86) at 
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924) at 
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933) at 
org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)Caused by: 
javax.net.ssl.SSLHandshakeException: Remote host terminated the handshake at 
java.base/sun.security.ssl.SSLSocketImpl.handleEOF(SSLSocketImpl.java:1321) at 
java.base/sun.security.ssl.SSLSocketImpl.decode(SSLSocketImpl.java:1160) at 
java.base/sun.security.ssl.SSLSocketImpl.readHand

Re: spark-cassandra-connector_2.1 caused java.lang.NoClassDefFoundError under Spark 2.4.2?

2019-05-06 Thread Richard Xin
Thanks for the reply. Unfortunately this is the highest version available for 
Cassandra connector. 
One thing I don’t quite understand is that it worked perfectly under Spark 
2.4.0. I thought support for Scala 2.11 only became deprecated starting spark 
2.4.1, will be removed after spark 3.0


Sent from Yahoo Mail for iPhone


On Monday, May 6, 2019, 18:34, Russell Spitzer  
wrote:

Scala version mismatched
Spark is shown at 2.12, the connector only has a 2.11 release 



On Mon, May 6, 2019, 7:59 PM Richard Xin  
wrote:


org.apache.spark
spark-core_2.12
2.4.0
compile


org.apache.spark
spark-sql_2.12
2.4.0


com.datastax.spark
spark-cassandra-connector_2.11
2.4.1


I run spark-submit I got following exceptions on Spark 2.4.2, it works fine 
when running  spark-submit under Spark 2.4.0 with exact the same command-line 
call, any idea how do i fix this? Thanks a lot!
Exception in thread "main" java.lang.NoClassDefFoundError: scala/Product$class 
at 
com.datastax.spark.connector.util.ConfigParameter.(ConfigParameter.scala:7)
 at com.datastax.spark.connector.rdd.ReadConf$.(ReadConf.scala:33) at 
com.datastax.spark.connector.rdd.ReadConf$.(ReadConf.scala) at 
org.apache.spark.sql.cassandra.DefaultSource$.(DefaultSource.scala:134) 
at org.apache.spark.sql.cassandra.DefaultSource$.(DefaultSource.scala) 
at 
org.apache.spark.sql.cassandra.DefaultSource.createRelation(DefaultSource.scala:55)
 at 
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:318)
 at 
org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223) at 
org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211) at 
org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167) at 
com.apple.jmet.pallas.data_migration.DirectMigrationWConfig.main(DirectMigrationWConfig.java:76)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498) at 
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at 
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849)
 at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167) at 
org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195) at 
org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86) at 
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924) at 
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933) at 
org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)






spark-cassandra-connector_2.1 caused java.lang.NoClassDefFoundError under Spark 2.4.2?

2019-05-06 Thread Richard Xin

org.apache.spark
spark-core_2.12
2.4.0
compile


org.apache.spark
spark-sql_2.12
2.4.0


com.datastax.spark
spark-cassandra-connector_2.11
2.4.1


I run spark-submit I got following exceptions on Spark 2.4.2, it works fine 
when running  spark-submit under Spark 2.4.0 with exact the same command-line 
call, any idea how do i fix this? Thanks a lot!
Exception in thread "main" java.lang.NoClassDefFoundError: scala/Product$class 
at 
com.datastax.spark.connector.util.ConfigParameter.(ConfigParameter.scala:7)
 at com.datastax.spark.connector.rdd.ReadConf$.(ReadConf.scala:33) at 
com.datastax.spark.connector.rdd.ReadConf$.(ReadConf.scala) at 
org.apache.spark.sql.cassandra.DefaultSource$.(DefaultSource.scala:134) 
at org.apache.spark.sql.cassandra.DefaultSource$.(DefaultSource.scala) 
at 
org.apache.spark.sql.cassandra.DefaultSource.createRelation(DefaultSource.scala:55)
 at 
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:318)
 at 
org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223) at 
org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211) at 
org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167) at 
com.apple.jmet.pallas.data_migration.DirectMigrationWConfig.main(DirectMigrationWConfig.java:76)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498) at 
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at 
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849)
 at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167) at 
org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195) at 
org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86) at 
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924) at 
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933) at 
org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


how sequence of chained jars in spark.(driver/executor).extraClassPath matters

2017-09-13 Thread Richard Xin
so let's say I have chained path in 
spark.driver.extraClassPath/spark.executor.extraClassPath such as 
/path1/*:/path2/*, and I have different versions of the same jar under those 2 
directories, how spark pick the version of jar to use, from /path1/*?

Thanks.

can I do spark-submit --jars [s3://bucket/folder/jar_file]? or --jars

2017-07-28 Thread Richard Xin
Can we add extra library (jars on S3) to spark-submit? if yes, how? such as 
--jars, extraClassPath, extraLibPathThanks,Richard

Re: Flatten JSON to multiple columns in Spark

2017-07-17 Thread Richard Xin
I believe you could use JOLT (bazaarvoice/jolt) to flatten it to a json string 
and then to dataframe or dataset.


| 
| 
| 
|  |  |

 |

 |
| 
|  | 
bazaarvoice/jolt

jolt - JSON to JSON transformation library written in Java.
 |

 |

 |





On Monday, July 17, 2017, 11:18:24 PM PDT, Chetan Khatri 
 wrote:

Explode is not working in this scenario with error - string cannot be used in 
explore either array or map in spark
On Tue, Jul 18, 2017 at 11:39 AM, 刘虓  wrote:

Hi,have you tried to use explode?
Chetan Khatri  于2017年7月18日 周二下午2:06写道:

Hello Spark Dev's,
Can you please guide me, how to flatten JSON to multiple columns in Spark.
Example:

| Sr No | Title | ISBN | Info |
| 1 | Calculus Theory | 1234567890 | [{"cert":[{
"authSbmtr":"009415da-c8cd- 418d-869e-0a19601d79fa",
009415da-c8cd-418d-869e- 0a19601d79fa
"certUUID":"03ea5a1a-5530- 4fa3-8871-9d1ebac627c4",
"effDt":"2016-05-06T15:04:56. 279Z",
"fileFmt":"rjrCsv","status":" live"}],

"expdCnt":"15",
"mfgAcctNum":"531093",
"oUUID":"23d07397-4fbe-4897- 8a18-b79c9f64726c",
"pgmRole":["RETAILER"],
"pgmUUID":"1cb5dd63-817a-45bc- a15c-5660e4accd63",
"regUUID":"cc1bd898-657d-40dc- af5d-4bf1569a1cc4",
"rtlrsSbmtd":["009415da-c8cd- 418d-869e-0a19601d79fa"]}]
 |


I want to get single row with 11 columns.
Thanks.




Re: apache-spark: Converting List of Rows into Dataset Java

2017-03-28 Thread Richard Xin
Maybe you could try something like that:        SparkSession sparkSession = 
SparkSession
    .builder()
    .appName("Rows2DataSet")
    .master("local")
    .getOrCreate();
        List results = new LinkedList();
        JavaRDD jsonRDD =
            new 
JavaSparkContext(sparkSession.sparkContext()).parallelize(results);
        
        Dataset peopleDF = sparkSession.createDataFrame(jsonRDD, 
Row.class);

Richard Xin 

On Tuesday, March 28, 2017 7:51 AM, Karin Valisova  
wrote:
 

 Hello!
I am running Spark on Java and bumped into a problem I can't solve or find 
anything helpful among answered questions, so I would really appreciate your 
help. 
I am running some calculations, creating rows for each result:
List results = new LinkedList();

for(something){ results.add(RowFactory.create( someStringVariable, 
someIntegerVariable ));         }
Now I ended up with a list of rows I need to turn into dataframe to perform 
some spark sql operations on them, like groupings and sorting. Would like to 
keep the dataTypes.
I tried: 
Dataset toShow = spark.createDataFrame(results, Row.class);

but it throws nullpointer. (spark being SparkSession) Is my logic wrong there 
somewhere, should this operation be possible, resulting in what I want? Or do I 
have to create a custom class which extends serializable and create a list of 
those objects rather than Rows? Will I be able to perform SQL queries on 
dataset consisting of custom class objects rather than rows?
I'm sorry if this is a duplicate question.Thank you for your help!Karin 

   

Re: Issue creating row with java.util.Map type

2017-01-27 Thread Richard Xin
try
Row newRow = RowFactory.create(row.getString(0), row.getString(1), 
row.getMap(2)); 

On Friday, January 27, 2017 10:52 AM, Ankur Srivastava 
 wrote:
 

 + DEV Mailing List
On Thu, Jan 26, 2017 at 5:12 PM, Ankur Srivastava  
wrote:

Hi,
I am trying to map a Dataset with rows which have a map attribute. When I try 
to create a Row with the map attribute I get cast errors. I am able to 
reproduce the issue with the below sample code. The surprising thing is with 
same schema I am able to create a dataset from the List of rows.
I am on Spark 2.0 and scala 2.11public static void main(String[] args) {
StructType schema = new StructType().add("src", DataTypes.StringType)
.add("dst", DataTypes.StringType)
.add("freq", DataTypes.createMapType( DataTypes.StringType, 
DataTypes.IntegerType));
List inputData = new ArrayList<>();
inputData.add(RowFactory.creat e("1", "2", new HashMap<>()));
SparkSession sparkSession = SparkSession
.builder()
.appName("IPCountFilterTest")
.master("local")
.getOrCreate();

Dataset out = sparkSession.createDataFrame( inputData, schema);
out.show();

Encoder rowEncoder = RowEncoder.apply(schema);
out.map((MapFunction) row -> {
Row newRow = RowFactory.create(row. getString(0), row.getString(1), new 
HashMap());   //Row newRow = RowFactory.create(row. 
getString(0), row.getString(1), row.getJavaMap(2));return newRow;
}, rowEncoder).show();
}
Below is the error:
17/01/26 17:05:30 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 
0)java.lang.RuntimeException: java.util.HashMap is not a valid external type 
for schema of map at org.apache.spark.sql.catalyst. 
expressions.GeneratedClass$ GeneratedIterator.processNext( Unknown Source) at 
org.apache.spark.sql. execution.BufferedRowIterator. 
hasNext(BufferedRowIterator. java:43) at org.apache.spark.sql. execution. 
WholeStageCodegenExec$$ anonfun$8$$anon$1.hasNext( WholeStageCodegenExec.scala: 
370) at org.apache.spark.sql. execution.SparkPlan$$anonfun$ 
4.apply(SparkPlan.scala:246) at org.apache.spark.sql. 
execution.SparkPlan$$anonfun$ 4.apply(SparkPlan.scala:240) at 
org.apache.spark.rdd.RDD$$ anonfun$mapPartitionsInternal$ 
1$$anonfun$apply$24.apply(RDD. scala:784) at org.apache.spark.rdd.RDD$$ 
anonfun$mapPartitionsInternal$ 1$$anonfun$apply$24.apply(RDD. scala:784) at 
org.apache.spark.rdd. MapPartitionsRDD.compute( MapPartitionsRDD.scala:38) at 
org.apache.spark.rdd.RDD. computeOrReadCheckpoint(RDD. scala:319) at 
org.apache.spark.rdd.RDD. iterator(RDD.scala:283) at 
org.apache.spark.scheduler. ResultTask.runTask(ResultTask. scala:70) at 
org.apache.spark.scheduler. Task.run(Task.scala:85) at 
org.apache.spark.executor. Executor$TaskRunner.run( Executor.scala:274) at 
java.util.concurrent. ThreadPoolExecutor.runWorker( 
ThreadPoolExecutor.java:1142) at java.util.concurrent. 
ThreadPoolExecutor$Worker.run( ThreadPoolExecutor.java:617) at 
java.lang.Thread.run(Thread. java:745)17/01/26 17:05:30 WARN TaskSetManager: 
Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.RuntimeException: 
java.util.HashMap is not a valid external type for schema of map at 
org.apache.spark.sql.catalyst. expressions.GeneratedClass$ 
GeneratedIterator.processNext( Unknown Source) at org.apache.spark.sql. 
execution.BufferedRowIterator. hasNext(BufferedRowIterator. java:43) at 
org.apache.spark.sql. execution. WholeStageCodegenExec$$ 
anonfun$8$$anon$1.hasNext( WholeStageCodegenExec.scala: 370) at 
org.apache.spark.sql. execution.SparkPlan$$anonfun$ 
4.apply(SparkPlan.scala:246) at org.apache.spark.sql. 
execution.SparkPlan$$anonfun$ 4.apply(SparkPlan.scala:240) at 
org.apache.spark.rdd.RDD$$ anonfun$mapPartitionsInternal$ 
1$$anonfun$apply$24.apply(RDD. scala:784) at org.apache.spark.rdd.RDD$$ 
anonfun$mapPartitionsInternal$ 1$$anonfun$apply$24.apply(RDD. scala:784) at 
org.apache.spark.rdd. MapPartitionsRDD.compute( MapPartitionsRDD.scala:38) at 
org.apache.spark.rdd.RDD. computeOrReadCheckpoint(RDD. scala:319) at 
org.apache.spark.rdd.RDD. iterator(RDD.scala:283) at 
org.apache.spark.scheduler. ResultTask.runTask(ResultTask. scala:70) at 
org.apache.spark.scheduler. Task.run(Task.scala:85) at 
org.apache.spark.executor. Executor$TaskRunner.run( Executor.scala:274) at 
java.util.concurrent. ThreadPoolExecutor.runWorker( 
ThreadPoolExecutor.java:1142) at java.util.concurrent. 
ThreadPoolExecutor$Worker.run( ThreadPoolExecutor.java:617) at 
java.lang.Thread.run(Thread. java:745)

ThanksAnkur



   

is partitionBy of DataFrameWriter supported in 1.6.x?

2017-01-18 Thread Richard Xin
I found contradictions in document 1.6.0 and 2.1.x
in 
http://spark.apache.org/docs/1.6.0/api/scala/index.html#org.apache.spark.sql.DataFrameWriterit
 says: "This is only applicable for Parquet at the moment."
in 
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameWriterit
 says: "This was initially applicable for Parquet but in 1.5+ covers JSON, 
text, ORC and avro as well."
and I got warning when trying to save in scala:
> df.write.mode("overwrite").format("orc").partitionBy("date").saveAsTable("test.my_test")
17/01/19 13:34:43 WARN hive.HiveContext$$anon$2: Persisting partitioned data 
source relation `test`.`my_test` into Hive metastore in Spark SQL specific 
format, which is NOT compatible with Hive. Input path(s): 
hdfs://nameservice1/user/hive/warehouse/test.db/my_test
looking at hdfs directories, the folders are there, but not selectable on both 
presto and hive. 

any comments?
Thanks.


Re: DataFrame to read json and include raw Json in DataFrame

2016-12-29 Thread Richard Xin
thanks, I have seen this, but this doesn't cover my question.
What I need is read json and include raw json as part of my dataframe. 

On Friday, December 30, 2016 10:23 AM, Annabel Melongo 
 wrote:
 

 Richard,
Below documentation will show you how to create a sparkSession and how to 
programmatically load data:
Spark SQL and DataFrames - Spark 2.1.0 Documentation

  
|  
|   |  
Spark SQL and DataFrames - Spark 2.1.0 Documentation
   |  |

  |

 
 

On Thursday, December 29, 2016 5:16 PM, Richard Xin 
 wrote:
 

 Say I have following data in file:{"id":1234,"ln":"Doe","fn":"John","age":25}
{"id":1235,"ln":"Doe","fn":"Jane","age":22}
java code snippet:        final SparkConf sparkConf = new 
SparkConf().setMaster("local[2]").setAppName("json_test");
        JavaSparkContext ctx = new JavaSparkContext(sparkConf);
    HiveContext hc = new HiveContext(ctx.sc());
    DataFrame df = hc.read().json("files/json/example2.json");

what I need is a DataFrame with columns id, ln, fn, age as well as raw_json 
string
any advice on the best practice in java?Thanks,
Richard


   

   

DataFrame to read json and include raw Json in DataFrame

2016-12-29 Thread Richard Xin
Say I have following data in file:{"id":1234,"ln":"Doe","fn":"John","age":25}
{"id":1235,"ln":"Doe","fn":"Jane","age":22}
java code snippet:        final SparkConf sparkConf = new 
SparkConf().setMaster("local[2]").setAppName("json_test");
        JavaSparkContext ctx = new JavaSparkContext(sparkConf);
    HiveContext hc = new HiveContext(ctx.sc());
    DataFrame df = hc.read().json("files/json/example2.json");

what I need is a DataFrame with columns id, ln, fn, age as well as raw_json 
string
any advice on the best practice in java?Thanks,
Richard


Re: access Broadcast Variables in Spark java

2016-12-20 Thread Richard Xin
try this:JavaRDD mapr = listrdd.map(x -> broadcastVar.value().get(x));
 

On Wednesday, December 21, 2016 2:25 PM, Sateesh Karuturi 
 wrote:
 

 I need to process spark Broadcast variables using Java RDD API. This is my 
code what i have tried so far:This is only sample code to check whether its 
works or not? In my case i need to work on two csvfiles.
SparkConf conf = new 
SparkConf().setAppName("BroadcastVariable").setMaster("local");
  JavaSparkContext ctx = new JavaSparkContext(conf);
  Map map = new HashMap();
  map.put(1, "aa");
  map.put(2, "bb");
  map.put(9, "ccc");
  Broadcast> broadcastVar = ctx.broadcast(map);
  List list = new ArrayList();
  list.add(1);
  list.add(2);
  list.add(9);
  JavaRDD listrdd = ctx.parallelize(list);
  JavaRDD mapr = listrdd.map(x -> broadcastVar.value());
  System.out.println(mapr.collect());
and its prints output like this:[{1=aa, 2=bb, 9=ccc}, {1=aa, 2=bb, 9=ccc}, 
{1=aa, 2=bb, 9=ccc}]
and my requirement is : [{aa, bb, ccc}]
Is it possible to do like in my required way?please help me out.


   

Re: How to get recent value in spark dataframe

2016-12-18 Thread Richard Xin
I am not sure I understood your logic, but it seems to me that you could take a 
look of Hive's Lead/Lag functions. 

On Monday, December 19, 2016 1:41 AM, Milin korath 
 wrote:
 

 thanks, I tried with left outer join. My dataset having around 400M records 
and lot of shuffling is happening.Is there any other workaround apart from 
Join,I tried use window function but I am not getting a proper solution, 

Thanks
On Sat, Dec 17, 2016 at 4:55 AM, Michael Armbrust  
wrote:

Oh and to get the null for missing years, you'd need to do an outer join with a 
table containing all of the years you are interested in.
On Fri, Dec 16, 2016 at 3:24 PM, Michael Armbrust  
wrote:

Are you looking for argmax? Here is an example.
On Wed, Dec 14, 2016 at 8:49 PM, Milin korath  wrote:

Hi 


| I have a spark data frame with following structure id  flag price date
  a   0100  2015
  a   050   2015
  a   1200  2014
  a   1300  2013
  a   0400  2012I need to create a data frame with recent value of flag 1 
and updated in the flag 0 rows.  id  flag price date new_column
  a   0100  2015200
  a   050   2015200
  a   1200  2014null
  a   1300  2013null
  a   0400  2012nullWe have 2 rows having flag=0. Consider the 
first row(flag=0),I will have 2 values(200 and 300) and I am taking the recent 
one 200(2014). And the last row I don't have any recent value for flag 1 so it 
is updated with null.Looking for a solution using scala. Any help would be 
appreciated.Thanks |
|


Thanks Milin







   

Re: Java to show struct field from a Dataframe

2016-12-17 Thread Richard Xin
 blockquote, div.yahoo_quoted { margin-left: 0 !important; border-left:1px 
#715FFA solid !important; padding-left:1ex !important; background-color:white 
!important; }  Super, that works! Thanks


Sent from Yahoo Mail for iPhone


On Sunday, December 18, 2016, 11:28 AM, Yong Zhang  wrote:

-- P {margin-top:0;margin-bottom:0;}
Why not you just return the struct you defined, instead of an array?




            @Override
            public Row call(Double x, Double y) throws Exception {
                Row row = RowFactory.create(x, y);
                return row;
            }



From: Richard Xin 
Sent: Saturday, December 17, 2016 8:53 PM
To: Yong Zhang; zjp_j...@163.com; user
Subject: Re: Java to show struct field from a Dataframe I tried to transform 
root
 |-- latitude: double (nullable = false)
 |-- longitude: double (nullable = false)
 |-- name: string (nullable = true)
to: 
root
 |-- name: string (nullable = true)
 |-- location: struct (nullable = true)
 |    |-- longitude: double (nullable = true)
 |    |-- latitude: double (nullable = true)
Code snippet is as followings:
        sqlContext.udf().register("toLocation", new UDF2() 
{
            @Override
            public Row call(Double x, Double y) throws Exception {
                Row row = RowFactory.create(new double[] { x, y });
                return row;
            }
        }, DataTypes.createStructType(new StructField[] { 
                new StructField("longitude", DataTypes.DoubleType, true, 
Metadata.empty()),
                new StructField("latitude", DataTypes.DoubleType, true, 
Metadata.empty())
            }));
        DataFrame transformedDf1 = citiesDF.withColumn("location",
                callUDF("toLocation", col("longitude"), col("latitude")));
        
transformedDf1.drop("latitude").drop("longitude").schema().printTreeString();  
// prints schema tree OK as expected
transformedDf.show();  // java.lang.ClassCastException: [D cannot be 
cast to java.lang.Double

seems to me that the ReturnType of the UDF2 might be the root cause. but not 
sure how to correct.
Thanks,Richard




On Sunday, December 18, 2016 7:15 AM, Yong Zhang  wrote:


"[D" type means a double array type. So this error simple means you have 
double[] data, but Spark needs to cast it to Double, as your schema defined.
The error message clearly indicates the data doesn't match with  the type 
specified in the schema.
I wonder how you are so sure about your data? Do you check it under other tool?
Yong

From: Richard Xin 
Sent: Saturday, December 17, 2016 10:56 AM
To: zjp_j...@163.com; user
Subject: Re: Java to show struct field from a Dataframe data is good


On Saturday, December 17, 2016 11:50 PM, "zjp_j...@163.com"  
wrote:


I think the causation is your invanlid Double data , have u checked your data ?
zjp_j...@163.com
 From: Richard XinDate: 2016-12-17 23:28To: UserSubject: Java to show struct 
field from a Dataframelet's say I have a DataFrame with schema of 
followings:root
 |-- name: string (nullable = true)
 |-- location: struct (nullable = true)
 |    |-- longitude: double (nullable = true)
 |    |-- latitude: double (nullable = true)
df.show(); throws following exception:

java.lang.ClassCastException: [D cannot be cast to java.lang.Double
    at scala.runtime.BoxesRunTime.unboxToDouble(BoxesRunTime.java:119)
    at 
org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getDouble(rows.scala:44)
    at 
org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getDouble(rows.scala:221)
    at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
 Source)

Any advise?Thanks in advance.Richard






 



Re: Java to show struct field from a Dataframe

2016-12-17 Thread Richard Xin
I tried to transform 
root
 |-- latitude: double (nullable = false)
 |-- longitude: double (nullable = false)
 |-- name: string (nullable = true)
to: 
root
 |-- name: string (nullable = true)
 |-- location: struct (nullable = true)
 |    |-- longitude: double (nullable = true)
 |    |-- latitude: double (nullable = true)
Code snippet is as followings:
        sqlContext.udf().register("toLocation", new UDF2() 
{
            @Override
            public Row call(Double x, Double y) throws Exception {
                Row row = RowFactory.create(new double[] { x, y });
                return row;
            }
        }, DataTypes.createStructType(new StructField[] { 
                new StructField("longitude", DataTypes.DoubleType, true, 
Metadata.empty()),
                new StructField("latitude", DataTypes.DoubleType, true, 
Metadata.empty()) 
            }));
        DataFrame transformedDf1 = citiesDF.withColumn("location",
                callUDF("toLocation", col("longitude"), col("latitude")));
        
transformedDf1.drop("latitude").drop("longitude").schema().printTreeString();  
// prints schema tree OK as expected
transformedDf.show();  // java.lang.ClassCastException: [D cannot be 
cast to java.lang.Double

seems to me that the ReturnType of the UDF2 might be the root cause. but not 
sure how to correct.
Thanks,Richard


 

On Sunday, December 18, 2016 7:15 AM, Yong Zhang  
wrote:
 

 #yiv1972361746 #yiv1972361746 -- P 
{margin-top:0;margin-bottom:0;}#yiv1972361746 "[D" type means a double array 
type. So this error simple means you have double[] data, but Spark needs to 
cast it to Double, as your schema defined.
The error message clearly indicates the data doesn't match with  the type 
specified in the schema.
I wonder how you are so sure about your data? Do you check it under other tool?
Yong

From: Richard Xin 
Sent: Saturday, December 17, 2016 10:56 AM
To: zjp_j...@163.com; user
Subject: Re: Java to show struct field from a Dataframe data is good


On Saturday, December 17, 2016 11:50 PM, "zjp_j...@163.com"  
wrote:


I think the causation is your invanlid Double data , have u checked your data ?
zjp_j...@163.com
 From: Richard XinDate: 2016-12-17 23:28To: UserSubject: Java to show struct 
field from a Dataframelet's say I have a DataFrame with schema of 
followings:root
 |-- name: string (nullable = true)
 |-- location: struct (nullable = true)
 |    |-- longitude: double (nullable = true)
 |    |-- latitude: double (nullable = true)
df.show(); throws following exception:

java.lang.ClassCastException: [D cannot be cast to java.lang.Double
    at scala.runtime.BoxesRunTime.unboxToDouble(BoxesRunTime.java:119)
    at 
org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getDouble(rows.scala:44)
    at 
org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getDouble(rows.scala:221)
    at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
 Source)

Any advise?Thanks in advance.Richard





   

Re: Java to show struct field from a Dataframe

2016-12-17 Thread Richard Xin
data is good
 

On Saturday, December 17, 2016 11:50 PM, "zjp_j...@163.com" 
 wrote:
 

 #yiv7434848277 body {line-height:1.5;}#yiv7434848277 blockquote 
{margin-top:0px;margin-bottom:0px;margin-left:0.5em;}#yiv7434848277 
div.yiv7434848277foxdiv20161217234614718397 {}#yiv7434848277 body 
{font-size:10.5pt;color:rgb(0, 0, 0);line-height:1.5;}I think the causation is 
your invanlid Double data , have u checked your data ?
zjp_j...@163.com
 From: Richard XinDate: 2016-12-17 23:28To: UserSubject: Java to show struct 
field from a Dataframelet's say I have a DataFrame with schema of 
followings:root
 |-- name: string (nullable = true)
 |-- location: struct (nullable = true)
 |    |-- longitude: double (nullable = true)
 |    |-- latitude: double (nullable = true)
df.show(); throws following exception:

java.lang.ClassCastException: [D cannot be cast to java.lang.Double
    at scala.runtime.BoxesRunTime.unboxToDouble(BoxesRunTime.java:119)
    at 
org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getDouble(rows.scala:44)
    at 
org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getDouble(rows.scala:221)
    at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
 Source)

Any advise?Thanks in advance.Richard



   

Java to show struct field from a Dataframe

2016-12-17 Thread Richard Xin
let's say I have a DataFrame with schema of followings:root
 |-- name: string (nullable = true)
 |-- location: struct (nullable = true)
 |    |-- longitude: double (nullable = true)
 |    |-- latitude: double (nullable = true)
df.show(); throws following exception:

java.lang.ClassCastException: [D cannot be cast to java.lang.Double
    at scala.runtime.BoxesRunTime.unboxToDouble(BoxesRunTime.java:119)
    at 
org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getDouble(rows.scala:44)
    at 
org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getDouble(rows.scala:221)
    at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
 Source)

Any advise?Thanks in advance.Richard


Re: need help to have a Java version of this scala script

2016-12-17 Thread Richard Xin
thanks for pointing to the right direction, I have figured out the way.
 

On Saturday, December 17, 2016 5:23 PM, Igor Berman  
wrote:
 

 do you mind to show what you have in java?in general $"bla" is col("bla") as 
soon as you import appropriate functionimport static 
org.apache.spark.sql.functions.callUDF;import static 
org.apache.spark.sql.functions.col;
udf should be callUDF e.g.ds.withColumn("localMonth", callUDF("toLocalMonth", 
col("unixTs"), col("tz")))
On 17 December 2016 at 09:54, Richard Xin  
wrote:

what I am trying to do:I need to add column (could be complicated 
transformation based on value of a column) to a give dataframe.
scala script:val hContext = new HiveContext(sc)
import hContext.implicits._
val df = hContext.sql("select x,y,cluster_no from test.dc")
val len = udf((str: String) => str.length)
val twice = udf { (x: Int) => println(s"Computed: twice($x)"); x * 2 }
val triple = udf { (x: Int) => println(s"Computed: triple($x)"); x * 3}
val df1 = df.withColumn("name-len", len($"x"))
val df2 = df1.withColumn("twice", twice($"cluster_no"))
val df3 = df2.withColumn("triple", triple($"cluster_no"))
The scala script above seems to work ok, but I am having trouble to do it Java 
way (note that transformation based on value of a column could be complicated, 
not limited to simple add/minus etc.). is there a way in java? Thanks.




   

need help to have a Java version of this scala script

2016-12-16 Thread Richard Xin
what I am trying to do:I need to add column (could be complicated 
transformation based on value of a column) to a give dataframe.
scala script:val hContext = new HiveContext(sc)
import hContext.implicits._
val df = hContext.sql("select x,y,cluster_no from test.dc")
val len = udf((str: String) => str.length)
val twice = udf { (x: Int) => println(s"Computed: twice($x)"); x * 2 }
val triple = udf { (x: Int) => println(s"Computed: triple($x)"); x * 3}
val df1 = df.withColumn("name-len", len($"x"))
val df2 = df1.withColumn("twice", twice($"cluster_no"))
val df3 = df2.withColumn("triple", triple($"cluster_no"))
The scala script above seems to work ok, but I am having trouble to do it Java 
way (note that transformation based on value of a column could be complicated, 
not limited to simple add/minus etc.). is there a way in java? Thanks.