RE: Spark code to write to MySQL and Hive

2018-08-29 Thread ryandam.9
Sorry, last mail format was not good. 

 


println("Going to talk to mySql")

// Read table from mySQL.
val mysqlDF = spark.read.jdbc(jdbcUrl, table, properties)
println("I am back from mySql")

mysqlDF.show()

// Create a new Dataframe with column 'id' increased to avoid Duplicate
primary keys
val newDF = mysqlDF.select((col("id") + 10).as("id"), col("country"),
col("city"))
newDF.printSchema()
newDF.show()

// Insert records into the table.
newDF.write
  .mode(SaveMode.Append)
  .jdbc(jdbcUrl, table, properties)

// Write to Hive - This Creates a new table.
newDF.write.saveAsTable("cities")
newDF.show()

 

 

 

Going to talk to mySql

I am back from mySql

+---+--+-+

| id|   country| city|

+---+--+-+

|  1|   USA|Palo Alto|

|  2|Czech Republic| Brno|

|  3|   USA|Sunnyvale|

|  4|  null| null|

+---+--+-+

 

root

|-- id: long (nullable = false)

|-- country: string (nullable = true)

|-- city: string (nullable = true)

 

+---+--+-+

| id|   country| city|

+---+--+-+

| 11|   USA|Palo Alto|

| 12|Czech Republic| Brno|

| 13|   USA|Sunnyvale|

| 14|  null| null|

+---+--+-+

 

+---+--+-+

| id|   country| city|

+---+--+-+

| 11|   USA|Palo Alto|

| 12|Czech Republic| Brno|

| 13|   USA|Sunnyvale|

| 14|  null| null|

| 24|  null| null|

| 23|   USA|Sunnyvale|

| 22|Czech Republic| Brno|

| 21|   USA|Palo Alto|

+---+--+-+

 

Thanks,

Ravi

 

From: ryanda...@gmail.com  
Sent: Wednesday, August 29, 2018 8:19 PM
To: user@spark.apache.org
Subject: Spark code to write to MySQL and Hive

 

Hi,

 

Can anyone help me to understand what is happening with my code ?

 

I wrote a Spark application to read from a MySQL table [that already has 4
records], Create a new DF by adding 10 to the ID field.  Then, I wanted to
write the new DF to MySQL as well as to Hive. 

 

I am surprised to see additional set of records in Hive !! I am not able to
understand how the newDF has records with IDs 21 to 24.  I know that a DF is
immutable. If so, how come it has 4 records at one point and 8 records at
later point ?

 


// Read table from mySQL.
val mysqlDF = spark.read.jdbc(jdbcUrl, table, properties)
println("I am back from mySql")



 

 

 

 

mysqlDF.show()



 

 

 

 

 

// Create a new Dataframe with column 'id' increased to avoid Duplicate
primary keys
val newDF = mysqlDF.select((col("id") + 10).as("id"), col("country"),
col("city"))
newDF.printSchema()
newDF.show()



 

 

// Insert records into the MySQL table.
newDF.write
  .mode(SaveMode.Append)
  .jdbc(jdbcUrl, table, properties)



 

// Write to Hive - This Creates a new table.
newDF.write.saveAsTable("cities")
newDF.show()

 

 

Records already existing in mySql

 

+---+--+-+

| id|   country| city|

+---+--+-+

|  1|   USA|Palo Alto|

|  2|Czech Republic| Brno|

|  3|   USA|Sunnyvale|

|  4|  null| null|

+---+--+-+

 

root

|-- id: long (nullable = false)

|-- country: string (nullable = true)

|-- city: string (nullable = true)

 

newDF.show()

 

+---+--+-+

| id|   country| city|

+---+--+-+

| 11|   USA|Palo Alto|

| 12|Czech Republic| Brno|

| 13|   USA|Sunnyvale|

| 14|  null| null|

+---+--+-+

 

+---+--+-+

| id|   country| city|

+---+--+-+

| 11|   USA|Palo Alto|

| 12|Czech Republic| Brno|

| 13|   USA|Sunnyvale|

| 14|  null| null|

| 24|  null| null|

| 23|   USA|Sunnyvale|

| 22|Czech Republic| Brno|

| 21|   USA|Palo Alto|

+---+--+-+

 

 

Thanks for you time. 

Ravi



Spark code to write to MySQL and Hive

2018-08-29 Thread ryandam.9
Hi,

 

Can anyone help me to understand what is happening with my code ?

 

I wrote a Spark application to read from a MySQL table [that already has 4
records], Create a new DF by adding 10 to the ID field.  Then, I wanted to
write the new DF to MySQL as well as to Hive. 

 

I am surprised to see additional set of records in Hive !! I am not able to
understand how the newDF has records with IDs 21 to 24.  I know that a DF is
immutable. If so, how come it has 4 records at one point and 8 records at
later point ?

 


// Read table from mySQL.
val mysqlDF = spark.read.jdbc(jdbcUrl, table, properties)
println("I am back from mySql")




 

 

 

 

mysqlDF.show()




 

 

 

 

 

// Create a new Dataframe with column 'id' increased to avoid Duplicate
primary keys
val newDF = mysqlDF.select((col("id") + 10).as("id"), col("country"),
col("city"))
newDF.printSchema()
newDF.show()




 

 

// Insert records into the MySQL table.
newDF.write
  .mode(SaveMode.Append)
  .jdbc(jdbcUrl, table, properties)




 

// Write to Hive - This Creates a new table.
newDF.write.saveAsTable("cities")
newDF.show()

 

 

Records already existing in mySql

 

+---+--+-+

| id|   country| city|

+---+--+-+

|  1|   USA|Palo Alto|

|  2|Czech Republic| Brno|

|  3|   USA|Sunnyvale|

|  4|  null| null|

+---+--+-+

 

root

|-- id: long (nullable = false)

|-- country: string (nullable = true)

|-- city: string (nullable = true)

 

newDF.show()

 

+---+--+-+

| id|   country| city|

+---+--+-+

| 11|   USA|Palo Alto|

| 12|Czech Republic| Brno|

| 13|   USA|Sunnyvale|

| 14|  null| null|

+---+--+-+

 

+---+--+-+

| id|   country| city|

+---+--+-+

| 11|   USA|Palo Alto|

| 12|Czech Republic| Brno|

| 13|   USA|Sunnyvale|

| 14|  null| null|

| 24|  null| null|

| 23|   USA|Sunnyvale|

| 22|Czech Republic| Brno|

| 21|   USA|Palo Alto|

+---+--+-+

 

 

Thanks for you time. 

Ravi



repartition

2018-07-08 Thread ryandam.9
Hi, 

 

Can anyone clarify how repartition works please ?

 

*   I have a DataFrame df which has only one partition:

 

// Returns 1
df.rdd.getNumPartitions



* I repartitioned it by passing "3" and assigned it a new DataFrame
newdf


val newdf = df.repartition(3)



 

* newdf shows 3 as number of partitions
// Returns 3
newdf.rdd.getNumPartitions



* df still shows 1


// Return 1
df.rdd.getNumPartitions

 

My Question is that, 

 

1.  How does repartition work ? Does it copy original dataframe and
create X partitions as specified by  repartition ?  If that is the case,
aren't there two copies of same data in memory as shown in below diagram ?

Or my understanding is incorrect ?  

 

As per executions above,  looks like there are two copies as after
repartition, df still has 1 partition !!

 

2.  Repartition is executed immediately or it waits for some trigger
[kind of action] ?

 

 



 

Thanks,

Ravi