Re: How to query the Cube via API and use the dataset for other purpose

2024-04-03 Thread Nam Đỗ Duy via user
Thank you very much for your response, I did ask a pro for help and below
was the sample code on sample SSB project I would like to contribute to
help someone who have same issue like me:

==


import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}
import org.json4s.jackson.JsonMethods
import org.json4s.{DefaultFormats, Formats}

import java.io.{BufferedReader, DataOutputStream, InputStreamReader}
import java.net.{HttpURLConnection, URL}
import java.util.Base64

object APIKylinRunSQL {

  val KYLIN_QUERY_URL = "http://localhost:7070/kylin/api/query;
  val USER_NAME = "x"
  val PASSWORD = "y"
  val KYLIN_PROJECT = "learn_kylin"

  val spark = SparkSession.builder
.master("local")
.appName("Convert JSON to DataFrame")
.getOrCreate()

  def main(args: Array[String]): Unit = {


val tablesAndQueries = Map(
  "CUSTOMER" -> "select * from SSB.CUSTOMER",
  "DATES" -> "SELECT * FROM SSB.DATES",
  "PART" -> "SELECT * FROM SSB.PART",
  "P_LINEORDER" -> "SELECT * FROM SSB.P_LINEORDER",
  "SUPPLIER" -> "SELECT * FROM SSB.SUPPLIER",
  "P_LINEORDER" -> "SELECT lo_orderdate, count(1) FROM SSB.P_LINEORDER
GROUP BY lo_orderdate",
  "PART" -> "SELECT P_COLOR, count(1) FROM SSB.PART group by P_COLOR"
)

// query times
val numberOfExecutions = 15

// loop query
for (i <- 1 to numberOfExecutions) {
  println(s"Executing query $i")
  for ((table, query) <- tablesAndQueries) {
println(s"Executing queries for table $table")

println(query)

executeQuery(query)
// wait a seconds
Thread.sleep(1000)
  }
}

  }

  def executeQuery(sqlQuery: String): Unit = {

val queryJson =
  s"""
 |{
 |  "project": "$KYLIN_PROJECT",
 |  "sql": "$sqlQuery"
 |}
 |""".stripMargin

// Encode the username and password for basic authentication
val encodedAuth =
Base64.getEncoder.encodeToString(s"$USER_NAME:$PASSWORD".getBytes)

val url = new URL(KYLIN_QUERY_URL)
val connection = url.openConnection.asInstanceOf[HttpURLConnection]

connection.setRequestMethod("POST")
connection.setRequestProperty("Authorization", s"Basic $encodedAuth")
connection.setRequestProperty("Content-Type", "application/json")
connection.setRequestProperty("Accept", "application/json")
connection.setDoOutput(true)

val outputStream = connection.getOutputStream
val writer = new DataOutputStream(outputStream)
writer.write(queryJson.getBytes("UTF-8"))
writer.flush()
writer.close()

val responseCode = connection.getResponseCode

if (responseCode == HttpURLConnection.HTTP_OK) {
  val inputStream = connection.getInputStream
  val reader = new BufferedReader(new InputStreamReader(inputStream))
  var inputLine: String = null
  val response = new StringBuilder

  while ( {
inputLine = reader.readLine;
inputLine != null
  }) {
response.append(inputLine)
  }
  reader.close()
  println("Result:")
  println(response.toString)

  connection.disconnect()

  // parse JSON
  implicit val formats: Formats = DefaultFormats
  val parsedJson = JsonMethods.parse(response.toString)

  val columns = (parsedJson \ "columnMetas")
.extract[List[Map[String, Any]]]

  // dynamically build the schema based on column name information in
JSON
  val schema = StructType(columns.map { col =>
val columnName = col("name").asInstanceOf[String]
StructField(columnName, StringType, nullable = true)
  })

  schema.printTreeString()

  // get data from JSON
  val data = (parsedJson \ "results").extract[List[List[Any]]]

  // convert data to RDD[Row]
  val rowsRDD = spark.sparkContext.parallelize(data.map(row =>
Row.fromSeq(row.map(_.asInstanceOf[AnyRef]

  val df = spark.createDataFrame(rowsRDD, schema)

  df.show(20, false)

} else {
  println(s"Error: $responseCode")
  connection.disconnect()
}
  }
}


On Sun, Mar 31, 2024 at 8:57 PM Lionel CL  wrote:

> Hi Nam,
> You can refer to the spark docs
> https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html
>
> Regards,
> Lu Cao
>
> From: Nam Đỗ Duy 
> Date: Sunday, March 31, 2024 at 08:53
> To: dev , user@kylin.apache.org <
> user@kylin.apache.org>
> Subject: Re: How to query the Cube via API and use the dataset for other
> purpose
> Dear Sirs/Madames
>
> Could anyone here help me to figureout the way to use scala to query an
> select SQL against kylin cube via API then turn that table result into a
> dataframe in scala for other purpose?
>
> Thank you so much for your time!
>
> Best regards
>
> On Fri, 29 Mar 2024 at 17:52 Nam Đỗ Duy  wrote:
>
> > Hi Xiaoxiang,
> > Sir & Madames,
> >
> > I use the following code to query the cube via API but I cannot use the
> > result as a 

Re: How to query the Cube via API and use the dataset for other purpose

2024-03-30 Thread Nam Đỗ Duy via user
Dear Sirs/Madames

Could anyone here help me to figureout the way to use scala to query an
select SQL against kylin cube via API then turn that table result into a
dataframe in scala for other purpose?

Thank you so much for your time!

Best regards

On Fri, 29 Mar 2024 at 17:52 Nam Đỗ Duy  wrote:

> Hi Xiaoxiang,
> Sir & Madames,
>
> I use the following code to query the cube via API but I cannot use the
> result as a dataframe, could you suggest a way to do that because it is
> very important for our project.
>
> Thanks and best regards
>
> ===
>
> import org.apache.spark.sql.{DataFrame, SparkSession}
> import org.apache.spark.sql.functions._
>
> object APICaller {
>   def main(args: Array[String]): Unit = {
> val spark = SparkSession.builder()
>   .appName("APICaller")
>   .master("local[*]")
>   .getOrCreate()
>
> import spark.implicits._
>
> val username = "namdd"
> val password = "eer123"
> val urlString = "http://localhost:7070/kylin/api/query;
> val project = "learn_kylin"
> val query = "select count(*) from HIVE_DWH_STANDARD.factuserEvent"
>
> val response: String = callAPI(urlString, username, password, project,
> query)
>
> // Convert response to DataFrame
> val df = spark.read.json(Seq(response).toDS())
>
> // Show DataFrame
> df.show()
>
> // Stop Spark session
> spark.stop()
>   }
>
>   def callAPI(url: String, username: String, password: String, project:
> String, query: String): String = {
> val encodedAuth =
> java.util.Base64.getEncoder.encodeToString(s"$username:$password".getBytes)
>
> val connection = scalaj.http.Http(url)
>   .postData(s"""{"project": "$project", "sql": "$query"}""")
>   .header("Content-Type", "application/json")
>   .header("Accept", "application/json")
>   .auth(username, password)
>   .asString
>
> if (connection.isError)
>   throw new RuntimeException(s"Error calling API: ${connection.body}")
>
> connection.body
>   }
> }
>
>


How to query the Cube via API and use the dataset for other purpose

2024-03-29 Thread Nam Đỗ Duy via user
Hi Xiaoxiang,
Sir & Madames,

I use the following code to query the cube via API but I cannot use the
result as a dataframe, could you suggest a way to do that because it is
very important for our project.

Thanks and best regards

===

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._

object APICaller {
  def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
  .appName("APICaller")
  .master("local[*]")
  .getOrCreate()

import spark.implicits._

val username = "namdd"
val password = "eer123"
val urlString = "http://localhost:7070/kylin/api/query;
val project = "learn_kylin"
val query = "select count(*) from HIVE_DWH_STANDARD.factuserEvent"

val response: String = callAPI(urlString, username, password, project,
query)

// Convert response to DataFrame
val df = spark.read.json(Seq(response).toDS())

// Show DataFrame
df.show()

// Stop Spark session
spark.stop()
  }

  def callAPI(url: String, username: String, password: String, project:
String, query: String): String = {
val encodedAuth =
java.util.Base64.getEncoder.encodeToString(s"$username:$password".getBytes)

val connection = scalaj.http.Http(url)
  .postData(s"""{"project": "$project", "sql": "$query"}""")
  .header("Content-Type", "application/json")
  .header("Accept", "application/json")
  .auth(username, password)
  .asString

if (connection.isError)
  throw new RuntimeException(s"Error calling API: ${connection.body}")

connection.body
  }
}