Hello, I 'm trying to develop with the new Dataframe API, but I'm
running into
an error.
I have an existing MySQL database and I want to insert rows.
I create a Dataframe from an RDD, then use the "insertIntoJDBC" function.
It appear that dataframes reorder the data inside them.
As a result, I get an error because the fields are not inserted in the
proper
order.
Is there a way to specify the name or the order of my variables inside the
database?
If it is a bug, here is an example to reproduce it:
My table:
==========
CREATE TABLE `reference` (
`zvalue` text,
`avalue` int(11) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=latin1
==========
My class:
==========
package org.mypackage.testspark;
import java.util.Arrays;
public class MysqlInsert {
public static class rowStruct implements java.io.Serializable {
private static final long serialVersionUID = 1L;
public java.lang.String zvalue;
public java.lang.Integer avalue;
public rowStruct() {
}
public java.lang.String getZvalue() {
return this.zvalue;
}
public java.lang.Integer getAvalue() {
return this.avalue;
}
public void setZvalue(java.lang.String zvalue) {
this.zvalue = zvalue;
}
public void setAvalue(java.lang.Integer avalue) {
this.avalue = avalue;
}
}
public static void main(String[] args) throws Exception {
SparkConf sparkConf = new SparkConf().setAppName("myApp");
sparkConf.setMaster("local[2]");
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
rowStruct rowStruct = new rowStruct();
rowStruct.setZvalue("test");
rowStruct.setAvalue(1);
org.apache.spark.api.java.JavaRDD<rowStruct> rdd_row6 =
ctx.parallelize(Arrays.asList(rowStruct));
org.apache.spark.sql.SQLContext sqlCtx = new
org.apache.spark.sql.SQLContext(ctx);
org.apache.spark.sql.DataFrame df =
sqlCtx.createDataFrame(rdd_row6, rowStruct.class);
df.insertIntoJDBC("jdbc:mysql://172.17.0.2:3306/mysql?user=root&password=pass",
"reference", false);
}
}
==========
My error log:
==========
Using Spark's default log4j profile:
org/apache/spark/log4j-defaults.properties
15/03/27 18:31:19 INFO SparkContext: Running Spark version 1.3.0-SNAPSHOT
15/03/27 18:31:19 WARN Utils: Your hostname, Tlnd-pbailly resolves to a
loopback address: 127.0.1.1; using 10.42.20.124 instead (on interface wlan0)
15/03/27 18:31:19 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to
another address
15/03/27 18:31:20 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
15/03/27 18:31:20 INFO SecurityManager: Changing view acls to: pbailly
15/03/27 18:31:20 INFO SecurityManager: Changing modify acls to: pbailly
15/03/27 18:31:20 INFO SecurityManager: SecurityManager: authentication
disabled; ui acls disabled; users with view permissions: Set(pbailly);
users with modify permissions: Set(pbailly)
15/03/27 18:31:20 INFO Slf4jLogger: Slf4jLogger started
15/03/27 18:31:20 INFO Remoting: Starting remoting
15/03/27 18:31:20 INFO Remoting: Remoting started; listening on
addresses :[akka.tcp://sparkDriver@10.42.20.124:58185]
15/03/27 18:31:20 INFO Utils: Successfully started service 'sparkDriver'
on port 58185.
15/03/27 18:31:20 INFO SparkEnv: Registering MapOutputTracker
15/03/27 18:31:20 INFO SparkEnv: Registering BlockManagerMaster
15/03/27 18:31:20 INFO DiskBlockManager: Created local directory at
/tmp/spark-1baef5a9-8c70-4c88-aaa6-7462f473c5b6/blockmgr-20176350-a69c-4170-b704-6621ca393889
15/03/27 18:31:20 INFO MemoryStore: MemoryStore started with capacity
947.7 MB
15/03/27 18:31:20 INFO HttpFileServer: HTTP File server directory is
/tmp/spark-1ff51d4d-6172-4231-98c0-5e69edc6e64e/httpd-4eb77dcf-da49-438e-b5db-ecbf07193245
15/03/27 18:31:20 INFO HttpServer: Starting HTTP Server
15/03/27 18:31:20 INFO Server: jetty-8.y.z-SNAPSHOT
15/03/27 18:31:20 INFO AbstractConnector: Started
SocketConnector@0.0.0.0:43576
15/03/27 18:31:20 INFO Utils: Successfully started service 'HTTP file
server' on port 43576.
15/03/27 18:31:20 INFO SparkEnv: Registering OutputCommitCoordinator
15/03/27 18:31:20 INFO Server: jetty-8.y.z-SNAPSHOT
15/03/27 18:31:20 INFO AbstractConnector: Started
SelectChannelConnector@0.0.0.0:4040
15/03/27 18:31:20 INFO Utils: Successfully started service 'SparkUI' on
port 4040.
15/03/27 18:31:20 INFO SparkUI: Started SparkUI at http://10.42.20.124:4040
15/03/27 18:31:20 INFO Executor: Starting executor ID <driver> on host
localhost
15/03/27 18:31:20 INFO AkkaUtils: Connecting to HeartbeatReceiver:
akka.tcp://sparkDriver@10.42.20.124:58185/user/HeartbeatReceiver
15/03/27 18:31:21 INFO NettyBlockTransferService: Server created on 43013
15/03/27 18:31:21 INFO BlockManagerMaster: Trying to register BlockManager
15/03/27 18:31:21 INFO BlockManagerMasterActor: Registering block
manager localhost:43013 with 947.7 MB RAM, BlockManagerId(<driver>,
localhost, 43013)
15/03/27 18:31:21 INFO BlockManagerMaster: Registered BlockManager
15/03/27 18:31:21 INFO SparkContext: Starting job: foreachPartition at
DataFrame.scala:778
15/03/27 18:31:21 INFO DAGScheduler: Got job 0 (foreachPartition at
DataFrame.scala:778) with 2 output partitions (allowLocal=false)
15/03/27 18:31:21 INFO DAGScheduler: Final stage: Stage
0(foreachPartition at DataFrame.scala:778)
15/03/27 18:31:21 INFO DAGScheduler: Parents of final stage: List()
15/03/27 18:31:21 INFO DAGScheduler: Missing parents: List()
15/03/27 18:31:21 INFO DAGScheduler: Submitting Stage 0
(MapPartitionsRDD[2] at map at DataFrame.scala:853), which has no
missing parents
15/03/27 18:31:21 INFO MemoryStore: ensureFreeSpace(3968) called with
curMem=0, maxMem=993735475
15/03/27 18:31:21 INFO MemoryStore: Block broadcast_0 stored as values
in memory (estimated size 3.9 KB, free 947.7 MB)
15/03/27 18:31:21 INFO MemoryStore: ensureFreeSpace(2836) called with
curMem=3968, maxMem=993735475
15/03/27 18:31:21 INFO MemoryStore: Block broadcast_0_piece0 stored as
bytes in memory (estimated size 2.8 KB, free 947.7 MB)
15/03/27 18:31:21 INFO BlockManagerInfo: Added broadcast_0_piece0 in
memory on localhost:43013 (size: 2.8 KB, free: 947.7 MB)
15/03/27 18:31:21 INFO BlockManagerMaster: Updated info of block
broadcast_0_piece0
15/03/27 18:31:21 INFO SparkContext: Created broadcast 0 from broadcast
at DAGScheduler.scala:839
15/03/27 18:31:21 INFO DAGScheduler: Submitting 2 missing tasks from
Stage 0 (MapPartitionsRDD[2] at map at DataFrame.scala:853)
15/03/27 18:31:21 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
15/03/27 18:31:21 INFO TaskSetManager: Starting task 0.0 in stage 0.0
(TID 0, localhost, PROCESS_LOCAL, 1262 bytes)
15/03/27 18:31:22 INFO TaskSetManager: Starting task 1.0 in stage 0.0
(TID 1, localhost, PROCESS_LOCAL, 1450 bytes)
15/03/27 18:31:22 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
15/03/27 18:31:22 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
15/03/27 18:31:22 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0).
620 bytes result sent to driver
15/03/27 18:31:22 INFO TaskSetManager: Finished task 0.0 in stage 0.0
(TID 0) in 228 ms on localhost (1/2)
15/03/27 18:31:22 ERROR Executor: Exception in task 1.0 in stage 0.0 (TID 1)
java.sql.SQLException: Incorrect integer value: 'test' for column
'avalue' at row 1
at
com.mysql.jdbc.SQLError.createSQLException(SQLError.java:996)
at
com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3887)
at
com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3823)
at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2435)
at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2582)
at
com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2530)
at
com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1907)
at
com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement.java:2141)
at
com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement.java:2077)
at
com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement.java:2062)
at
org.apache.spark.sql.jdbc.package$JDBCWriteDetails$.savePartition(jdbc.scala:95)
at
org.apache.spark.sql.jdbc.package$JDBCWriteDetails$$anonfun$saveTable$1.apply(jdbc.scala:180)
at
org.apache.spark.sql.jdbc.package$JDBCWriteDetails$$anonfun$saveTable$1.apply(jdbc.scala:179)
at
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:806)
at
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:806)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1504)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1504)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:197)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
15/03/27 18:31:22 WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID
1, localhost): java.sql.SQLException: Incorrect integer value: 'test'
for column 'avalue' at row 1
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:996)
at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3887)
at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3823)
at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2435)
at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2582)
at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2530)
at
com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1907)
at
com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement.java:2141)
at
com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement.java:2077)
at
com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement.java:2062)
at
org.apache.spark.sql.jdbc.package$JDBCWriteDetails$.savePartition(jdbc.scala:95)
at
org.apache.spark.sql.jdbc.package$JDBCWriteDetails$$anonfun$saveTable$1.apply(jdbc.scala:180)
at
org.apache.spark.sql.jdbc.package$JDBCWriteDetails$$anonfun$saveTable$1.apply(jdbc.scala:179)
at
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:806)
at
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:806)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1504)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1504)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:197)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
15/03/27 18:31:22 ERROR TaskSetManager: Task 1 in stage 0.0 failed 1
times; aborting job
15/03/27 18:31:22 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose
tasks have all completed, from pool
15/03/27 18:31:22 INFO TaskSchedulerImpl: Cancelling stage 0
15/03/27 18:31:22 INFO DAGScheduler: Job 0 failed: foreachPartition at
DataFrame.scala:778, took 0.367799 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted
due to stage failure: Task 1 in stage 0.0 failed 1 times, most recent
failure: Lost task 1.0 in stage 0.0 (TID 1, localhost):
java.sql.SQLException: Incorrect integer value: 'test' for column
'avalue' at row 1
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:996)
at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3887)
at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3823)
at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2435)
at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2582)
at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2530)
at
com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1907)
at
com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement.java:2141)
at
com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement.java:2077)
at
com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement.java:2062)
at
org.apache.spark.sql.jdbc.package$JDBCWriteDetails$.savePartition(jdbc.scala:95)
at
org.apache.spark.sql.jdbc.package$JDBCWriteDetails$$anonfun$saveTable$1.apply(jdbc.scala:180)
at
org.apache.spark.sql.jdbc.package$JDBCWriteDetails$$anonfun$saveTable$1.apply(jdbc.scala:179)
at
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:806)
at
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:806)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1504)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1504)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:197)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1211)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1200)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1199)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1199)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1401)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1362)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
==========
My jar version:
spark-sql_2.10:1.3.0
mysql-connector-java:5.1.34
Thank you for you advice,
--
Pierre Bailly
Talend*
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org