As of now my approach is to fetch all data from tables located in different
databases in separate RDD's and then make a union of them and then query on
them together. I want to know whether I can perform a query on it directly
along with creating an RDD. i.e. Instead of creating two RDDs , firing a
query on both the tables in a single JdbcRDD and creating an RDD of it.
Other than the above if any alternate methods are available they are also
welcome.

The below way of doing it is complex involving fetching all data. I want to
reduce time . Any help is appreciated .




import java.io.Serializable;
import scala.*;
import scala.runtime.AbstractFunction0;
import scala.runtime.AbstractFunction1;
import scala.runtime.*;
import scala.collection.mutable.LinkedHashMap;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.rdd.*;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.JavaSchemaRDD;
import org.apache.spark.sql.api.java.Row;
import org.apache.spark.sql.api.java.DataType;
import org.apache.spark.sql.api.java.StructType;
import org.apache.spark.sql.api.java.StructField;
import org.apache.spark.sql.api.java.Row;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import java.sql.*;
import java.util.*;
import org.postgresql.Driver;
import java.io.*;
public class Spark_Postgresql {
    public static class tableSchema implements Serializable {
        private String ID;
        private String INTENSITY;
        public String getID() {
            return ID;
        }
        public void setINTENSITY(String iNTENSITY) {
            INTENSITY = iNTENSITY;
        }
        public String getINTENSITY() {
            return INTENSITY;
        }
        public void setID(String iD) {
            ID = iD;
        }
      }
    static class Z extends AbstractFunction0<java.sql.Connection> implements
Serializable
    {
        java.sql.Connection con;
        static String
URL="jdbc:postgresql://localhost:5432/postgres?user=postgres&password=postgres";
        public java.sql.Connection apply()
        {
            
            try 
                {
                    con=DriverManager.getConnection(URL);
                }
            catch(Exception e)
                {
                    e.printStackTrace();
                }
            return con;
        }
        // public void change(String DB)
        //     {
        //      URL="jdbc:postgresql://"+DB+"?user=postgres&password=postgres";
        //     }
        
    }
    static class Z2 extends AbstractFunction0<java.sql.Connection>
implements Serializable
    {
        java.sql.Connection con;
        static String
URL="jdbc:postgresql://localhost:5432/postgres1?user=postgres&password=postgres";
        public java.sql.Connection apply()
        {
            
            try 
                {
                    con=DriverManager.getConnection(URL);
                }
            catch(Exception e)
                {
                    e.printStackTrace();
                }
            return con;
        }
    }
    static public class Z1 extends AbstractFunction1<ResultSet,String>
implements Serializable
    {

        public String apply(ResultSet i) {
            String ret="";
            Integer colcnt=0;
            try{
                ResultSetMetaData meta = i.getMetaData();
                colcnt=meta.getColumnCount();
                Integer k=1,type;
                while(k<=colcnt)
                    {
                        type=meta.getColumnType(k);
                        if(k==1)
                            {
                                if((type==Types.VARCHAR)||(type==Types.CHAR))
                                    ret+=i.getString(k);
                                else if(type==Types.FLOAT)
                                    ret+=String.valueOf(i.getFloat(k));
                                else if(type==Types.INTEGER)
                                    ret+=String.valueOf(i.getInt(k));
                                else if(type==Types.DOUBLE)
                                    ret+=String.valueOf(i.getDouble(k));
                            }
                        else
                            {
                                if((type==Types.VARCHAR)||(type==Types.CHAR))
                                    ret+=" "+i.getString(k);
                                else if(type==Types.FLOAT)
                                    ret+=" "+String.valueOf(i.getFloat(k));
                                else 
if((type==Types.INTEGER)||(type==Types.BIGINT))
                                    ret+=" "+String.valueOf(i.getInt(k));
                                else if(type==Types.DOUBLE)
                                    ret+=" "+String.valueOf(i.getDouble(k));
                                //      ret+=" "+i.getInt(k);
                            }
                        k=k+1;
                    }
            }
            catch(Exception e)
                {e.printStackTrace();}
            return ret;
        }
    }
    public static void main(String[] args) throws Exception {
        String arr[]=new String[1];

arr[0]="/home/hduser/Documents/Credentials/Newest_Credentials_AX/spark-1.1.0-bin-hadoop1/lib/postgresql-9.3-1102.jdbc41.jar";
        JavaSparkContext ctx = new JavaSparkContext(new
SparkConf().setAppName("JavaSparkSQL").setJars(arr));
        SparkContext sctx = new SparkContext(new
SparkConf().setAppName("JavaSparkSQL").setJars(arr));
        JavaSQLContext sqlCtx = new JavaSQLContext(new JavaSparkContext(sctx));
        try 
            {
                Class.forName("org.postgresql.Driver");
            }
        catch(Exception ex) 
            {
                ex.printStackTrace();
                System.exit(1);
            }
          JdbcRDD rdd=new JdbcRDD(sctx,new Z(),"SELECT * FROM spark WHERE ? <= 
id
AND id <= ?",0L, 1000L, 10,new
Z1(),scala.reflect.ClassTag$.MODULE$.AnyRef());
          JdbcRDD rdd1=new JdbcRDD(sctx,new Z2(),"SELECT * FROM spark WHERE ? 
<= id
AND id <= ?",0L, 1000L, 10,new
Z1(),scala.reflect.ClassTag$.MODULE$.AnyRef());   
          
          //rdd.saveAsTextFile("hdfs://127.0.0.1:9000/user/hduser/psqlrdd1"); 
          RDD rd=rdd.union(rdd1);
          JavaRDD<tableSchema> jrd=rd.toJavaRDD().map(new
Function<String,tableSchema>(){
                  public tableSchema call(String line)throws Exception{ 
                      String[] parts =line.split(" ");
                      tableSchema tS = new tableSchema(); 
                      tS.setID(parts[0]); 
                      tS.setINTENSITY(parts[1]); 
                      return tS;
                  }
              });
          
          JavaSchemaRDD schemardd = sqlCtx.applySchema(jrd, tableSchema.class);
          schemardd.registerAsTable("jrd");
          JavaSchemaRDD newrdd= sqlCtx.sql("SELECT jrd.INTENSITY from jrd");
          JavaRDD<String> result = newrdd.map(new Function<Row, String>() {
                  public String call(Row row) {
                      return row.getString(0);
                  }
              });
          result.saveAsTextFile("/home/hduser/psqlrdd"); 
    }
}



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Query-from-two-or-more-tables-Spark-Sql-I-have-done-this-Is-there-any-simpler-solution-tp18812.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to