As of now my approach is to fetch all data from tables located in different databases in separate RDD's and then make a union of them and then query on them together. I want to know whether I can perform a query on it directly along with creating an RDD. i.e. Instead of creating two RDDs , firing a query on both the tables in a single JdbcRDD and creating an RDD of it. Other than the above if any alternate methods are available they are also welcome.
The below way of doing it is complex involving fetching all data. I want to reduce time . Any help is appreciated . import java.io.Serializable; import scala.*; import scala.runtime.AbstractFunction0; import scala.runtime.AbstractFunction1; import scala.runtime.*; import scala.collection.mutable.LinkedHashMap; import org.apache.spark.api.java.*; import org.apache.spark.api.java.function.*; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.rdd.*; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.api.java.JavaSQLContext; import org.apache.spark.sql.api.java.JavaSchemaRDD; import org.apache.spark.sql.api.java.Row; import org.apache.spark.sql.api.java.DataType; import org.apache.spark.sql.api.java.StructType; import org.apache.spark.sql.api.java.StructField; import org.apache.spark.sql.api.java.Row; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import java.sql.*; import java.util.*; import org.postgresql.Driver; import java.io.*; public class Spark_Postgresql { public static class tableSchema implements Serializable { private String ID; private String INTENSITY; public String getID() { return ID; } public void setINTENSITY(String iNTENSITY) { INTENSITY = iNTENSITY; } public String getINTENSITY() { return INTENSITY; } public void setID(String iD) { ID = iD; } } static class Z extends AbstractFunction0<java.sql.Connection> implements Serializable { java.sql.Connection con; static String URL="jdbc:postgresql://localhost:5432/postgres?user=postgres&password=postgres"; public java.sql.Connection apply() { try { con=DriverManager.getConnection(URL); } catch(Exception e) { e.printStackTrace(); } return con; } // public void change(String DB) // { // URL="jdbc:postgresql://"+DB+"?user=postgres&password=postgres"; // } } static class Z2 extends AbstractFunction0<java.sql.Connection> implements Serializable { java.sql.Connection con; static String URL="jdbc:postgresql://localhost:5432/postgres1?user=postgres&password=postgres"; public java.sql.Connection apply() { try { con=DriverManager.getConnection(URL); } catch(Exception e) { e.printStackTrace(); } return con; } } static public class Z1 extends AbstractFunction1<ResultSet,String> implements Serializable { public String apply(ResultSet i) { String ret=""; Integer colcnt=0; try{ ResultSetMetaData meta = i.getMetaData(); colcnt=meta.getColumnCount(); Integer k=1,type; while(k<=colcnt) { type=meta.getColumnType(k); if(k==1) { if((type==Types.VARCHAR)||(type==Types.CHAR)) ret+=i.getString(k); else if(type==Types.FLOAT) ret+=String.valueOf(i.getFloat(k)); else if(type==Types.INTEGER) ret+=String.valueOf(i.getInt(k)); else if(type==Types.DOUBLE) ret+=String.valueOf(i.getDouble(k)); } else { if((type==Types.VARCHAR)||(type==Types.CHAR)) ret+=" "+i.getString(k); else if(type==Types.FLOAT) ret+=" "+String.valueOf(i.getFloat(k)); else if((type==Types.INTEGER)||(type==Types.BIGINT)) ret+=" "+String.valueOf(i.getInt(k)); else if(type==Types.DOUBLE) ret+=" "+String.valueOf(i.getDouble(k)); // ret+=" "+i.getInt(k); } k=k+1; } } catch(Exception e) {e.printStackTrace();} return ret; } } public static void main(String[] args) throws Exception { String arr[]=new String[1]; arr[0]="/home/hduser/Documents/Credentials/Newest_Credentials_AX/spark-1.1.0-bin-hadoop1/lib/postgresql-9.3-1102.jdbc41.jar"; JavaSparkContext ctx = new JavaSparkContext(new SparkConf().setAppName("JavaSparkSQL").setJars(arr)); SparkContext sctx = new SparkContext(new SparkConf().setAppName("JavaSparkSQL").setJars(arr)); JavaSQLContext sqlCtx = new JavaSQLContext(new JavaSparkContext(sctx)); try { Class.forName("org.postgresql.Driver"); } catch(Exception ex) { ex.printStackTrace(); System.exit(1); } JdbcRDD rdd=new JdbcRDD(sctx,new Z(),"SELECT * FROM spark WHERE ? <= id AND id <= ?",0L, 1000L, 10,new Z1(),scala.reflect.ClassTag$.MODULE$.AnyRef()); JdbcRDD rdd1=new JdbcRDD(sctx,new Z2(),"SELECT * FROM spark WHERE ? <= id AND id <= ?",0L, 1000L, 10,new Z1(),scala.reflect.ClassTag$.MODULE$.AnyRef()); //rdd.saveAsTextFile("hdfs://127.0.0.1:9000/user/hduser/psqlrdd1"); RDD rd=rdd.union(rdd1); JavaRDD<tableSchema> jrd=rd.toJavaRDD().map(new Function<String,tableSchema>(){ public tableSchema call(String line)throws Exception{ String[] parts =line.split(" "); tableSchema tS = new tableSchema(); tS.setID(parts[0]); tS.setINTENSITY(parts[1]); return tS; } }); JavaSchemaRDD schemardd = sqlCtx.applySchema(jrd, tableSchema.class); schemardd.registerAsTable("jrd"); JavaSchemaRDD newrdd= sqlCtx.sql("SELECT jrd.INTENSITY from jrd"); JavaRDD<String> result = newrdd.map(new Function<Row, String>() { public String call(Row row) { return row.getString(0); } }); result.saveAsTextFile("/home/hduser/psqlrdd"); } } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Query-from-two-or-more-tables-Spark-Sql-I-have-done-this-Is-there-any-simpler-solution-tp18812.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org