Re: writing into oracle database is very slow

2019-04-19 Thread spark receiver
hi Jiang,

i was facing the very same issue ,the solution is write to file and using
oracle external table to do the insert.

hope this could help.

Dalin

On Thu, Apr 18, 2019 at 11:43 AM Jörn Franke  wrote:

> What is the size of the data? How much time does it need on HDFS and how
> much on Oracle? How many partitions do you have on Oracle side?
>
> Am 06.04.2019 um 16:59 schrieb Lian Jiang :
>
> Hi,
>
> My spark job writes into oracle db using:
>
> df.coalesce(10).write.format("jdbc").option("url", url)
>   .option("driver", driver).option("user", user)
>   .option("batchsize", 2000)
>   .option("password", password).option("dbtable", 
> tableName).mode("append").save()
>
> It is much slow than writting into HDFS. The data to write is small.
>
> Is this expected? Thanks for any clue.
>
>
>


Re: Hive to Oracle using Spark - Type(Date) conversion issue

2018-06-06 Thread spark receiver
Use unix time and write the unix time to oracle as number column type ,create 
virtual column in oracle database for the unix time  like “oracle_time 
generated always as (to_date('1970010108','MMDDHH24')+(1/24/60/60)*unixtime 
)

> On Mar 20, 2018, at 11:08 PM, Gurusamy Thirupathy  wrote:
> 
> HI Jorn,
> 
> Thanks for your sharing different options, yes we are trying to build a 
> generic tool for Hive to Spark export. 
> FYI, currently we are using sqoop, we are trying to migrate from sqoop to 
> spark.
> 
> Thanks
> -G
> 
> On Tue, Mar 20, 2018 at 2:17 AM, Jörn Franke  > wrote:
> Write your own Spark UDF. Apply it to all varchar columns.
> 
> Within this udf you can use the SimpleDateFormat parse method. If this method 
> returns null you return the content as varchar if not you return a date. If 
> the content is null you return null.
> 
> Alternatively you can define an insert function as pl/sql on Oracle side.
> 
> Another alternative is to read the Oracle metadata for the table at runtime 
> and then adapt your conversion based on this. 
> 
> However, this may not be perfect depending on your use case. Can you please 
> provide more details/examples? Do you aim at a generic hive to Oracle import 
> tool using Spark? Sqoop would not be an alternative?
> 
> On 20. Mar 2018, at 03:45, Gurusamy Thirupathy  > wrote:
> 
>> Hi guha,
>> 
>> Thanks for your quick response, option a and b are in our table already. For 
>> option b, again the same problem, we don't know which column is date.
>> 
>> 
>> Thanks,
>> -G
>> 
>> On Sun, Mar 18, 2018 at 9:36 PM, Deepak Sharma > > wrote:
>> The other approach would to write to temp table and then merge the data.
>> But this may be expensive solution.
>> 
>> Thanks
>> Deepak
>> 
>> On Mon, Mar 19, 2018, 08:04 Gurusamy Thirupathy > > wrote:
>> Hi,
>> 
>> I am trying to read data from Hive as DataFrame, then trying to write the DF 
>> into the Oracle data base. In this case, the date field/column in hive is 
>> with Type Varchar(20)
>> but the corresponding column type in Oracle is Date. While reading from hive 
>> , the hive table names are dynamically decided(read from another table) 
>> based on some job condition(ex. Job1). There are multiple tables like this, 
>> so column and the table names are decided only run time. So I can't do type 
>> conversion explicitly when read from Hive.
>> 
>> So is there any utility/api available in Spark to achieve this conversion 
>> issue?
>> 
>> 
>> Thanks,
>> Guru
>> 
>> 
>> 
>> -- 
>> Thanks,
>> Guru
> 
> 
> 
> -- 
> Thanks,
> Guru



Re: [Structured Streaming] More than 1 streaming in a code

2018-04-13 Thread spark receiver
Hi Panagiotis ,

Wondering you solved the problem or not? Coz I met the same issue today. I’d 
appreciate  so much if you could paste the code snippet  if it’s working .

Thanks.


> 在 2018年4月6日,上午7:40,Aakash Basu  写道:
> 
> Hi Panagiotis,
> 
> I did that, but it still prints the result of the first query and awaits for 
> new data, doesn't even goes to the next one.
> 
> Data -
> 
> $ nc -lk 9998
> 
> 1,2
> 3,4
> 5,6
> 7,8
> 
> Result -
> 
> ---
> Batch: 0
> ---
> ++
> |aver|
> ++
> | 3.0|
> ++
> 
> ---
> Batch: 1
> ---
> ++
> |aver|
> ++
> | 4.0|
> ++
> 
> 
> Updated Code -
> from pyspark.sql import SparkSession
> from pyspark.sql.functions import split
> 
> spark = SparkSession \
> .builder \
> .appName("StructuredNetworkWordCount") \
> .getOrCreate()
> 
> data = spark \
> .readStream \
> .format("socket") \
> .option("header","true") \
> .option("host", "localhost") \
> .option("port", 9998) \
> .load("csv")
> 
> 
> id_DF = data.select(split(data.value, ",").getItem(0).alias("col1"), 
> split(data.value, ",").getItem(1).alias("col2"))
> 
> id_DF.createOrReplaceTempView("ds")
> 
> df = spark.sql("select avg(col1) as aver from ds")
> 
> df.createOrReplaceTempView("abcd")
> 
> wordCounts = spark.sql("Select col1, col2, col2/(select aver from abcd) col3 
> from ds")  # (select aver from abcd)
> 
> query2 = df \
> .writeStream \
> .format("console") \
> .outputMode("complete") \
> .trigger(processingTime='5 seconds') \
> .start()
> 
> query = wordCounts \
> .writeStream \
> .format("console") \
> .trigger(processingTime='5 seconds') \
> .start()
> 
> spark.streams.awaitAnyTermination()
> 
> 
> Thanks,
> Aakash.
> 
> On Fri, Apr 6, 2018 at 4:18 PM, Panagiotis Garefalakis  > wrote:
> Hello Aakash,
> 
> When you use query.awaitTermination you are pretty much blocking there 
> waiting for the current query to stop or throw an exception. In your case the 
> second query will not even start.
> What you could do instead is remove all the blocking calls and use 
> spark.streams.awaitAnyTermination instead (waiting for either query1 or 
> query2 to terminate). Make sure you do that after the query2.start call.
> 
> I hope this helps.
> 
> Cheers,
> Panagiotis
> 
> On Fri, Apr 6, 2018 at 11:23 AM, Aakash Basu  > wrote:
> Any help?
> 
> Need urgent help. Someone please clarify the doubt?
> 
> -- Forwarded message --
> From: Aakash Basu  >
> Date: Thu, Apr 5, 2018 at 3:18 PM
> Subject: [Structured Streaming] More than 1 streaming in a code
> To: user mailto:user@spark.apache.org>>
> 
> 
> Hi,
> 
> If I have more than one writeStream in a code, which operates on the same 
> readStream data, why does it produce only the first writeStream? I want the 
> second one to be also printed on the console.
> 
> How to do that?
> 
> from pyspark.sql import SparkSession
> from pyspark.sql.functions import split, col
> 
> class test:
> 
> 
> spark = SparkSession.builder \
> .appName("Stream_Col_Oper_Spark") \
> .getOrCreate()
> 
> data = spark.readStream.format("kafka") \
> .option("startingOffsets", "latest") \
> .option("kafka.bootstrap.servers", "localhost:9092") \
> .option("subscribe", "test1") \
> .load()
> 
> ID = data.select('value') \
> .withColumn('value', data.value.cast("string")) \
> .withColumn("Col1", split(col("value"), ",").getItem(0)) \
> .withColumn("Col2", split(col("value"), ",").getItem(1)) \
> .drop('value')
> 
> ID.createOrReplaceTempView("transformed_Stream_DF")
> 
> df = spark.sql("select avg(col1) as aver from transformed_Stream_DF")
> 
> df.createOrReplaceTempView("abcd")
> 
> wordCounts = spark.sql("Select col1, col2, col2/(select aver from abcd) 
> col3 from transformed_Stream_DF")
> 
> 
> # ---#
> 
> query1 = df \
> .writeStream \
> .format("console") \
> .outputMode("complete") \
> .trigger(processingTime='3 seconds') \
> .start()
> 
> query1.awaitTermination()
> # ---#
> 
> query2 = wordCounts \
> .writeStream \
> .format("console") \
> .trigger(processingTime='3 seconds') \
> .start()
> 
> query2.awaitTermination()
> 
> # /home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/bin/spark-submit 
> --packages 
> org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0,com.databricks:spark-csv_2.10:1.0.3
>  
> /home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Col_Oper_Spark.py
> 
> 
> 
> Thanks,
> Aakash.
> 
> 
> 



Re: Reload some static data during struct streaming

2017-11-13 Thread spark receiver
I need it cached to improve throughput ,only hope it can be refreshed once a 
day not every batch.


> On Nov 13, 2017, at 4:49 PM, Burak Yavuz  wrote:
> 
> I think if you don't cache the jdbc table, then it should auto-refresh.
> 
> On Mon, Nov 13, 2017 at 1:21 PM, spark receiver  <mailto:spark.recei...@gmail.com>> wrote:
> Hi 
> 
> I’m using struct streaming(spark 2.2)  to receive Kafka msg ,it works great. 
> The thing is I need to join the Kafka message with a relative static table 
> stored in mysql database (let’s call it metadata here).
> 
> So is it possible to reload the metadata table after some time interval(like 
> daily ) without restart running struct streaming?
> 
> Snippet code as following :
> // df_meta contains important information to join with the dataframe read 
> from kafka
> val df_meta = spark.read.format("jdbc").option("url", 
> mysql_url).option("dbtable", "v_entity_ap_rel").load()
> df_meta.cache()
> val df = spark.readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", 
> “x.x.x.x:9092").option("fetch.message.max.bytes", 
> "5000").option("kafka.max.partition.fetch.bytes", "5000")
>   .option("subscribe", "rawdb.raw_data")
>   .option("failOnDataLoss", true)
>   .option("startingOffsets", "latest")
>   .load()
>   .select($"value".as[Array[Byte]])
>   .map(avroDeserialize(_))
>   .as[ApRawData].select("APMAC", "RSSI", "sourceMacAddress", "updatingTime")
>   .join(df_meta.as <http://df_meta.as/>("b"), $"a.apmac" === $"b.apmac”)
> 
> df.selectExpr("ENTITYID", "CLIENTMAC", "STIME", "case when a.rrssi>=b.rssi 
> then '1' when a.rrssi < b.nearbyrssi then '3' else '2' end FLAG", 
> "substring(stime,1,13) STIME_HOUR")
>   .distinct().writeStream.format("parquet").partitionBy("STIME_HOUR")
>   .option("checkpointLocation", 
> "/user/root/t_cf_table_chpt").trigger(ProcessingTime("5 minutes"))
>   .start("T_CF_TABLE")
>   .awaitTermination()
> 
> Mason
> 



Reload some static data during struct streaming

2017-11-13 Thread spark receiver
Hi 

I’m using struct streaming(spark 2.2)  to receive Kafka msg ,it works great. 
The thing is I need to join the Kafka message with a relative static table 
stored in mysql database (let’s call it metadata here).

So is it possible to reload the metadata table after some time interval(like 
daily ) without restart running struct streaming?

Snippet code as following :
// df_meta contains important information to join with the dataframe read from 
kafka
val df_meta = spark.read.format("jdbc").option("url", 
mysql_url).option("dbtable", "v_entity_ap_rel").load()
df_meta.cache()
val df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", 
“x.x.x.x:9092").option("fetch.message.max.bytes", 
"5000").option("kafka.max.partition.fetch.bytes", "5000")
  .option("subscribe", "rawdb.raw_data")
  .option("failOnDataLoss", true)
  .option("startingOffsets", "latest")
  .load()
  .select($"value".as[Array[Byte]])
  .map(avroDeserialize(_))
  .as[ApRawData].select("APMAC", "RSSI", "sourceMacAddress", "updatingTime")
  .join(df_meta.as("b"), $"a.apmac" === $"b.apmac”)

df.selectExpr("ENTITYID", "CLIENTMAC", "STIME", "case when a.rrssi>=b.rssi then 
'1' when a.rrssi < b.nearbyrssi then '3' else '2' end FLAG", 
"substring(stime,1,13) STIME_HOUR")
  .distinct().writeStream.format("parquet").partitionBy("STIME_HOUR")
  .option("checkpointLocation", 
"/user/root/t_cf_table_chpt").trigger(ProcessingTime("5 minutes"))
  .start("T_CF_TABLE")
  .awaitTermination()

Mason