Re: Spark Streaming join taking long to process

2018-11-27 Thread Shixiong(Ryan) Zhu
If you are using the same code to run on Yarn, I believe it’s still using
the local mode as it overwrites the master url set by CLI. You can check
the “executors” tab in the Spark UI to set how many executors are running,
and verify if it matches your config.
On Tue, Nov 27, 2018 at 6:17 AM Abhijeet Kumar 
wrote:

> Yes, it did
> Thanks for the solution. I solved it locally, but I’m worried how I can do
> this when I’m using yarn because that same 15 Sec is taking on the yarn too
> :)
>
> On 27-Nov-2018, at 4:42 PM, Srikanth Sriram 
> wrote:
>
> Hello Abhijeet,
> I am not sure my answer will solve your problem but just think of the
> master configuration set for spark application.
>
> val spark = SparkSession.builder
>   .appName("Argoid_Realtime_Pipeline")
>   .master("local")
>   .getOrCreate()
>
> I see you have set it has "local" not as "local[*]".
>
> From other blog, i got this information, sharing you in full sentence:
> "We are going to work locally, running the application straight from our
> IDE, so we are setting the master to local[*], meaning we are creating as
> many threads as there are cores on the machine."
>
> Just check if this is reducing the time taken for processing, since by
> this local[*] we are going to use all cores available, not just one core?
>
> Regards,
> Sriram Srikanth
>
> On Tue, Nov 27, 2018 at 1:46 PM Abhijeet Kumar <
> abhijeet.ku...@sentienz.com> wrote:
>
>> Hi All,
>>
>> I'm just practicing Spark Streaming with joining two different stream. I
>> noticed that it's taking around 15 seconds for each record. Let me share
>> the details and the code:
>>
>> If you can see for stage id 2, it's taking 15 s. Isn't this strange.
>>
>> Code:
>>
>> import org.apache.spark.sql.SparkSession
>> import org.apache.spark.sql.functions._
>> import org.apache.spark.sql.streaming.Trigger
>> import org.apache.spark.sql.types.TimestampType
>> import org.apache.log4j.{Level, Logger}
>>
>> object StreamJoin{
>>
>>   val kafkaTopic1 = "demo2"
>>   val kafkaTopic2 = "demo3"
>>   val bootstrapServer = "localhost:9092"
>>
>>   def main(args: Array[String]): Unit = {
>> val checkPointDir = "hdfs://localhost:8020/checkpo"
>>
>> val spark = SparkSession.builder
>>   .appName("Argoid_Realtime_Pipeline")
>>   .master("local")
>>   .getOrCreate()
>>
>> val rootLogger = Logger.getRootLogger()
>> rootLogger.setLevel(Level.ERROR)
>>
>> import spark.implicits._
>>
>> val df1 = spark
>>   .readStream
>>   .format("kafka")
>>   .option("kafka.bootstrap.servers", bootstrapServer)
>>   .option("subscribe", kafkaTopic1)
>>   .option("failOnDataLoss", "false")
>>   .load()
>>
>> val df2 = spark
>>   .readStream
>>   .format("kafka")
>>   .option("kafka.bootstrap.servers", bootstrapServer)
>>   .option("subscribe", kafkaTopic2)
>>   .option("failOnDataLoss", "false")
>>   .load()
>>
>> val order_details = df1
>>   .withColumn("s_order_id", get_json_object($"value".cast("String"), 
>> "$.order_id"))
>>   .withColumn("s_customer_id", get_json_object($"value".cast("String"), 
>> "$.customer_id"))
>>   .withColumn("s_promotion_id", get_json_object($"value".cast("String"), 
>> "$.promotion_id"))
>>   .withColumn("s_store_id", get_json_object($"value".cast("String"), 
>> "$.store_id"))
>>   .withColumn("s_product_id", get_json_object($"value".cast("String"), 
>> "$.product_id"))
>>   .withColumn("s_warehouse_id", get_json_object($"value".cast("String"), 
>> "$.warehouse_id"))
>>   .withColumn("unit_cost", get_json_object($"value".cast("String"), 
>> "$.unit_cost"))
>>   .withColumn("total_cost", get_json_object($"value".cast("String"), 
>> "$.total_cost"))
>>   .withColumn("units_sold", get_json_object($"value".cast("String"), 
>> "$.units_sold"))
>>   .withColumn("promotion_cost", get_json_object($"value".cast("String"), 
>> "$.promotion_cost"))
>>   .withColumn("date_of_order", get_json_object($"value".cast("String"), 
>> "$.date_of_order"))
>>   .withColumn("tstamp_trans", current_timestamp())
>>   .withColumn("TIMESTAMP", unix_timestamp($"tstamp_trans", 
>> "MMddHHmmss").cast(TimestampType))
>>   .select($"s_customer_id", $"s_order_id", $"s_promotion_id", 
>> $"s_store_id", $"s_product_id",
>> $"s_warehouse_id", $"unit_cost".cast("integer") as "unit_cost",
>> $"total_cost".cast("integer") as "total_cost", 
>> $"promotion_cost".cast("integer") as "promotion_cost",
>> $"date_of_order", $"tstamp_trans", $"TIMESTAMP", 
>> $"units_sold".cast("integer") as "units_sold")
>>
>> val invoice_details = df2
>>   .withColumn("order_id", get_json_object($"value".cast("String"), 
>> "$.order_id"))
>>   .withColumn("invoice_status", get_json_object($"value".cast("String"), 
>> "$.invoice_status"))
>>   .where($"invoice_status" === "Success")
>>
>>   .withColumn("tstamp_trans", current_timestamp())
>>   .withColumn("TIMEST

Re: Spark Streaming join taking long to process

2018-11-27 Thread Abhijeet Kumar
Yes, it did 
Thanks for the solution. I solved it locally, but I’m worried how I can do this 
when I’m using yarn because that same 15 Sec is taking on the yarn too :)

> On 27-Nov-2018, at 4:42 PM, Srikanth Sriram  
> wrote:
> 
> Hello Abhijeet,
> I am not sure my answer will solve your problem but just think of the master 
> configuration set for spark application.
> val spark = SparkSession.builder
>   .appName("Argoid_Realtime_Pipeline")
>   .master("local")
>   .getOrCreate()
> I see you have set it has "local" not as "local[*]".
> 
> From other blog, i got this information, sharing you in full sentence:
> "We are going to work locally, running the application straight from our IDE, 
> so we are setting the master to local[*], meaning we are creating as many 
> threads as there are cores on the machine."
> 
> Just check if this is reducing the time taken for processing, since by this 
> local[*] we are going to use all cores available, not just one core?
> 
> Regards,
> Sriram Srikanth
> 
> On Tue, Nov 27, 2018 at 1:46 PM Abhijeet Kumar  > wrote:
> Hi All,
> 
> I'm just practicing Spark Streaming with joining two different stream. I 
> noticed that it's taking around 15 seconds for each record. Let me share the 
> details and the code:
> 
> 
> 
> If you can see for stage id 2, it's taking 15 s. Isn't this strange.
> 
> Code:
> 
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.streaming.Trigger
> import org.apache.spark.sql.types.TimestampType
> import org.apache.log4j.{Level, Logger}
> 
> object StreamJoin{
> 
>   val kafkaTopic1 = "demo2"
>   val kafkaTopic2 = "demo3"
>   val bootstrapServer = "localhost:9092"
> 
>   def main(args: Array[String]): Unit = {
> val checkPointDir = "hdfs://localhost:8020/checkpo <>"
> 
> val spark = SparkSession.builder
>   .appName("Argoid_Realtime_Pipeline")
>   .master("local")
>   .getOrCreate()
> 
> val rootLogger = Logger.getRootLogger()
> rootLogger.setLevel(Level.ERROR)
> 
> import spark.implicits._
> 
> val df1 = spark
>   .readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", bootstrapServer)
>   .option("subscribe", kafkaTopic1)
>   .option("failOnDataLoss", "false")
>   .load()
> 
> val df2 = spark
>   .readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", bootstrapServer)
>   .option("subscribe", kafkaTopic2)
>   .option("failOnDataLoss", "false")
>   .load()
> 
> val order_details = df1
>   .withColumn("s_order_id", get_json_object($"value".cast("String"), 
> "$.order_id"))
>   .withColumn("s_customer_id", get_json_object($"value".cast("String"), 
> "$.customer_id"))
>   .withColumn("s_promotion_id", get_json_object($"value".cast("String"), 
> "$.promotion_id"))
>   .withColumn("s_store_id", get_json_object($"value".cast("String"), 
> "$.store_id"))
>   .withColumn("s_product_id", get_json_object($"value".cast("String"), 
> "$.product_id"))
>   .withColumn("s_warehouse_id", get_json_object($"value".cast("String"), 
> "$.warehouse_id"))
>   .withColumn("unit_cost", get_json_object($"value".cast("String"), 
> "$.unit_cost"))
>   .withColumn("total_cost", get_json_object($"value".cast("String"), 
> "$.total_cost"))
>   .withColumn("units_sold", get_json_object($"value".cast("String"), 
> "$.units_sold"))
>   .withColumn("promotion_cost", get_json_object($"value".cast("String"), 
> "$.promotion_cost"))
>   .withColumn("date_of_order", get_json_object($"value".cast("String"), 
> "$.date_of_order"))
>   .withColumn("tstamp_trans", current_timestamp())
>   .withColumn("TIMESTAMP", unix_timestamp($"tstamp_trans", 
> "MMddHHmmss").cast(TimestampType))
>   .select($"s_customer_id", $"s_order_id", $"s_promotion_id", 
> $"s_store_id", $"s_product_id",
> $"s_warehouse_id", $"unit_cost".cast("integer") as "unit_cost",
> $"total_cost".cast("integer") as "total_cost", 
> $"promotion_cost".cast("integer") as "promotion_cost",
> $"date_of_order", $"tstamp_trans", $"TIMESTAMP", 
> $"units_sold".cast("integer") as "units_sold")
> 
> val invoice_details = df2
>   .withColumn("order_id", get_json_object($"value".cast("String"), 
> "$.order_id"))
>   .withColumn("invoice_status", get_json_object($"value".cast("String"), 
> "$.invoice_status"))
>   .where($"invoice_status" === "Success")
> 
>   .withColumn("tstamp_trans", current_timestamp())
>   .withColumn("TIMESTAMP", unix_timestamp($"tstamp_trans", 
> "MMddHHmmss").cast(TimestampType))
> 
> 
> val join_df = order_details
>   .join(invoice_details, order_details.col("s_order_id") === 
> invoice_details.col("order_id"))
>   .select($"s_customer_id", $"s_promotion_id", $"s_store_id", 
> $"s_product_id",
> $"s_warehouse_id", $"unit_cost", $"total_cost",
> $"