Re: SSH Tunneling issue with Apache Spark

2023-12-06 Thread Nicholas Chammas
This is not a question for the dev list. Moving dev to bcc.

One thing I would try is to connect to this database using JDBC + SSH tunnel, 
but without Spark. That way you can focus on getting the JDBC connection to 
work without Spark complicating the picture for you.


> On Dec 5, 2023, at 8:12 PM, Venkatesan Muniappan 
>  wrote:
> 
> Hi Team,
> 
> I am facing an issue with SSH Tunneling in Apache Spark. The behavior is same 
> as the one in this Stackoverflow question 
> 
>  but there are no answers there.
> 
> This is what I am trying:
> 
> 
> with SSHTunnelForwarder(
> (ssh_host, ssh_port),
> ssh_username=ssh_user,
> ssh_pkey=ssh_key_file,
> remote_bind_address=(sql_hostname, sql_port),
> local_bind_address=(local_host_ip_address, sql_port)) as tunnel:
> tunnel.local_bind_port
> b1_semester_df = spark.read \
> .format("jdbc") \
> .option("url", b2b_mysql_url.replace("<>", 
> str(tunnel.local_bind_port))) \
> .option("query", b1_semester_sql) \
> .option("database", 'b2b') \
> .option("password", b2b_mysql_password) \
> .option("driver", "com.mysql.cj.jdbc.Driver") \
> .load()
> b1_semester_df.count()
> 
> Here, the b1_semester_df is loaded but when I try count on the same Df it 
> fails saying this
> 
> 23/12/05 11:49:17 ERROR TaskSetManager: Task 0 in stage 2.0 failed 4 times; 
> aborting job
> Traceback (most recent call last):
>   File "", line 1, in 
>   File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 382, in show
> print(self._jdf.showString(n, 20, vertical))
>   File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", 
> line 1257, in __call__
>   File "/usr/lib/spark/python/pyspark/sql/utils.py", line 63, in deco
> return f(*a, **kw)
>   File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 
> 328, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling o284.showString.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 
> in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 
> (TID 11, ip-172-32-108-1.eu-central-1.compute.internal, executor 3): 
> com.mysql.cj.jdbc.exceptions.CommunicationsException: Communications link 
> failure
> 
> However, the same is working fine with pandas df. I have tried this below and 
> it worked.
> 
> 
> with SSHTunnelForwarder(
> (ssh_host, ssh_port),
> ssh_username=ssh_user,
> ssh_pkey=ssh_key_file,
> remote_bind_address=(sql_hostname, sql_port)) as tunnel:
> conn = pymysql.connect(host=local_host_ip_address, user=sql_username,
>passwd=sql_password, db=sql_main_database,
>port=tunnel.local_bind_port)
> df = pd.read_sql_query(b1_semester_sql, conn)
> spark.createDataFrame(df).createOrReplaceTempView("b1_semester")
> 
> So wanted to check what I am missing with my Spark usage. Please help.
> 
> Thanks,
> Venkat
> 



SSH Tunneling issue with Apache Spark

2023-12-05 Thread Venkatesan Muniappan
Hi Team,

I am facing an issue with SSH Tunneling in Apache Spark. The behavior is
same as the one in this Stackoverflow question

but there are no answers there.

This is what I am trying:


with SSHTunnelForwarder(
(ssh_host, ssh_port),
ssh_username=ssh_user,
ssh_pkey=ssh_key_file,
remote_bind_address=(sql_hostname, sql_port),
local_bind_address=(local_host_ip_address, sql_port)) as tunnel:
tunnel.local_bind_port
b1_semester_df = spark.read \
.format("jdbc") \
.option("url", b2b_mysql_url.replace("<>",
str(tunnel.local_bind_port)))
\
.option("query", b1_semester_sql) \
.option("database", 'b2b') \
.option("password", b2b_mysql_password) \
.option("driver", "com.mysql.cj.jdbc.Driver") \
.load()
b1_semester_df.count()

Here, the b1_semester_df is loaded but when I try count on the same Df it
fails saying this

23/12/05 11:49:17 ERROR TaskSetManager: Task 0 in stage 2.0 failed 4 times;
aborting job

Traceback (most recent call last):

  File "", line 1, in 

  File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 382, in show

print(self._jdf.showString(n, 20, vertical))

  File
"/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line
1257, in __call__

  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 63, in deco

return f(*a, **kw)

  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py",
line 328, in get_return_value

py4j.protocol.Py4JJavaError: An error occurred while calling
o284.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage
2.0 (TID 11, ip-172-32-108-1.eu-central-1.compute.internal, executor 3):
com.mysql.cj.jdbc.exceptions.CommunicationsException: Communications link
failure

However, the same is working fine with pandas df. I have tried this below
and it worked.


with SSHTunnelForwarder(
(ssh_host, ssh_port),
ssh_username=ssh_user,
ssh_pkey=ssh_key_file,
remote_bind_address=(sql_hostname, sql_port)) as tunnel:
conn = pymysql.connect(host=local_host_ip_address, user=sql_username,
passwd=sql_password, db=sql_main_database,
port=tunnel.local_bind_port)
df = pd.read_sql_query(b1_semester_sql, conn)
spark.createDataFrame(df).createOrReplaceTempView("b1_semester")

So wanted to check what I am missing with my Spark usage. Please help.

*Thanks,*
*Venkat*