Hi 1. Spark version : 2.3.0 2. jdk: oracle jdk 1.8 3. os version: centos 6.8 4. spark-env.sh: null 5. spark session config: SparkSession.builder().appName("DBScale") .config("spark.sql.crossJoin.enabled", "true") .config("spark.sql.adaptive.enabled", "true") .config("spark.scheduler.mode", "FAIR") .config("spark.executor.memory", "1g") .config("spark.executor.cores", 1) .config("spark.driver.memory", "20") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.executor.extraJavaOptions", "-XX:+UseG1GC -XX:+PrintFlagsFinal -XX:+PrintReferenceGC " + "-verbose:gc -XX:+PrintGCDetails " + "-XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy") .master(this.spark_master) .getOrCreate(); 6. core code: for (SparksqlTableInfo tableInfo: this.dbtable){ // this loop reads data from mysql String dt = "(" + tableInfo.sql + ")" + tableInfo.tmp_table_name; String[] pred = new String[tableInfo.partition_num]; if (tableInfo.partition_num > 0) { for (int j = 0; j < tableInfo.partition_num; j++) { String str = "some where clause to split mysql table into many partitions"; pred[j] = str; } Dataset<Row> jdbcDF = ss.read().jdbc(this.url, dt, pred, connProp); //this.url is mysql-jdbc-url (mysql://XX.XX.XX.XX:XXXX) jdbcDF.createOrReplaceTempView(tableInfo.tmp_table_name); } else { logger.warn("[\033[32m" + "partition_num == 0" + "\033[0m]"); Dataset<Row> jdbcDF = ss.read().jdbc(this.url, dt, connProp); jdbcDF.createOrReplaceTempView(tableInfo.tmp_table_name); } } // Then run a query and write the result set to mysql
Dataset<Row> result = ss.sql(this.sql); result.explain(true); connProp.put("rewriteBatchedStatements", "true"); connProp.put("sessionVariables", "sql_log_bin=off"); result.write().jdbc(this.dst_url, this.dst_table, connProp); ------------------------------------------------------------------发件人:Jhon Anderson Cardenas Diaz <jhonderson2...@gmail.com>发送时间:2018年4月11日(星期三) 22:42收件人:宋源栋 <yuandong.s...@greatopensource.com>抄 送:user <user@spark.apache.org>主 题:Re: Spark is only using one worker machine when more are available Hi, could you please share the environment variables values that you are sending when you run the jobs, spark version, etc.. more details. Btw, you should take a look on SPARK_WORKER_INSTANCES and SPARK_WORKER_CORES if you are using spark 2.0.0. Regards. 2018-04-11 4:10 GMT-05:00 宋源栋 <yuandong.s...@greatopensource.com>: Hi all, I hava a standalone mode spark cluster without HDFS with 10 machines that each one has 40 cpu cores and 128G RAM. My application is a sparksql application that reads data from database "tpch_100g" in mysql and run tpch queries. When loading tables from myql to spark, I spilts the biggest table "lineitem" into 600 partitions. When my application runs, there are only 40 executor(spark.executor.memory = 1g, spark.executor.cores = 1) in executor page of spark application web and all executors are on the same mathine. It is too slowly that all tasks are parallelly running in only one mathine.