unsubscribe

2024-04-25 Thread Code Tutelage



Re:RE: How to add MaxDOP option in spark mssql JDBC

2024-04-25 Thread Elite
Thank you.

My main purpose is pass "MaxDop 1" to MSSQL to control the CPU usage. From the 
offical doc, I guess the problem of my codes is spark wrap the query to 




select * from (SELECT TOP 10 * FROM dbo.Demo with (nolock) WHERE Id = 1 option 
(maxdop 1)) spark_gen_alias




Apparently, this valilate MSSQL syntax, because "option (maxdop 1)" is not 
placed at the end.

May I know,  how spark wrap the query if I use prepareQuery?

I do not have spark 3.4+ env now, so did not got a chance to try this option.







At 2024-04-24 20:51:45, "Appel, Kevin"  wrote:

You might be able to leverage the prepareQuery option, that is at 
https://spark.apache.org/docs/3.5.1/sql-data-sources-jdbc.html#data-source-option
 … this was introduced in Spark 3.4.0 to handle temp table query and CTE query 
against MSSQL server since what you send in is not actually what gets sent, 
there is some items that get wrapped.

 

There is more of the technical info in 
https://issues.apache.org/jira/browse/SPARK-37259 with the PR’s linked that had 
the fix done for this

 

 

From: Elite 
Sent: Tuesday, April 23, 2024 10:28 PM
To: user@spark.apache.org
Subject: How to add MaxDOP option in spark mssql JDBC

 

[QUESTION] How to pass MAXDOP option · Issue #2395 · microsoft/mssql-jdbc 
(github.com)

 

Hi team, 

 

I am suggested to require help form spark community.

 

We suspect spark rewerite the query before pass to ms sql, and it lead to 
syntax error.

Is there any work around to let make my codes work? 

 

spark.read()
.format("jdbc")
.option("driver","com.microsoft.sqlserver.jdbc.SQLServerDriver")
.option("url", "jdbc:sqlserver://xxx.database.windows.net;databaseName=")
.option("query", "SELECT TOP 10 * FROM dbo.Demo with (nolock) WHERE Id = 1 
option (maxdop 1)")
.load()
.show();

com.microsoft.sqlserver.jdbc.SQLServerException: Incorrect syntax near the 
keyword 'option'.
at 
com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:270)
at 
com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResult(SQLServerStatement.java:1778)
at 
com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.doExecutePreparedStatement(SQLServerPreparedStatement.java:697)
at 
com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement$PrepStmtExecCmd.doExecute(SQLServerPreparedStatement.java:616)
at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7775)
at 
com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(SQLServerConnection.java:4397)
at 
com.microsoft.sqlserver.jdbc.SQLServerStatement.executeCommand(SQLServerStatement.java:293)
at 
com.microsoft.sqlserver.jdbc.SQLServerStatement.executeStatement(SQLServerStatement.java:263)
at 
com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.executeQuery(SQLServerPreparedStatement.java:531)
at 
org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:61)
at 
org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation$.getSchema(JDBCRelation.scala:226)
at 
org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:35)
at 
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:344)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:297)
at 
org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:286)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:286)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:221)

This message, and any attachment(s), is for the intended recipient(s) only, may 
contain information that is privileged, confidential and/or proprietary and 
subject to important terms and conditions available at 
http://www.bankofamerica.com/electronic-disclaimer. If you are not the intended 
recipient, please delete this message. For more information about how Bank of 
America protects your privacy, including specific rights that may apply, please 
visit the following pages: 
https://business.bofa.com/en-us/content/global-privacy-notices.html (which 
includes global privacy notices) and 
https://www.bankofamerica.com/security-center/privacy-overview/ (which includes 
US State specific privacy notices such as the 
http://www.bankofamerica.com/ccpa-notice).


Re: [spark-graphframes]: Generating incorrect edges

2024-04-25 Thread Nijland, J.G.W. (Jelle, Student M-CS)
Hi Mich,

Thanks for your suggestions.
1) It currently runs on one server with plenty of resources assigned. But I 
will keep it in mind to replace monotonically_increasing_id() with uuid() once 
we scale up.
2) I have replaced the null values in origin with a string 
{prefix}-{mnt_by}-{organisation}

replacement_string = psf.concat_ws("-", psf.col("prefix"), psf.col("mnt_by"), 
psf.col("descr"))
df = df.withColumn("origin", psf.coalesce(psf.col("origin"), 
replacement_string))

I have verified my other columns have no Null values.

3) This is my logic how i generate IDs

mnt_by_id = df.select(MNT_BY).distinct().withColumn(MAINTAINER_ID, 
psf.concat(psf.lit('m_'), psf.monotonically_increasing_id()))
prefix_id = df.select(PREFIX).distinct().withColumn(PREFIX_ID, 
psf.concat(psf.lit('p_'), psf.monotonically_increasing_id()))
origin_id = df.select(ORIGIN).distinct().withColumn(ORIGIN_ID, 
psf.concat(psf.lit('o_'), psf.monotonically_increasing_id()))
organisation_id = df.select(DESCR).distinct().withColumn(ORGANISATION_ID, 
psf.concat(psf.lit('org_'), psf.monotonically_increasing_id()))

df = df.join(mnt_by_id, on=MNT_BY, how="left").join(prefix_id, on=PREFIX, 
how="left").join(origin_id, on=ORIGIN, how="left").join(organisation_id, 
on=DESCR, how="left")

I create the ID using the distinct values in the columns "mnt_by", "prefix", 
"origin" and "descr". The same columns I join "on".

4) This is my current resource allocation, I run it on the server of my 
university.
It has 112 cores and 1.48T ram, I can request more resources but in my eyes 
this sound be plenty.
If you think more resource would help, I will ask them.

spark_conf = SparkConf().setAppName(f"pyspark-{APP_NAME}-{int(time())}").set(
"spark.submit.deployMode", "client"
).set("spark.sql.parquet.binaryAsString", "true"
).set("spark.driver.bindAddress", "localhost"
).set("spark.driver.host", "127.0.0.1"
# ).set("spark.driver.port", "0"
).set("spark.ui.port", "4041"
).set("spark.executor.instances", "1"
).set("spark.executor.cores", "50"
).set("spark.executor.memory", "128G"
).set("spark.executor.memoryOverhead", "32G"
).set("spark.driver.cores", "16"
).set("spark.driver.memory", "64G"
)

I dont think b) applies as its a single machine.

Kind regards,
Jelle


From: Mich Talebzadeh 
Sent: Wednesday, April 24, 2024 6:12 PM
To: Nijland, J.G.W. (Jelle, Student M-CS) 
Cc: user@spark.apache.org 
Subject: Re: [spark-graphframes]: Generating incorrect edges

OK let us have a look at these

1) You are using monotonically_increasing_id(), which is not 
collision-resistant in distributed environments like Spark. Multiple hosts
   can generate the same ID. I suggest switching to UUIDs (e.g., uuid.uuid4()) 
for guaranteed uniqueness.

2) Missing values in the Origin column lead to null IDs, potentially causing 
problems downstream. You can handle missing values appropriately, say
   a) Filter out rows with missing origins or b) impute missing values with a 
strategy that preserves relationships (if applicable).

3) With join code, you mentioned left joining on the same column used for ID 
creation, not very clear!

4) Edge Issue, it appears to me the issue seems to occur with larger datasets 
(>100K records). Possible causes could be
   a) Resource Constraints as data size increases, PySpark might struggle with 
joins or computations if resources are limited (memory, CPU).
   b) Data Skew: Uneven distribution of values in certain columns could lead to 
imbalanced processing across machines.  Check Spark UI (4040) on staging and 
execution tabs

HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
London
United Kingdom


 
[https://ci3.googleusercontent.com/mail-sig/AIorK4zholKucR2Q9yMrKbHNn-o1TuS4mYXyi2KO6Xmx6ikHPySa9MLaLZ8t2hrA6AUcxSxDgHIwmKE]
   view my Linkedin 
profile


 https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: The information provided is correct to the best of my knowledge but 
of course cannot be guaranteed . It is essential to note that, as with any 
advice, quote "one test result is worth one-thousand expert opinions (Werner 
 Von 
Braun)".


On Wed, 24 Apr 2024 at 16:44, Nijland, J.G.W. (Jelle, Student M-CS) 
mailto:j.g.w.nijl...@student.utwente.nl>> 
wrote:
Hi Mich,

Thanks for your reply,
1) ID generation is done using 
monotonically_increasing_id()
 this is then prefixed with "p_", "m_", "o_" or "org_" depending on the type of 
the value it identifies.
2) There are some missing values in the Origin column, these will result in a 
Null ID
3) The join code is present in [1], I join "left" on the same column I create 
the ID on
4) I dont think the issue is in ID or edge