My joindf is taking 14 sec in the first run and i have commented out the
withcolumn still it is taking more time.



On Tue, Nov 27, 2018 at 12:08 PM Jungtaek Lim <kabh...@gmail.com> wrote:

> You may need to put efforts on triage how much time is spent on each part.
> Without such information you are only able to get general tips and tricks.
> Please check SQL tab and see DAG graph as well as details (logical plan,
> physical plan) to see whether you're happy about these plans.
>
> General tip on quick look of query: avoid using withColumn repeatedly and
> try to put them in one select statement. If I'm not mistaken, it is known
> as a bit costly since each call would produce a new Dataset. Defining
> schema and using "from_json" will eliminate all the call of withColumn"s"
> and extra calls of "get_json_object".
>
> - Jungtaek Lim (HeartSaVioR)
>
> 2018년 11월 27일 (화) 오후 2:44, Siva Samraj <samraj.mi...@gmail.com>님이 작성:
>
>> Hello All,
>>
>> I am using Spark 2.3 version and i am trying to write Spark Streaming
>> Join. It is a basic join and it is taking more time to join the stream
>> data. I am not sure any configuration we need to set on Spark.
>>
>> Code:
>> *************************
>> import org.apache.spark.sql.SparkSession
>> import org.apache.spark.sql.functions._
>> import org.apache.spark.sql.streaming.Trigger
>> import org.apache.spark.sql.types.TimestampType
>>
>> object OrderSalesJoin {
>>   def main(args: Array[String]): Unit = {
>>
>>     setEnvironmentVariables(args(0))
>>
>>     val order_topic = args(1)
>>     val invoice_topic = args(2)
>>     val dest_topic_name = args(3)
>>
>>     val spark =
>> SparkSession.builder().appName("SalesStreamingJoin").getOrCreate()
>>
>>     val checkpoint_path = HDFS_HOST + "/checkpoints/" + dest_topic_name
>>
>>     import spark.implicits._
>>
>>
>>     val order_df = spark
>>       .readStream
>>       .format("kafka")
>>       .option("kafka.bootstrap.servers", KAFKA_BROKERS)
>>       .option("subscribe", order_topic)
>>       .option("startingOffsets", "latest")
>>       .option("failOnDataLoss", "false")
>>       .option("kafka.replica.fetch.max.bytes", "15728640")
>>       .load()
>>
>>
>>     val invoice_df = spark
>>       .readStream
>>       .format("kafka")
>>       .option("kafka.bootstrap.servers", KAFKA_BROKERS)
>>       .option("subscribe", invoice_topic)
>>       .option("startingOffsets", "latest")
>>       .option("failOnDataLoss", "false")
>>       .option("kafka.replica.fetch.max.bytes", "15728640")
>>       .load()
>>
>>
>>     val order_details = order_df
>>       .withColumn("s_order_id", get_json_object($"value".cast("String"),
>> "$.order_id"))
>>       .withColumn("s_customer_id",
>> get_json_object($"value".cast("String"), "$.customer_id"))
>>       .withColumn("s_promotion_id",
>> get_json_object($"value".cast("String"), "$.promotion_id"))
>>       .withColumn("s_store_id", get_json_object($"value".cast("String"),
>> "$.store_id"))
>>       .withColumn("s_product_id",
>> get_json_object($"value".cast("String"), "$.product_id"))
>>       .withColumn("s_warehouse_id",
>> get_json_object($"value".cast("String"), "$.warehouse_id"))
>>       .withColumn("unit_cost", get_json_object($"value".cast("String"),
>> "$.unit_cost"))
>>       .withColumn("total_cost", get_json_object($"value".cast("String"),
>> "$.total_cost"))
>>       .withColumn("units_sold", get_json_object($"value".cast("String"),
>> "$.units_sold"))
>>       .withColumn("promotion_cost",
>> get_json_object($"value".cast("String"), "$.promotion_cost"))
>>       .withColumn("date_of_order",
>> get_json_object($"value".cast("String"), "$.date_of_order"))
>>       .withColumn("tstamp_trans", current_timestamp())
>>       .withColumn("TIMESTAMP", unix_timestamp($"tstamp_trans",
>> "yyyyMMddHHmmss").cast(TimestampType))
>>       .select($"s_customer_id", $"s_order_id", $"s_promotion_id",
>> $"s_store_id", $"s_product_id",
>>         $"s_warehouse_id", $"unit_cost".cast("integer") as "unit_cost",
>>         $"total_cost".cast("integer") as "total_cost",
>> $"promotion_cost".cast("integer") as "promotion_cost",
>>         $"date_of_order", $"tstamp_trans", $"TIMESTAMP",
>> $"units_sold".cast("integer") as "units_sold")
>>
>>
>>     val invoice_details = invoice_df
>>       .withColumn("order_id", get_json_object($"value".cast("String"),
>> "$.order_id"))
>>       .withColumn("invoice_status",
>> get_json_object($"value".cast("String"), "$.invoice_status"))
>>       .where($"invoice_status" === "Success")
>>
>>       .withColumn("tstamp_trans", current_timestamp())
>>       .withColumn("TIMESTAMP", unix_timestamp($"tstamp_trans",
>> "yyyyMMddHHmmss").cast(TimestampType))
>>
>>
>>
>>     val order_wm = order_details.withWatermark("tstamp_trans", args(4))
>>     val invoice_wm = invoice_details.withWatermark("tstamp_trans",
>> args(5))
>>
>>     val join_df = order_wm
>>       .join(invoice_wm, order_wm.col("s_order_id") ===
>> invoice_wm.col("order_id"))
>>       .select($"s_customer_id", $"s_promotion_id", $"s_store_id",
>> $"s_product_id",
>>         $"s_warehouse_id", $"unit_cost", $"total_cost",
>>         $"promotion_cost",
>>         $"date_of_order",
>>         $"units_sold" as "units_sold", $"order_id")
>>
>>     val final_ids = join_df
>>       .withColumn("value", to_json(struct($"s_customer_id",
>> $"s_promotion_id", $"s_store_id", $"s_product_id",
>>         $"s_warehouse_id", $"unit_cost".cast("Int") as "unit_cost",
>> $"total_cost".cast("Int") as "total_cost",
>>         $"promotion_cost".cast("Int") as "promotion_cost",
>>         $"date_of_order",
>>         $"units_sold".cast("Int") as "units_sold", $"order_id")))
>>       .dropDuplicates("order_id")
>>       .select("value")
>>
>>
>>     val write_df = final_ids
>>       .writeStream
>>       .format("kafka")
>>       .option("kafka.bootstrap.servers", KAFKA_BROKERS)
>>       .option("topic", dest_topic_name)
>>       .option("checkpointLocation", checkpoint_path)
>>       .trigger(Trigger.ProcessingTime("1 second"))
>>       .start()
>>
>>     write_df.awaitTermination()
>>
>>   }
>>
>> }
>> ****************************
>>
>> Let me know, it is taking more than a minute for every run. The waiting
>> time is keep on increasing as the data grows.
>>
>> Please let me know, any thing we need to configure to make it fast. I
>> tried increase the parallesim.
>>
>> Executor: tried from <1 to 4> and memory i gave is 3GB. The data flow is
>> very less. Even for the single data it is taking time.
>>
>>
>>

Reply via email to