I'm running the following code in an attempt to import some tables from our Oracle DB into Spark (2.0.2), and then save them as Parquet tables in S3 (using S3A). The code runs, and does create query-able tables in our Hive Metastore, but it only creates one connection to Oracle (I was expecting to get a number of connections equal to "numPartitions" or the number of tasks the workers can start simultaneously, whichever is less). Also, it fails if I am importing a large table (the limit seems to be ~driver memory size, so I'm guessing that it is putting the table into driver memory and failing if it doesn't fit).
This is obviously both slow and inefficient. Am I doing (or not doing) something wrong that is forcing this program to run entirely on the driver? Or is there something else going on here? The one Oracle query does seem to come from one of the workers. Some small amount of memory usage on the workers does appear, but they don't seem to be used in any significant way. I appreciate any insight I can get here, as I was under the impression that this should use worker resources and run in parallel sort of automagically. Thanks! from pyspark import SparkContext,sql, SparkConf import sys def main(argv): conf = SparkConf() conf.setMaster("spark://<spark-master>") spark = sql.SparkSession.builder.config(conf=conf).appName('Writing tables to S3').enableHiveSupport().getOrCreate() sc = spark.sparkContext lines = [] for x in sorted(sc.getConf().getAll(), key=lambda x: x[0]): print(x) with open("tables.txt") as tablenames: for line in tablenames: lines.append(line.rstrip('\n')) for line in lines: print("Reading "+line) query_min = spark.read.format("jdbc").option("user","user").option("password","pass").option("dbtable","(select min("+line+"_ID) from <dbname>."+line+')').option("url",'<Oracle-url>').load() query_min.show() query_max = spark.read.format("jdbc").option("user","user").option("password","pass").option("dbtable","(select max("+line+"_ID) from <dbname>."+line+')').option("url",'<Oracle-url>').load() query_max.show() data = spark.read.format("jdbc").option("user",'user').option("password",'pass').option("dbtable",'<dbname>.'+line).option("url",'<Oracle-url>').option("partitionColumn",line+'_ID').option("lowerBound",int(query_min.head()[0])).option("upperBound",int(query_max.head()[0])).option("numPartitions","40").load() print("reading "+line) data.write.mode("overwrite").format("parquet").saveAsTable("s3."+line) print("done writing "+line) if __name__ == '__main__': main(sys.argv[1:]) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Parallel-read-from-OracleDB-slow-fails-on-large-tables-tp28204.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org