Which version of spark are you using? insertIntoJDBC is deprecated (from
1.4.0), you may use write.jdbc() instead.

Thanks
Best Regards

On Wed, Jul 15, 2015 at 2:43 PM, Manohar753 <[email protected]
> wrote:

> Hi All,
>
> Am trying to add few new rows for existing table in mysql using
> DataFrame.But it is adding new rows to the table in local environment but
> on
> spark cluster below is the runtime exception.
>
>
> Exception in thread "main" java.lang.RuntimeException: Table msusers_1
> already exists.
>         at scala.sys.package$.error(package.scala:27)
>         at
> org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:240)
>         at
> org.apache.spark.sql.DataFrame.insertIntoJDBC(DataFrame.scala:1481)
>         at com.sparkexpert.UserMigration.main(UserMigration.java:59)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>         at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:606)
>         at
>
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
>         at
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
>         at
> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
>         at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
>         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> 15/07/15 08:13:42 INFO spark.SparkContext: Invoking stop() from shutdown
> hook
> 15/07/15 08:13:42 INFO handler.ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/metrics/json,null}
> 15/07/15 08:13:
>
> code snippet is below:
>
> System.out.println(Query);
>         Map<String, String> options = new HashMap<>();
>         options.put("driver",
> PropertyLoader.getProperty(Constants.msSqlDriver));
>         options.put("url", PropertyLoader.getProperty(Constants.msSqlURL));
>         options.put("dbtable",Query);
>         options.put("numPartitions", "1");
>         DataFrame delatUsers = sqlContext.load("jdbc", options);
>
>
>         delatUsers.show();
>         //Load latest users DataFrame
>
>         String mysQuery="(SELECT * FROM msusers_1) as employees_name";
>         Map<String, String> msoptions = new HashMap<>();
>
> msoptions.put("driver",PropertyLoader.getProperty(Constants.mysqlDriver));
>         msoptions.put("url",
> PropertyLoader.getProperty(Constants.mysqlUrl));
>         msoptions.put("dbtable",mysQuery);
>         msoptions.put("numPartitions", "1");
>         DataFrame latestUsers = sqlContext.load("jdbc", msoptions);
>
> //Get Update users Data
>         DataFrame updatedUsers =
> delatUsers.as("ms").join(latestUsers.as("lat"),
> col("lat.uid").equalTo(col("ms.uid")),
>
> "inner").select("ms.revision","ms.uid","ms.UserType","ms.FirstName","ms.LastName","ms.Email","ms.smsuser_id","ms.dev_acct","ms.lastlogin","ms.username","ms.schoolAffiliation","ms.authsystem_id","ms.AdminStatus");
>  //Insert new users into Mysql DB
> *
>
> delatUsers.except(updatedUsers).insertIntoJDBC(PropertyLoader.getProperty(Constants.mysqlUrl),
> "msusers_1", false);
> *
>  the bold line is the Exception occur line.
> Team please give me some inputs if any one had come across this .
> but for the same override the table is working fine on cluster also.
>
> Thanks,
> manoar
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/DataFrame-InsertIntoJdbc-Runtime-Exception-on-cluster-tp23851.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [email protected]
> For additional commands, e-mail: [email protected]
>
>

Reply via email to