Re: Do we need partitioning while loading data from JDBC sources?

2024-06-06 Thread Perez
Also can I take my lower bound starting from 1 or is it index?

On Thu, Jun 6, 2024 at 8:42 PM Perez  wrote:

> Thanks again Mich. It gives the clear picture but I have again couple of
> doubts:
>
> 1) I know that there will be multiple threads that will be executed with
> 10 segment sizes each until the upper bound is reached but I didn't get
> this part of the code exactly segments = [(i, min(i + segment_size,
> upper_bound)) for i in range(lower_bound, upper_bound, segment_size)]
>
> 2) Also performing union on these small dataframes won't impact
> performance right? since spark has to shuffle and combine less data from
> these dataframes?
>
>
> On Thu, Jun 6, 2024 at 3:53 PM Mich Talebzadeh 
> wrote:
>
>> well you can dynamically determine the upper bound by first querying the
>> database to find the maximum value of the partition column and use it as
>> the upper bound for your partitioning logic.
>>
>> def get_max_value(spark, mongo_config, column_name):
>> max_value_df =
>> spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load()
>> max_value = max_value_df.agg({column_name: "max"}).collect()[0][0]
>> return max_value
>>
>> # Define your MongoDB config without the bounds first
>> mongo_config_base = {
>> "uri": "mongodb://username:password@host:port/database.collection",
>> "partitionColumn": "_id"
>> }
>>
>> # Fetch the dynamic upper bound
>> upper_bound = get_max_value(spark, mongo_config_base, "_id")
>>
>> # Define your segment size
>> segment_size = 10
>> lower_bound = 0
>> segments = [(i, min(i + segment_size, upper_bound)) for i in
>> range(lower_bound, upper_bound, segment_size)]
>>
>> Then you need to aggregate DF from multiple threads When loading data in
>> parallel, each thread will load a segment of data into its own DataFrame.
>> To aggregate all these DataFrames into a single DataFrame, you can use t*he
>> union method in PySpark.*
>>
>> from concurrent.futures import ThreadPoolExecutor, as_completed
>> from pyspark.sql import SparkSession
>>
>> def extract_data_from_mongodb(mongo_config):
>> df =
>> spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load()
>> return df
>>
>> # Function to get the maximum value of the partition column
>> def get_max_value(spark, mongo_config, column_name):
>> max_value_df =
>> spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load()
>> max_value = max_value_df.agg({column_name: "max"}).collect()[0][0]
>> return max_value
>>
>> # MongoDB configuration without bounds
>> mongo_config_base = {
>> "uri": "mongodb://username:password@host:port/database.collection",
>> "partitionColumn": "_id"
>> }
>>
>> # Initialize Spark session
>> spark = SparkSession.builder \
>> .appName("MongoDBDataLoad") \
>> .config("spark.mongodb.input.uri", 
>> "mongodb://username:password@host:port/database.collection")
>> \
>> .getOrCreate()
>>
>> # Fetch the dynamic upper bound
>> upper_bound = get_max_value(spark, mongo_config_base, "_id")
>>
>> # Define your segment size
>> segment_size = 10
>> lower_bound = 0
>> segments = [(i, min(i + segment_size, upper_bound)) for i in
>> range(lower_bound, upper_bound, segment_size)]
>>
>> # Function to load a segment
>> def load_segment(segment):
>> segment_lower_bound, segment_upper_bound = segment
>> mongo_config = mongo_config_base.copy()
>> mongo_config["lowerBound"] = str(segment_lower_bound)
>> mongo_config["upperBound"] = str(segment_upper_bound)
>> return extract_data_from_mongodb(mongo_config)
>>
>> # Collect all DataFrames from threads
>> all_dfs = []
>>
>> with ThreadPoolExecutor() as executor:
>> futures = [executor.submit(load_segment, segment) for segment in
>> segments]
>> for future in as_completed(futures):
>> try:
>> df_segment = future.result()
>> all_dfs.append(df_segment)
>> except Exception as e:
>> print(f"Error: {e}")
>>
>> # Union all DataFrames into a single DataFrame
>> if all_dfs:
>> final_df = all_dfs[0]
>> for df in all_dfs[1:]:
>> final_df = final_df.union(df)
>>
>> # Proceed with your final DataFrame
>> final_df.show()
>>
>> HTH
>>
>> Mich Talebzadeh,
>> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
>> PhD  Imperial
>> College London 
>> London, United Kingdom
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* The information provided is correct to the best of my
>> knowledge but of course cannot be guaranteed . It is essential to note
>> that, as with any advice, quote "one test result is worth one-thousand
>> expert opinions (Werner
>> Von Braun
>> 

Re: Do we need partitioning while loading data from JDBC sources?

2024-06-06 Thread Perez
Thanks again Mich. It gives the clear picture but I have again couple of
doubts:

1) I know that there will be multiple threads that will be executed with 10
segment sizes each until the upper bound is reached but I didn't get this
part of the code exactly segments = [(i, min(i + segment_size,
upper_bound)) for i in range(lower_bound, upper_bound, segment_size)]

2) Also performing union on these small dataframes won't impact performance
right? since spark has to shuffle and combine less data from these
dataframes?


On Thu, Jun 6, 2024 at 3:53 PM Mich Talebzadeh 
wrote:

> well you can dynamically determine the upper bound by first querying the
> database to find the maximum value of the partition column and use it as
> the upper bound for your partitioning logic.
>
> def get_max_value(spark, mongo_config, column_name):
> max_value_df =
> spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load()
> max_value = max_value_df.agg({column_name: "max"}).collect()[0][0]
> return max_value
>
> # Define your MongoDB config without the bounds first
> mongo_config_base = {
> "uri": "mongodb://username:password@host:port/database.collection",
> "partitionColumn": "_id"
> }
>
> # Fetch the dynamic upper bound
> upper_bound = get_max_value(spark, mongo_config_base, "_id")
>
> # Define your segment size
> segment_size = 10
> lower_bound = 0
> segments = [(i, min(i + segment_size, upper_bound)) for i in
> range(lower_bound, upper_bound, segment_size)]
>
> Then you need to aggregate DF from multiple threads When loading data in
> parallel, each thread will load a segment of data into its own DataFrame.
> To aggregate all these DataFrames into a single DataFrame, you can use t*he
> union method in PySpark.*
>
> from concurrent.futures import ThreadPoolExecutor, as_completed
> from pyspark.sql import SparkSession
>
> def extract_data_from_mongodb(mongo_config):
> df =
> spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load()
> return df
>
> # Function to get the maximum value of the partition column
> def get_max_value(spark, mongo_config, column_name):
> max_value_df =
> spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load()
> max_value = max_value_df.agg({column_name: "max"}).collect()[0][0]
> return max_value
>
> # MongoDB configuration without bounds
> mongo_config_base = {
> "uri": "mongodb://username:password@host:port/database.collection",
> "partitionColumn": "_id"
> }
>
> # Initialize Spark session
> spark = SparkSession.builder \
> .appName("MongoDBDataLoad") \
> .config("spark.mongodb.input.uri", 
> "mongodb://username:password@host:port/database.collection")
> \
> .getOrCreate()
>
> # Fetch the dynamic upper bound
> upper_bound = get_max_value(spark, mongo_config_base, "_id")
>
> # Define your segment size
> segment_size = 10
> lower_bound = 0
> segments = [(i, min(i + segment_size, upper_bound)) for i in
> range(lower_bound, upper_bound, segment_size)]
>
> # Function to load a segment
> def load_segment(segment):
> segment_lower_bound, segment_upper_bound = segment
> mongo_config = mongo_config_base.copy()
> mongo_config["lowerBound"] = str(segment_lower_bound)
> mongo_config["upperBound"] = str(segment_upper_bound)
> return extract_data_from_mongodb(mongo_config)
>
> # Collect all DataFrames from threads
> all_dfs = []
>
> with ThreadPoolExecutor() as executor:
> futures = [executor.submit(load_segment, segment) for segment in
> segments]
> for future in as_completed(futures):
> try:
> df_segment = future.result()
> all_dfs.append(df_segment)
> except Exception as e:
> print(f"Error: {e}")
>
> # Union all DataFrames into a single DataFrame
> if all_dfs:
> final_df = all_dfs[0]
> for df in all_dfs[1:]:
> final_df = final_df.union(df)
>
> # Proceed with your final DataFrame
> final_df.show()
>
> HTH
>
> Mich Talebzadeh,
> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
> PhD  Imperial College
> London 
> London, United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  Von
> Braun )".
>
>
> On Thu, 6 Jun 2024 at 10:52, Perez  wrote:
>
>> Thanks, Mich for your response. However, I have multiple doubts as below:
>>
>> 1) I am trying to load the data for the incremental batch so I am not
>> sure what 

Re: [SPARK-48463] Mllib Feature transformer failing with nested dataset (Dot notation)

2024-06-06 Thread Someshwar Kale
As a fix, you may consider adding a transformer to rename columns (perhaps
replace all columns with dot to underscore) and use the renamed columns in
your pipeline as below-

val renameColumn = new
RenameColumn().setInputCol("location.longitude").setOutputCol("location_longitude")
val si = new 
StringIndexer().setInputCol("location_longitude").setOutputCol("longitutdee")
val pipeline = new Pipeline().setStages(Array(renameColumn, si))
pipeline.fit(flattenedDf).transform(flattenedDf).show()


refer my comment

for
elaboration.
Thanks!!

*Regards,*
*Someshwar Kale*





On Thu, Jun 6, 2024 at 3:24 AM Chhavi Bansal 
wrote:

> Hello team
> I was exploring feature transformation exposed via Mllib on nested
> dataset, and encountered an error while applying any transformer to a
> column with dot notation naming. I thought of raising a ticket on spark
> https://issues.apache.org/jira/browse/SPARK-48463, where I have mentioned
> the entire scenario.
>
> I wanted to get suggestions on what would be the best way to solve the
> problem while using the dot notation. One workaround is to use`_` while
> flattening the dataframe, but that would mean having an additional overhead
> to convert back to `.` (dot notation ) since that’s the convention for our
> other flattened data.
>
> I would be happy to make a contribution to the code if someone can shed
> some light on how this could be solved.
>
>
>
> --
> Thanks and Regards,
> Chhavi Bansal
>


Kubernetes cluster: change log4j configuration using uploaded `--files`

2024-06-06 Thread Jennifer Wirth
Hi,

I am trying to change the log4j configuration for jobs submitted to a k8s
cluster (using submit)

The my-log4j.xml is uploaded using --files ,./my-log4j.xml and the file in
the working directory of the driver/exec pods.

I added D-flags using the extra java options (and tried many different
URIs, absolute, with and without file:.

--conf spark.driver.extraJavaOptions="-Dlog4j2.debug=false
-Dlog4j2.configurationFile=file:./my-log4j.xml" \
--conf spark.executor.extraJavaOptions="-Dlog4j2.debug=false
-Dlog4j2.configurationFile=file:./my-log4j.xml" \

When debugging i notice that log4j is not able to load my configuration
file. I see the following additional log entries:

ERROR StatusLogger Reconfiguration failed: No configuration found for
'4a87761d' at 'null' in 'null'
ERROR StatusLogger Reconfiguration failed: No configuration found for
'Default' at 'null' in 'null'
24/06/06 09:20:44 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where
applicable
Files 
file:///tmp/mk2/spark-upload-36b1f43d-1878-4b06-be5e-e25b703f28d5/orbit-movements.csv
from 
/tmp/mk2/spark-upload-36b1f43d-1878-4b06-be5e-e25b703f28d5/orbit-movements.csv
to /opt/spark/work-dir/orbit-movements.csv
Files 
file:///tmp/mk2/spark-upload-cc211704-f481-4ebe-b6f0-5dbe66a7c639/my-log4j.xml
from /tmp/mk2/spark-upload-cc211704-f481-4ebe-b6f0-5dbe66a7c639/my-log4j.xml
to /opt/spark/work-dir/my-log4j.xml
Files file:///tmp/mk2/spark-upload-7970b482-7669-49aa-9f88-65191a83a18a/out.jar
from /tmp/mk2/spark-upload-7970b482-7669-49aa-9f88-65191a83a18a/out.jar
to /opt/spark/work-dir/out.jar

The lines starting with Files in the logs of the Driver process, makes me
wonder if the copying of files from my shared mount to the working
directory happens in that process and is not something that happens before
the java process launches. Is that assumption correct, as it would explain
why my log4j config files are not found at JVM launch.

If so, what is the recommended way to change the logging config *per job*
when running spark in k8s (i am not using a custom container image, so
can’t place it in there)

tx.,


Re: Do we need partitioning while loading data from JDBC sources?

2024-06-06 Thread Mich Talebzadeh
well you can dynamically determine the upper bound by first querying the
database to find the maximum value of the partition column and use it as
the upper bound for your partitioning logic.

def get_max_value(spark, mongo_config, column_name):
max_value_df =
spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load()
max_value = max_value_df.agg({column_name: "max"}).collect()[0][0]
return max_value

# Define your MongoDB config without the bounds first
mongo_config_base = {
"uri": "mongodb://username:password@host:port/database.collection",
"partitionColumn": "_id"
}

# Fetch the dynamic upper bound
upper_bound = get_max_value(spark, mongo_config_base, "_id")

# Define your segment size
segment_size = 10
lower_bound = 0
segments = [(i, min(i + segment_size, upper_bound)) for i in
range(lower_bound, upper_bound, segment_size)]

Then you need to aggregate DF from multiple threads When loading data in
parallel, each thread will load a segment of data into its own DataFrame.
To aggregate all these DataFrames into a single DataFrame, you can use t*he
union method in PySpark.*

from concurrent.futures import ThreadPoolExecutor, as_completed
from pyspark.sql import SparkSession

def extract_data_from_mongodb(mongo_config):
df =
spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load()
return df

# Function to get the maximum value of the partition column
def get_max_value(spark, mongo_config, column_name):
max_value_df =
spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load()
max_value = max_value_df.agg({column_name: "max"}).collect()[0][0]
return max_value

# MongoDB configuration without bounds
mongo_config_base = {
"uri": "mongodb://username:password@host:port/database.collection",
"partitionColumn": "_id"
}

# Initialize Spark session
spark = SparkSession.builder \
.appName("MongoDBDataLoad") \
.config("spark.mongodb.input.uri",
"mongodb://username:password@host:port/database.collection")
\
.getOrCreate()

# Fetch the dynamic upper bound
upper_bound = get_max_value(spark, mongo_config_base, "_id")

# Define your segment size
segment_size = 10
lower_bound = 0
segments = [(i, min(i + segment_size, upper_bound)) for i in
range(lower_bound, upper_bound, segment_size)]

# Function to load a segment
def load_segment(segment):
segment_lower_bound, segment_upper_bound = segment
mongo_config = mongo_config_base.copy()
mongo_config["lowerBound"] = str(segment_lower_bound)
mongo_config["upperBound"] = str(segment_upper_bound)
return extract_data_from_mongodb(mongo_config)

# Collect all DataFrames from threads
all_dfs = []

with ThreadPoolExecutor() as executor:
futures = [executor.submit(load_segment, segment) for segment in
segments]
for future in as_completed(futures):
try:
df_segment = future.result()
all_dfs.append(df_segment)
except Exception as e:
print(f"Error: {e}")

# Union all DataFrames into a single DataFrame
if all_dfs:
final_df = all_dfs[0]
for df in all_dfs[1:]:
final_df = final_df.union(df)

# Proceed with your final DataFrame
final_df.show()

HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
PhD  Imperial College
London 
London, United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Thu, 6 Jun 2024 at 10:52, Perez  wrote:

> Thanks, Mich for your response. However, I have multiple doubts as below:
>
> 1) I am trying to load the data for the incremental batch so I am not sure
> what would be my upper bound. So what can we do?
> 2) So as each thread loads the desired segment size's data into a
> dataframe if I want to aggregate all the data from all the threads in a
> single dataframe what should I do? Keep on appending in a dataframe as it
> comes?
>
>
> On Thu, Jun 6, 2024 at 1:54 PM Mich Talebzadeh 
> wrote:
>
>> Yes, partitioning and parallel loading can significantly improve the
>> performance of data extraction from JDBC sources or databases like MongoDB.
>> This approach can leverage Spark's distributed computing capabilities,
>> allowing you to load data in parallel, thus speeding up the overall data
>> loading process.
>>
>> When loading data from JDBC sources, specifying partitioning options
>> allows Spark to parallelize the data read operation. Here's how you 

Re: Do we need partitioning while loading data from JDBC sources?

2024-06-06 Thread Perez
Thanks, Mich for your response. However, I have multiple doubts as below:

1) I am trying to load the data for the incremental batch so I am not sure
what would be my upper bound. So what can we do?
2) So as each thread loads the desired segment size's data into a dataframe
if I want to aggregate all the data from all the threads in a single
dataframe what should I do? Keep on appending in a dataframe as it comes?


On Thu, Jun 6, 2024 at 1:54 PM Mich Talebzadeh 
wrote:

> Yes, partitioning and parallel loading can significantly improve the
> performance of data extraction from JDBC sources or databases like MongoDB.
> This approach can leverage Spark's distributed computing capabilities,
> allowing you to load data in parallel, thus speeding up the overall data
> loading process.
>
> When loading data from JDBC sources, specifying partitioning options
> allows Spark to parallelize the data read operation. Here's how you can do
> it for a JDBC source:
>
> Something like below given the information provided
>
> from pyspark.sql import SparkSession
> from concurrent.futures import ThreadPoolExecutor, as_completed
>
> def extract_data_from_mongodb(mongo_config):
> df =
> spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load()
> return df
>
> # MongoDB configuration
> mongo_config_template = {
> "uri": "mongodb://username:password@host:port/database.collection",
> "partitionColumn": "_id",
> "lowerBound": None,
> "upperBound": None
> }
>
> lower_bound = 0
> upper_bound = 200
> segment_size = 10
>
> # Create segments
> segments = [(i, min(i + segment_size, upper_bound)) for i in
> range(lower_bound, upper_bound, segment_size)]
>
> # Initialize Spark session
> spark = SparkSession.builder \
> .appName("MongoDBDataLoad") \
> .config("spark.mongodb.input.uri", 
> "mongodb://username:password@host:port/database.collection")
> \
> .getOrCreate()
>
> # Extract data in parallel using ThreadPoolExecutor
> def load_segment(segment):
> segment_lower_bound, segment_upper_bound = segment
> mongo_config = mongo_config_template.copy()
> mongo_config["lowerBound"] = str(segment_lower_bound)
> mongo_config["upperBound"] = str(segment_upper_bound)
> return extract_data_from_mongodb(mongo_config)
>
> with ThreadPoolExecutor() as executor:
> futures = [executor.submit(load_segment, segment) for segment in
> segments]
> for future in as_completed(futures):
> try:
> df_segment = future.result()
> # Process df_segment as needed
> except Exception as e:
> print(f"Error: {e}")
>
>
> ThreadPoolExecutor enables parallel execution of tasks using multiple
> threads. Each thread can be responsible for loading a segment of the data.
>
> HTH
>
> Mich Talebzadeh,
> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
> PhD  Imperial College
> London  (voted 2nd
> best university in the world after MIT https://lnkd.in/eCPt6KTj)
> London, United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  Von
> Braun )".
>
>
> On Thu, 6 Jun 2024 at 00:46, Perez  wrote:
>
>> Hello experts,
>>
>> I was just wondering if I could leverage the below thing to expedite the
>> loading of the data process in Spark.
>>
>>
>> def extract_data_from_mongodb(mongo_config): df =
>> glueContext.create_dynamic_frame.from_options( connection_type="mongodb",
>> connection_options=mongo_config ) return df
>>
>> mongo_config = { "connection.uri": "mongodb://url", "database": "",
>> "collection": "", "username": "", "password": "", "partitionColumn":"_id",
>> "lowerBound": str(lower_bound), "upperBound": str(upper_bound) }
>> lower_bound = 0 upper_bound = 200 segment_size = 10 segments = [(i, min(i
>> + segment_size, upper_bound)) for i in range(lower_bound, upper_bound,
>> segment_size)] with ThreadPoolExecutor() as executor: futures =
>> [executor.submit(execution, segment) for segment in segments] for future in
>> as_completed(futures): try: future.result() except Exception as e:
>> print(f"Error: {e}")
>>
>> I am trying to leverage the parallel threads to pull data in parallel. So
>> is it effective?
>>
>


Re: Do we need partitioning while loading data from JDBC sources?

2024-06-06 Thread Mich Talebzadeh
Yes, partitioning and parallel loading can significantly improve the
performance of data extraction from JDBC sources or databases like MongoDB.
This approach can leverage Spark's distributed computing capabilities,
allowing you to load data in parallel, thus speeding up the overall data
loading process.

When loading data from JDBC sources, specifying partitioning options allows
Spark to parallelize the data read operation. Here's how you can do it for
a JDBC source:

Something like below given the information provided

from pyspark.sql import SparkSession
from concurrent.futures import ThreadPoolExecutor, as_completed

def extract_data_from_mongodb(mongo_config):
df =
spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load()
return df

# MongoDB configuration
mongo_config_template = {
"uri": "mongodb://username:password@host:port/database.collection",
"partitionColumn": "_id",
"lowerBound": None,
"upperBound": None
}

lower_bound = 0
upper_bound = 200
segment_size = 10

# Create segments
segments = [(i, min(i + segment_size, upper_bound)) for i in
range(lower_bound, upper_bound, segment_size)]

# Initialize Spark session
spark = SparkSession.builder \
.appName("MongoDBDataLoad") \
.config("spark.mongodb.input.uri",
"mongodb://username:password@host:port/database.collection")
\
.getOrCreate()

# Extract data in parallel using ThreadPoolExecutor
def load_segment(segment):
segment_lower_bound, segment_upper_bound = segment
mongo_config = mongo_config_template.copy()
mongo_config["lowerBound"] = str(segment_lower_bound)
mongo_config["upperBound"] = str(segment_upper_bound)
return extract_data_from_mongodb(mongo_config)

with ThreadPoolExecutor() as executor:
futures = [executor.submit(load_segment, segment) for segment in
segments]
for future in as_completed(futures):
try:
df_segment = future.result()
# Process df_segment as needed
except Exception as e:
print(f"Error: {e}")


ThreadPoolExecutor enables parallel execution of tasks using multiple
threads. Each thread can be responsible for loading a segment of the data.

HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
PhD  Imperial College
London  (voted 2nd
best university in the world after MIT https://lnkd.in/eCPt6KTj)
London, United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Thu, 6 Jun 2024 at 00:46, Perez  wrote:

> Hello experts,
>
> I was just wondering if I could leverage the below thing to expedite the
> loading of the data process in Spark.
>
>
> def extract_data_from_mongodb(mongo_config): df =
> glueContext.create_dynamic_frame.from_options( connection_type="mongodb",
> connection_options=mongo_config ) return df
>
> mongo_config = { "connection.uri": "mongodb://url", "database": "",
> "collection": "", "username": "", "password": "", "partitionColumn":"_id",
> "lowerBound": str(lower_bound), "upperBound": str(upper_bound) }
> lower_bound = 0 upper_bound = 200 segment_size = 10 segments = [(i, min(i
> + segment_size, upper_bound)) for i in range(lower_bound, upper_bound,
> segment_size)] with ThreadPoolExecutor() as executor: futures =
> [executor.submit(execution, segment) for segment in segments] for future in
> as_completed(futures): try: future.result() except Exception as e:
> print(f"Error: {e}")
>
> I am trying to leverage the parallel threads to pull data in parallel. So
> is it effective?
>


[SPARK-48423] Unable to save ML Pipeline to azure blob storage

2024-06-05 Thread Chhavi Bansal
Hello team,
I was exploring on how to save ML pipeline to azure blob storage, but was
setback by an issue where it complains of  `fs.azure.account.key`  not
being found in the configuration even when I have provided the values in
the pipelineModel.option(key1,value1) field. I considered raising a ticket
on spark https://issues.apache.org/jira/browse/SPARK-48423, where I
describe the entire scenario. I tried debugging the code and found that
this key is being explicitly asked for in the code. The only solution was
to again set it part of spark.conf which could result to a race condition
since we work on multi-tenant architecture.



Since saving to Azure blob storage would be common, Can someone please
guide me if I am missing something in the `.option` clause?



I would be happy to make a contribution to the code if someone can shed
some light on how this could be solved.

-- 
Thanks and Regards,
Chhavi Bansal


[SPARK-48463] Mllib Feature transformer failing with nested dataset (Dot notation)

2024-06-05 Thread Chhavi Bansal
Hello team
I was exploring feature transformation exposed via Mllib on nested dataset,
and encountered an error while applying any transformer to a column with
dot notation naming. I thought of raising a ticket on spark
https://issues.apache.org/jira/browse/SPARK-48463, where I have mentioned
the entire scenario.

I wanted to get suggestions on what would be the best way to solve the
problem while using the dot notation. One workaround is to use`_` while
flattening the dataframe, but that would mean having an additional overhead
to convert back to `.` (dot notation ) since that’s the convention for our
other flattened data.

I would be happy to make a contribution to the code if someone can shed
some light on how this could be solved.



-- 
Thanks and Regards,
Chhavi Bansal


Re: Terabytes data processing via Glue

2024-06-05 Thread Perez
Thanks Nitin and Russel for your responses. Much appreciated.

On Mon, Jun 3, 2024 at 9:47 PM Russell Jurney 
wrote:

> You could use either Glue or Spark for your job. Use what you’re more
> comfortable with.
>
> Thanks,
> Russell Jurney @rjurney 
> russell.jur...@gmail.com LI  FB
>  datasyndrome.com
>
>
> On Sun, Jun 2, 2024 at 9:59 PM Perez  wrote:
>
>> Hello,
>>
>> Can I get some suggestions?
>>
>> On Sat, Jun 1, 2024 at 1:18 PM Perez  wrote:
>>
>>> Hi Team,
>>>
>>> I am planning to load and process around 2 TB historical data. For that
>>> purpose I was planning to go ahead with Glue.
>>>
>>> So is it ok if I use glue if I calculate my DPUs needed correctly? or
>>> should I go with EMR.
>>>
>>> This will be a one time activity.
>>>
>>>
>>> TIA
>>>
>>


Do we need partitioning while loading data from JDBC sources?

2024-06-05 Thread Perez
Hello experts,

I was just wondering if I could leverage the below thing to expedite the
loading of the data process in Spark.


def extract_data_from_mongodb(mongo_config): df =
glueContext.create_dynamic_frame.from_options( connection_type="mongodb",
connection_options=mongo_config ) return df

mongo_config = { "connection.uri": "mongodb://url", "database": "",
"collection": "", "username": "", "password": "", "partitionColumn":"_id",
"lowerBound": str(lower_bound), "upperBound": str(upper_bound) }
lower_bound = 0 upper_bound = 200 segment_size = 10 segments = [(i, min(i +
segment_size, upper_bound)) for i in range(lower_bound, upper_bound,
segment_size)] with ThreadPoolExecutor() as executor: futures =
[executor.submit(execution, segment) for segment in segments] for future in
as_completed(futures): try: future.result() except Exception as e:
print(f"Error: {e}")

I am trying to leverage the parallel threads to pull data in parallel. So
is it effective?


Inquiry Regarding Security Compliance of Apache Spark Docker Image

2024-06-05 Thread Tonmoy Sagar
Dear Apache Team,

I hope this email finds you well.

We are a team from Ernst and Young LLP - India, dedicated to providing 
innovative supply chain solutions for a diverse range of clients. Our team 
recently encountered a pivotal use case necessitating the utilization of 
PySpark for a project aimed at handling substantial volumes of data. As part of 
our deployment strategy, we are endeavouring to implement a Spark-based 
application on our Azure Kubernetes service.

Regrettably, we have encountered challenges from a security perspective with 
the latest Apache Spark Docker image, specifically apache/spark-py:latest. Our 
security team has meticulously conducted an assessment and has generated a 
comprehensive vulnerability report highlighting areas of concern.

Given the non-compliance of the Docker image with our organization's stringent 
security protocols, we find ourselves unable to proceed with its integration 
into our applications. We attach the vulnerability report herewith for your 
perusal.

Considering these circumstances, we kindly request your esteemed team to 
provide any resolutions or guidance that may assist us in mitigating the 
identified security vulnerabilities. Your prompt attention to this matter would 
be greatly appreciated, as it is crucial for the successful deployment and 
operation of our Spark-based application within our infrastructure.

Thank you for your attention to this inquiry, and we look forward to your 
valued support and assistance.



Please find attachment for the vulnerability report
Best Regards,
Tonmoy Sagar | Sr. Consultant | Advisory | Asterisk
Ernst & Young LLP
C-401, Panchshil Tech Park One, Yerawada, Pune, Maharashtra 411006, India
Mobile: +91 8724918230 | tonmoy.sa...@in.ey.com
Thrive in the Transformative Age with the better-connected consultants - 
ey.com/consulting



The information contained in this communication is intended solely for the use 
of the individual or entity to whom it is addressed and others authorized to 
receive it. It may contain confidential or legally privileged information. If 
you are not the intended recipient you are hereby notified that any disclosure, 
copying, distribution or taking any action in reliance on the contents of this 
information is strictly prohibited and may be unlawful. If you have received 
this communication in error, please notify us immediately by responding to this 
email and then delete it from your system. The firm is neither liable for the 
proper and complete transmission of the information contained in this 
communication nor for any delay in its receipt.


spark_vulnerability_report.xlsx
Description: spark_vulnerability_report.xlsx

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Re: Classification request

2024-06-04 Thread Dirk-Willem van Gulik
Actually - that answer may oversimplify things / be rather incorrect depending 
on the exact question of the entity that asks and the exact situation (who 
ships what code from where).

For this reason it is properly best to refer this original poster to:

https://www.apache.org/licenses/exports/

Which explains that Spark is classified as ECCN 5d002, and is subject to EAR.

 And to refer developers or those that need to bundle Spark, or otherwise place 
it on the market to:

https://infra.apache.org/crypto.html

And when in doubt - contact the Spark PMC or the ASF security team.

With kind regards,

Dw.


> On 4 Jun 2024, at 15:20, Artemis User  wrote:
> 
> Sara, Apache Spark is open source under Apache License 2.0 
> (https://github.com/apache/spark/blob/master/LICENSE).  It is not under 
> export control of any country!  Please feel free to use, reproduce and 
> distribute, as long as your practice is compliant with the license.
> 
> Having said that, some components in Apache Spark may be under other open 
> source licenses, whose terms and conditions may be different than Apache's.  
> 
> Cheers!
> ND
> 
> On 6/4/24 8:01 AM, VARGA, Sara wrote:
>> Hello,
>>  
>> my name is Sara and I am working in export control at MTU.
>>  
>> In order to ensure export compliance, we would like to be informed about the 
>> export control classification for your items:
>>  
>> Apache Spark 3.0.1
>> Apache Spark 3.2.1
>>  
>> Please be so kind and use the attached Supplier Export Control Declaration 
>> and return the completed excel file and a pdf scan of the signed document 
>> via email.
>>  
>> Thanks in advance
>>  
>>  
>> Mit freundlichen Gruessen / Best regards
>>  
>> 
   
>> Sara Varga
>> Exportkontrolle
>> Export control
>> 
>> MTU Aero Engines AG
>> Dachauer Str. 665 | 80995 Muenchen | Germany
>> 
>> 
>> sara.va...@mtu.de  | www.mtu.de 
>> 
>> 
>>  
>> 
>>  
>>  
>>  
>> --
>> MTU Aero Engines AG
>> Vorstand/Board of Management: Lars Wagner, Vorsitzender/CEO; Peter 
>> Kameritsch, Dr. Silke Maurer, Michael Schreyoegg
>> Vorsitzender des Aufsichtsrats/Chairman of the Supervisory Board: Gordon 
>> Riske
>> Sitz der Gesellschaft/Registered Office: Muenchen
>> Handelsregister/Commercial Register: Muenchen HRB 157206
>> Lobbyregister/Lobbying Register: R002076
>> 
>> Diese E-Mail sowie ihre Anhaenge enthalten MTU-eigene vertrauliche oder 
>> rechtlich geschuetzte Informationen.
>> Wenn Sie nicht der beabsichtigte Empfaenger sind, informieren Sie bitte den 
>> Absender und loeschen Sie diese
>> E-Mail sowie die Anhaenge. Das unbefugte Speichern, Kopieren oder 
>> Weiterleiten ist nicht gestattet.
>> 
>> This e-mail and any attached documents are proprietary to MTU, confidential 
>> or protected by law.
>> If you are not the intended recipient, please advise the sender and delete 
>> this message and its attachments.
>> Any unauthorised storing, copying or distribution is prohibited.
>> 
>> 
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
>> 


Re: Classification request

2024-06-04 Thread Artemis User
Sara, Apache Spark is open source under Apache License 2.0 
(https://github.com/apache/spark/blob/master/LICENSE).  It is not under 
export control of any country!  Please feel free to use, reproduce and 
distribute, as long as your practice is compliant with the license.


Having said that, some components in Apache Spark may be under other 
open source licenses, whose terms and conditions may be different than 
Apache's.


Cheers!
ND

On 6/4/24 8:01 AM, VARGA, Sara wrote:


Hello,

my name is Sara and I am working in export control at MTU.

In order to ensure export compliance, we would like to be informed 
about the export control classification for your items:


**

*Apache Spark 3.0.1*

*Apache Spark 3.2.1*

**

Please be so kind and use the attached Supplier Export Control 
Declaration and return the completed excel file and a pdf scan of the 
signed document via email.


Thanks in advance

Mit freundlichen Gruessen / Best regards




*Sara Varga*
Exportkontrolle
/Export control/

*MTU Aero Engines AG*
Dachauer Str. 665 | 80995 Muenchen | Germany


sara.va...@mtu.de  | www.mtu.de 





--
*MTU Aero Engines AG*
Vorstand/Board of Management: Lars Wagner, Vorsitzender/CEO; Peter 
Kameritsch, Dr. Silke Maurer, Michael Schreyoegg
Vorsitzender des Aufsichtsrats/Chairman of the Supervisory Board: 
Gordon Riske

Sitz der Gesellschaft/Registered Office: Muenchen
Handelsregister/Commercial Register: Muenchen HRB 157206
Lobbyregister/Lobbying Register: R002076

Diese E-Mail sowie ihre Anhaenge enthalten MTU-eigene vertrauliche 
oder rechtlich geschuetzte Informationen.
Wenn Sie nicht der beabsichtigte Empfaenger sind, informieren Sie 
bitte den Absender und loeschen Sie diese
E-Mail sowie die Anhaenge. Das unbefugte Speichern, Kopieren oder 
Weiterleiten ist nicht gestattet.


This e-mail and any attached documents are proprietary to MTU, 
confidential or protected by law.
If you are not the intended recipient, please advise the sender and 
delete this message and its attachments.

Any unauthorised storing, copying or distribution is prohibited.

-
To unsubscribe e-mail:user-unsubscr...@spark.apache.org


Classification request

2024-06-04 Thread VARGA, Sara
Hello,

my name is Sara and I am working in export control at MTU.

In order to ensure export compliance, we would like to be informed about the 
export control classification for your items:

Apache Spark 3.0.1
Apache Spark 3.2.1

Please be so kind and use the attached Supplier Export Control Declaration and 
return the completed excel file and a pdf scan of the signed document via email.

Thanks in advance


Mit freundlichen Gruessen / Best regards



[cid:image001.png@01DAB687.A2F6BB90]

Sara Varga
Exportkontrolle
Export control

MTU Aero Engines AG
Dachauer Str. 665 | 80995 Muenchen | Germany


sara.va...@mtu.de | www.mtu.de










--
MTU Aero Engines AG
Vorstand/Board of Management: Lars Wagner, Vorsitzender/CEO; Peter Kameritsch, 
Dr. Silke Maurer, Michael Schreyoegg
Vorsitzender des Aufsichtsrats/Chairman of the Supervisory Board: Gordon Riske
Sitz der Gesellschaft/Registered Office: Muenchen
Handelsregister/Commercial Register: Muenchen HRB 157206
Lobbyregister/Lobbying Register: R002076

Diese E-Mail sowie ihre Anhaenge enthalten MTU-eigene vertrauliche oder 
rechtlich geschuetzte Informationen.
Wenn Sie nicht der beabsichtigte Empfaenger sind, informieren Sie bitte den 
Absender und loeschen Sie diese
E-Mail sowie die Anhaenge. Das unbefugte Speichern, Kopieren oder Weiterleiten 
ist nicht gestattet.

This e-mail and any attached documents are proprietary to MTU, confidential or 
protected by law.
If you are not the intended recipient, please advise the sender and delete this 
message and its attachments.
Any unauthorised storing, copying or distribution is prohibited.


FRM-837_Supplier-Export-Control-Declaration-Indirect-Material_EN.xlsx
Description: FRM-837_Supplier-Export-Control-Declaration-Indirect-Material_EN.xlsx

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

[ANNOUNCE] Announcing Apache Spark 4.0.0-preview1

2024-06-03 Thread Wenchen Fan
Hi all,

To enable wide-scale community testing of the upcoming Spark 4.0 release,
the Apache Spark community has posted a preview release of Spark 4.0. This
preview is not a stable release in terms of either API or functionality,
but it is meant to give the community early access to try the code that
will become Spark 4.0. If you would like to test the release, please
download it, and send feedback using either the mailing lists or JIRA.

There are a lot of exciting new features added to Spark 4.0, including ANSI
mode by default, Python data source, polymorphic Python UDTF, string
collation support, new VARIANT data type, streaming state store data
source, structured logging, Java 17 by default, and many more.

We'd like to thank our contributors and users for their contributions and
early feedback to this release. This release would not have been possible
without you.

To download Spark 4.0.0-preview1, head over to the download page:
https://archive.apache.org/dist/spark/spark-4.0.0-preview1 . It's also
available in PyPI, with version name "4.0.0.dev1".

Thanks,

Wenchen


Re: Terabytes data processing via Glue

2024-06-03 Thread Russell Jurney
You could use either Glue or Spark for your job. Use what you’re more
comfortable with.

Thanks,
Russell Jurney @rjurney 
russell.jur...@gmail.com LI  FB
 datasyndrome.com


On Sun, Jun 2, 2024 at 9:59 PM Perez  wrote:

> Hello,
>
> Can I get some suggestions?
>
> On Sat, Jun 1, 2024 at 1:18 PM Perez  wrote:
>
>> Hi Team,
>>
>> I am planning to load and process around 2 TB historical data. For that
>> purpose I was planning to go ahead with Glue.
>>
>> So is it ok if I use glue if I calculate my DPUs needed correctly? or
>> should I go with EMR.
>>
>> This will be a one time activity.
>>
>>
>> TIA
>>
>


Re: Terabytes data processing via Glue

2024-06-02 Thread Perez
Hello,

Can I get some suggestions?

On Sat, Jun 1, 2024 at 1:18 PM Perez  wrote:

> Hi Team,
>
> I am planning to load and process around 2 TB historical data. For that
> purpose I was planning to go ahead with Glue.
>
> So is it ok if I use glue if I calculate my DPUs needed correctly? or
> should I go with EMR.
>
> This will be a one time activity.
>
>
> TIA
>


[ANNOUNCE] Apache Kyuubi released 1.9.1

2024-06-02 Thread Cheng Pan
Hi all,

The Apache Kyuubi community is pleased to announce that
Apache Kyuubi 1.9.1 has been released!

This release brings support for Apache Spark 4.0.0-preview1.

Apache Kyuubi is a distributed and multi-tenant gateway to provide
serverless SQL on data warehouses and lakehouses.

Kyuubi provides a pure SQL gateway through Thrift JDBC/ODBC interface
for end-users to manipulate large-scale data with pre-programmed and
extensible Spark SQL engines.

We are aiming to make Kyuubi an "out-of-the-box" tool for data warehouses
and lakehouses.

This "out-of-the-box" model minimizes the barriers and costs for end-users
to use Spark at the client side.

At the server-side, Kyuubi server and engine's multi-tenant architecture
provides the administrators a way to achieve computing resource isolation,
data security, high availability, high client concurrency, etc.

The full release notes and download links are available at:
Release Notes: https://kyuubi.apache.org/release/1.9.1.html

To learn more about Apache Kyuubi, please see
https://kyuubi.apache.org/

Kyuubi Resources:
- Issue: https://github.com/apache/kyuubi/issues
- Mailing list: d...@kyuubi.apache.org

We would like to thank all contributors of the Kyuubi community
who made this release possible!

Thanks,
On behalf of Apache Kyuubi community

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Terabytes data processing via Glue

2024-06-01 Thread Perez
Hi Team,

I am planning to load and process around 2 TB historical data. For that
purpose I was planning to go ahead with Glue.

So is it ok if I use glue if I calculate my DPUs needed correctly? or
should I go with EMR.

This will be a one time activity.


TIA


[apache-spark][spark-dataframe] DataFrameWriter.partitionBy does not guarantee previous sort result

2024-05-31 Thread leeyc0
I have a dataset that have the following schema:
(timestamp, partitionKey, logValue)

I want to have the dataset to be sorted by timestamp, but write to file in
the follow directory layout:
outputDir/partitionKey/files
The output file only contains logValue, that is, timestamp is used for
sorting only and is not used for output.
(FYI, logValue contains textual representation of timestamp which is not
sortable)

My first attempt is to use DataFrameWriter.partitionBy:
dataset
.sort("timestamp")
.select("partitionKey", "logValue")
.write()
.partitionBy("partitionKey")
.text("output");

However, as mentioned in SPARK-44512 (
https://issues.apache.org/jira/browse/SPARK-44512), this does not guarantee
the output is globally sorted.
(note: I found that even setting
spark.sql.optimizer.plannedWrite.enabled=false still does not guarantee
sorted result in low memory environment)

And the developers say DataFrameWriter.partitionBy does not guarantee
sorted results:
"Although I understand Apache Spark 3.4.0 changes the behavior like the
above, I don't think there is a contract that Apache Spark's `partitionBy`
operation preserves the previous ordering."

To workaround this problem, I have to resort to creating a hadoop output
format by extending org.apache.hadoop.mapred.lib.MultipleTextOutputFormat
and output the file by saveAsHadoopFile:

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat;

public final class PartitionedMultipleTextOutputFormat
extends MultipleTextOutputFormat {
@SuppressWarnings("MissingJavadocMethod")
public PartitionedMultipleTextOutputFormat() {
super();
}

@Override
protected Object generateActualKey(final Object key, final V value) {
return NullWritable.get();
}

@Override
protected String generateFileNameForKeyValue(final Object key, final V
value, final String leaf) {
return new Path(key.toString(), leaf).toString();
}
}

private static Tuple2 mapRDDToDomainLogPair(final Row row) {
final String domain = row.getAs(" partitionKey ");
final var log = (String) row.getAs("logValue");
final var logTextClass = new Text(log);
return new Tuple2(domain, logTextClass);
}

dataset
.sort("timestamp")
.javaRDD()
.mapToPair(TheClass::mapRDDToDomainLogPair)
.saveAsHadoopFile(hdfsTmpPath, String.class, Text.class,
PartitionedMultipleTextOutputFormat.class, GzipCodec.class);

Which seems a little bit hacky.
Does anyone have another better method?


Re: [s3a] Spark is not reading s3 object content

2024-05-31 Thread Amin Mosayyebzadeh
I am reading from a single file:
df = spark.read.text("s3a://test-bucket/testfile.csv")



On Fri, May 31, 2024 at 5:26 AM Mich Talebzadeh 
wrote:

> Tell Spark to read from a single file
>
> data = spark.read.text("s3a://test-bucket/testfile.csv")
>
> This clarifies to Spark that you are dealing with a single file and avoids
> any bucket-like interpretation.
>
> HTH
>
> Mich Talebzadeh,
> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
> PhD  Imperial College
> London 
> London, United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  Von
> Braun )".
>
>
> On Fri, 31 May 2024 at 09:53, Amin Mosayyebzadeh 
> wrote:
>
>> I will work on the first two possible causes.
>> For the third one, which I guess is the real problem, Spark treats the
>> testfile.csv object with the url s3a://test-bucket/testfile.csv as a bucket
>> to access _spark_metadata with url
>> s3a://test-bucket/testfile.csv/_spark_metadata
>> testfile.csv is an object and should not be treated as a bucket. But I am
>> not sure how to prevent Spark from doing that.
>>
>


Re: [s3a] Spark is not reading s3 object content

2024-05-31 Thread Mich Talebzadeh
Tell Spark to read from a single file

data = spark.read.text("s3a://test-bucket/testfile.csv")

This clarifies to Spark that you are dealing with a single file and avoids
any bucket-like interpretation.

HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
PhD  Imperial College
London 
London, United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Fri, 31 May 2024 at 09:53, Amin Mosayyebzadeh 
wrote:

> I will work on the first two possible causes.
> For the third one, which I guess is the real problem, Spark treats the
> testfile.csv object with the url s3a://test-bucket/testfile.csv as a bucket
> to access _spark_metadata with url
> s3a://test-bucket/testfile.csv/_spark_metadata
> testfile.csv is an object and should not be treated as a bucket. But I am
> not sure how to prevent Spark from doing that.
>


Re: Re: EXT: Dual Write to HDFS and MinIO in faster way

2024-05-30 Thread Subhasis Mukherjee
Regarding making spark writer fast part, If you are (or can be) on Databricks, 
check this out. It is just out of the oven at Databricks.

https://www.databricks.com/blog/announcing-general-availability-liquid-clustering?utm_source=bambu_medium=social_campaign=advocacy=6087618




From: Gera Shegalov 
Sent: Wednesday, May 29, 2024 7:57:56 am
To: Prem Sahoo 
Cc: eab...@163.com ; Vibhor Gupta ; 
user @spark 
Subject: Re: Re: EXT: Dual Write to HDFS and MinIO in faster way

I agree with the previous answers that (if requirements allow it) it is much 
easier to just orchestrate a copy either in the same app or sync externally.

A long time ago and not for a Spark app we were solving a similar usecase via 
https://hadoop.apache.org/docs/r3.2.3/hadoop-project-dist/hadoop-hdfs/ViewFs.html#Multi-Filesystem_I.2F0_with_Nfly_Mount_Points
 . It may work with Spark because it is underneath the FileSystem API ...



On Tue, May 21, 2024 at 10:03 PM Prem Sahoo 
mailto:prem.re...@gmail.com>> wrote:
I am looking for writer/comitter optimization which can make the spark write 
faster.

On Tue, May 21, 2024 at 9:15 PM eab...@163.com 
mailto:eab...@163.com>> wrote:
Hi,
I think you should write to HDFS then copy file (parquet or orc) from HDFS 
to MinIO.


eabour

From: Prem Sahoo
Date: 2024-05-22 00:38
To: Vibhor Gupta; 
user
Subject: Re: EXT: Dual Write to HDFS and MinIO in faster way


On Tue, May 21, 2024 at 6:58 AM Prem Sahoo 
mailto:prem.re...@gmail.com>> wrote:
Hello Vibhor,
Thanks for the suggestion .
I am looking for some other alternatives where I can use the same dataframe can 
be written to two destinations without re execution and cache or persist .

Can some one help me in scenario 2 ?
How to make spark write to MinIO faster ?
Sent from my iPhone

On May 21, 2024, at 1:18 AM, Vibhor Gupta 
mailto:vibhor.gu...@walmart.com>> wrote:


Hi Prem,

You can try to write to HDFS then read from HDFS and write to MinIO.

This will prevent duplicate transformation.

You can also try persisting the dataframe using the DISK_ONLY level.

Regards,
Vibhor
From: Prem Sahoo mailto:prem.re...@gmail.com>>
Date: Tuesday, 21 May 2024 at 8:16 AM
To: Spark dev list mailto:d...@spark.apache.org>>
Subject: EXT: Dual Write to HDFS and MinIO in faster way
EXTERNAL: Report suspicious emails to Email Abuse.
Hello Team,
I am planning to write to two datasource at the same time .

Scenario:-

Writing the same dataframe to HDFS and MinIO without re-executing the 
transformations and no cache(). Then how can we make it faster ?

Read the parquet file and do a few transformations and write to HDFS and MinIO.

here in both write spark needs execute the transformation again. Do we know how 
we can avoid re-execution of transformation  without cache()/persist ?

Scenario2 :-
I am writing 3.2G data to HDFS and MinIO which takes ~6mins.
Do we have any way to make writing this faster ?

I don't want to do repartition and write as repartition will have overhead of 
shuffling .

Please provide some inputs.





Re: [s3a] Spark is not reading s3 object content

2024-05-30 Thread Amin Mosayyebzadeh
I will work on the first two possible causes.
For the third one, which I guess is the real problem, Spark treats the
testfile.csv object with the url s3a://test-bucket/testfile.csv as a bucket
to access _spark_metadata with url
s3a://test-bucket/testfile.csv/_spark_metadata
testfile.csv is an object and should not be treated as a bucket. But I am
not sure how to prevent Spark from doing that.


Re: [s3a] Spark is not reading s3 object content

2024-05-30 Thread Mich Talebzadeh
ok

some observations


   - Spark job successfully lists the S3 bucket containing testfile.csv.
   - Spark job can retrieve the file size (33 Bytes) for testfile.csv.
   - Spark job fails to read the actual data from testfile.csv.
   - The printed content from testfile.csv is an empty list.
   - Spark logs show a debug message with an exception related to
   UserGroupInformation while trying to access the _spark_metadata file
   associated with testfile.csv.

possible causes


   - Permission Issues: Spark user (likely ubuntu based on logs) might lack
   the necessary permissions to access the testfile.csv file or the
   _spark_metadata file on S3 storage.
   - Spark Configuration: Issues with Spark's configuration for S3 access,
   such as missing credentials or incorrect security settings.
   - Spark attempting to read unnecessary files: The _spark_metadata file
   might not be essential for your current operation, and Spark's attempt to
   read it could be causing the issue.


HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
PhD  Imperial College
London 
London, United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Thu, 30 May 2024 at 22:29, Amin Mosayyebzadeh 
wrote:

> The code should read testfile.csv file from s3. and print the content. It
> only prints a empty list although the file has content.
> I have also checked our custom s3 storage (Ceph based) logs and I see only
> LIST operations coming from Spark, there is no GET object operation for
> testfile.csv
>
> The only error I see in DEBUG output is these lines:
>
> =
> 24/05/30 15:39:21 INFO MetadataLogFileIndex: Reading streaming file log
> from s3a://test-bucket/testfile.csv/_spark_metadata
> 24/05/30 15:39:21 DEBUG UserGroupInformation: PrivilegedAction [as: ubuntu
> (auth:SIMPLE)][action: org.apache.hadoop.fs.FileContext$2@7af85238]
> java.lang.Exception
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
> at
> org.apache.hadoop.fs.FileContext.getAbstractFileSystem(FileContext.java:339)
> at
> org.apache.hadoop.fs.FileContext.getFileContext(FileContext.java:465)
> at
> org.apache.spark.sql.execution.streaming.AbstractFileContextBasedCheckpointFileManager.(CheckpointFileManager.scala:311)
> at
> org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.(CheckpointFileManager.scala:352)
> at
> org.apache.spark.sql.execution.streaming.CheckpointFileManager$.create(CheckpointFileManager.scala:209)
> at
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.(HDFSMetadataLog.scala:64)
> at
> org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.(CompactibleFileStreamLog.scala:48)
> at
> org.apache.spark.sql.execution.streaming.FileStreamSinkLog.(FileStreamSinkLog.scala:91)
> at
> org.apache.spark.sql.execution.streaming.MetadataLogFileIndex.(MetadataLogFileIndex.scala:52)
> at
> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:369)
> at
> org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
> at
> org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
> at scala.Option.getOrElse(Option.scala:201)
> at
> org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
> at
> org.apache.spark.sql.DataFrameReader.text(DataFrameReader.scala:646)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> at
> py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
> at py4j.Gateway.invoke(Gateway.java:282)
> at
> py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> at py4j.commands.CallCommand.execute(CallCommand.java:79)
> at
> py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
> at 

Re: [s3a] Spark is not reading s3 object content

2024-05-30 Thread Amin Mosayyebzadeh
The code should read testfile.csv file from s3. and print the content. It
only prints a empty list although the file has content.
I have also checked our custom s3 storage (Ceph based) logs and I see only
LIST operations coming from Spark, there is no GET object operation for
testfile.csv

The only error I see in DEBUG output is these lines:

=
24/05/30 15:39:21 INFO MetadataLogFileIndex: Reading streaming file log
from s3a://test-bucket/testfile.csv/_spark_metadata
24/05/30 15:39:21 DEBUG UserGroupInformation: PrivilegedAction [as: ubuntu
(auth:SIMPLE)][action: org.apache.hadoop.fs.FileContext$2@7af85238]
java.lang.Exception
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
at
org.apache.hadoop.fs.FileContext.getAbstractFileSystem(FileContext.java:339)
at
org.apache.hadoop.fs.FileContext.getFileContext(FileContext.java:465)
at
org.apache.spark.sql.execution.streaming.AbstractFileContextBasedCheckpointFileManager.(CheckpointFileManager.scala:311)
at
org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.(CheckpointFileManager.scala:352)
at
org.apache.spark.sql.execution.streaming.CheckpointFileManager$.create(CheckpointFileManager.scala:209)
at
org.apache.spark.sql.execution.streaming.HDFSMetadataLog.(HDFSMetadataLog.scala:64)
at
org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.(CompactibleFileStreamLog.scala:48)
at
org.apache.spark.sql.execution.streaming.FileStreamSinkLog.(FileStreamSinkLog.scala:91)
at
org.apache.spark.sql.execution.streaming.MetadataLogFileIndex.(MetadataLogFileIndex.scala:52)
at
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:369)
at
org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
at
org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
at scala.Option.getOrElse(Option.scala:201)
at
org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
at
org.apache.spark.sql.DataFrameReader.text(DataFrameReader.scala:646)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:829)

===
Which I am not sure if it is related since Spark can see and list the
bucket (it also returns the correct object size which is 33 Bytes.).

Best,
Amin


On Thu, May 30, 2024 at 4:05 PM Mich Talebzadeh 
wrote:

> Hello,
>
> Overall, the exit code of 0 suggests a successful run of your Spark job.
> Analyze the intended purpose of your code and verify the output or Spark UI
> for further confirmation.
>
> 24/05/30 01:23:43 INFO SparkContext: SparkContext is stopping with
> exitCode 0.
>
> what to check
>
>
>1. Verify Output: If your Spark job was intended to read data from S3
>and process it, you will need to verify the output to ensure the data was
>handled correctly. This might involve checking if any results were written
>to a designated location or if any transformations were applied
>successfully.
>2. Review Code:
>3. Check Spark UI:
>
>
> HTH
>
> Mich Talebzadeh,
> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
> PhD  Imperial College
> London 
> London, United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  Von
> Braun )".
>
>
> On Thu, 30 May 2024 at 11:56, Amin Mosayyebzadeh 
> wrote:
>
>> Hi Mich,
>>
>> Thank you for the help and sorry about the late reply.
>> I 

Re: [s3a] Spark is not reading s3 object content

2024-05-30 Thread Mich Talebzadeh
Hello,

Overall, the exit code of 0 suggests a successful run of your Spark job.
Analyze the intended purpose of your code and verify the output or Spark UI
for further confirmation.

24/05/30 01:23:43 INFO SparkContext: SparkContext is stopping with exitCode
0.

what to check


   1. Verify Output: If your Spark job was intended to read data from S3
   and process it, you will need to verify the output to ensure the data was
   handled correctly. This might involve checking if any results were written
   to a designated location or if any transformations were applied
   successfully.
   2. Review Code:
   3. Check Spark UI:


HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
PhD  Imperial College
London 
London, United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Thu, 30 May 2024 at 11:56, Amin Mosayyebzadeh 
wrote:

> Hi Mich,
>
> Thank you for the help and sorry about the late reply.
> I ran your provided but I got "exitCode 0". Here is the complete output:
>
> ===
>
>
> 24/05/30 01:23:38 INFO SparkContext: Running Spark version 3.5.0
> 24/05/30 01:23:38 INFO SparkContext: OS info Linux, 5.4.0-182-generic,
> amd64
> 24/05/30 01:23:38 INFO SparkContext: Java version 11.0.22
> 24/05/30 01:23:38 WARN NativeCodeLoader: Unable to load native-hadoop
> library for your platform... using builtin-java classes where applicable
> 24/05/30 01:23:38 INFO ResourceUtils:
> ==
> 24/05/30 01:23:38 INFO ResourceUtils: No custom resources configured for
> spark.driver.
> 24/05/30 01:23:38 INFO ResourceUtils:
> ==
> 24/05/30 01:23:38 INFO SparkContext: Submitted application: S3ReadTest
> 24/05/30 01:23:38 INFO ResourceProfile: Default ResourceProfile created,
> executor resources: Map(cores -> name: cores, amount: 1, script: , vendor:
> , memory -> name: memory, amount: 1024, script: , vendor: , offHeap ->
> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus ->
> name: cpus, amount: 1.0)
> 24/05/30 01:23:38 INFO ResourceProfile: Limiting resource is cpu
> 24/05/30 01:23:38 INFO ResourceProfileManager: Added ResourceProfile id: 0
> 24/05/30 01:23:38 INFO SecurityManager: Changing view acls to: ubuntu
> 24/05/30 01:23:38 INFO SecurityManager: Changing modify acls to: ubuntu
> 24/05/30 01:23:38 INFO SecurityManager: Changing view acls groups to:
> 24/05/30 01:23:38 INFO SecurityManager: Changing modify acls groups to:
> 24/05/30 01:23:38 INFO SecurityManager: SecurityManager: authentication
> disabled; ui acls disabled; users with view permissions: ubuntu; groups
> with view permissions: EMPTY; users with modify permissions: ubuntu; groups
> with modify permissions: EMPTY
> 24/05/30 01:23:38 INFO Utils: Successfully started service 'sparkDriver'
> on port 46321.
> 24/05/30 01:23:38 INFO SparkEnv: Registering MapOutputTracker
> 24/05/30 01:23:38 INFO SparkEnv: Registering BlockManagerMaster
> 24/05/30 01:23:38 INFO BlockManagerMasterEndpoint: Using
> org.apache.spark.storage.DefaultTopologyMapper for getting topology
> information
> 24/05/30 01:23:38 INFO BlockManagerMasterEndpoint:
> BlockManagerMasterEndpoint up
> 24/05/30 01:23:38 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
> 24/05/30 01:23:38 INFO DiskBlockManager: Created local directory at
> /tmp/blockmgr-a1fc37d5-885a-4ed0-b8f2-4eeb930c69ee
> 24/05/30 01:23:38 INFO MemoryStore: MemoryStore started with capacity 2.8
> GiB
> 24/05/30 01:23:38 INFO SparkEnv: Registering OutputCommitCoordinator
> 24/05/30 01:23:39 INFO JettyUtils: Start Jetty 0.0.0.0:4040 for SparkUI
> 24/05/30 01:23:39 INFO Utils: Successfully started service 'SparkUI' on
> port 4040.
> 24/05/30 01:23:39 INFO Executor: Starting executor ID driver on host
> MOC-R4PAC08U33-S1C
> 24/05/30 01:23:39 INFO Executor: OS info Linux, 5.4.0-182-generic, amd64
> 24/05/30 01:23:39 INFO Executor: Java version 11.0.22
> 24/05/30 01:23:39 INFO Executor: Starting executor with user classpath
> (userClassPathFirst = false): ''
> 24/05/30 01:23:39 INFO Executor: Created or updated repl class loader
> org.apache.spark.util.MutableURLClassLoader@a45f4d6 for default.
> 24/05/30 01:23:39 INFO Utils: Successfully started service
> 'org.apache.spark.network.netty.NettyBlockTransferService' on port 39343.
> 24/05/30 01:23:39 INFO NettyBlockTransferService: Server 

Re: [s3a] Spark is not reading s3 object content

2024-05-29 Thread Amin Mosayyebzadeh
Hi Mich,

Thank you for the help and sorry about the late reply.
I ran your provided but I got "exitCode 0". Here is the complete output:

===


24/05/30 01:23:38 INFO SparkContext: Running Spark version 3.5.0
24/05/30 01:23:38 INFO SparkContext: OS info Linux, 5.4.0-182-generic, amd64
24/05/30 01:23:38 INFO SparkContext: Java version 11.0.22
24/05/30 01:23:38 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
24/05/30 01:23:38 INFO ResourceUtils:
==
24/05/30 01:23:38 INFO ResourceUtils: No custom resources configured for
spark.driver.
24/05/30 01:23:38 INFO ResourceUtils:
==
24/05/30 01:23:38 INFO SparkContext: Submitted application: S3ReadTest
24/05/30 01:23:38 INFO ResourceProfile: Default ResourceProfile created,
executor resources: Map(cores -> name: cores, amount: 1, script: , vendor:
, memory -> name: memory, amount: 1024, script: , vendor: , offHeap ->
name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus ->
name: cpus, amount: 1.0)
24/05/30 01:23:38 INFO ResourceProfile: Limiting resource is cpu
24/05/30 01:23:38 INFO ResourceProfileManager: Added ResourceProfile id: 0
24/05/30 01:23:38 INFO SecurityManager: Changing view acls to: ubuntu
24/05/30 01:23:38 INFO SecurityManager: Changing modify acls to: ubuntu
24/05/30 01:23:38 INFO SecurityManager: Changing view acls groups to:
24/05/30 01:23:38 INFO SecurityManager: Changing modify acls groups to:
24/05/30 01:23:38 INFO SecurityManager: SecurityManager: authentication
disabled; ui acls disabled; users with view permissions: ubuntu; groups
with view permissions: EMPTY; users with modify permissions: ubuntu; groups
with modify permissions: EMPTY
24/05/30 01:23:38 INFO Utils: Successfully started service 'sparkDriver' on
port 46321.
24/05/30 01:23:38 INFO SparkEnv: Registering MapOutputTracker
24/05/30 01:23:38 INFO SparkEnv: Registering BlockManagerMaster
24/05/30 01:23:38 INFO BlockManagerMasterEndpoint: Using
org.apache.spark.storage.DefaultTopologyMapper for getting topology
information
24/05/30 01:23:38 INFO BlockManagerMasterEndpoint:
BlockManagerMasterEndpoint up
24/05/30 01:23:38 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
24/05/30 01:23:38 INFO DiskBlockManager: Created local directory at
/tmp/blockmgr-a1fc37d5-885a-4ed0-b8f2-4eeb930c69ee
24/05/30 01:23:38 INFO MemoryStore: MemoryStore started with capacity 2.8
GiB
24/05/30 01:23:38 INFO SparkEnv: Registering OutputCommitCoordinator
24/05/30 01:23:39 INFO JettyUtils: Start Jetty 0.0.0.0:4040 for SparkUI
24/05/30 01:23:39 INFO Utils: Successfully started service 'SparkUI' on
port 4040.
24/05/30 01:23:39 INFO Executor: Starting executor ID driver on host
MOC-R4PAC08U33-S1C
24/05/30 01:23:39 INFO Executor: OS info Linux, 5.4.0-182-generic, amd64
24/05/30 01:23:39 INFO Executor: Java version 11.0.22
24/05/30 01:23:39 INFO Executor: Starting executor with user classpath
(userClassPathFirst = false): ''
24/05/30 01:23:39 INFO Executor: Created or updated repl class loader
org.apache.spark.util.MutableURLClassLoader@a45f4d6 for default.
24/05/30 01:23:39 INFO Utils: Successfully started service
'org.apache.spark.network.netty.NettyBlockTransferService' on port 39343.
24/05/30 01:23:39 INFO NettyBlockTransferService: Server created on
MOC-R4PAC08U33-S1C:39343
24/05/30 01:23:39 INFO BlockManager: Using
org.apache.spark.storage.RandomBlockReplicationPolicy for block replication
policy
24/05/30 01:23:39 INFO BlockManagerMaster: Registering BlockManager
BlockManagerId(driver, MOC-R4PAC08U33-S1C, 39343, None)
24/05/30 01:23:39 INFO BlockManagerMasterEndpoint: Registering block
manager MOC-R4PAC08U33-S1C:39343 with 2.8 GiB RAM, BlockManagerId(driver,
MOC-R4PAC08U33-S1C, 39343, None)
24/05/30 01:23:39 INFO BlockManagerMaster: Registered BlockManager
BlockManagerId(driver, MOC-R4PAC08U33-S1C, 39343, None)
24/05/30 01:23:39 INFO BlockManager: Initialized BlockManager:
BlockManagerId(driver, MOC-R4PAC08U33-S1C, 39343, None)
24/05/30 01:23:39 INFO SharedState: Setting hive.metastore.warehouse.dir
('null') to the value of spark.sql.warehouse.dir.
24/05/30 01:23:39 INFO SharedState: Warehouse path is
'file:/home/ubuntu/tpch-spark/spark-warehouse'.
24/05/30 01:23:40 WARN MetricsConfig: Cannot locate configuration: tried
hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
24/05/30 01:23:40 INFO MetricsSystemImpl: Scheduled Metric snapshot period
at 10 second(s).
24/05/30 01:23:40 INFO MetricsSystemImpl: s3a-file-system metrics system
started
24/05/30 01:23:41 INFO MetadataLogFileIndex: Reading streaming file log
from s3a://test-bucket/testfile.csv/_spark_metadata
24/05/30 01:23:41 INFO FileStreamSinkLog: BatchIds found from listing:
24/05/30 01:23:43 INFO FileSourceStrategy: Pushed Filters:
24/05/30 01:23:43 INFO FileSourceStrategy: Post-Scan 

[Spark on k8s] A issue of k8s resource creation order

2024-05-29 Thread Tao Yang
Hi, team! I have a spark on k8s issue which posts in
https://stackoverflow.com/questions/78537132/spark-on-k8s-resource-creation-order

Need help


Tox and Pyspark

2024-05-29 Thread Perez
Hi Team,

I need help with this
https://stackoverflow.com/questions/78547676/tox-with-pyspark


Re: OOM concern

2024-05-29 Thread Perez
Thanks Mich for the detailed explanation.

On Tue, May 28, 2024 at 9:53 PM Mich Talebzadeh 
wrote:

> Russell mentioned some of these issues before. So in short your mileage
> varies. For a 100 GB data transfer, the speed difference between Glue and
> EMR might not be significant, especially considering the benefits of Glue's
> managed service aspects. However, for much larger datasets or scenarios
> where speed is critical, EMR's customization options might provide a slight
> edge.
>
> My recommendation is test and Compare: If speed is a concern, consider
> running a test job with both Glue and EMR (if feasible) on a smaller subset
> of your data to compare transfer times and costs in your specific
> environment.. Focus on Benefits: If the speed difference with Glue is
> minimal but it offers significant benefits in terms of management and cost
> for your use case, Glue might still be the preferable option.. Also
> bandwidth: Ensure your network bandwidth between the database and S3 is
> sufficient to handle the data transfer rate, regardless of the service you
> choose.
>
>
> HTH
> Mich Talebzadeh,
> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
> PhD  Imperial College
> London 
> London, United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  Von
> Braun )".
>
>
> On Tue, 28 May 2024 at 16:40, Perez  wrote:
>
>> Thanks Mich.
>>
>> Yes, I agree on the costing part but how does the data transfer speed be
>> impacted? Is it because glue takes some time to initialize underlying
>> resources and then process the data?
>>
>>
>> On Tue, May 28, 2024 at 2:23 PM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Your mileage varies as usual
>>>
>>> Glue with DPUs seems like a strong contender for your data transfer
>>> needs based on the simplicity, scalability, and managed service aspects.
>>> However, if data transfer speed is critical or costs become a concern after
>>> testing, consider EMR as an alternative.
>>>
>>> HTH
>>>
>>> Mich Talebzadeh,
>>> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
>>> PhD  Imperial
>>> College London 
>>> London, United Kingdom
>>>
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* The information provided is correct to the best of my
>>> knowledge but of course cannot be guaranteed . It is essential to note
>>> that, as with any advice, quote "one test result is worth one-thousand
>>> expert opinions (Werner
>>> Von Braun
>>> )".
>>>
>>>
>>> On Tue, 28 May 2024 at 09:04, Perez  wrote:
>>>
 Thank you everyone for your response.

 I am not getting any errors as of now. I am just trying to choose the
 right tool for my task which is data loading from an external source into
 s3 via Glue/EMR.

 I think Glue job would be the best fit for me because I can calculate
 DPUs needed (maybe keeping some extra buffer) so just wanted to check if
 there are any edge cases I need to consider.


 On Tue, May 28, 2024 at 5:39 AM Russell Jurney <
 russell.jur...@gmail.com> wrote:

> If you’re using EMR and Spark, you need to choose nodes with enough
> RAM to accommodate any given partition in your data or you can get an OOM
> error. Not sure if this job involves a reduce, but I would choose a single
> 128GB+ memory optimized instance and then adjust parallelism as via the
> Dpark docs using pyspark.sql.DataFrame.repartition(n) at the start of your
> job.
>
> Thanks,
> Russell Jurney @rjurney 
> russell.jur...@gmail.com LI  FB
>  datasyndrome.com
>
>
> On Mon, May 27, 2024 at 9:15 AM Perez  wrote:
>
>> Hi Team,
>>
>> I want to extract the data from DB and just dump it into S3. I
>> don't have to perform any transformations on the data yet. My data size
>> would be ~100 GB (historical load).
>>
>> Choosing the right DPUs(Glue jobs) should solve this problem right?
>> Or should I move to EMR.
>>
>> I don't feel 

Re: Re: EXT: Dual Write to HDFS and MinIO in faster way

2024-05-28 Thread Gera Shegalov
I agree with the previous answers that (if requirements allow it) it is
much easier to just orchestrate a copy either in the same app or sync
externally.

A long time ago and not for a Spark app we were solving a similar usecase
via
https://hadoop.apache.org/docs/r3.2.3/hadoop-project-dist/hadoop-hdfs/ViewFs.html#Multi-Filesystem_I.2F0_with_Nfly_Mount_Points
. It may work with Spark because it is underneath the FileSystem API ...



On Tue, May 21, 2024 at 10:03 PM Prem Sahoo  wrote:

> I am looking for writer/comitter optimization which can make the spark
> write faster.
>
> On Tue, May 21, 2024 at 9:15 PM eab...@163.com  wrote:
>
>> Hi,
>> I think you should write to HDFS then copy file (parquet or orc)
>> from HDFS to MinIO.
>>
>> --
>> eabour
>>
>>
>> *From:* Prem Sahoo 
>> *Date:* 2024-05-22 00:38
>> *To:* Vibhor Gupta ; user
>> 
>> *Subject:* Re: EXT: Dual Write to HDFS and MinIO in faster way
>>
>>
>> On Tue, May 21, 2024 at 6:58 AM Prem Sahoo  wrote:
>>
>>> Hello Vibhor,
>>> Thanks for the suggestion .
>>> I am looking for some other alternatives where I can use the same
>>> dataframe can be written to two destinations without re execution and cache
>>> or persist .
>>>
>>> Can some one help me in scenario 2 ?
>>> How to make spark write to MinIO faster ?
>>> Sent from my iPhone
>>>
>>> On May 21, 2024, at 1:18 AM, Vibhor Gupta 
>>> wrote:
>>>
>>> 
>>>
>>> Hi Prem,
>>>
>>>
>>>
>>> You can try to write to HDFS then read from HDFS and write to MinIO.
>>>
>>>
>>>
>>> This will prevent duplicate transformation.
>>>
>>>
>>>
>>> You can also try persisting the dataframe using the DISK_ONLY level.
>>>
>>>
>>>
>>> Regards,
>>>
>>> Vibhor
>>>
>>> *From: *Prem Sahoo 
>>> *Date: *Tuesday, 21 May 2024 at 8:16 AM
>>> *To: *Spark dev list 
>>> *Subject: *EXT: Dual Write to HDFS and MinIO in faster way
>>>
>>> *EXTERNAL: *Report suspicious emails to *Email Abuse.*
>>>
>>> Hello Team,
>>>
>>> I am planning to write to two datasource at the same time .
>>>
>>>
>>>
>>> Scenario:-
>>>
>>>
>>>
>>> Writing the same dataframe to HDFS and MinIO without re-executing the
>>> transformations and no cache(). Then how can we make it faster ?
>>>
>>>
>>>
>>> Read the parquet file and do a few transformations and write to HDFS and
>>> MinIO.
>>>
>>>
>>>
>>> here in both write spark needs execute the transformation again. Do we
>>> know how we can avoid re-execution of transformation  without
>>> cache()/persist ?
>>>
>>>
>>>
>>> Scenario2 :-
>>>
>>> I am writing 3.2G data to HDFS and MinIO which takes ~6mins.
>>>
>>> Do we have any way to make writing this faster ?
>>>
>>>
>>>
>>> I don't want to do repartition and write as repartition will have
>>> overhead of shuffling .
>>>
>>>
>>>
>>> Please provide some inputs.
>>>
>>>
>>>
>>>
>>>
>>>


Re: OOM concern

2024-05-28 Thread Russell Jurney
If Glue lets you take a configuration based approach, and you don't have to
operate any servers as with EMR... use Glue. Try EMR if that is troublesome.

Russ

On Tue, May 28, 2024 at 9:23 AM Mich Talebzadeh 
wrote:

> Russell mentioned some of these issues before. So in short your mileage
> varies. For a 100 GB data transfer, the speed difference between Glue and
> EMR might not be significant, especially considering the benefits of Glue's
> managed service aspects. However, for much larger datasets or scenarios
> where speed is critical, EMR's customization options might provide a slight
> edge.
>
> My recommendation is test and Compare: If speed is a concern, consider
> running a test job with both Glue and EMR (if feasible) on a smaller subset
> of your data to compare transfer times and costs in your specific
> environment.. Focus on Benefits: If the speed difference with Glue is
> minimal but it offers significant benefits in terms of management and cost
> for your use case, Glue might still be the preferable option.. Also
> bandwidth: Ensure your network bandwidth between the database and S3 is
> sufficient to handle the data transfer rate, regardless of the service you
> choose.
>
>
> HTH
> Mich Talebzadeh,
> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
> PhD  Imperial College
> London 
> London, United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  Von
> Braun )".
>
>
> On Tue, 28 May 2024 at 16:40, Perez  wrote:
>
>> Thanks Mich.
>>
>> Yes, I agree on the costing part but how does the data transfer speed be
>> impacted? Is it because glue takes some time to initialize underlying
>> resources and then process the data?
>>
>>
>> On Tue, May 28, 2024 at 2:23 PM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Your mileage varies as usual
>>>
>>> Glue with DPUs seems like a strong contender for your data transfer
>>> needs based on the simplicity, scalability, and managed service aspects.
>>> However, if data transfer speed is critical or costs become a concern after
>>> testing, consider EMR as an alternative.
>>>
>>> HTH
>>>
>>> Mich Talebzadeh,
>>> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
>>> PhD  Imperial
>>> College London 
>>> London, United Kingdom
>>>
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* The information provided is correct to the best of my
>>> knowledge but of course cannot be guaranteed . It is essential to note
>>> that, as with any advice, quote "one test result is worth one-thousand
>>> expert opinions (Werner
>>> Von Braun
>>> )".
>>>
>>>
>>> On Tue, 28 May 2024 at 09:04, Perez  wrote:
>>>
 Thank you everyone for your response.

 I am not getting any errors as of now. I am just trying to choose the
 right tool for my task which is data loading from an external source into
 s3 via Glue/EMR.

 I think Glue job would be the best fit for me because I can calculate
 DPUs needed (maybe keeping some extra buffer) so just wanted to check if
 there are any edge cases I need to consider.


 On Tue, May 28, 2024 at 5:39 AM Russell Jurney <
 russell.jur...@gmail.com> wrote:

> If you’re using EMR and Spark, you need to choose nodes with enough
> RAM to accommodate any given partition in your data or you can get an OOM
> error. Not sure if this job involves a reduce, but I would choose a single
> 128GB+ memory optimized instance and then adjust parallelism as via the
> Dpark docs using pyspark.sql.DataFrame.repartition(n) at the start of your
> job.
>
> Thanks,
> Russell Jurney @rjurney 
> russell.jur...@gmail.com LI  FB
>  datasyndrome.com
>
>
> On Mon, May 27, 2024 at 9:15 AM Perez  wrote:
>
>> Hi Team,
>>
>> I want to extract the data from DB and just dump it into S3. I
>> don't have to perform any transformations on the data yet. My data size
>> would be ~100 GB (historical load).
>>
>> Choosing 

Re: OOM concern

2024-05-28 Thread Mich Talebzadeh
Russell mentioned some of these issues before. So in short your mileage
varies. For a 100 GB data transfer, the speed difference between Glue and
EMR might not be significant, especially considering the benefits of Glue's
managed service aspects. However, for much larger datasets or scenarios
where speed is critical, EMR's customization options might provide a slight
edge.

My recommendation is test and Compare: If speed is a concern, consider
running a test job with both Glue and EMR (if feasible) on a smaller subset
of your data to compare transfer times and costs in your specific
environment.. Focus on Benefits: If the speed difference with Glue is
minimal but it offers significant benefits in terms of management and cost
for your use case, Glue might still be the preferable option.. Also
bandwidth: Ensure your network bandwidth between the database and S3 is
sufficient to handle the data transfer rate, regardless of the service you
choose.


HTH
Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
PhD  Imperial College
London 
London, United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Tue, 28 May 2024 at 16:40, Perez  wrote:

> Thanks Mich.
>
> Yes, I agree on the costing part but how does the data transfer speed be
> impacted? Is it because glue takes some time to initialize underlying
> resources and then process the data?
>
>
> On Tue, May 28, 2024 at 2:23 PM Mich Talebzadeh 
> wrote:
>
>> Your mileage varies as usual
>>
>> Glue with DPUs seems like a strong contender for your data transfer needs
>> based on the simplicity, scalability, and managed service aspects. However,
>> if data transfer speed is critical or costs become a concern after testing,
>> consider EMR as an alternative.
>>
>> HTH
>>
>> Mich Talebzadeh,
>> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
>> PhD  Imperial
>> College London 
>> London, United Kingdom
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* The information provided is correct to the best of my
>> knowledge but of course cannot be guaranteed . It is essential to note
>> that, as with any advice, quote "one test result is worth one-thousand
>> expert opinions (Werner
>> Von Braun
>> )".
>>
>>
>> On Tue, 28 May 2024 at 09:04, Perez  wrote:
>>
>>> Thank you everyone for your response.
>>>
>>> I am not getting any errors as of now. I am just trying to choose the
>>> right tool for my task which is data loading from an external source into
>>> s3 via Glue/EMR.
>>>
>>> I think Glue job would be the best fit for me because I can calculate
>>> DPUs needed (maybe keeping some extra buffer) so just wanted to check if
>>> there are any edge cases I need to consider.
>>>
>>>
>>> On Tue, May 28, 2024 at 5:39 AM Russell Jurney 
>>> wrote:
>>>
 If you’re using EMR and Spark, you need to choose nodes with enough RAM
 to accommodate any given partition in your data or you can get an OOM
 error. Not sure if this job involves a reduce, but I would choose a single
 128GB+ memory optimized instance and then adjust parallelism as via the
 Dpark docs using pyspark.sql.DataFrame.repartition(n) at the start of your
 job.

 Thanks,
 Russell Jurney @rjurney 
 russell.jur...@gmail.com LI  FB
  datasyndrome.com


 On Mon, May 27, 2024 at 9:15 AM Perez  wrote:

> Hi Team,
>
> I want to extract the data from DB and just dump it into S3. I
> don't have to perform any transformations on the data yet. My data size
> would be ~100 GB (historical load).
>
> Choosing the right DPUs(Glue jobs) should solve this problem right? Or
> should I move to EMR.
>
> I don't feel the need to move to EMR but wanted the expertise
> suggestions.
>
> TIA.
>



Re: OOM concern

2024-05-28 Thread Perez
Thanks Mich.

Yes, I agree on the costing part but how does the data transfer speed be
impacted? Is it because glue takes some time to initialize underlying
resources and then process the data?


On Tue, May 28, 2024 at 2:23 PM Mich Talebzadeh 
wrote:

> Your mileage varies as usual
>
> Glue with DPUs seems like a strong contender for your data transfer needs
> based on the simplicity, scalability, and managed service aspects. However,
> if data transfer speed is critical or costs become a concern after testing,
> consider EMR as an alternative.
>
> HTH
>
> Mich Talebzadeh,
> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
> PhD  Imperial College
> London 
> London, United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  Von
> Braun )".
>
>
> On Tue, 28 May 2024 at 09:04, Perez  wrote:
>
>> Thank you everyone for your response.
>>
>> I am not getting any errors as of now. I am just trying to choose the
>> right tool for my task which is data loading from an external source into
>> s3 via Glue/EMR.
>>
>> I think Glue job would be the best fit for me because I can calculate
>> DPUs needed (maybe keeping some extra buffer) so just wanted to check if
>> there are any edge cases I need to consider.
>>
>>
>> On Tue, May 28, 2024 at 5:39 AM Russell Jurney 
>> wrote:
>>
>>> If you’re using EMR and Spark, you need to choose nodes with enough RAM
>>> to accommodate any given partition in your data or you can get an OOM
>>> error. Not sure if this job involves a reduce, but I would choose a single
>>> 128GB+ memory optimized instance and then adjust parallelism as via the
>>> Dpark docs using pyspark.sql.DataFrame.repartition(n) at the start of your
>>> job.
>>>
>>> Thanks,
>>> Russell Jurney @rjurney 
>>> russell.jur...@gmail.com LI  FB
>>>  datasyndrome.com
>>>
>>>
>>> On Mon, May 27, 2024 at 9:15 AM Perez  wrote:
>>>
 Hi Team,

 I want to extract the data from DB and just dump it into S3. I
 don't have to perform any transformations on the data yet. My data size
 would be ~100 GB (historical load).

 Choosing the right DPUs(Glue jobs) should solve this problem right? Or
 should I move to EMR.

 I don't feel the need to move to EMR but wanted the expertise
 suggestions.

 TIA.

>>>


Re: OOM concern

2024-05-28 Thread Mich Talebzadeh
Your mileage varies as usual

Glue with DPUs seems like a strong contender for your data transfer needs
based on the simplicity, scalability, and managed service aspects. However,
if data transfer speed is critical or costs become a concern after testing,
consider EMR as an alternative.

HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
PhD  Imperial College
London 
London, United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Tue, 28 May 2024 at 09:04, Perez  wrote:

> Thank you everyone for your response.
>
> I am not getting any errors as of now. I am just trying to choose the
> right tool for my task which is data loading from an external source into
> s3 via Glue/EMR.
>
> I think Glue job would be the best fit for me because I can calculate DPUs
> needed (maybe keeping some extra buffer) so just wanted to check if there
> are any edge cases I need to consider.
>
>
> On Tue, May 28, 2024 at 5:39 AM Russell Jurney 
> wrote:
>
>> If you’re using EMR and Spark, you need to choose nodes with enough RAM
>> to accommodate any given partition in your data or you can get an OOM
>> error. Not sure if this job involves a reduce, but I would choose a single
>> 128GB+ memory optimized instance and then adjust parallelism as via the
>> Dpark docs using pyspark.sql.DataFrame.repartition(n) at the start of your
>> job.
>>
>> Thanks,
>> Russell Jurney @rjurney 
>> russell.jur...@gmail.com LI  FB
>>  datasyndrome.com
>>
>>
>> On Mon, May 27, 2024 at 9:15 AM Perez  wrote:
>>
>>> Hi Team,
>>>
>>> I want to extract the data from DB and just dump it into S3. I
>>> don't have to perform any transformations on the data yet. My data size
>>> would be ~100 GB (historical load).
>>>
>>> Choosing the right DPUs(Glue jobs) should solve this problem right? Or
>>> should I move to EMR.
>>>
>>> I don't feel the need to move to EMR but wanted the expertise
>>> suggestions.
>>>
>>> TIA.
>>>
>>


Re: OOM concern

2024-05-27 Thread Perez
Thank you everyone for your response.

I am not getting any errors as of now. I am just trying to choose the right
tool for my task which is data loading from an external source into s3 via
Glue/EMR.

I think Glue job would be the best fit for me because I can calculate DPUs
needed (maybe keeping some extra buffer) so just wanted to check if there
are any edge cases I need to consider.


On Tue, May 28, 2024 at 5:39 AM Russell Jurney 
wrote:

> If you’re using EMR and Spark, you need to choose nodes with enough RAM to
> accommodate any given partition in your data or you can get an OOM error.
> Not sure if this job involves a reduce, but I would choose a single 128GB+
> memory optimized instance and then adjust parallelism as via the Dpark docs
> using pyspark.sql.DataFrame.repartition(n) at the start of your job.
>
> Thanks,
> Russell Jurney @rjurney 
> russell.jur...@gmail.com LI  FB
>  datasyndrome.com
>
>
> On Mon, May 27, 2024 at 9:15 AM Perez  wrote:
>
>> Hi Team,
>>
>> I want to extract the data from DB and just dump it into S3. I
>> don't have to perform any transformations on the data yet. My data size
>> would be ~100 GB (historical load).
>>
>> Choosing the right DPUs(Glue jobs) should solve this problem right? Or
>> should I move to EMR.
>>
>> I don't feel the need to move to EMR but wanted the expertise suggestions.
>>
>> TIA.
>>
>


Re: Spark Protobuf Deserialization

2024-05-27 Thread Sandish Kumar HN
Did you try using to_protobuf and from_protobuf ?

https://spark.apache.org/docs/latest/sql-data-sources-protobuf.html


On Mon, May 27, 2024 at 15:45 Satyam Raj  wrote:

> Hello guys,
> We're using Spark 3.5.0 for processing Kafka source that contains protobuf
> serialized data. The format is as follows:
>
> message Request {
>   long sent_ts = 1;
>   Event[] event = 2;
> }
>
> message Event {
>  string event_name = 1;
>  bytes event_bytes = 2;
> }
>
> The event_bytes contains the data for the event_name. event_name is the
> className of the Protobuf class.
> Currently, we parse the protobuf message from the Kafka topic, and for
> every event in the array, push the event_bytes to the `event_name` topic,
> over which spark jobs run and use the same event_name protobuf class to
> deserialize the data.
>
> Is there a better way to do all this in a single job?
>


Re: OOM concern

2024-05-27 Thread Russell Jurney
If you’re using EMR and Spark, you need to choose nodes with enough RAM to
accommodate any given partition in your data or you can get an OOM error.
Not sure if this job involves a reduce, but I would choose a single 128GB+
memory optimized instance and then adjust parallelism as via the Dpark docs
using pyspark.sql.DataFrame.repartition(n) at the start of your job.

Thanks,
Russell Jurney @rjurney 
russell.jur...@gmail.com LI  FB
 datasyndrome.com


On Mon, May 27, 2024 at 9:15 AM Perez  wrote:

> Hi Team,
>
> I want to extract the data from DB and just dump it into S3. I
> don't have to perform any transformations on the data yet. My data size
> would be ~100 GB (historical load).
>
> Choosing the right DPUs(Glue jobs) should solve this problem right? Or
> should I move to EMR.
>
> I don't feel the need to move to EMR but wanted the expertise suggestions.
>
> TIA.
>


Re: OOM concern

2024-05-27 Thread Meena Rajani
What exactly is the error? Is it erroring out while reading the data from
db? How are you partitioning the data?

How much memory currently do you have? What is the network time out?

Regards,
Meena


On Mon, May 27, 2024 at 4:22 PM Perez  wrote:

> Hi Team,
>
> I want to extract the data from DB and just dump it into S3. I
> don't have to perform any transformations on the data yet. My data size
> would be ~100 GB (historical load).
>
> Choosing the right DPUs(Glue jobs) should solve this problem right? Or
> should I move to EMR.
>
> I don't feel the need to move to EMR but wanted the expertise suggestions.
>
> TIA.
>


Re: [Spark SQL]: Does Spark support processing records with timestamp NULL in stateful streaming?

2024-05-27 Thread Mich Talebzadeh
When you use applyInPandasWithState, Spark processes each input row as it
arrives, regardless of whether certain columns, such as the timestamp
column, contain NULL values. This behavior is useful where you want to
handle incomplete or missing data gracefully within your stateful
processing logic. By allowing NULL timestamps to trigger calls to the
stateful function, you can implement custom handling strategies, such as
skipping incomplete records, within your stateful function.


However, it is important to understand that this behavior also *means that
the watermark is not advanced for NULL timestamps*. The watermark is used
for event-time processing in Spark Structured Streaming, to track the
progress of event-time in your data stream and is typically based on the
timestamp column. Since NULL timestamps do not contribute to the watermark
advancement,

Regarding whether you can rely on this behavior for your production code,
it largely depends on your requirements and use case. If your application
logic is designed to handle NULL timestamps appropriately and you have
tested it to ensure it behaves as expected, then you can generally rely on
this behavior. FYI, I have not tested it myself, so I cannot provide a
definitive answer.

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
PhD  Imperial College
London 
London, United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Mon, 27 May 2024 at 22:04, Juan Casse  wrote:

> I am using applyInPandasWithState in PySpark 3.5.0.
>
> I noticed that records with timestamp==NULL are processed (i.e., trigger a
> call to the stateful function). And, as you would expect, does not advance
> the watermark.
>
> I am taking advantage of this in my application.
>
> My question: Is this a supported feature of Spark? Can I rely on this
> behavior for my production code?
>
> Thanks,
> Juan
>


Spark Protobuf Deserialization

2024-05-27 Thread Satyam Raj
Hello guys,
We're using Spark 3.5.0 for processing Kafka source that contains protobuf
serialized data. The format is as follows:

message Request {
  long sent_ts = 1;
  Event[] event = 2;
}

message Event {
 string event_name = 1;
 bytes event_bytes = 2;
}

The event_bytes contains the data for the event_name. event_name is the
className of the Protobuf class.
Currently, we parse the protobuf message from the Kafka topic, and for
every event in the array, push the event_bytes to the `event_name` topic,
over which spark jobs run and use the same event_name protobuf class to
deserialize the data.

Is there a better way to do all this in a single job?


[Spark SQL]: Does Spark support processing records with timestamp NULL in stateful streaming?

2024-05-27 Thread Juan Casse
I am using applyInPandasWithState in PySpark 3.5.0.

I noticed that records with timestamp==NULL are processed (i.e., trigger a
call to the stateful function). And, as you would expect, does not advance
the watermark.

I am taking advantage of this in my application.

My question: Is this a supported feature of Spark? Can I rely on this
behavior for my production code?

Thanks,
Juan


OOM concern

2024-05-27 Thread Perez
Hi Team,

I want to extract the data from DB and just dump it into S3. I
don't have to perform any transformations on the data yet. My data size
would be ~100 GB (historical load).

Choosing the right DPUs(Glue jobs) should solve this problem right? Or
should I move to EMR.

I don't feel the need to move to EMR but wanted the expertise suggestions.

TIA.


Re: Subject: [Spark SQL] [Debug] Spark Memory Issue with DataFrame Processing

2024-05-27 Thread Shay Elbaz
Seen this before; had a very(!) complex plan behind a DataFrame, to the point 
where any additional transformation went OOM on the driver.

A quick and ugly solution was to break the plan - convert the DataFrame to rdd 
and back to DF at certain points to make the plan shorter. This has obvious 
drawbacks, and is not recommended in general, but at least we had something 
working. The real, long-term solution was to replace the many ( > 200)  
withColumn() calls to only a few select() calls. You can easily find sources on 
the internet for why this is better. (it was on Spark 2.3, but I think the main 
principles remain). TBH, it was easier than I expected, as it mainly involved 
moving pieces of code from one place to another, and not a "real", meaningful 
refactoring.



From: Mich Talebzadeh 
Sent: Monday, May 27, 2024 15:43
Cc: user@spark.apache.org 
Subject: Re: Subject: [Spark SQL] [Debug] Spark Memory Issue with DataFrame 
Processing


This message contains hyperlinks, take precaution before opening these links.

Few ideas on top of my head for how to go about solving the problem


  1.  Try with subsets: Try reproducing the issue with smaller subsets of your 
data to pinpoint the specific operation causing the memory problems.
  2.  Explode or Flatten Nested Structures: If your DataFrame schema involves 
deep nesting, consider using techniques like explode or flattening to transform 
it into a less nested structure. This can reduce memory usage during operations 
like withColumn.
  3.  Lazy Evaluation: Use select before withColumn: this ensures lazy 
evaluation, meaning Spark only materializes the data when necessary. This can 
improve memory usage compared to directly calling withColumn on the entire 
DataFrame.
  4.  spark.sql.shuffle.partitions: Setting this configuration to a value close 
to the number of executors can improve shuffle performance and potentially 
reduce memory usage.
  5.  Spark UI Monitoring: Utilize the Spark UI to monitor memory usage 
throughout your job execution and identify potential memory bottlenecks.

HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
London
United Kingdom


 
[https://ci3.googleusercontent.com/mail-sig/AIorK4zholKucR2Q9yMrKbHNn-o1TuS4mYXyi2KO6Xmx6ikHPySa9MLaLZ8t2hrA6AUcxSxDgHIwmKE]
   view my Linkedin 
profile


 https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: The information provided is correct to the best of my knowledge but 
of course cannot be guaranteed . It is essential to note that, as with any 
advice, quote "one test result is worth one-thousand expert opinions (Werner 
 Von 
Braun)".



On Mon, 27 May 2024 at 12:50, Gaurav Madan 
 wrote:
Dear Community,

I'm reaching out to seek your assistance with a memory issue we've been facing 
while processing certain large and nested DataFrames using Apache Spark. We 
have encountered a scenario where the driver runs out of memory when applying 
the `withColumn` method on specific DataFrames in Spark 3.4.1. However, the 
same DataFrames are processed successfully in Spark 2.4.0.

Problem Summary:
For certain DataFrames, applying the `withColumn` method in Spark 3.4.1 causes 
the driver to choke and run out of memory. However, the same DataFrames are 
processed successfully in Spark 2.4.0.

Heap Dump Analysis:
We performed a heap dump analysis after enabling heap dump on out-of-memory 
errors, and the analysis revealed the following significant frames and local 
variables:

```

org.apache.spark.sql.Dataset.withPlan(Lorg/apache/spark/sql/catalyst/plans/logical/LogicalPlan;)Lorg/apache/spark/sql/Dataset;
 (Dataset.scala:4273)

org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%) bytes

org.apache.spark.sql.Dataset.select(Lscala/collection/Seq;)Lorg/apache/spark/sql/Dataset;
 (Dataset.scala:1622)

org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%) bytes

org.apache.spark.sql.Dataset.withColumns(Lscala/collection/Seq;Lscala/collection/Seq;)Lorg/apache/spark/sql/Dataset;
 (Dataset.scala:2820)

org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%) bytes

org.apache.spark.sql.Dataset.withColumn(Ljava/lang/String;Lorg/apache/spark/sql/Column;)Lorg/apache/spark/sql/Dataset;
 (Dataset.scala:2759)

org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%) bytes

com.urbanclap.dp.eventpersistence.utils.DataPersistenceUtil$.addStartTimeInPartitionColumn(Lorg/apache/spark/sql/Dataset;Lcom/urbanclap/dp/factory/output/Event;Lcom/urbanclap/dp/eventpersistence/JobMetadata;)Lorg/apache/spark/sql/Dataset;
 (DataPersistenceUtil.scala:88)

org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%) bytes


Re: Subject: [Spark SQL] [Debug] Spark Memory Issue with DataFrame Processing

2024-05-27 Thread Mich Talebzadeh
Few ideas on top of my head for how to go about solving the problem


   1. Try with subsets: Try reproducing the issue with smaller subsets of
   your data to pinpoint the specific operation causing the memory problems.
   2. Explode or Flatten Nested Structures: If your DataFrame schema
   involves deep nesting, consider using techniques like explode or flattening
   to transform it into a less nested structure. This can reduce memory usage
   during operations like withColumn.
   3. Lazy Evaluation: Use select before withColumn: this ensures lazy
   evaluation, meaning Spark only materializes the data when necessary. This
   can improve memory usage compared to directly calling withColumn on the
   entire DataFrame.
   4. spark.sql.shuffle.partitions: Setting this configuration to a value
   close to the number of executors can improve shuffle performance and
   potentially reduce memory usage.
   5. Spark UI Monitoring: Utilize the Spark UI to monitor memory usage
   throughout your job execution and identify potential memory bottlenecks.

HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".



On Mon, 27 May 2024 at 12:50, Gaurav Madan
 wrote:

> Dear Community,
>
> I'm reaching out to seek your assistance with a memory issue we've been
> facing while processing certain large and nested DataFrames using Apache
> Spark. We have encountered a scenario where the driver runs out of memory
> when applying the `withColumn` method on specific DataFrames in Spark
> 3.4.1. However, the same DataFrames are processed successfully in Spark
> 2.4.0.
>
>
> *Problem Summary:*For certain DataFrames, applying the `withColumn`
> method in Spark 3.4.1 causes the driver to choke and run out of memory.
> However, the same DataFrames are processed successfully in Spark 2.4.0.
>
>
> *Heap Dump Analysis:*We performed a heap dump analysis after enabling
> heap dump on out-of-memory errors, and the analysis revealed the following
> significant frames and local variables:
>
> ```
>
> org.apache.spark.sql.Dataset.withPlan(Lorg/apache/spark/sql/catalyst/plans/logical/LogicalPlan;)Lorg/apache/spark/sql/Dataset;
> (Dataset.scala:4273)
>
> org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%)
> bytes
>
> org.apache.spark.sql.Dataset.select(Lscala/collection/Seq;)Lorg/apache/spark/sql/Dataset;
> (Dataset.scala:1622)
>
> org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%)
> bytes
>
> org.apache.spark.sql.Dataset.withColumns(Lscala/collection/Seq;Lscala/collection/Seq;)Lorg/apache/spark/sql/Dataset;
> (Dataset.scala:2820)
>
> org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%)
> bytes
>
> org.apache.spark.sql.Dataset.withColumn(Ljava/lang/String;Lorg/apache/spark/sql/Column;)Lorg/apache/spark/sql/Dataset;
> (Dataset.scala:2759)
>
> org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%)
> bytes
>
> com.urbanclap.dp.eventpersistence.utils.DataPersistenceUtil$.addStartTimeInPartitionColumn(Lorg/apache/spark/sql/Dataset;Lcom/urbanclap/dp/factory/output/Event;Lcom/urbanclap/dp/eventpersistence/JobMetadata;)Lorg/apache/spark/sql/Dataset;
> (DataPersistenceUtil.scala:88)
>
> org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%)
> bytes
>
> com.urbanclap.dp.eventpersistence.utils.DataPersistenceUtil$.writeRawData(Lorg/apache/spark/sql/Dataset;Lcom/urbanclap/dp/factory/output/Event;Ljava/lang/String;Lcom/urbanclap/dp/eventpersistence/JobMetadata;)V
> (DataPersistenceUtil.scala:19)
>
> org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%)
> bytes
>
> com.urbanclap.dp.eventpersistence.step.bronze.BronzeStep$.persistRecords(Lorg/apache/spark/sql/Dataset;Lcom/urbanclap/dp/factory/output/Event;Lcom/urbanclap/dp/eventpersistence/JobMetadata;)V
> (BronzeStep.scala:23)
>
> org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%)
> bytes
>
> com.urbanclap.dp.eventpersistence.MainJob$.processRecords(Lorg/apache/spark/sql/SparkSession;Lcom/urbanclap/dp/factory/output/Event;Lcom/urbanclap/dp/eventpersistence/JobMetadata;Lorg/apache/spark/sql/Dataset;)V
> (MainJob.scala:78)
>
> org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%)
> bytes
>
> com.urbanclap.dp.eventpersistence.MainJob$.processBatchCB(Lorg/apache/spark/sql/SparkSession;Lcom/urbanclap/dp/eventpersistence/JobMetadata;Lcom/urbanclap/dp/factory/output/Event;Lorg/apache/spark/sql/Dataset;)V
> 

Subject: [Spark SQL] [Debug] Spark Memory Issue with DataFrame Processing

2024-05-27 Thread Gaurav Madan
Dear Community,

I'm reaching out to seek your assistance with a memory issue we've been
facing while processing certain large and nested DataFrames using Apache
Spark. We have encountered a scenario where the driver runs out of memory
when applying the `withColumn` method on specific DataFrames in Spark
3.4.1. However, the same DataFrames are processed successfully in Spark
2.4.0.


*Problem Summary:*For certain DataFrames, applying the `withColumn` method
in Spark 3.4.1 causes the driver to choke and run out of memory. However,
the same DataFrames are processed successfully in Spark 2.4.0.


*Heap Dump Analysis:*We performed a heap dump analysis after enabling heap
dump on out-of-memory errors, and the analysis revealed the following
significant frames and local variables:

```

org.apache.spark.sql.Dataset.withPlan(Lorg/apache/spark/sql/catalyst/plans/logical/LogicalPlan;)Lorg/apache/spark/sql/Dataset;
(Dataset.scala:4273)

org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%)
bytes

org.apache.spark.sql.Dataset.select(Lscala/collection/Seq;)Lorg/apache/spark/sql/Dataset;
(Dataset.scala:1622)

org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%)
bytes

org.apache.spark.sql.Dataset.withColumns(Lscala/collection/Seq;Lscala/collection/Seq;)Lorg/apache/spark/sql/Dataset;
(Dataset.scala:2820)

org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%)
bytes

org.apache.spark.sql.Dataset.withColumn(Ljava/lang/String;Lorg/apache/spark/sql/Column;)Lorg/apache/spark/sql/Dataset;
(Dataset.scala:2759)

org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%)
bytes

com.urbanclap.dp.eventpersistence.utils.DataPersistenceUtil$.addStartTimeInPartitionColumn(Lorg/apache/spark/sql/Dataset;Lcom/urbanclap/dp/factory/output/Event;Lcom/urbanclap/dp/eventpersistence/JobMetadata;)Lorg/apache/spark/sql/Dataset;
(DataPersistenceUtil.scala:88)

org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%)
bytes

com.urbanclap.dp.eventpersistence.utils.DataPersistenceUtil$.writeRawData(Lorg/apache/spark/sql/Dataset;Lcom/urbanclap/dp/factory/output/Event;Ljava/lang/String;Lcom/urbanclap/dp/eventpersistence/JobMetadata;)V
(DataPersistenceUtil.scala:19)

org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%)
bytes

com.urbanclap.dp.eventpersistence.step.bronze.BronzeStep$.persistRecords(Lorg/apache/spark/sql/Dataset;Lcom/urbanclap/dp/factory/output/Event;Lcom/urbanclap/dp/eventpersistence/JobMetadata;)V
(BronzeStep.scala:23)

org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%)
bytes

com.urbanclap.dp.eventpersistence.MainJob$.processRecords(Lorg/apache/spark/sql/SparkSession;Lcom/urbanclap/dp/factory/output/Event;Lcom/urbanclap/dp/eventpersistence/JobMetadata;Lorg/apache/spark/sql/Dataset;)V
(MainJob.scala:78)

org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%)
bytes

com.urbanclap.dp.eventpersistence.MainJob$.processBatchCB(Lorg/apache/spark/sql/SparkSession;Lcom/urbanclap/dp/eventpersistence/JobMetadata;Lcom/urbanclap/dp/factory/output/Event;Lorg/apache/spark/sql/Dataset;)V
(MainJob.scala:66)

org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%)
bytes

```


*Driver Configuration:*1. Driver instance: c6g.xlarge with 4 vCPUs and 8 GB
RAM.
2.  `spark.driver.memory` and `spark.driver.memoryOverhead` are set to
default values.


*Observations:*- The DataFrame schema is very nested and large, which might
be contributing to the memory issue.
- Despite similar configurations, Spark 2.4.0 processes the DataFrame
without issues, while Spark 3.4.1 does not.


*Tried Solutions:*We have tried several solutions, including disabling
Adaptive Query Execution, setting the driver max result size, increasing
driver cores, and enabling specific optimizer rules. However, the issue
persisted until we increased the driver memory to 48 GB and memory overhead
to 5 GB, which allowed the driver to schedule the tasks successfully.


*Request for Suggestions:*Are there any additional configurations or
optimizations that could help mitigate this memory issue without always
resorting to a larger machine? We would greatly appreciate any insights or
recommendations from the community on how to resolve this issue effectively.

I have attached the DataFrame schema and the complete stack trace from the
heap dump analysis for your reference.

Doc explaining the issue

DataFrame Schema


Thank you in advance for your assistance.

Best regards,
Gaurav Madan
LinkedIn 
*Personal Mail: *gauravmadan...@gmail.com
*Work Mail:* gauravma...@urbancompany.com


Re: BUG :: UI Spark

2024-05-26 Thread Mich Talebzadeh
sorry i thought i gave an explanation

The issue you are encountering with incorrect record numbers in the
"ShuffleWrite Size/Records" column in the Spark DAG UI when data is read
from cache/persist is a known limitation. This discrepancy arises due to
the way Spark handles and reports shuffle data when caching is involved.

Mich Talebzadeh,

Technologist | Architect | Data Engineer  | Generative AI | FinCrime

London
United Kingdom


   view my Linkedin profile


 https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: The information provided is correct to the best of my knowledge
but of course cannot be guaranteed . It is essential to note that, as with
any advice, quote "one test result is worth one-thousand expert opinions
(Werner Von Braun)".


Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Sun, 26 May 2024 at 21:16, Prem Sahoo  wrote:

> Can anyone please assist me ?
>
> On Fri, May 24, 2024 at 12:29 AM Prem Sahoo  wrote:
>
>> Does anyone have a clue ?
>>
>> On Thu, May 23, 2024 at 11:40 AM Prem Sahoo  wrote:
>>
>>> Hello Team,
>>> in spark DAG UI , we have Stages tab. Once you click on each stage you
>>> can view the tasks.
>>>
>>> In each task we have a column "ShuffleWrite Size/Records " that column
>>> prints wrong data when it gets the data from cache/persist . it
>>> typically will show the wrong record number though the data size is correct
>>> for e.g  3.2G/ 7400 which is wrong .
>>>
>>> please advise.
>>>
>>


Re: BUG :: UI Spark

2024-05-26 Thread Mich Talebzadeh
Just to further clarify that the Shuffle Write Size/Records column in
the Spark UI can be misleading when working with cached/persisted data
because it reflects the shuffled data size and record count, not the
entire cached/persisted data., So it is fair to say that this is a
limitation of the UI's display, not necessarily a bug in the Spark
framework itself.

HTH

Mich Talebzadeh,

Technologist | Architect | Data Engineer  | Generative AI | FinCrime

London
United Kingdom


   view my Linkedin profile


 https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner Von Braun)".

Mich Talebzadeh,

Technologist | Architect | Data Engineer  | Generative AI | FinCrime

London
United Kingdom


   view my Linkedin profile


 https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner Von Braun)".



On Sun, 26 May 2024 at 16:45, Mich Talebzadeh  wrote:
>
> Yep, the Spark UI's Shuffle Write Size/Records" column can sometimes show 
> incorrect record counts when data is retrieved from cache or persisted data. 
> This happens because the record count reflects the number of records written 
> to disk for shuffling, and not the actual number of records in the cached or 
> persisted data itself. Add to it, because of lazy evaluation:, Spark may only 
> materialize a portion of the cached or persisted data when a task needs it. 
> The "Shuffle Write Size/Records" might only reflect the materialized portion, 
> not the total number of records in the cache/persistence. While the "Shuffle 
> Write Size/Records" might be inaccurate for cached/persisted data, the 
> "Shuffle Read Size/Records" column can be more reliable. This metric shows 
> the number of records read from shuffle by the following stage, which should 
> be closer to the actual number of records processed.
>
> HTH
>
> Mich Talebzadeh,
>
> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
>
> London
> United Kingdom
>
>
>view my Linkedin profile
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> Disclaimer: The information provided is correct to the best of my knowledge 
> but of course cannot be guaranteed . It is essential to note that, as with 
> any advice, quote "one test result is worth one-thousand expert opinions 
> (Werner Von Braun)".
>
>
>
> On Thu, 23 May 2024 at 17:45, Prem Sahoo  wrote:
>>
>> Hello Team,
>> in spark DAG UI , we have Stages tab. Once you click on each stage you can 
>> view the tasks.
>>
>> In each task we have a column "ShuffleWrite Size/Records " that column 
>> prints wrong data when it gets the data from cache/persist . it typically 
>> will show the wrong record number though the data size is correct for e.g  
>> 3.2G/ 7400 which is wrong .
>>
>> please advise.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: BUG :: UI Spark

2024-05-26 Thread Mich Talebzadeh
Yep, the Spark UI's Shuffle Write Size/Records" column can sometimes show
incorrect record counts *when data is retrieved from cache or persisted
data*. This happens because the record count reflects the number of records
written to disk for shuffling, and not the actual number of records in the
cached or persisted data itself. Add to it, because of lazy evaluation:,
Spark may only materialize a portion of the cached or persisted data when a
task needs it. The "Shuffle Write Size/Records" might only reflect the
materialized portion, not the total number of records in the
cache/persistence. While the "Shuffle Write Size/Records" might be
inaccurate for cached/persisted data, the "Shuffle Read Size/Records"
column can be more reliable. This metric shows the number of records read
from shuffle by the following stage, which should be closer to the actual
number of records processed.

HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Thu, 23 May 2024 at 17:45, Prem Sahoo  wrote:

> Hello Team,
> in spark DAG UI , we have Stages tab. Once you click on each stage you can
> view the tasks.
>
> In each task we have a column "ShuffleWrite Size/Records " that column
> prints wrong data when it gets the data from cache/persist . it
> typically will show the wrong record number though the data size is correct
> for e.g  3.2G/ 7400 which is wrong .
>
> please advise.
>


Re: BUG :: UI Spark

2024-05-26 Thread Sathi Chowdhury
Can you please explain how did you realize it’s wrong? Did you check cloudwatch 
for the same metrics and compare? Also are you using do.cache() and expecting 
that shuffle read/write to go away ?


Sent from Yahoo Mail for iPhone


On Sunday, May 26, 2024, 7:53 AM, Prem Sahoo  wrote:

Can anyone please assist me ?
On Fri, May 24, 2024 at 12:29 AM Prem Sahoo  wrote:

Does anyone have a clue ?
On Thu, May 23, 2024 at 11:40 AM Prem Sahoo  wrote:

Hello Team,in spark DAG UI , we have Stages tab. Once you click on each stage 
you can view the tasks.
In each task we have a column "ShuffleWrite Size/Records " that column prints 
wrong data when it gets the data from cache/persist . it typically will show 
the wrong record number though the data size is correct for e.g  3.2G/ 7400 
which is wrong . 
please advise. 






Re: BUG :: UI Spark

2024-05-26 Thread Prem Sahoo
Can anyone please assist me ?

On Fri, May 24, 2024 at 12:29 AM Prem Sahoo  wrote:

> Does anyone have a clue ?
>
> On Thu, May 23, 2024 at 11:40 AM Prem Sahoo  wrote:
>
>> Hello Team,
>> in spark DAG UI , we have Stages tab. Once you click on each stage you
>> can view the tasks.
>>
>> In each task we have a column "ShuffleWrite Size/Records " that column
>> prints wrong data when it gets the data from cache/persist . it
>> typically will show the wrong record number though the data size is correct
>> for e.g  3.2G/ 7400 which is wrong .
>>
>> please advise.
>>
>


Re: Can Spark Catalog Perform Multimodal Database Query Analysis

2024-05-24 Thread Mich Talebzadeh
Something like this in Python

from pyspark.sql import SparkSession

# Configure Spark Session with JDBC URLs
spark_conf = SparkConf() \
  .setAppName("SparkCatalogMultipleSources") \
  .set("hive.metastore.uris",
"thrift://hive1-metastore:9080,thrift://hive2-metastore:9080")

jdbc_urls = ["jdbc:hive2://hive1-jdbc:1",
"jdbc:hive2://hive2-jdbc:1"]
mysql_jdbc_url = "jdbc:mysql://mysql-host:3306/mysql_database"

spark = SparkSession.builder \
  .config(spark_conf) \
  .enableHiveSupport() \
  .getOrCreate()

# Accessing tables from Hive1, Hive2, and MySQL
spark.sql("SELECT * FROM hive1.table1").show()
spark.sql("SELECT * FROM hive2.table2").show()
spark.sql("SELECT * FROM mysql.table1").show()

# Optional: Create temporary views for easier joining (if needed)
spark.sql("CREATE TEMPORARY VIEW hive1_table1 AS SELECT * FROM
hive1.table1")
spark.sql("CREATE TEMPORARY VIEW hive2_table2 AS SELECT * FROM
hive2.table2")
spark.sql("CREATE TEMPORARY VIEW mysql_table1 AS SELECT * FROM
mysql.table1")

HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Fri, 24 May 2024 at 09:41, 志阳 <308027...@qq.com.invalid> wrote:

> I have two clusters hive1 and hive2, as well as a MySQL database. Can I
> use Spark Catalog for registration, but can I only use one catalog at a
> time? Can multiple catalogs be joined across databases.
> select * from
>  hive1.table1 join hive2.table2 join mysql.table1
> where 
>
> --
> 志阳
> 308027...@qq.com
>
> 
>
>


Can Spark Catalog Perform Multimodal Database Query Analysis

2024-05-24 Thread ????
I have two clusters hive1 and hive2, as well as a MySQL database. Can I use 
Spark Catalog for registration, but can I only use one catalog at a time? Can 
multiple catalogs be joined across databases.
select * from
hive1.table1 join hive2.table2 join mysql.table1
where 





308027...@qq.com





Re: Dstream HasOffsetRanges equivalent in Structured streaming

2024-05-24 Thread Anil Dasari
It appears that structured streaming and Dstream have entirely different
microbatch metadata representation
Can someone assist me in finding the following Dstream microbatch metadata
equivalent in Structured streaming.

1. microbatch timestamp : structured streaming foreachBatch gives batchID
which is not a timestamp. Is there a way to get the microbatch timestamp ?
2. microbatch start event ?
3. scheduling delay of a microbatch ?
4. pending microbatches in case of fixed internal microbatches ?

Thanks

On Wed, May 22, 2024 at 5:23 PM Anil Dasari  wrote:

> You are right.
> - another question on migration. Is there a way to get the microbatch id
> during the microbatch dataset `trasform` operation like in rdd transform ?
> I am attempting to implement the following pseudo functionality with
> structured streaming. In this approach, recordCategoriesMetadata is fetched
> and rdd metrics like rdd size etc using microbatch idin the transform
> operation.
> ```code
> val rddQueue = new mutable.Queue[RDD[Int]]()
> // source components
> val sources = Seq.empty[String]
> val consolidatedDstream = sources
> .map(source => {
> val inputStream = ssc.queueStream(rddQueue)
> inputStream.transform((rdd, ts) => {
> // emit metrics of microbatch ts : rdd size etc.
>
> val recordCategories = rdd.map(..).collect();
> val recordCategoriesMetadata = ...
> rdd
> .map(r =>
> val customRecord = CustomRecord(r, SomeMetadataOfRecordSchema)
> (source, customRecord)
> )
> })
> }
> )
> .reduceLeft(_ union _)
>
> consolidatedDstream
> .foreachRDD((rdd, ts) => {
> // get pipes for each source
> val pipes = Seq.empty[String] // pipes of given source
> pipes.foreach(pipe => {
> val pipeSource = null; // get from pipe variable
> val psRDD = rdd
> .filter {
> case (source, sourceRDD) => source.equals(pipeSource)
> }
> // apply pipe transformation and sink
>
> })
> })
> ```
>
> In structured streaming, it can look like -
>
> ```code
> val consolidatedDstream = sources
> .map(source => {
> val inputStream = ... (for each source)
> inputStream
> }
> )
> .reduceLeft(_ union _)
>
> consolidatedDstream
> .writeStream
> .foreachBatch((ds, ts) => {
> val newDS = ds.transform((internalDS => {
> // emit metrics of microbatch ts : rdd size etc.
>
> val recordCategories = rdd.map(..).collect();
> val recordCategoriesMetadata = ...
> internalDS
> .map(r =>
> val customRecord = CustomRecord(r, SomeMetadataOfRecordSchema)
> (source, customRecord)
> )
> })(... )
> // get pipes for each source
> val pipes = Seq.empty[String] // pipes of given source
> pipes.foreach(pipe => {
> val pipeSource = null; // get from pipe variable
> val psRDD = newDS
> .filter {
> case (source, sourceDS) => source.equals(pipeSource)
> }
> // apply pipe transformation and sink
>
> })
> })
> ```
> ^ is just pseudo code and still not sure if it works. Let me know your
> suggestions if any. thanks.
>
> On Wed, May 22, 2024 at 8:34 AM Tathagata Das 
> wrote:
>
>> The right way to associated microbatches when committing to external
>> storage is to use the microbatch id that you can get in foreachBatch. That
>> microbatch id guarantees that the data produced in the batch is the always
>> the same no matter any recomputations (assuming all processing logic is
>> deterministic). So you can commit the batch id + batch data together. And
>> then async commit the batch id + offsets.
>>
>> On Wed, May 22, 2024 at 11:27 AM Anil Dasari 
>> wrote:
>>
>>> Thanks Das, Mtich.
>>>
>>> Mitch,
>>> We process data from Kafka and write it to S3 in Parquet format using
>>> Dstreams. To ensure exactly-once delivery and prevent data loss, our
>>> process records micro-batch offsets to an external storage at the end of
>>> each micro-batch in foreachRDD, which is then used when the job restarts.
>>>
>>> Das,
>>> Thanks for sharing the details. I will look into them.
>>> Unfortunately, the listeners process is async and can't
>>> guarantee happens before association with microbatch to commit offsets to
>>> external storage. But still they will work. Is there a way to access
>>> lastProgress in foreachBatch ?
>>>
>>>
>>> On Wed, May 22, 2024 at 7:35 AM Tathagata Das <
>>> tathagata.das1...@gmail.com> wrote:
>>>
 If you want to find what offset ranges are present in a microbatch in
 Structured Streaming, you have to look at the
 StreamingQuery.lastProgress or use the QueryProgressListener
 .
 Both of these approaches gives you access to the SourceProgress
 
 which gives Kafka offsets as a JSON string.

 Hope this helps!

 On Wed, May 22, 2024 at 10:04 AM Mich Talebzadeh <
 mich.talebza...@gmail.com> wrote:

> OK to understand better your current model relies on streaming data
> input through Kafka topic, Spark does some ETL and you send to a sink, 

Re: BUG :: UI Spark

2024-05-23 Thread Prem Sahoo
Does anyone have a clue ?

On Thu, May 23, 2024 at 11:40 AM Prem Sahoo  wrote:

> Hello Team,
> in spark DAG UI , we have Stages tab. Once you click on each stage you can
> view the tasks.
>
> In each task we have a column "ShuffleWrite Size/Records " that column
> prints wrong data when it gets the data from cache/persist . it
> typically will show the wrong record number though the data size is correct
> for e.g  3.2G/ 7400 which is wrong .
>
> please advise.
>


Re: [s3a] Spark is not reading s3 object content

2024-05-23 Thread Mich Talebzadeh
Could be a number of reasons

First test reading the file with a cli

aws s3 cp s3a://input/testfile.csv .
cat testfile.csv


Try this code with debug option to diagnose the problem

from pyspark.sql import SparkSession
from pyspark.sql.utils import AnalysisException

try:
# Initialize Spark session
spark = SparkSession.builder \
.appName("S3ReadTest") \
.config("spark.jars.packages",
"org.apache.hadoop:hadoop-aws:3.3.6") \
.config("spark.hadoop.fs.s3a.access.key", "R*6") \
.config("spark.hadoop.fs.s3a.secret.key", "1***e") \
.config("spark.hadoop.fs.s3a.endpoint", "192.168.52.63:8000") \
.config("spark.hadoop.fs.s3a.path.style.access", "true") \
.config("spark.hadoop.fs.s3a.impl",
"org.apache.hadoop.fs.s3a.S3AFileSystem") \
.getOrCreate()

# Read the CSV file from S3
df = spark.read \
.option("header", "true") \
.option("inferSchema", "true") \
.option("delimiter", " ") \  # ensure this is apace
.csv("s3a://input/testfile.csv")

# Show the data
df.show(n=1)

except AnalysisException as e:
print(f"AnalysisException: {e}")
except Exception as e:
print(f"Error: {e}")
finally:
# Stop the Spark session
spark.stop()

HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Thu, 23 May 2024 at 20:14, Amin Mosayyebzadeh 
wrote:

> I am trying to read an s3 object from a local S3 storage (Ceph based)
> using Spark 3.5.1. I see it can access the bucket and list the files (I
> have verified it on Ceph side by checking its logs), even returning the
> correct size of the object. But the content is not read.
>
> The object url is:
> s3a://input/testfile.csv (I have also tested a nested bucket:
> s3a://test1/test2/test3/testfile.csv)
>
>
> Object's content:
>
> =
> name int1 int2
> first 1 2
> second 3 4
> =
>
>
> Here is the config I have set so far:
>
> ("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.6")
> ("spark.hadoop.fs.s3a.access.key", "R*6")
> ("spark.hadoop.fs.s3a.secret.key", "1***e")
> ("spark.hadoop.fs.s3a.endpoint", "192.168.52.63:8000")
> ("spark.hadoop.fs.s3a.path.style.access", "true")
> ("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
>
>
> The outop for my following Pyspark application:
> df = spark.read \
> .option("header", "true") \
> .schema(schema) \
> .csv("s3a://input/testfile.csv", sep=' ')
>
> df.show(n=1)
> ==
> 24/05/20 02:35:00 INFO MetricsSystemImpl: s3a-file-system metrics system 
> started24/05/20 02:35:01 INFO MetadataLogFileIndex: Reading streaming file 
> log from s3a://input/testfile.csv/_spark_metadata24/05/20 02:35:01 INFO 
> FileStreamSinkLog: BatchIds found from listing:24/05/20 02:35:03 INFO 
> FileSourceStrategy: Pushed Filters:24/05/20 02:35:03 INFO FileSourceStrategy: 
> Post-Scan Filters:24/05/20 02:35:03 INFO CodeGenerator: Code generated in 
> 176.139675 ms24/05/20 02:35:03 INFO MemoryStore: Block broadcast_0 stored as 
> values in memory (estimated size 496.6 KiB, free 4.1 GiB)24/05/20 02:35:03 
> INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory 
> (estimated size 54.4 KiB, free 4.1 GiB)24/05/20 02:35:03 INFO 
> BlockManagerInfo: Added broadcast_0_piece0 in memory on master:38197 (size: 
> 54.4 KiB, free: 4.1 GiB)24/05/20 02:35:03 INFO SparkContext: Created 
> broadcast 0 from showString at NativeMethodAccessorImpl.java:024/05/20 
> 02:35:03 INFO FileSourceScanExec: Planning scan with bin packing, max size: 
> 4194304 bytes, open cost is considered as scanning 4194304 bytes.
> ++++
> |name|int1|int2|
> ++++
> ++++
> 24/05/20 02:35:04 INFO SparkContext: Invoking stop() from shutdown 
> hook24/05/20 02:35:04 INFO SparkContext: SparkContext is stopping with 
> exitCode 0
> =
>
> Am I missing something here?
>
> P.S. I see OP_IS_DIRECTORY is set to 1. Is that a correct behavior?
>
>
> Thanks in advance!
>
>


BUG :: UI Spark

2024-05-23 Thread Prem Sahoo
Hello Team,
in spark DAG UI , we have Stages tab. Once you click on each stage you can
view the tasks.

In each task we have a column "ShuffleWrite Size/Records " that column
prints wrong data when it gets the data from cache/persist . it
typically will show the wrong record number though the data size is correct
for e.g  3.2G/ 7400 which is wrong .

please advise.


[s3a] Spark is not reading s3 object content

2024-05-23 Thread Amin Mosayyebzadeh
 I am trying to read an s3 object from a local S3 storage (Ceph based)
using Spark 3.5.1. I see it can access the bucket and list the files (I
have verified it on Ceph side by checking its logs), even returning the
correct size of the object. But the content is not read.

The object url is:
s3a://input/testfile.csv (I have also tested a nested bucket:
s3a://test1/test2/test3/testfile.csv)


Object's content:

=
name int1 int2
first 1 2
second 3 4
=


Here is the config I have set so far:

("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.6")
("spark.hadoop.fs.s3a.access.key", "R*6")
("spark.hadoop.fs.s3a.secret.key", "1***e")
("spark.hadoop.fs.s3a.endpoint", "192.168.52.63:8000")
("spark.hadoop.fs.s3a.path.style.access", "true")
("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")


The outop for my following Pyspark application:
df = spark.read \
.option("header", "true") \
.schema(schema) \
.csv("s3a://input/testfile.csv", sep=' ')

df.show(n=1)
==
24/05/20 02:35:00 INFO MetricsSystemImpl: s3a-file-system metrics
system started24/05/20 02:35:01 INFO MetadataLogFileIndex: Reading
streaming file log from
s3a://input/testfile.csv/_spark_metadata24/05/20 02:35:01 INFO
FileStreamSinkLog: BatchIds found from listing:24/05/20 02:35:03 INFO
FileSourceStrategy: Pushed Filters:24/05/20 02:35:03 INFO
FileSourceStrategy: Post-Scan Filters:24/05/20 02:35:03 INFO
CodeGenerator: Code generated in 176.139675 ms24/05/20 02:35:03 INFO
MemoryStore: Block broadcast_0 stored as values in memory (estimated
size 496.6 KiB, free 4.1 GiB)24/05/20 02:35:03 INFO MemoryStore: Block
broadcast_0_piece0 stored as bytes in memory (estimated size 54.4 KiB,
free 4.1 GiB)24/05/20 02:35:03 INFO BlockManagerInfo: Added
broadcast_0_piece0 in memory on master:38197 (size: 54.4 KiB, free:
4.1 GiB)24/05/20 02:35:03 INFO SparkContext: Created broadcast 0 from
showString at NativeMethodAccessorImpl.java:024/05/20 02:35:03 INFO
FileSourceScanExec: Planning scan with bin packing, max size: 4194304
bytes, open cost is considered as scanning 4194304 bytes.
++++
|name|int1|int2|
++++
++++
24/05/20 02:35:04 INFO SparkContext: Invoking stop() from shutdown
hook24/05/20 02:35:04 INFO SparkContext: SparkContext is stopping with
exitCode 0
=

Am I missing something here?

P.S. I see OP_IS_DIRECTORY is set to 1. Is that a correct behavior?


Thanks in advance!


Re: Dstream HasOffsetRanges equivalent in Structured streaming

2024-05-22 Thread Anil Dasari
You are right.
- another question on migration. Is there a way to get the microbatch id
during the microbatch dataset `trasform` operation like in rdd transform ?
I am attempting to implement the following pseudo functionality with
structured streaming. In this approach, recordCategoriesMetadata is fetched
and rdd metrics like rdd size etc using microbatch idin the transform
operation.
```code
val rddQueue = new mutable.Queue[RDD[Int]]()
// source components
val sources = Seq.empty[String]
val consolidatedDstream = sources
.map(source => {
val inputStream = ssc.queueStream(rddQueue)
inputStream.transform((rdd, ts) => {
// emit metrics of microbatch ts : rdd size etc.

val recordCategories = rdd.map(..).collect();
val recordCategoriesMetadata = ...
rdd
.map(r =>
val customRecord = CustomRecord(r, SomeMetadataOfRecordSchema)
(source, customRecord)
)
})
}
)
.reduceLeft(_ union _)

consolidatedDstream
.foreachRDD((rdd, ts) => {
// get pipes for each source
val pipes = Seq.empty[String] // pipes of given source
pipes.foreach(pipe => {
val pipeSource = null; // get from pipe variable
val psRDD = rdd
.filter {
case (source, sourceRDD) => source.equals(pipeSource)
}
// apply pipe transformation and sink

})
})
```

In structured streaming, it can look like -

```code
val consolidatedDstream = sources
.map(source => {
val inputStream = ... (for each source)
inputStream
}
)
.reduceLeft(_ union _)

consolidatedDstream
.writeStream
.foreachBatch((ds, ts) => {
val newDS = ds.transform((internalDS => {
// emit metrics of microbatch ts : rdd size etc.

val recordCategories = rdd.map(..).collect();
val recordCategoriesMetadata = ...
internalDS
.map(r =>
val customRecord = CustomRecord(r, SomeMetadataOfRecordSchema)
(source, customRecord)
)
})(... )
// get pipes for each source
val pipes = Seq.empty[String] // pipes of given source
pipes.foreach(pipe => {
val pipeSource = null; // get from pipe variable
val psRDD = newDS
.filter {
case (source, sourceDS) => source.equals(pipeSource)
}
// apply pipe transformation and sink

})
})
```
^ is just pseudo code and still not sure if it works. Let me know your
suggestions if any. thanks.

On Wed, May 22, 2024 at 8:34 AM Tathagata Das 
wrote:

> The right way to associated microbatches when committing to external
> storage is to use the microbatch id that you can get in foreachBatch. That
> microbatch id guarantees that the data produced in the batch is the always
> the same no matter any recomputations (assuming all processing logic is
> deterministic). So you can commit the batch id + batch data together. And
> then async commit the batch id + offsets.
>
> On Wed, May 22, 2024 at 11:27 AM Anil Dasari 
> wrote:
>
>> Thanks Das, Mtich.
>>
>> Mitch,
>> We process data from Kafka and write it to S3 in Parquet format using
>> Dstreams. To ensure exactly-once delivery and prevent data loss, our
>> process records micro-batch offsets to an external storage at the end of
>> each micro-batch in foreachRDD, which is then used when the job restarts.
>>
>> Das,
>> Thanks for sharing the details. I will look into them.
>> Unfortunately, the listeners process is async and can't guarantee happens
>> before association with microbatch to commit offsets to external storage.
>> But still they will work. Is there a way to access lastProgress in
>> foreachBatch ?
>>
>>
>> On Wed, May 22, 2024 at 7:35 AM Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> If you want to find what offset ranges are present in a microbatch in
>>> Structured Streaming, you have to look at the
>>> StreamingQuery.lastProgress or use the QueryProgressListener
>>> .
>>> Both of these approaches gives you access to the SourceProgress
>>> 
>>> which gives Kafka offsets as a JSON string.
>>>
>>> Hope this helps!
>>>
>>> On Wed, May 22, 2024 at 10:04 AM Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
 OK to understand better your current model relies on streaming data
 input through Kafka topic, Spark does some ETL and you send to a sink, a
 database for file storage like HDFS etc?

 Your current architecture relies on Direct Streams (DStream) and RDDs
 and you want to move to Spark sStructured Streaming based on dataframes and
 datasets?

 You have not specified your sink

 With regard to your question?

 "Is there an equivalent of Dstream HasOffsetRanges in structure
 streaming to get the microbatch end offsets to the checkpoint in our
 external checkpoint store ?"

 There is not a direct equivalent of DStream HasOffsetRanges in Spark
 Structured Streaming. However, Structured Streaming provides mechanisms to
 achieve similar functionality:

 HTH

 Mich Talebzadeh,
 Technologist | Architect | Data Engineer  | 

Re: Dstream HasOffsetRanges equivalent in Structured streaming

2024-05-22 Thread Mich Talebzadeh
With regard to this sentence


*Offset Tracking with Structured Streaming:: While storing offsets in an
external storage with DStreams was necessary, SSS handles this
automatically through checkpointing. The checkpoints include the offsets
processed by each micro-batch. However, you can still access the most
recent offsets using the offset() method on your StreamingQuery object for
monitoring purposes that is if you need it*

In essence, with SSS and checkpointing in place, you can rely on the
automatic offset management provided by the framework,
*eliminating the need for the custom offset storage you had with DStreams.*

HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Wed, 22 May 2024 at 19:49, Mich Talebzadeh 
wrote:

> Hi  Anil,
>
> Ok let us put the complete picture here
>
>* Current DStreams Setup:*
>
>- Data Source: Kafka
>- Processing Engine: Spark DStreams
>- Data Transformation with Spark
>- Sink: S3
>- Data Format: Parquet
>- Exactly-Once Delivery (Attempted): You're attempting exactly-once
>delivery by recording micro-batch offsets in an external storage using
>foreachRDD at the end of each micro-batch. This allows you to potentially
>restart the job from the last processed offset in case of failures?
>- Challenges with DStreams for Exactly-Once Delivery: Spark DStreams
>offer limited built-in support for exactly-once delivery guarantees.
>
>
> *Moving to Spark Structured Streaming: (SSS)*
>
> All stays the same. except below
>
>
>- Exactly-Once Delivery which is guaranteed by SSS
>- Checkpointing: Enable checkpointing by setting the
>checkpointLocation option in  writeStream. Spark will periodically
>checkpoint the state of streaming query, including offsets, to a designated
>location (e.g., HDFS, cloud storage or SSD).
>- Offset Tracking with Structured Streaming:: While storing offsets in
>an external storage with DStreams was necessary, SSS handles this
>automatically through checkpointing. The checkpoints include the offsets
>processed by each micro-batch. However, you can still access the most
>recent offsets using the offset() method on your StreamingQuery object for
>monitoring purposes that is if you need it
>
> Have a look at this article of mine  about  structured streaming  and
> checkpointing
>
> Processing Change Data Capture with Spark Structured Streaming
> 
>
> In your case briefly
>
> def *store_offsets_to_checkpoint*(df, batchId):
> if(len(df.take(1))) > 0:
>  df. persist()
>  # Extract offsets from the DataFrame (assuming a column named
> 'offset')
>  offset_rows = df.select(col('offset')).rdd.collect()
>  # Create OffsetRange objects from extracted offsets
>  offsets = [OffsetRange(partition=row.partition,
> fromOffset=row.offset, toOffset=row.offset + 1) # Assuming 'partition'
> and 'offset' columns
> for row in offset_rows]
>  # Logic to store offsets in your external checkpoint store)
>   ..
>   df.unpersist()
> else:
>   print("DataFrame is empty")
>
> # Define your Structured Streaming application with Kafka source and sink
>
>"""
>"foreach" performs custom write logic on each row and
> "foreachBatch" performs custom write logic on each micro-batch through
> *store_offsets_to_checkpoint* function
> foreachBatch(*store_offsets_to_checkpoint*) expects 2
> parameters, first: micro-batch as DataFrame or Dataset and second: unique
> id for each batch
>Using foreachBatch, we write each micro batch to storage
> defined in our custom logic
> """
>
> streaming = spark.readStream \
>.format("kafka") \ .
> option("kafka.bootstrap.servers", "localhost:9092") \
>   .option("subscribe", "topic_name") \
>   .load()
>
> # Custom sink function to store offsets in checkpoint
> streaming = streaming.writeStream \
>  . format("memory")  \
>  *.option("checkpointLocation", "/path/to/checkpoint/store") \ *
>   .foreachBatch(*store_offsets_to_checkpoint*) \
>   .start()
>
> HTH
>
>
> 
>
> 

Remote File change detection in S3 when spark queries are running and parquet files in S3 changes

2024-05-22 Thread Raghvendra Yadav
Hello,
 We are hoping someone can help us understand the spark behavior
for scenarios listed below.

Q. *Will spark running queries fail when S3 parquet object changes
underneath with S3A remote file change detection enabled?  Is it 100%? *
Our understanding is that S3A has a feature for remote file change
detection using ETag, implemented in the S3AInputStream class.
This feature caches the ETag per S3AInputStream Instance and uses it to
detect file changes even if the stream is reopened. When running a Spark
query that uses FSDataInputStream, will it reliably detect changes in the
file on S3?

*Q2. Does spark work on a single instance of S3AInputStream for a parquet
file or can open multiple S3AInputStream for some queries? *



Thanks
Raghav


Re: Dstream HasOffsetRanges equivalent in Structured streaming

2024-05-22 Thread Mich Talebzadeh
Hi  Anil,

Ok let us put the complete picture here

   * Current DStreams Setup:*

   - Data Source: Kafka
   - Processing Engine: Spark DStreams
   - Data Transformation with Spark
   - Sink: S3
   - Data Format: Parquet
   - Exactly-Once Delivery (Attempted): You're attempting exactly-once
   delivery by recording micro-batch offsets in an external storage using
   foreachRDD at the end of each micro-batch. This allows you to potentially
   restart the job from the last processed offset in case of failures?
   - Challenges with DStreams for Exactly-Once Delivery: Spark DStreams
   offer limited built-in support for exactly-once delivery guarantees.


*Moving to Spark Structured Streaming: (SSS)*

All stays the same. except below


   - Exactly-Once Delivery which is guaranteed by SSS
   - Checkpointing: Enable checkpointing by setting the checkpointLocation
   option in  writeStream. Spark will periodically checkpoint the state of
   streaming query, including offsets, to a designated location (e.g., HDFS,
   cloud storage or SSD).
   - Offset Tracking with Structured Streaming:: While storing offsets in
   an external storage with DStreams was necessary, SSS handles this
   automatically through checkpointing. The checkpoints include the offsets
   processed by each micro-batch. However, you can still access the most
   recent offsets using the offset() method on your StreamingQuery object for
   monitoring purposes that is if you need it

Have a look at this article of mine  about  structured streaming  and
checkpointing

Processing Change Data Capture with Spark Structured Streaming


In your case briefly

def *store_offsets_to_checkpoint*(df, batchId):
if(len(df.take(1))) > 0:
 df. persist()
 # Extract offsets from the DataFrame (assuming a column named
'offset')
 offset_rows = df.select(col('offset')).rdd.collect()
 # Create OffsetRange objects from extracted offsets
 offsets = [OffsetRange(partition=row.partition,
fromOffset=row.offset, toOffset=row.offset + 1) # Assuming 'partition' and
'offset' columns
for row in offset_rows]
 # Logic to store offsets in your external checkpoint store)
  ..
  df.unpersist()
else:
  print("DataFrame is empty")

# Define your Structured Streaming application with Kafka source and sink

   """
   "foreach" performs custom write logic on each row and
"foreachBatch" performs custom write logic on each micro-batch through
*store_offsets_to_checkpoint* function
foreachBatch(*store_offsets_to_checkpoint*) expects 2
parameters, first: micro-batch as DataFrame or Dataset and second: unique
id for each batch
   Using foreachBatch, we write each micro batch to storage
defined in our custom logic
"""

streaming = spark.readStream \
   .format("kafka") \ .
option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "topic_name") \
  .load()

# Custom sink function to store offsets in checkpoint
streaming = streaming.writeStream \
 . format("memory")  \
 *.option("checkpointLocation", "/path/to/checkpoint/store") \ *
  .foreachBatch(*store_offsets_to_checkpoint*) \
  .start()

HTH



Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Wed, 22 May 2024 at 16:27, Anil Dasari  wrote:

> Thanks Das, Mtich.
>
> Mitch,
> We process data from Kafka and write it to S3 in Parquet format using
> Dstreams. To ensure exactly-once delivery and prevent data loss, our
> process records micro-batch offsets to an external storage at the end of
> each micro-batch in foreachRDD, which is then used when the job restarts.
>
> Das,
> Thanks for sharing the details. I will look into them.
> Unfortunately, the listeners process is async and can't guarantee happens
> before association with microbatch to commit offsets to external storage.
> But still they will work. Is there a way to access lastProgress in
> foreachBatch ?
>
>
> On Wed, May 22, 2024 at 7:35 AM Tathagata Das 
> wrote:
>
>> If you want to find what 

Re: Dstream HasOffsetRanges equivalent in Structured streaming

2024-05-22 Thread Tathagata Das
The right way to associated microbatches when committing to external
storage is to use the microbatch id that you can get in foreachBatch. That
microbatch id guarantees that the data produced in the batch is the always
the same no matter any recomputations (assuming all processing logic is
deterministic). So you can commit the batch id + batch data together. And
then async commit the batch id + offsets.

On Wed, May 22, 2024 at 11:27 AM Anil Dasari  wrote:

> Thanks Das, Mtich.
>
> Mitch,
> We process data from Kafka and write it to S3 in Parquet format using
> Dstreams. To ensure exactly-once delivery and prevent data loss, our
> process records micro-batch offsets to an external storage at the end of
> each micro-batch in foreachRDD, which is then used when the job restarts.
>
> Das,
> Thanks for sharing the details. I will look into them.
> Unfortunately, the listeners process is async and can't guarantee happens
> before association with microbatch to commit offsets to external storage.
> But still they will work. Is there a way to access lastProgress in
> foreachBatch ?
>
>
> On Wed, May 22, 2024 at 7:35 AM Tathagata Das 
> wrote:
>
>> If you want to find what offset ranges are present in a microbatch in
>> Structured Streaming, you have to look at the StreamingQuery.lastProgress or
>> use the QueryProgressListener
>> .
>> Both of these approaches gives you access to the SourceProgress
>> 
>> which gives Kafka offsets as a JSON string.
>>
>> Hope this helps!
>>
>> On Wed, May 22, 2024 at 10:04 AM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> OK to understand better your current model relies on streaming data
>>> input through Kafka topic, Spark does some ETL and you send to a sink, a
>>> database for file storage like HDFS etc?
>>>
>>> Your current architecture relies on Direct Streams (DStream) and RDDs
>>> and you want to move to Spark sStructured Streaming based on dataframes and
>>> datasets?
>>>
>>> You have not specified your sink
>>>
>>> With regard to your question?
>>>
>>> "Is there an equivalent of Dstream HasOffsetRanges in structure
>>> streaming to get the microbatch end offsets to the checkpoint in our
>>> external checkpoint store ?"
>>>
>>> There is not a direct equivalent of DStream HasOffsetRanges in Spark
>>> Structured Streaming. However, Structured Streaming provides mechanisms to
>>> achieve similar functionality:
>>>
>>> HTH
>>>
>>> Mich Talebzadeh,
>>> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
>>> London
>>> United Kingdom
>>>
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* The information provided is correct to the best of my
>>> knowledge but of course cannot be guaranteed . It is essential to note
>>> that, as with any advice, quote "one test result is worth one-thousand
>>> expert opinions (Werner
>>> Von Braun
>>> )".
>>>
>>>
>>> On Wed, 22 May 2024 at 10:32, ashok34...@yahoo.com.INVALID
>>>  wrote:
>>>
 Hello,

 what options are you considering yourself?

 On Wednesday 22 May 2024 at 07:37:30 BST, Anil Dasari <
 adas...@guidewire.com> wrote:


 Hello,

 We are on Spark 3.x and using Spark dstream + kafka and planning to use
 structured streaming + Kafka.
 Is there an equivalent of Dstream HasOffsetRanges in structure
 streaming to get the microbatch end offsets to the checkpoint in our
 external checkpoint store ? Thanks in advance.

 Regards




Re: Dstream HasOffsetRanges equivalent in Structured streaming

2024-05-22 Thread Anil Dasari
Thanks Das, Mtich.

Mitch,
We process data from Kafka and write it to S3 in Parquet format using
Dstreams. To ensure exactly-once delivery and prevent data loss, our
process records micro-batch offsets to an external storage at the end of
each micro-batch in foreachRDD, which is then used when the job restarts.

Das,
Thanks for sharing the details. I will look into them.
Unfortunately, the listeners process is async and can't guarantee happens
before association with microbatch to commit offsets to external storage.
But still they will work. Is there a way to access lastProgress in
foreachBatch ?


On Wed, May 22, 2024 at 7:35 AM Tathagata Das 
wrote:

> If you want to find what offset ranges are present in a microbatch in
> Structured Streaming, you have to look at the StreamingQuery.lastProgress or
> use the QueryProgressListener
> .
> Both of these approaches gives you access to the SourceProgress
> 
> which gives Kafka offsets as a JSON string.
>
> Hope this helps!
>
> On Wed, May 22, 2024 at 10:04 AM Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> OK to understand better your current model relies on streaming data input
>> through Kafka topic, Spark does some ETL and you send to a sink, a
>> database for file storage like HDFS etc?
>>
>> Your current architecture relies on Direct Streams (DStream) and RDDs and
>> you want to move to Spark sStructured Streaming based on dataframes and
>> datasets?
>>
>> You have not specified your sink
>>
>> With regard to your question?
>>
>> "Is there an equivalent of Dstream HasOffsetRanges in structure
>> streaming to get the microbatch end offsets to the checkpoint in our
>> external checkpoint store ?"
>>
>> There is not a direct equivalent of DStream HasOffsetRanges in Spark
>> Structured Streaming. However, Structured Streaming provides mechanisms to
>> achieve similar functionality:
>>
>> HTH
>>
>> Mich Talebzadeh,
>> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
>> London
>> United Kingdom
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>> 
>>
>>
>>
>> *Disclaimer:* The information provided is correct to the best of my
>> knowledge but of course cannot be guaranteed . It is essential to note
>> that, as with any advice, quote "one test result is worth one-thousand
>> expert opinions (Werner
>> Von
>> Braun
>> 
>> )".
>>
>>
>> On Wed, 22 May 2024 at 10:32, ashok34...@yahoo.com.INVALID
>>  wrote:
>>
>>> Hello,
>>>
>>> what options are you considering yourself?
>>>
>>> On Wednesday 22 May 2024 at 07:37:30 BST, Anil Dasari <
>>> adas...@guidewire.com> wrote:
>>>
>>>
>>> Hello,
>>>
>>> We are on Spark 3.x and using Spark dstream + kafka and planning to use
>>> structured streaming + Kafka.
>>> Is there an equivalent of Dstream HasOffsetRanges in structure streaming
>>> to get the microbatch end offsets to the checkpoint in our external
>>> checkpoint store ? Thanks in advance.
>>>
>>> Regards
>>>
>>>


Re: Dstream HasOffsetRanges equivalent in Structured streaming

2024-05-22 Thread Tathagata Das
If you want to find what offset ranges are present in a microbatch in
Structured Streaming, you have to look at the StreamingQuery.lastProgress or
use the QueryProgressListener
.
Both of these approaches gives you access to the SourceProgress

which gives Kafka offsets as a JSON string.

Hope this helps!

On Wed, May 22, 2024 at 10:04 AM Mich Talebzadeh 
wrote:

> OK to understand better your current model relies on streaming data input
> through Kafka topic, Spark does some ETL and you send to a sink, a
> database for file storage like HDFS etc?
>
> Your current architecture relies on Direct Streams (DStream) and RDDs and
> you want to move to Spark sStructured Streaming based on dataframes and
> datasets?
>
> You have not specified your sink
>
> With regard to your question?
>
> "Is there an equivalent of Dstream HasOffsetRanges in structure streaming
> to get the microbatch end offsets to the checkpoint in our external
> checkpoint store ?"
>
> There is not a direct equivalent of DStream HasOffsetRanges in Spark
> Structured Streaming. However, Structured Streaming provides mechanisms to
> achieve similar functionality:
>
> HTH
>
> Mich Talebzadeh,
> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  Von
> Braun )".
>
>
> On Wed, 22 May 2024 at 10:32, ashok34...@yahoo.com.INVALID
>  wrote:
>
>> Hello,
>>
>> what options are you considering yourself?
>>
>> On Wednesday 22 May 2024 at 07:37:30 BST, Anil Dasari <
>> adas...@guidewire.com> wrote:
>>
>>
>> Hello,
>>
>> We are on Spark 3.x and using Spark dstream + kafka and planning to use
>> structured streaming + Kafka.
>> Is there an equivalent of Dstream HasOffsetRanges in structure streaming
>> to get the microbatch end offsets to the checkpoint in our external
>> checkpoint store ? Thanks in advance.
>>
>> Regards
>>
>>


[ANNOUNCE] Apache Celeborn 0.4.1 available

2024-05-22 Thread Nicholas Jiang
Hi all,

Apache Celeborn community is glad to announce the
new release of Apache Celeborn 0.4.1.

Celeborn is dedicated to improving the efficiency and elasticity of
different map-reduce engines and provides an elastic, high-efficient
service for intermediate data including shuffle data, spilled data,
result data, etc.


Download Link: https://celeborn.apache.org/download/

GitHub Release Tag:

- https://github.com/apache/celeborn/releases/tag/v0.4.1

Release Notes:

- https://celeborn.apache.org/community/release_notes/release_note_0.4.1


Home Page: https://celeborn.apache.org/

Celeborn Resources:

- Issue Management: https://issues.apache.org/jira/projects/CELEBORN
- Mailing List: d...@celeborn.apache.org

Regards,
Nicholas Jiang
On behalf of the Apache Celeborn community

Re: Dstream HasOffsetRanges equivalent in Structured streaming

2024-05-22 Thread Mich Talebzadeh
OK to understand better your current model relies on streaming data input
through Kafka topic, Spark does some ETL and you send to a sink, a
database for file storage like HDFS etc?

Your current architecture relies on Direct Streams (DStream) and RDDs and
you want to move to Spark sStructured Streaming based on dataframes and
datasets?

You have not specified your sink

With regard to your question?

"Is there an equivalent of Dstream HasOffsetRanges in structure streaming
to get the microbatch end offsets to the checkpoint in our external
checkpoint store ?"

There is not a direct equivalent of DStream HasOffsetRanges in Spark
Structured Streaming. However, Structured Streaming provides mechanisms to
achieve similar functionality:

HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Wed, 22 May 2024 at 10:32, ashok34...@yahoo.com.INVALID
 wrote:

> Hello,
>
> what options are you considering yourself?
>
> On Wednesday 22 May 2024 at 07:37:30 BST, Anil Dasari <
> adas...@guidewire.com> wrote:
>
>
> Hello,
>
> We are on Spark 3.x and using Spark dstream + kafka and planning to use
> structured streaming + Kafka.
> Is there an equivalent of Dstream HasOffsetRanges in structure streaming
> to get the microbatch end offsets to the checkpoint in our external
> checkpoint store ? Thanks in advance.
>
> Regards
>
>


Re: Dstream HasOffsetRanges equivalent in Structured streaming

2024-05-22 Thread ashok34...@yahoo.com.INVALID
 Hello,
what options are you considering yourself?
On Wednesday 22 May 2024 at 07:37:30 BST, Anil Dasari 
 wrote:  
 
 Hello,

We are on Spark 3.x and using Spark dstream + kafka and planning to use 
structured streaming + Kafka. Is there an equivalent of Dstream HasOffsetRanges 
in structure streaming to get the microbatch end offsets to the checkpoint in 
our external checkpoint store ? Thanks in advance. 
Regards
  

Dstream HasOffsetRanges equivalent in Structured streaming

2024-05-22 Thread Anil Dasari
Hello,

We are on Spark 3.x and using Spark dstream + kafka and planning to use
structured streaming + Kafka.
Is there an equivalent of Dstream HasOffsetRanges in structure streaming to
get the microbatch end offsets to the checkpoint in our external checkpoint
store ? Thanks in advance.

Regards


Re: Re: EXT: Dual Write to HDFS and MinIO in faster way

2024-05-21 Thread Prem Sahoo
I am looking for writer/comitter optimization which can make the spark
write faster.

On Tue, May 21, 2024 at 9:15 PM eab...@163.com  wrote:

> Hi,
> I think you should write to HDFS then copy file (parquet or orc) from
> HDFS to MinIO.
>
> --
> eabour
>
>
> *From:* Prem Sahoo 
> *Date:* 2024-05-22 00:38
> *To:* Vibhor Gupta ; user
> 
> *Subject:* Re: EXT: Dual Write to HDFS and MinIO in faster way
>
>
> On Tue, May 21, 2024 at 6:58 AM Prem Sahoo  wrote:
>
>> Hello Vibhor,
>> Thanks for the suggestion .
>> I am looking for some other alternatives where I can use the same
>> dataframe can be written to two destinations without re execution and cache
>> or persist .
>>
>> Can some one help me in scenario 2 ?
>> How to make spark write to MinIO faster ?
>> Sent from my iPhone
>>
>> On May 21, 2024, at 1:18 AM, Vibhor Gupta 
>> wrote:
>>
>> 
>>
>> Hi Prem,
>>
>>
>>
>> You can try to write to HDFS then read from HDFS and write to MinIO.
>>
>>
>>
>> This will prevent duplicate transformation.
>>
>>
>>
>> You can also try persisting the dataframe using the DISK_ONLY level.
>>
>>
>>
>> Regards,
>>
>> Vibhor
>>
>> *From: *Prem Sahoo 
>> *Date: *Tuesday, 21 May 2024 at 8:16 AM
>> *To: *Spark dev list 
>> *Subject: *EXT: Dual Write to HDFS and MinIO in faster way
>>
>> *EXTERNAL: *Report suspicious emails to *Email Abuse.*
>>
>> Hello Team,
>>
>> I am planning to write to two datasource at the same time .
>>
>>
>>
>> Scenario:-
>>
>>
>>
>> Writing the same dataframe to HDFS and MinIO without re-executing the
>> transformations and no cache(). Then how can we make it faster ?
>>
>>
>>
>> Read the parquet file and do a few transformations and write to HDFS and
>> MinIO.
>>
>>
>>
>> here in both write spark needs execute the transformation again. Do we
>> know how we can avoid re-execution of transformation  without
>> cache()/persist ?
>>
>>
>>
>> Scenario2 :-
>>
>> I am writing 3.2G data to HDFS and MinIO which takes ~6mins.
>>
>> Do we have any way to make writing this faster ?
>>
>>
>>
>> I don't want to do repartition and write as repartition will have
>> overhead of shuffling .
>>
>>
>>
>> Please provide some inputs.
>>
>>
>>
>>
>>
>>


Re: Re: EXT: Dual Write to HDFS and MinIO in faster way

2024-05-21 Thread eab...@163.com
Hi,
I think you should write to HDFS then copy file (parquet or orc) from HDFS 
to MinIO.



eabour
 
From: Prem Sahoo
Date: 2024-05-22 00:38
To: Vibhor Gupta; user
Subject: Re: EXT: Dual Write to HDFS and MinIO in faster way


On Tue, May 21, 2024 at 6:58 AM Prem Sahoo  wrote:
Hello Vibhor,
Thanks for the suggestion .
I am looking for some other alternatives where I can use the same dataframe can 
be written to two destinations without re execution and cache or persist .

Can some one help me in scenario 2 ?
How to make spark write to MinIO faster ?
Sent from my iPhone

On May 21, 2024, at 1:18 AM, Vibhor Gupta  wrote:

 
Hi Prem,
 
You can try to write to HDFS then read from HDFS and write to MinIO.
 
This will prevent duplicate transformation.
 
You can also try persisting the dataframe using the DISK_ONLY level.
 
Regards,
Vibhor
From: Prem Sahoo 
Date: Tuesday, 21 May 2024 at 8:16 AM
To: Spark dev list 
Subject: EXT: Dual Write to HDFS and MinIO in faster way
EXTERNAL: Report suspicious emails to Email Abuse.
Hello Team,
I am planning to write to two datasource at the same time . 
 
Scenario:-
 
Writing the same dataframe to HDFS and MinIO without re-executing the 
transformations and no cache(). Then how can we make it faster ?
 
Read the parquet file and do a few transformations and write to HDFS and MinIO.
 
here in both write spark needs execute the transformation again. Do we know how 
we can avoid re-execution of transformation  without cache()/persist ?
 
Scenario2 :-
I am writing 3.2G data to HDFS and MinIO which takes ~6mins.
Do we have any way to make writing this faster ?
 
I don't want to do repartition and write as repartition will have overhead of 
shuffling .
 
Please provide some inputs. 
 
 


Re: A handy tool called spark-column-analyser

2024-05-21 Thread ashok34...@yahoo.com.INVALID
 Great work. Very handy for identifying problems
thanks
On Tuesday 21 May 2024 at 18:12:15 BST, Mich Talebzadeh 
 wrote:  
 
 A colleague kindly pointed out about giving an example of output which wll be 
added to README
Doing analysis for column Postcode
Json formatted output
{    "Postcode": {        "exists": true,        "num_rows": 93348,        
"data_type": "string",        "null_count": 21921,        "null_percentage": 
23.48,        "distinct_count": 38726,        "distinct_percentage": 41.49    }}
Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
London
United Kingdom



   view my Linkedin profile




 https://en.everybodywiki.com/Mich_Talebzadeh

 



Disclaimer: The information provided is correct to the best of my knowledge but 
of course cannot be guaranteed . It is essential to note that, as with any 
advice, quote "one test result is worth one-thousand expert opinions (Werner 
Von Braun)".


On Tue, 21 May 2024 at 16:21, Mich Talebzadeh  wrote:


I just wanted to share a tool I built called spark-column-analyzer. It's a 
Python package that helps you dig into your Spark DataFrames with ease. 

Ever spend ages figuring out what's going on in your columns? Like, how many 
null values are there, or how many unique entries? Built with data preparation 
for Generative AI in mind, it aids in data imputation and augmentation – key 
steps for creating realistic synthetic data.

Basics
   
   - Effortless Column Analysis: It calculates all the important stats you need 
for each column, like null counts, distinct values, percentages, and more. No 
more manual counting or head scratching!
   - Simple to Use: Just toss in your DataFrame and call the analyze_column 
function. Bam! Insights galore.
   - Makes Data Cleaning easier: Knowing your data's quality helps you clean it 
up way faster. This package helps you figure out where the missing values are 
hiding and how much variety you've got in each column.
   - Detecting skewed columns
   - Open Source and Friendly: Feel free to tinker, suggest improvements, or 
even contribute some code yourself! We love collaboration in the Spark 
community.

Installation: 


Using pip from the link: https://pypi.org/project/spark-column-analyzer/

pip install spark-column-analyzer

Also you can clone the project from gitHub

git clone https://github.com/michTalebzadeh/spark_column_analyzer.git

The details are in the attached RENAME file
Let me know what you think! Feedback is always welcome.

HTH

Mich Talebzadeh,

Technologist | Architect | Data Engineer  | Generative AI | FinCrime
London
United Kingdom



   view my Linkedin profile




 https://en.everybodywiki.com/Mich_Talebzadeh

 



Disclaimer: The information provided is correct to the best of my knowledge but 
of course cannot be guaranteed . It is essential to note that, as with any 
advice, quote "one test result is worth one-thousand expert opinions (Werner 
Von Braun)".

  

Re: A handy tool called spark-column-analyser

2024-05-21 Thread Mich Talebzadeh
A colleague kindly pointed out about giving an example of output which wll
be added to README

Doing analysis for column Postcode

Json formatted output

{
"Postcode": {
"exists": true,
"num_rows": 93348,
"data_type": "string",
"null_count": 21921,
"null_percentage": 23.48,
"distinct_count": 38726,
"distinct_percentage": 41.49
}
}

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Tue, 21 May 2024 at 16:21, Mich Talebzadeh 
wrote:

> I just wanted to share a tool I built called *spark-column-analyzer*.
> It's a Python package that helps you dig into your Spark DataFrames with
> ease.
>
> Ever spend ages figuring out what's going on in your columns? Like, how
> many null values are there, or how many unique entries? Built with data
> preparation for Generative AI in mind, it aids in data imputation and
> augmentation – key steps for creating realistic synthetic data.
>
> *Basics*
>
>- *Effortless Column Analysis:* It calculates all the important stats
>you need for each column, like null counts, distinct values, percentages,
>and more. No more manual counting or head scratching!
>- *Simple to Use:* Just toss in your DataFrame and call the
>analyze_column function. Bam! Insights galore.
>- *Makes Data Cleaning easier:* Knowing your data's quality helps you
>clean it up way faster. This package helps you figure out where the missing
>values are hiding and how much variety you've got in each column.
>- *Detecting skewed columns*
>- *Open Source and Friendly:* Feel free to tinker, suggest
>improvements, or even contribute some code yourself! We love collaboration
>in the Spark community.
>
> *Installation:*
>
> Using pip from the link: https://pypi.org/project/spark-column-analyzer/
>
>
> *pip install spark-column-analyzer*
> Also you can clone the project from gitHub
>
>
> *git clone https://github.com/michTalebzadeh/spark_column_analyzer.git
> *
>
> The details are in the attached RENAME file
>
> Let me know what you think! Feedback is always welcome.
>
> HTH
>
> Mich Talebzadeh,
> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  Von
> Braun )".
>


Re: EXT: Dual Write to HDFS and MinIO in faster way

2024-05-21 Thread Prem Sahoo
On Tue, May 21, 2024 at 6:58 AM Prem Sahoo  wrote:

> Hello Vibhor,
> Thanks for the suggestion .
> I am looking for some other alternatives where I can use the same
> dataframe can be written to two destinations without re execution and cache
> or persist .
>
> Can some one help me in scenario 2 ?
> How to make spark write to MinIO faster ?
> Sent from my iPhone
>
> On May 21, 2024, at 1:18 AM, Vibhor Gupta 
> wrote:
>
> 
>
> Hi Prem,
>
>
>
> You can try to write to HDFS then read from HDFS and write to MinIO.
>
>
>
> This will prevent duplicate transformation.
>
>
>
> You can also try persisting the dataframe using the DISK_ONLY level.
>
>
>
> Regards,
>
> Vibhor
>
> *From: *Prem Sahoo 
> *Date: *Tuesday, 21 May 2024 at 8:16 AM
> *To: *Spark dev list 
> *Subject: *EXT: Dual Write to HDFS and MinIO in faster way
>
> *EXTERNAL: *Report suspicious emails to *Email Abuse.*
>
> Hello Team,
>
> I am planning to write to two datasource at the same time .
>
>
>
> Scenario:-
>
>
>
> Writing the same dataframe to HDFS and MinIO without re-executing the
> transformations and no cache(). Then how can we make it faster ?
>
>
>
> Read the parquet file and do a few transformations and write to HDFS and
> MinIO.
>
>
>
> here in both write spark needs execute the transformation again. Do we
> know how we can avoid re-execution of transformation  without
> cache()/persist ?
>
>
>
> Scenario2 :-
>
> I am writing 3.2G data to HDFS and MinIO which takes ~6mins.
>
> Do we have any way to make writing this faster ?
>
>
>
> I don't want to do repartition and write as repartition will have overhead
> of shuffling .
>
>
>
> Please provide some inputs.
>
>
>
>
>
>


A handy tool called spark-column-analyser

2024-05-21 Thread Mich Talebzadeh
I just wanted to share a tool I built called *spark-column-analyzer*. It's
a Python package that helps you dig into your Spark DataFrames with ease.

Ever spend ages figuring out what's going on in your columns? Like, how
many null values are there, or how many unique entries? Built with data
preparation for Generative AI in mind, it aids in data imputation and
augmentation – key steps for creating realistic synthetic data.

*Basics*

   - *Effortless Column Analysis:* It calculates all the important stats
   you need for each column, like null counts, distinct values, percentages,
   and more. No more manual counting or head scratching!
   - *Simple to Use:* Just toss in your DataFrame and call the
   analyze_column function. Bam! Insights galore.
   - *Makes Data Cleaning easier:* Knowing your data's quality helps you
   clean it up way faster. This package helps you figure out where the missing
   values are hiding and how much variety you've got in each column.
   - *Detecting skewed columns*
   - *Open Source and Friendly:* Feel free to tinker, suggest improvements,
   or even contribute some code yourself! We love collaboration in the Spark
   community.

*Installation:*

Using pip from the link: https://pypi.org/project/spark-column-analyzer/


*pip install spark-column-analyzer*
Also you can clone the project from gitHub


*git clone https://github.com/michTalebzadeh/spark_column_analyzer.git
*

The details are in the attached RENAME file

Let me know what you think! Feedback is always welcome.

HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


README.md
Description: Binary data

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Re: pyspark dataframe join with two different data type

2024-05-17 Thread Karthick Nk
Hi All,

I have tried the same result with pyspark and with SQL query by creating
with tempView, I could able to achieve whereas I have to do in the pyspark
code itself, Could you help on this

incoming_data = [["a"], ["b"], ["d"]]
column_names = ["column1"]
df = spark.createDataFrame(incoming_data, column_names)

view_data_df = spark.createDataFrame([(["a", "b", "c"], 1), (['f'], 2)],
['data_col', 'id'])

df.createOrReplaceTempView(f"id_table")
view_data_df.createOrReplaceTempView(f"view_data")

*%sql*
*select * from view_data*
*where exists (select 1 from id_table where array_contains(data_col,
column1))*

*Result:*

*data_col id*
*["a","b","c"] 1*

I need this equivalent SQL query with pyspark code to achieve the result.

One of the solution, I have tried is below, but here I am doing explode and
doing distinct again, But I need to perform the action without doing this
since this will impact performance again for the huge data.

Thanks,


solutions

On Thu, May 16, 2024 at 8:33 AM Karthick Nk  wrote:

> Thanks Mich,
>
> I have tried this solution, but i want all the columns from the dataframe
> df_1, if i explode the df_1 i am getting only data column. But the
> resultant should get the all the column from the df_1 with distinct result
> like below.
>
> Results in
>
> *df:*
> +---+
> |column1|
> +---+
> |  a|
> |  b|
> |  d|
> +---+
>
> *df_1:*
> +-+
> | id| data| field
> +-+
> |1 | [a, b, c]| ['company']
> | 3| [b, c, d]| ['hello']
> | 4| [e, f, s]| ['hello']
> +-+
>
> *Result:*
> ++
> |id| data| field
> ++
> |1| ['company']
> | 3|  ['helo']|
> ++
>
> Explanation: id with 1, 3 why -> because a or b present in both records 1
> and 3 so returning distinct result from the join.
>
>
> Here I would like to get the result like above, even if I get the
> duplicate element in the column data, I need to get the distinct data
> with respect to id field, But when I try to use array_contain, it will
> return duplicate result since data column has multiple occurrence.
>
> If you need more clarification, please let me know.
>
> Thanks,
>
>
>
>
>
>
> On Tue, May 14, 2024 at 6:12 PM Mich Talebzadeh 
> wrote:
>
>> You can use a combination of explode and distinct before joining.
>>
>> from pyspark.sql import SparkSession
>> from pyspark.sql.functions import explode
>>
>> # Create a SparkSession
>> spark = SparkSession.builder \
>> .appName("JoinExample") \
>> .getOrCreate()
>>
>> sc = spark.sparkContext
>> # Set the log level to ERROR to reduce verbosity
>> sc.setLogLevel("ERROR")
>>
>> # Create DataFrame df
>> data = [
>> ["a"],
>> ["b"],
>> ["d"],
>> ]
>> column_names = ["column1"]
>> df = spark.createDataFrame(data, column_names)
>> print("df:")
>> df.show()
>>
>> # Create DataFrame df_1
>> df_1 = spark.createDataFrame([(["a", "b", "c"],), ([],)], ['data'])
>> print("df_1:")
>> df_1.show()
>>
>> # Explode the array column in df_1
>> exploded_df_1 = df_1.select(explode("data").alias("data"))
>>
>> # Join with df using the exploded column
>> final_df = exploded_df_1.join(df, exploded_df_1.data == df.column1)
>>
>> # Distinct to ensure only unique rows are returned from df_1
>> final_df = final_df.select("data").distinct()
>>
>> print("Result:")
>> final_df.show()
>>
>>
>> Results in
>>
>> df:
>> +---+
>> |column1|
>> +---+
>> |  a|
>> |  b|
>> |  d|
>> +---+
>>
>> df_1:
>> +-+
>> | data|
>> +-+
>> |[a, b, c]|
>> |   []|
>> +-+
>>
>> Result:
>> ++
>> |data|
>> ++
>> |   a|
>> |   b|
>> ++
>>
>> HTH
>>
>> Mich Talebzadeh,
>>
>> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
>> London
>> United Kingdom
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* The information provided is correct to the best of my
>> knowledge but of course cannot be guaranteed . It is essential to note
>> that, as with any advice, quote "one test result is worth one-thousand
>> expert opinions (Werner
>> Von Braun
>> )".
>>
>>
>> On Tue, 14 May 2024 at 13:19, Karthick Nk  wrote:
>>
>>> Hi All,
>>>
>>> Could anyone have any idea or suggestion of any alternate way to achieve
>>> this scenario?
>>>
>>> Thanks.
>>>
>>> On Sat, May 11, 2024 at 6:55 AM Damien Hawes 
>>> wrote:
>>>
 Right now, with the structure of your data, it isn't possible.

 The rows aren't duplicates of each other. "a" and "b" both exist in the
 array. So Spark is correctly performing the join. It looks like you need to
 find another way to model this data to get what you want to achieve.

 Are the values of "a" and "b" related to each other in any way?

 - Damien

 Op vr 10 mei 2024 18:08 schreef Karthick Nk :

> Hi Mich,
>

Request for Assistance: Adding User Authentication to Apache Spark Application

2024-05-16 Thread NIKHIL RAJ SHRIVASTAVA
Dear Team,

I hope this email finds you well. My name is Nikhil Raj, and I am currently
working with Apache Spark for one of my projects , where through the help
of a parquet file we are creating an external table in Spark.

I am reaching out to seek assistance regarding user authentication for our
Apache Spark application. Currently, we can connect to the application
using only the host and port information. However, for security reasons, we
would like to implement user authentication to control access and ensure
data integrity.

After reviewing the available documentation and resources, I found that
adding user authentication to our Spark setup requires additional
configurations or plugins. However, I'm facing challenges in understanding
the exact steps or best practices to implement this.

Could you please provide guidance or point me towards relevant
documentation/resources that detail how to integrate user authentication
into Apache Spark?  Additionally, if there are any recommended practices or
considerations for ensuring the security of our Spark setup, we would
greatly appreciate your insights on that as well.

Your assistance in this matter would be invaluable to us, as we aim to
enhance the security of our Spark application and safeguard our data
effectively.

Thank you very much for your time and consideration. I look forward to
hearing from you and your suggestions.

Warm regards,

NIKHIL RAJ
Developer
Estuate Software Pvt. Ltd.
Thanks & Regards


Re: pyspark dataframe join with two different data type

2024-05-15 Thread Karthick Nk
Thanks Mich,

I have tried this solution, but i want all the columns from the dataframe
df_1, if i explode the df_1 i am getting only data column. But the
resultant should get the all the column from the df_1 with distinct result
like below.

Results in

*df:*
+---+
|column1|
+---+
|  a|
|  b|
|  d|
+---+

*df_1:*
+-+
| id| data| field
+-+
|1 | [a, b, c]| ['company']
| 3| [b, c, d]| ['hello']
| 4| [e, f, s]| ['hello']
+-+

*Result:*
++
|id| data| field
++
|1| ['company']
| 3|  ['helo']|
++

Explanation: id with 1, 3 why -> because a or b present in both records 1
and 3 so returning distinct result from the join.


Here I would like to get the result like above, even if I get the
duplicate element in the column data, I need to get the distinct data
with respect to id field, But when I try to use array_contain, it will
return duplicate result since data column has multiple occurrence.

If you need more clarification, please let me know.

Thanks,






On Tue, May 14, 2024 at 6:12 PM Mich Talebzadeh 
wrote:

> You can use a combination of explode and distinct before joining.
>
> from pyspark.sql import SparkSession
> from pyspark.sql.functions import explode
>
> # Create a SparkSession
> spark = SparkSession.builder \
> .appName("JoinExample") \
> .getOrCreate()
>
> sc = spark.sparkContext
> # Set the log level to ERROR to reduce verbosity
> sc.setLogLevel("ERROR")
>
> # Create DataFrame df
> data = [
> ["a"],
> ["b"],
> ["d"],
> ]
> column_names = ["column1"]
> df = spark.createDataFrame(data, column_names)
> print("df:")
> df.show()
>
> # Create DataFrame df_1
> df_1 = spark.createDataFrame([(["a", "b", "c"],), ([],)], ['data'])
> print("df_1:")
> df_1.show()
>
> # Explode the array column in df_1
> exploded_df_1 = df_1.select(explode("data").alias("data"))
>
> # Join with df using the exploded column
> final_df = exploded_df_1.join(df, exploded_df_1.data == df.column1)
>
> # Distinct to ensure only unique rows are returned from df_1
> final_df = final_df.select("data").distinct()
>
> print("Result:")
> final_df.show()
>
>
> Results in
>
> df:
> +---+
> |column1|
> +---+
> |  a|
> |  b|
> |  d|
> +---+
>
> df_1:
> +-+
> | data|
> +-+
> |[a, b, c]|
> |   []|
> +-+
>
> Result:
> ++
> |data|
> ++
> |   a|
> |   b|
> ++
>
> HTH
>
> Mich Talebzadeh,
>
> Technologist | Architect | Data Engineer  | Generative AI | FinCrimeLondon
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  Von
> Braun )".
>
>
> On Tue, 14 May 2024 at 13:19, Karthick Nk  wrote:
>
>> Hi All,
>>
>> Could anyone have any idea or suggestion of any alternate way to achieve
>> this scenario?
>>
>> Thanks.
>>
>> On Sat, May 11, 2024 at 6:55 AM Damien Hawes 
>> wrote:
>>
>>> Right now, with the structure of your data, it isn't possible.
>>>
>>> The rows aren't duplicates of each other. "a" and "b" both exist in the
>>> array. So Spark is correctly performing the join. It looks like you need to
>>> find another way to model this data to get what you want to achieve.
>>>
>>> Are the values of "a" and "b" related to each other in any way?
>>>
>>> - Damien
>>>
>>> Op vr 10 mei 2024 18:08 schreef Karthick Nk :
>>>
 Hi Mich,

 Thanks for the solution, But I am getting duplicate result by using
 array_contains. I have explained the scenario below, could you help me on
 that, how we can achieve i have tried different way bu i could able to
 achieve.

 For example

 data = [
 ["a"],
 ["b"],
 ["d"],
 ]
 column_names = ["column1"]
 df = spark.createDataFrame(data, column_names)
 df.display()

 [image: image.png]

 df_1 = spark.createDataFrame([(["a", "b", "c"],), ([],)], ['data'])
 df_1.display()
 [image: image.png]


 final_df = df_1.join(df, expr("array_contains(data, column1)"))
 final_df.display()

 Resul:
 [image: image.png]

 But i need the result like below:

 [image: image.png]

 Why because

 In the df_1 i have only two records, in that first records onlly i have
 matching value.
 But both records from the df i.e *a, b* are present in the first
 records itself, it is returning two records as resultant, but my
 expectation is to return only one records means if any of the records from
 the df is present in the df_1 it should return only one records from the
 df_1.


How to provide a Zstd "training mode" dictionary object

2024-05-15 Thread Saha, Daniel
Hi,

I understand that Zstd compression can optionally be provided a dictionary 
object to improve performance. See “training mode” here 
https://facebook.github.io/zstd/

Does Spark surface a way to provide this dictionary object when writing/reading 
data? What about for intermediate shuffle results?

Thanks,
Daniel


Query Regarding UDF Support in Spark Connect with Kubernetes as Cluster Manager

2024-05-15 Thread Nagatomi Yasukazu
Hi Spark Community,

I have a question regarding the support for User-Defined Functions (UDFs)
in Spark Connect, specifically when using Kubernetes as the Cluster Manager.

According to the Spark documentation, UDFs are supported by default for the
shell and in standalone applications with additional setup requirements.

However, it is not clear whether this support extends to scenarios where
Kubernetes is used as the Cluster Manager.

cf.
https://spark.apache.org/docs/latest/spark-connect-overview.html#:~:text=User%2DDefined%20Functions%20(UDFs)%20are%20supported%2C%20by%20default%20for%20the%20shell%20and%20in%20standalone%20applications%20with%20additional%20set%2Dup%20requirements
.

Could someone please clarify:

1. Are UDFs supported in Spark Connect when using Kubernetes as the Cluster
Manager?

2. If they are supported, are there any specific setup requirements or
limitations we should be aware of?

3. If UDFs are not currently supported with Kubernetes as the Cluster
Manager, are there any plans to include this support in future releases?

Your insights and guidance on this matter would be greatly appreciated.

Thank you in advance for your help!

Best regards,
Yasukazau


Re: pyspark dataframe join with two different data type

2024-05-14 Thread Mich Talebzadeh
You can use a combination of explode and distinct before joining.

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode

# Create a SparkSession
spark = SparkSession.builder \
.appName("JoinExample") \
.getOrCreate()

sc = spark.sparkContext
# Set the log level to ERROR to reduce verbosity
sc.setLogLevel("ERROR")

# Create DataFrame df
data = [
["a"],
["b"],
["d"],
]
column_names = ["column1"]
df = spark.createDataFrame(data, column_names)
print("df:")
df.show()

# Create DataFrame df_1
df_1 = spark.createDataFrame([(["a", "b", "c"],), ([],)], ['data'])
print("df_1:")
df_1.show()

# Explode the array column in df_1
exploded_df_1 = df_1.select(explode("data").alias("data"))

# Join with df using the exploded column
final_df = exploded_df_1.join(df, exploded_df_1.data == df.column1)

# Distinct to ensure only unique rows are returned from df_1
final_df = final_df.select("data").distinct()

print("Result:")
final_df.show()


Results in

df:
+---+
|column1|
+---+
|  a|
|  b|
|  d|
+---+

df_1:
+-+
| data|
+-+
|[a, b, c]|
|   []|
+-+

Result:
++
|data|
++
|   a|
|   b|
++

HTH

Mich Talebzadeh,

Technologist | Architect | Data Engineer  | Generative AI | FinCrimeLondon
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Tue, 14 May 2024 at 13:19, Karthick Nk  wrote:

> Hi All,
>
> Could anyone have any idea or suggestion of any alternate way to achieve
> this scenario?
>
> Thanks.
>
> On Sat, May 11, 2024 at 6:55 AM Damien Hawes 
> wrote:
>
>> Right now, with the structure of your data, it isn't possible.
>>
>> The rows aren't duplicates of each other. "a" and "b" both exist in the
>> array. So Spark is correctly performing the join. It looks like you need to
>> find another way to model this data to get what you want to achieve.
>>
>> Are the values of "a" and "b" related to each other in any way?
>>
>> - Damien
>>
>> Op vr 10 mei 2024 18:08 schreef Karthick Nk :
>>
>>> Hi Mich,
>>>
>>> Thanks for the solution, But I am getting duplicate result by using
>>> array_contains. I have explained the scenario below, could you help me on
>>> that, how we can achieve i have tried different way bu i could able to
>>> achieve.
>>>
>>> For example
>>>
>>> data = [
>>> ["a"],
>>> ["b"],
>>> ["d"],
>>> ]
>>> column_names = ["column1"]
>>> df = spark.createDataFrame(data, column_names)
>>> df.display()
>>>
>>> [image: image.png]
>>>
>>> df_1 = spark.createDataFrame([(["a", "b", "c"],), ([],)], ['data'])
>>> df_1.display()
>>> [image: image.png]
>>>
>>>
>>> final_df = df_1.join(df, expr("array_contains(data, column1)"))
>>> final_df.display()
>>>
>>> Resul:
>>> [image: image.png]
>>>
>>> But i need the result like below:
>>>
>>> [image: image.png]
>>>
>>> Why because
>>>
>>> In the df_1 i have only two records, in that first records onlly i have
>>> matching value.
>>> But both records from the df i.e *a, b* are present in the first
>>> records itself, it is returning two records as resultant, but my
>>> expectation is to return only one records means if any of the records from
>>> the df is present in the df_1 it should return only one records from the
>>> df_1.
>>>
>>> Note:
>>> 1. Here we are able to filter the duplicate records by using distinct of
>>> ID field in the resultant df, bu I am thinking that shouldn't be effective
>>> way, rather i am thinking of updating in array_contains steps itself.
>>>
>>> Thanks.
>>>
>>>
>>> On Fri, Mar 1, 2024 at 4:11 AM Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>

 This is what you want, how to join two DFs with a string column in one
 and an array of strings in the other, keeping only rows where the
 string is present in the array.

 from pyspark.sql import SparkSession
 from pyspark.sql import Row
 from pyspark.sql.functions import expr

 spark = SparkSession.builder.appName("joins").getOrCreate()

 data1 = [Row(combined_id=[1, 2, 3])  # this one has a column
 combined_id as an array of integers
 data2 = [Row(mr_id=2), Row(mr_id=5)] # this one has column mr_id with
 single integers

 df1 = spark.createDataFrame(data1)
 df2 = spark.createDataFrame(data2)

 df1.printSchema()
 df2.printSchema()

 # Perform the join with array_contains. It takes two arguments: an
 array and a value. It returns True if the value exists as an element
 within the array, otherwise False.
 joined_df = df1.join(df2, 

Re: pyspark dataframe join with two different data type

2024-05-14 Thread Karthick Nk
Hi All,

Could anyone have any idea or suggestion of any alternate way to achieve
this scenario?

Thanks.

On Sat, May 11, 2024 at 6:55 AM Damien Hawes  wrote:

> Right now, with the structure of your data, it isn't possible.
>
> The rows aren't duplicates of each other. "a" and "b" both exist in the
> array. So Spark is correctly performing the join. It looks like you need to
> find another way to model this data to get what you want to achieve.
>
> Are the values of "a" and "b" related to each other in any way?
>
> - Damien
>
> Op vr 10 mei 2024 18:08 schreef Karthick Nk :
>
>> Hi Mich,
>>
>> Thanks for the solution, But I am getting duplicate result by using
>> array_contains. I have explained the scenario below, could you help me on
>> that, how we can achieve i have tried different way bu i could able to
>> achieve.
>>
>> For example
>>
>> data = [
>> ["a"],
>> ["b"],
>> ["d"],
>> ]
>> column_names = ["column1"]
>> df = spark.createDataFrame(data, column_names)
>> df.display()
>>
>> [image: image.png]
>>
>> df_1 = spark.createDataFrame([(["a", "b", "c"],), ([],)], ['data'])
>> df_1.display()
>> [image: image.png]
>>
>>
>> final_df = df_1.join(df, expr("array_contains(data, column1)"))
>> final_df.display()
>>
>> Resul:
>> [image: image.png]
>>
>> But i need the result like below:
>>
>> [image: image.png]
>>
>> Why because
>>
>> In the df_1 i have only two records, in that first records onlly i have
>> matching value.
>> But both records from the df i.e *a, b* are present in the first records
>> itself, it is returning two records as resultant, but my expectation is to
>> return only one records means if any of the records from the df is present
>> in the df_1 it should return only one records from the df_1.
>>
>> Note:
>> 1. Here we are able to filter the duplicate records by using distinct of
>> ID field in the resultant df, bu I am thinking that shouldn't be effective
>> way, rather i am thinking of updating in array_contains steps itself.
>>
>> Thanks.
>>
>>
>> On Fri, Mar 1, 2024 at 4:11 AM Mich Talebzadeh 
>> wrote:
>>
>>>
>>> This is what you want, how to join two DFs with a string column in one
>>> and an array of strings in the other, keeping only rows where the
>>> string is present in the array.
>>>
>>> from pyspark.sql import SparkSession
>>> from pyspark.sql import Row
>>> from pyspark.sql.functions import expr
>>>
>>> spark = SparkSession.builder.appName("joins").getOrCreate()
>>>
>>> data1 = [Row(combined_id=[1, 2, 3])  # this one has a column combined_id
>>> as an array of integers
>>> data2 = [Row(mr_id=2), Row(mr_id=5)] # this one has column mr_id with
>>> single integers
>>>
>>> df1 = spark.createDataFrame(data1)
>>> df2 = spark.createDataFrame(data2)
>>>
>>> df1.printSchema()
>>> df2.printSchema()
>>>
>>> # Perform the join with array_contains. It takes two arguments: an
>>> array and a value. It returns True if the value exists as an element
>>> within the array, otherwise False.
>>> joined_df = df1.join(df2, expr("array_contains(combined_id, mr_id)"))
>>>
>>> # Show the result
>>> joined_df.show()
>>>
>>> root
>>>  |-- combined_id: array (nullable = true)
>>>  ||-- element: long (containsNull = true)
>>>
>>> root
>>>  |-- mr_id: long (nullable = true)
>>>
>>> +---+-+
>>> |combined_id|mr_id|
>>> +---+-+
>>> |  [1, 2, 3]|2|
>>> |  [4, 5, 6]|5|
>>> +---+-+
>>>
>>> HTH
>>>
>>> Mich Talebzadeh,
>>> Dad | Technologist | Solutions Architect | Engineer
>>> London
>>> United Kingdom
>>>
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* The information provided is correct to the best of my
>>> knowledge but of course cannot be guaranteed . It is essential to note
>>> that, as with any advice, quote "one test result is worth one-thousand
>>> expert opinions (Werner
>>> Von Braun
>>> )".
>>>
>>>
>>> On Thu, 29 Feb 2024 at 20:50, Karthick Nk  wrote:
>>>
 Hi All,

 I have two dataframe with below structure, i have to join these two
 dataframe - the scenario is one column is string in one dataframe and in
 other df join column is array of string, so we have to inner join two df
 and get the data if string value is present in any of the array of string
 value in another dataframe,


 df1 = spark.sql("""
 SELECT
 mr.id as mr_id,
 pv.id as pv_id,
 array(mr.id, pv.id) as combined_id
 FROM
 table1 mr
 INNER JOIN table2 pv ON pv.id = Mr.recordid
where
 pv.id = '35122806-4cd2-4916-a149-24ea55c2dc36'
 or pv.id = 'a5f03625-6cc5-49df-95eb-df741fe9139b'
 """)

 # df1.display()

 # Your second query
 df2 = spark.sql("""
 SELECT
 

Display a warning in EMR welcome screen

2024-05-11 Thread Abhishek Basu
Hi Team, for Elastic Map Reduce (EMR) cluster, it would be great if there is a 
warning message that Logging should be handled carefully and INFO or DEBUG 
should be enabled only when required. This logging thing took my whole day and 
lastly I discovered that it’s exploding the hdfs and not allowing it to perform 
the real task.

Thanks Abhishek


Sent from Yahoo Mail for iPhone


Re: [spark-graphframes]: Generating incorrect edges

2024-05-11 Thread Nijland, J.G.W. (Jelle, Student M-CS)
Hi all,

The issue is solved.
I conducted a lot more testing and built checkers to verify at which size it's 
going wrong.
When checking for specific edges, I could construct successful graphs up to 
261k records.
When verifying all edges created, is breaks somewhere in the 200-250k records.
I didn't bother finding the specific error threshold, as runs take up to 7 
minutes per slice.

I started looking at all underlying assumptions of my code along with my 
supervisor.
We located the problem in the generate_ids() function.
I selected all distinct values to give them an ID and subsequently joining 
those results back to the main DataFrame.
I replaced this by generating unique IDs for each value occurrence by hashing 
them with 'withColumn' rather than joining them back.
This resolved my issues and ended up to be a significant performance boost as 
well.

My fixed generate_ids() code
def generate_ids(df: DataFrame) -> DataFrame:
   """
   Generates a unique ID for each distinct maintainer, prefix, origin and 
organisation

   Parameters
   --
   df : DataFrame
   DataFrame to generate IDs for
   """
   df = df.withColumn(MAINTAINER_ID, psf.concat(psf.lit(PREFIX_M), 
psf.sha2(df.mnt_by, 256)))
   df = df.withColumn(PREFIX_ID, psf.concat(psf.lit(PREFIX_P), 
psf.sha2(df.prefix, 256)))
   df = df.withColumn(ORIGIN_ID, psf.concat(psf.lit(PREFIX_O), 
psf.sha2(df.origin, 256)))
   df = df.withColumn(ORGANISATION_ID, psf.concat(psf.lit(PREFIX_ORG), 
psf.sha2(df.descr, 256)))
   return df

Hope this email finds someone running into a similar issue in the future.

Kind regards,
Jelle




From: Mich Talebzadeh 
Sent: Wednesday, May 1, 2024 11:56 AM
To: Stephen Coy 
Cc: Nijland, J.G.W. (Jelle, Student M-CS) ; 
user@spark.apache.org 
Subject: Re: [spark-graphframes]: Generating incorrect edges

Hi Steve,

Thanks for your statement. I tend to use uuid myself to avoid collisions. This 
built-in function generates random IDs that are highly likely to be unique 
across systems. My concerns are on edge so to speak. If the Spark application 
runs for a very long time or encounters restarts, the 
monotonically_increasing_id() sequence might restart from the beginning. This 
could again cause duplicate IDs if other Spark applications are running 
concurrently or if data is processed across multiple runs of the same 
application..

HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
London
United Kingdom


 
[https://ci3.googleusercontent.com/mail-sig/AIorK4zholKucR2Q9yMrKbHNn-o1TuS4mYXyi2KO6Xmx6ikHPySa9MLaLZ8t2hrA6AUcxSxDgHIwmKE]
   view my Linkedin 
profile


 https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: The information provided is correct to the best of my knowledge but 
of course cannot be guaranteed . It is essential to note that, as with any 
advice, quote "one test result is worth one-thousand expert opinions (Werner 
 Von 
Braun)".


On Wed, 1 May 2024 at 01:22, Stephen Coy 
mailto:s...@infomedia.com.au>> wrote:
Hi Mich,

I was just reading random questions on the user list when I noticed that you 
said:

On 25 Apr 2024, at 2:12 AM, Mich Talebzadeh 
mailto:mich.talebza...@gmail.com>> wrote:

1) You are using monotonically_increasing_id(), which is not 
collision-resistant in distributed environments like Spark. Multiple hosts
   can generate the same ID. I suggest switching to UUIDs (e.g., uuid.uuid4()) 
for guaranteed uniqueness.


It’s my understanding that the *Spark* `monotonically_increasing_id()` function 
exists for the exact purpose of generating a collision-resistant unique id 
across nodes on different hosts.
We use it extensively for this purpose and have never encountered an issue.

Are we wrong or are you thinking of a different (not Spark) function?

Cheers,

Steve C




This email contains confidential information of and is the copyright of 
Infomedia. It must not be forwarded, amended or disclosed without consent of 
the sender. If you received this message by mistake, please advise the sender 
and delete all copies. Security of transmission on the internet cannot be 
guaranteed, could be infected, intercepted, or corrupted and you should ensure 
you have suitable antivirus protection in place. By sending us your or any 
third party personal details, you consent to (or confirm you have obtained 
consent from such third parties) to Infomedia’s privacy policy. 
http://www.infomedia.com.au/privacy-policy/


unsubscribe

2024-05-10 Thread J UDAY KIRAN
unsubscribe


Re: pyspark dataframe join with two different data type

2024-05-10 Thread Damien Hawes
Right now, with the structure of your data, it isn't possible.

The rows aren't duplicates of each other. "a" and "b" both exist in the
array. So Spark is correctly performing the join. It looks like you need to
find another way to model this data to get what you want to achieve.

Are the values of "a" and "b" related to each other in any way?

- Damien

Op vr 10 mei 2024 18:08 schreef Karthick Nk :

> Hi Mich,
>
> Thanks for the solution, But I am getting duplicate result by using
> array_contains. I have explained the scenario below, could you help me on
> that, how we can achieve i have tried different way bu i could able to
> achieve.
>
> For example
>
> data = [
> ["a"],
> ["b"],
> ["d"],
> ]
> column_names = ["column1"]
> df = spark.createDataFrame(data, column_names)
> df.display()
>
> [image: image.png]
>
> df_1 = spark.createDataFrame([(["a", "b", "c"],), ([],)], ['data'])
> df_1.display()
> [image: image.png]
>
>
> final_df = df_1.join(df, expr("array_contains(data, column1)"))
> final_df.display()
>
> Resul:
> [image: image.png]
>
> But i need the result like below:
>
> [image: image.png]
>
> Why because
>
> In the df_1 i have only two records, in that first records onlly i have
> matching value.
> But both records from the df i.e *a, b* are present in the first records
> itself, it is returning two records as resultant, but my expectation is to
> return only one records means if any of the records from the df is present
> in the df_1 it should return only one records from the df_1.
>
> Note:
> 1. Here we are able to filter the duplicate records by using distinct of
> ID field in the resultant df, bu I am thinking that shouldn't be effective
> way, rather i am thinking of updating in array_contains steps itself.
>
> Thanks.
>
>
> On Fri, Mar 1, 2024 at 4:11 AM Mich Talebzadeh 
> wrote:
>
>>
>> This is what you want, how to join two DFs with a string column in one
>> and an array of strings in the other, keeping only rows where the string
>> is present in the array.
>>
>> from pyspark.sql import SparkSession
>> from pyspark.sql import Row
>> from pyspark.sql.functions import expr
>>
>> spark = SparkSession.builder.appName("joins").getOrCreate()
>>
>> data1 = [Row(combined_id=[1, 2, 3])  # this one has a column combined_id
>> as an array of integers
>> data2 = [Row(mr_id=2), Row(mr_id=5)] # this one has column mr_id with
>> single integers
>>
>> df1 = spark.createDataFrame(data1)
>> df2 = spark.createDataFrame(data2)
>>
>> df1.printSchema()
>> df2.printSchema()
>>
>> # Perform the join with array_contains. It takes two arguments: an array
>> and a value. It returns True if the value exists as an element within
>> the array, otherwise False.
>> joined_df = df1.join(df2, expr("array_contains(combined_id, mr_id)"))
>>
>> # Show the result
>> joined_df.show()
>>
>> root
>>  |-- combined_id: array (nullable = true)
>>  ||-- element: long (containsNull = true)
>>
>> root
>>  |-- mr_id: long (nullable = true)
>>
>> +---+-+
>> |combined_id|mr_id|
>> +---+-+
>> |  [1, 2, 3]|2|
>> |  [4, 5, 6]|5|
>> +---+-+
>>
>> HTH
>>
>> Mich Talebzadeh,
>> Dad | Technologist | Solutions Architect | Engineer
>> London
>> United Kingdom
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* The information provided is correct to the best of my
>> knowledge but of course cannot be guaranteed . It is essential to note
>> that, as with any advice, quote "one test result is worth one-thousand
>> expert opinions (Werner
>> Von Braun
>> )".
>>
>>
>> On Thu, 29 Feb 2024 at 20:50, Karthick Nk  wrote:
>>
>>> Hi All,
>>>
>>> I have two dataframe with below structure, i have to join these two
>>> dataframe - the scenario is one column is string in one dataframe and in
>>> other df join column is array of string, so we have to inner join two df
>>> and get the data if string value is present in any of the array of string
>>> value in another dataframe,
>>>
>>>
>>> df1 = spark.sql("""
>>> SELECT
>>> mr.id as mr_id,
>>> pv.id as pv_id,
>>> array(mr.id, pv.id) as combined_id
>>> FROM
>>> table1 mr
>>> INNER JOIN table2 pv ON pv.id = Mr.recordid
>>>where
>>> pv.id = '35122806-4cd2-4916-a149-24ea55c2dc36'
>>> or pv.id = 'a5f03625-6cc5-49df-95eb-df741fe9139b'
>>> """)
>>>
>>> # df1.display()
>>>
>>> # Your second query
>>> df2 = spark.sql("""
>>> SELECT
>>> id
>>> FROM
>>> table2
>>> WHERE
>>> id = '35122806-4cd2-4916-a149-24ea55c2dc36'
>>>
>>> """)
>>>
>>>
>>>
>>> Result data:
>>> 35122806-4cd2-4916-a149-24ea55c2dc36 only, because this records alone is
>>> common between string and array of string value.
>>>
>>> Can you share the sample snippet, how we 

Re: pyspark dataframe join with two different data type

2024-05-10 Thread Karthick Nk
Hi Mich,

Thanks for the solution, But I am getting duplicate result by using
array_contains. I have explained the scenario below, could you help me on
that, how we can achieve i have tried different way bu i could able to
achieve.

For example

data = [
["a"],
["b"],
["d"],
]
column_names = ["column1"]
df = spark.createDataFrame(data, column_names)
df.display()

[image: image.png]

df_1 = spark.createDataFrame([(["a", "b", "c"],), ([],)], ['data'])
df_1.display()
[image: image.png]


final_df = df_1.join(df, expr("array_contains(data, column1)"))
final_df.display()

Resul:
[image: image.png]

But i need the result like below:

[image: image.png]

Why because

In the df_1 i have only two records, in that first records onlly i have
matching value.
But both records from the df i.e *a, b* are present in the first records
itself, it is returning two records as resultant, but my expectation is to
return only one records means if any of the records from the df is present
in the df_1 it should return only one records from the df_1.

Note:
1. Here we are able to filter the duplicate records by using distinct of ID
field in the resultant df, bu I am thinking that shouldn't be effective
way, rather i am thinking of updating in array_contains steps itself.

Thanks.


On Fri, Mar 1, 2024 at 4:11 AM Mich Talebzadeh 
wrote:

>
> This is what you want, how to join two DFs with a string column in one and
> an array of strings in the other, keeping only rows where the string is
> present in the array.
>
> from pyspark.sql import SparkSession
> from pyspark.sql import Row
> from pyspark.sql.functions import expr
>
> spark = SparkSession.builder.appName("joins").getOrCreate()
>
> data1 = [Row(combined_id=[1, 2, 3])  # this one has a column combined_id
> as an array of integers
> data2 = [Row(mr_id=2), Row(mr_id=5)] # this one has column mr_id with
> single integers
>
> df1 = spark.createDataFrame(data1)
> df2 = spark.createDataFrame(data2)
>
> df1.printSchema()
> df2.printSchema()
>
> # Perform the join with array_contains. It takes two arguments: an array
> and a value. It returns True if the value exists as an element within the
> array, otherwise False.
> joined_df = df1.join(df2, expr("array_contains(combined_id, mr_id)"))
>
> # Show the result
> joined_df.show()
>
> root
>  |-- combined_id: array (nullable = true)
>  ||-- element: long (containsNull = true)
>
> root
>  |-- mr_id: long (nullable = true)
>
> +---+-+
> |combined_id|mr_id|
> +---+-+
> |  [1, 2, 3]|2|
> |  [4, 5, 6]|5|
> +---+-+
>
> HTH
>
> Mich Talebzadeh,
> Dad | Technologist | Solutions Architect | Engineer
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  Von
> Braun )".
>
>
> On Thu, 29 Feb 2024 at 20:50, Karthick Nk  wrote:
>
>> Hi All,
>>
>> I have two dataframe with below structure, i have to join these two
>> dataframe - the scenario is one column is string in one dataframe and in
>> other df join column is array of string, so we have to inner join two df
>> and get the data if string value is present in any of the array of string
>> value in another dataframe,
>>
>>
>> df1 = spark.sql("""
>> SELECT
>> mr.id as mr_id,
>> pv.id as pv_id,
>> array(mr.id, pv.id) as combined_id
>> FROM
>> table1 mr
>> INNER JOIN table2 pv ON pv.id = Mr.recordid
>>where
>> pv.id = '35122806-4cd2-4916-a149-24ea55c2dc36'
>> or pv.id = 'a5f03625-6cc5-49df-95eb-df741fe9139b'
>> """)
>>
>> # df1.display()
>>
>> # Your second query
>> df2 = spark.sql("""
>> SELECT
>> id
>> FROM
>> table2
>> WHERE
>> id = '35122806-4cd2-4916-a149-24ea55c2dc36'
>>
>> """)
>>
>>
>>
>> Result data:
>> 35122806-4cd2-4916-a149-24ea55c2dc36 only, because this records alone is
>> common between string and array of string value.
>>
>> Can you share the sample snippet, how we can do the join for this two
>> different datatype in the dataframe.
>>
>> if any clarification needed, pls feel free to ask.
>>
>> Thanks
>>
>>


Spark 3.5.x on Java 21?

2024-05-08 Thread Stephen Coy
Hi everyone,

We’re about to upgrade our Spark clusters from Java 11 and Spark 3.2.1 to Spark 
3.5.1.

I know that 3.5.1 is supposed to be fine on Java 17, but will it run OK on Java 
21?

Thanks,

Steve C


This email contains confidential information of and is the copyright of 
Infomedia. It must not be forwarded, amended or disclosed without consent of 
the sender. If you received this message by mistake, please advise the sender 
and delete all copies. Security of transmission on the internet cannot be 
guaranteed, could be infected, intercepted, or corrupted and you should ensure 
you have suitable antivirus protection in place. By sending us your or any 
third party personal details, you consent to (or confirm you have obtained 
consent from such third parties) to Infomedia’s privacy policy. 
http://www.infomedia.com.au/privacy-policy/


Re: [Spark Streaming]: Save the records that are dropped by watermarking in spark structured streaming

2024-05-08 Thread Mich Talebzadeh
you may consider

- Increase Watermark Retention: Consider increasing the watermark retention
duration. This allows keeping records for a longer period before dropping
them. However, this might increase processing latency and violate
at-least-once semantics if the watermark lags behind real-time.

OR

- Use a separate stream for dropped records: Create a separate streaming
pipeline to process the dropped records. Try:


   - Filter: Filter out records older than the watermark in the main
   pipeline.  say

   resultC = streamingDataFrame.select( \
 col("parsed_value.rowkey").alias("rowkey") \
   , col("parsed_value.timestamp").alias("timestamp") \
   , col("parsed_value.temperature").alias("temperature"))

"""
We work out the window and the AVG(temperature) in the window's
timeframe below
This should return back the following Dataframe as struct

 root
 |-- window: struct (nullable = false)
 ||-- start: timestamp (nullable = true)
 ||-- end: timestamp (nullable = true)
 |-- avg(temperature): double (nullable = true)

"""
resultM = resultC. \
 *withWatermark("timestamp", "5 minutes").* \
 groupBy(window(resultC.timestamp, "5 minutes", "5
minutes")). \
 avg('temperature')

   - Write to Sink: Write the filtered records (dropped records) to a
   separate Kafka topic.
   - Consume and Store: Consume the dropped records topic with another
   streaming job and store them in a Postgres table or S3 using lib


HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Wed, 8 May 2024 at 05:13, Nandha Kumar  wrote:

> Hi Team,
>We are trying to use *spark structured streaming *for our use
> case. We will be joining 2 streaming sources(from kafka topic) with
> watermarks. As time progresses, the records that are prior to the watermark
> timestamp are removed from the state. For our use case, we want to *store
> these dropped records* in some postgres table or s3.
>
> When searching, we found a similar question
> in
> StackOverflow which is unanswered.
> *We would like to know how to store these dropped records due to the
> watermark.*
>


Spark not creating staging dir for insertInto partitioned table

2024-05-07 Thread Sanskar Modi
Hi Folks,

I wanted to check why spark doesn't create staging dir while doing an
insertInto on partitioned tables. I'm running below example code –
```
spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")

val rdd = sc.parallelize(Seq((1, 5, 1), (2, 1, 2), (4, 4, 3)))
val df = spark.createDataFrame(rdd)
df.write.insertInto("testing_table") // testing table is partitioned on "_1"
```
In this scenario FileOutputCommitter considers table path as output path
and creates temporary folders like
`/testing_table/_temporary/0` and then moves them to the
partition location when the job commit happens.

But in-case if multiple parallel apps are inserting into the same
partition, this can cause race condition issues while deleting the
`_temporary` dir. Ideally for each app there should be a unique staging dir
where the job should write its output.

Is there any specific reason for this? or am i missing something here?
Thanks for your time and assistance regarding this!

Kind regards
Sanskar


[Spark Streaming]: Save the records that are dropped by watermarking in spark structured streaming

2024-05-07 Thread Nandha Kumar
Hi Team,
   We are trying to use *spark structured streaming *for our use case.
We will be joining 2 streaming sources(from kafka topic) with watermarks.
As time progresses, the records that are prior to the watermark timestamp
are removed from the state. For our use case, we want to *store these
dropped records* in some postgres table or s3.

When searching, we found a similar question
in
StackOverflow which is unanswered.
*We would like to know how to store these dropped records due to the
watermark.*


unsubscribe

2024-05-07 Thread Wojciech Bombik
unsubscribe


unsubscribe

2024-05-06 Thread Moise
unsubscribe


Re: ********Spark streaming issue to Elastic data**********

2024-05-06 Thread Mich Talebzadeh
Hi Kartrick,

Unfortunately Materialised views are not available in Spark as yet. I
raised Jira [SPARK-48117] Spark Materialized Views: Improve Query
Performance and Data Management - ASF JIRA (apache.org)
 as a feature request.

Let me think about another way and revert

HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Mon, 6 May 2024 at 07:54, Karthick Nk  wrote:

> Thanks Mich,
>
> can you please confirm me is my understanding correct?
>
> First, we have to create the materialized view based on the mapping
> details we have by using multiple tables as source(since we have multiple
> join condition from different tables). From the materialised view we can
> stream the view data into elastic index by using cdc?
>
> Thanks in advance.
>
> On Fri, May 3, 2024 at 3:39 PM Mich Talebzadeh 
> wrote:
>
>> My recommendation! is using materialized views (MVs) created in Hive with
>> Spark Structured Streaming and Change Data Capture (CDC) is a good
>> combination for efficiently streaming view data updates in your scenario.
>>
>> HTH
>>
>> Mich Talebzadeh,
>> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
>> London
>> United Kingdom
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* The information provided is correct to the best of my
>> knowledge but of course cannot be guaranteed . It is essential to note
>> that, as with any advice, quote "one test result is worth one-thousand
>> expert opinions (Werner
>> Von Braun
>> )".
>>
>>
>> On Thu, 2 May 2024 at 21:25, Karthick Nk  wrote:
>>
>>> Hi All,
>>>
>>> Requirements:
>>> I am working on the data flow, which will use the view definition(view
>>> definition already defined in schema), there are multiple tables used in
>>> the view definition. Here we want to stream the view data into elastic
>>> index based on if any of the table(used in the view definition) data got
>>> changed.
>>>
>>>
>>> Current flow:
>>> 1. we are inserting id's from the table(which used in the view
>>> definition) into the common table.
>>> 2. From the common table by using the id, we will be streaming the view
>>> data (by using if any of the incomming id is present in the collective id
>>> of all tables used from view definition) by using spark structured
>>> streaming.
>>>
>>>
>>> Issue:
>>> 1. Here we are facing issue - For each incomming id here we running view
>>> definition(so it will read all the data from all the data) and check if any
>>> of the incomming id is present in the collective id's of view result, Due
>>> to which it is taking more memory in the cluster driver and taking more
>>> time to process.
>>>
>>>
>>> I am epxpecting an alternate solution, if we can avoid full scan of view
>>> definition every time, If you have any alternate deisgn flow how we can
>>> achieve the result, please suggest for the same.
>>>
>>>
>>> Note: Also, it will be helpfull, if you can share the details like
>>> community forum or platform to discuss this kind of deisgn related topics,
>>> it will be more helpfull.
>>>
>>


Re: ********Spark streaming issue to Elastic data**********

2024-05-06 Thread Karthick Nk
Thanks Mich,

can you please confirm me is my understanding correct?

First, we have to create the materialized view based on the mapping details
we have by using multiple tables as source(since we have multiple
join condition from different tables). From the materialised view we can
stream the view data into elastic index by using cdc?

Thanks in advance.

On Fri, May 3, 2024 at 3:39 PM Mich Talebzadeh 
wrote:

> My recommendation! is using materialized views (MVs) created in Hive with
> Spark Structured Streaming and Change Data Capture (CDC) is a good
> combination for efficiently streaming view data updates in your scenario.
>
> HTH
>
> Mich Talebzadeh,
> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  Von
> Braun )".
>
>
> On Thu, 2 May 2024 at 21:25, Karthick Nk  wrote:
>
>> Hi All,
>>
>> Requirements:
>> I am working on the data flow, which will use the view definition(view
>> definition already defined in schema), there are multiple tables used in
>> the view definition. Here we want to stream the view data into elastic
>> index based on if any of the table(used in the view definition) data got
>> changed.
>>
>>
>> Current flow:
>> 1. we are inserting id's from the table(which used in the view
>> definition) into the common table.
>> 2. From the common table by using the id, we will be streaming the view
>> data (by using if any of the incomming id is present in the collective id
>> of all tables used from view definition) by using spark structured
>> streaming.
>>
>>
>> Issue:
>> 1. Here we are facing issue - For each incomming id here we running view
>> definition(so it will read all the data from all the data) and check if any
>> of the incomming id is present in the collective id's of view result, Due
>> to which it is taking more memory in the cluster driver and taking more
>> time to process.
>>
>>
>> I am epxpecting an alternate solution, if we can avoid full scan of view
>> definition every time, If you have any alternate deisgn flow how we can
>> achieve the result, please suggest for the same.
>>
>>
>> Note: Also, it will be helpfull, if you can share the details like
>> community forum or platform to discuss this kind of deisgn related topics,
>> it will be more helpfull.
>>
>


unsubscribe

2024-05-04 Thread chen...@birdiexx.com

unsubscribe


unsubscribe

2024-05-03 Thread Bing



 Replied Message 
| From | Wood Super |
| Date | 05/01/2024 07:49 |
| To | user  |
| Subject | unsubscribe |
unsubscribe


Spark Materialized Views: Improve Query Performance and Data Management

2024-05-03 Thread Mich Talebzadeh
Hi,

I have raised a ticket SPARK-48117
 for enhancing Spark
capabilities with Materialised Views (MV). Currently both Hive and
Databricks support this. I have added these potential benefits  to the
ticket

-* Improved Query Performance (especially for Streaming Data):*
Materialized Views can significantly improve query performance,
particularly for use cases involving Spark Structured Streaming. When
dealing with continuous data streams, materialized views can pre-compute
and store frequently accessed aggregations or transformations. Subsequent
queries on the materialized view can retrieve the results much faster
compared to continuously processing the entire streaming data. This is
crucial for real-time analytics where low latency is essential.
*Enhancing Data Management:* They offer a way to pre-aggregate or transform
data, making complex queries more efficient.
- *Reduced Data Movement*: Materialized Views can be materialized on
specific clusters or storage locations closer to where the data will be
consumed. This minimizes data movement across the network, further
improving query performance and reducing overall processing time.
- *Simplified Workflows:* Developers and analysts can leverage pre-defined
Materialized Views that represent specific business logic or data subsets.
This simplifies data access, reduces development time for queries that rely
on these views, and fosters code reuse.

Please have a look at the ticket and add your comments.

Thanks

Mich Talebzadeh,

Technologist | Architect | Data Engineer  | Generative AI | FinCrime

London
United Kingdom


   view my Linkedin profile


 https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: The information provided is correct to the best of my knowledge
but of course cannot be guaranteed . It is essential to note that, as with
any advice, quote "one test result is worth one-thousand expert opinions
(Werner Von Braun)".


  1   2   3   4   5   6   7   8   9   10   >