Re: How to handle auto-restart in Kubernetes Spark application

2021-05-02 Thread Ali Gouta
Hello,

Better to ask your question on the spark operator github and not on this
mailing list. For the answer, try:

type: Always

Best regards,
Ali Gouta.

On Sun, May 2, 2021 at 6:15 PM Sachit Murarka 
wrote:

> Hi All,
>
> I am using Spark with Kubernetes, Can anyone please tell me how I can
> handle restarting failed Spark jobs?
>
> I have used following property but it is not working
>
>  restartPolicy:
> type: OnFailure
>
> Kind Regards,
> Sachit Murarka
>


How to handle auto-restart in Kubernetes Spark application

2021-05-02 Thread Sachit Murarka
Hi All,

I am using Spark with Kubernetes, Can anyone please tell me how I can
handle restarting failed Spark jobs?

I have used following property but it is not working

 restartPolicy:
type: OnFailure

Kind Regards,
Sachit Murarka


Re: Spark JDBC errors out

2021-05-02 Thread Farhan Misarwala
Thanks, Mich,

I have been using the JDBC source with MySQL & Postgres drivers in
production for almost 4 years now. The error looked a bit weird and what I
meant to ask was am I doing it right? As you mentioned, I will also check
with the developers of the driver if they have anything to say about this.
Thanks for looking into it :)

Regards,
Farhan.

On Fri, Apr 30, 2021 at 7:01 PM Mich Talebzadeh 
wrote:

> Hi Farhan,
>
> I have used it successfully and it works. The only thing that potentially
> can cause this issue is the jdbc driver itself. Have  you tried another
> jdbc driver like progress direct etc. Most of these defects are related to
> jdbc driver itself!
>
> HHT,
>
> Mich
>
> On Fri, 30 Apr 2021 at 13:49, Farhan Misarwala 
> wrote:
>
>> Hi Mich,
>>
>> I have tried this already. I am using the same methods you are using in
>> my Java code. I see the same error, where 'dbtable' or 'query' gets added
>> as a connection property in the JDBC connection string for the source db,
>> which is AAS in my case.
>>
>>
>>
>> Thanks,
>>
>> Farhan.
>>
>>
>> On Fri, Apr 30, 2021 at 3:07 PM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> If you are using Spark JDBC connection then you can do the following
>>> generic JDBC from PySpark say. that tablename could be sql query as well
>>> (select col1, col2 from )
>>>
>>> ## load from database
>>> def loadTableFromJDBC(spark, url, tableName, user, password, driver,
>>> fetchsize):
>>> try:
>>>house_df = spark.read. \
>>> format("jdbc"). \
>>> option("url", url). \
>>> option("dbtable", tableName). \
>>> option("user", user). \
>>> option("password", password). \
>>> option("driver", driver). \
>>> option("fetchsize", fetchsize). \
>>> load()
>>>return house_df
>>> except Exception as e:
>>> print(f"""{e}, quitting""")
>>> sys.exit(1)
>>>
>>> ## write to database
>>> def writeTableWithJDBC(dataFrame, url, tableName, user, password,
>>> driver, mode):
>>> try:
>>> dataFrame. \
>>> write. \
>>> format("jdbc"). \
>>> option("url", url). \
>>> option("dbtable", tableName). \
>>> option("user", user). \
>>> option("password", password). \
>>> option("driver", driver). \
>>> mode(mode). \
>>> save()
>>> except Exception as e:
>>> print(f"""{e}, quitting""")
>>> sys.exit(1)
>>>
>>> I don't know about AWS, but this will allow you to connect to Google
>>> BigQuery. Check AWS documentation
>>>
>>>
>>> def loadTableFromBQ(spark,dataset,tableName):
>>> try:
>>> df = spark.read. \
>>> format("bigquery"). \
>>> option("credentialsFile",
>>> config['GCPVariables']['jsonKeyFile']). \
>>> option("dataset", dataset). \
>>> option("table", tableName). \
>>> load()
>>> return df
>>> except Exception as e:
>>> print(f"""{e}, quitting""")
>>> sys.exit(1)
>>>
>>> def writeTableToBQ(dataFrame,mode,dataset,tableName):
>>> try:
>>> dataFrame. \
>>> write. \
>>> format("bigquery"). \
>>> option("credentialsFile",
>>> config['GCPVariables']['jsonKeyFile']). \
>>> mode(mode). \
>>> option("dataset", dataset). \
>>> option("table", tableName). \
>>> save()
>>> except Exception as e:
>>> print(f"""{e}, quitting""")
>>> sys.exit(1)
>>>
>>> HTH
>>>
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Fri, 30 Apr 2021 at 09:32, Farhan Misarwala <
>>> farhan.misarw...@gmail.com> wrote:
>>>
 Hi All,

 I am trying to read data from 'Azure Analysis Services' using CData's
 AAS JDBC driver
 .

 I am doing spark.read().jdbc(jdbcURL, aasQuery, connectionProperties)

 This errors out as the driver is throwing an exception saying there's
 no 'dbtable' as a connection property. This means spark is trying to set
 'dbtable' as a connection property on the driver, instead of creating a
 connection and then executing the query I specify in the options 'dbtable'
 or 'query'.

 Usually, in my non-spark/vanilla java code, I use
 java.sql.DriverManager to create a connection and then execute a query
 successfully using execute() from java.sql.Statement.


Re: Delivery Status Notification (Failure)

2021-05-02 Thread Mich Talebzadeh
Part 2

In this case, we are simply counting the number of rows to be ingested once
before SSS terminates. This is shown in the above method


batchId is 0

 Total records processed in this run = 3107

wrote to DB

So it shows batchId (0) and the total records count() and writes to
BigQuery table and terminates

wait and start again, it should pickup from the next batchId

batchId is 1
 Total records processed in this run = 80
wrote to DB
hduser@rhes76: /home/hduser/dba/bin/python/DSBQ/src>

*What checkpoint directory has*

 /mnt/gs/prices/chkpt> ltr
total 1
-rw-r--r--. 1 hduser hadoop 45 May  2 09:35 metadata
drwxr-xr-x. 1 hduser hadoop  0 May  2 09:35 offsets
drwxr-xr-x. 1 hduser hadoop  0 May  2 09:35 commits
drwxr-xr-x. 1 hduser hadoop  0 May  2 09:35 sources

cat metadata
{"id":"cc3a9459-2a9d-4740-a280-e5b5d333d098"}

cd offsets/
/mnt/gs/prices/chkpt/offsets> ltr
total 2
-rw-r--r--. 1 hduser hadoop 529 May  2 09:35 0
-rw-r--r--. 1 hduser hadoop 529 May  2 09:39 1

> cat 0
v1
{"batchWatermarkMs":0,"batchTimestampMs":1619944526698,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
{"logOffset":0}

cat 1
v1
{"batchWatermarkMs":0,"batchTimestampMs":1619944796208,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
{"logOffset":1}


So there are two offsets for each run (0,1) with different
"batchTimestampMs", namely "batchTimestampMs":1619944526698 and
 "batchTimestampMs":1619944796208 respectively


*How to trigger SSS for each run*


The SSS job can be triggered in many ways. Can use simple cron on prem,
autoSys, Airflow on prem or composer in cloud.


If there is nothing in the queue (source stopped say), SSS will come back
with


DataFrame is empty


and terminates. This logic is already built in the sendToSink() method.


*Conclusion*


I am not sure not running compute with SSS is going to save a lot. Surely
compute process will be run as needed and that saves some dollars but the
whole infra-structure has to be there and the lion cost goes there. If the
idea is to run CDC once or twice a day, then it is equally fine to schedule
SSS to start at certain intervals. The import thing to realise is that SSS
will pickup from the records left through the checkpoint directory. If
checkpoint directory is lost or content deleted, SSS will process all the
records from batchId 0.


HTH,


Mich



   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sun, 2 May 2021 at 10:45, Mich Talebzadeh 
wrote:

> This message is in two parts
>
> Hi,
>
> I did some tests on these. The idea being running  Spark Structured
> Streaming (SSS) on a collection of records since the last run of SSS and
> shutdown SSS job.
>
> Some parts of this approach has been described in the following databricks
> blog
>
> Running Streaming Jobs Once a Day For 10x Cost Savings
> 
> However, real life is more complicated than that.
>
> Let us look at a typical example as depicted in my lousy diagram attached
>
> sources (trail files) --> Kafka --> Flume --> write to Cloud storage
> (mounted locally) --> SSS --> BigQuery
>
>
> What I have in here is a typical example of *trail files* produced by
> source. This could be some CDC like Oracle Golden Gate sending
> committed logs or anything that writes to files. We use Kafka to ingest
> these files and we use Apache flume
> 
>  to
> move these files onto Google Cloud Storage (gs:// ) *mounted as a local
> file system* in the edge node through Cloud Storage Fuse
> 
>
>
> The advantage of these storage types is that both on-premise and cloud

Re: Delivery Status Notification (Failure)

2021-05-02 Thread Mich Talebzadeh
This message is in two parts

Hi,

I did some tests on these. The idea being running  Spark Structured
Streaming (SSS) on a collection of records since the last run of SSS and
shutdown SSS job.

Some parts of this approach has been described in the following databricks
blog

Running Streaming Jobs Once a Day For 10x Cost Savings

However, real life is more complicated than that.

Let us look at a typical example as depicted in my lousy diagram attached

sources (trail files) --> Kafka --> Flume --> write to Cloud storage
(mounted locally) --> SSS --> BigQuery


What I have in here is a typical example of *trail files* produced by
source. This could be some CDC like Oracle Golden Gate sending
committed logs or anything that writes to files. We use Kafka to ingest
these files and we use Apache flume

to
move these files onto Google Cloud Storage (gs:// ) *mounted as a local
file system* in the edge node through Cloud Storage Fuse



The advantage of these storage types is that both on-premise and cloud
applications can take care of it.


For on-premise the mount point would be say data_path =
"file:///mnt/gs/prices/data/" where gs://etcbucket is mounted as /mnt/gs


For cloud reference it would be data_path = "etcbucket/prices/data/"


We use Flume's file_roll type for this storage


This is an event driven architecture and our interest is to process these
trail files through SSS at a predetermined interval or when needed.
However, the caveat is that the volume should be containable meaning SSS
can process it as required. The code base in PySpark is as follows:


  data_path = "etcbucket/prices/data/"

  checkpoint_path = "etcbucket/prices/chkpt/"

try:

streamingDataFrame = self.spark \

.readStream \

.option('newFilesOnly', 'true') \

.option('header', 'true') \

.option('maxFilesPerTrigger', 1) \

.option('latestFirst', 'false') \

.text(data_path) \

.select(from_json(col("value").cast("string"),
schema).alias("parsed_value"))


streamingDataFrame.printSchema()

result = streamingDataFrame.select( \

 col("parsed_value.rowkey").alias("rowkey") \

   , col("parsed_value.ticker").alias("ticker") \

   , col("parsed_value.timeissued").alias("timeissued") \

   , col("parsed_value.price").alias("price")). \

 writeStream. \

 outputMode('append'). \

 option("truncate", "false"). \

 *foreachBatch(sendToSink). \*

 queryName('trailFiles'). \

   *  trigger(once = True). \*

  *   option('checkpointLocation', checkpoint_path). \*

 start(data_path)

except Exception as e:

print(f"""{e}, quitting""")

sys.exit(1)

def sendToSink(df, batchId):
if(len(df.take(1))) > 0:
print(f"""batchId is {batchId}""")
df.show(100,False)
df. persist()
# write to BigQuery batch table
s.writeTableToBQ(df, "append",
config['MDVariables']['targetDataset'],config['MDVariables']['targetTable'])
df.unpersist()
print(f"""wrote to DB""")
else:
print("DataFrame is empty")



   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sun, 2 May 2021 at 10:42, Mail Delivery Subsystem <
mailer-dae...@googlemail.com> wrote:

> [image: Error Icon]
> Message too large
> Your message couldn't be delivered to *user@spark.apache.org* because it
> exceeds the size limit. Try reducing the message size and then resending
> it.
> The response from the remote server was:
>
> 552 5.3.4 Message size exceeds fixed limit
>
>
>
> -- Forwarded message --
> From: Mich Talebzadeh 
> To:
> Cc: user 
> Bcc:
> Date: Sun, 2 May 2021 10:42:09 +0100
> Subject: Re: Spark Streaming with Files
> - Message truncated -