Re: [s3a] Spark is not reading s3 object content

2024-05-31 Thread Amin Mosayyebzadeh
I am reading from a single file:
df = spark.read.text("s3a://test-bucket/testfile.csv")



On Fri, May 31, 2024 at 5:26 AM Mich Talebzadeh 
wrote:

> Tell Spark to read from a single file
>
> data = spark.read.text("s3a://test-bucket/testfile.csv")
>
> This clarifies to Spark that you are dealing with a single file and avoids
> any bucket-like interpretation.
>
> HTH
>
> Mich Talebzadeh,
> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
> PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial College
> London <https://en.wikipedia.org/wiki/Imperial_College_London>
> London, United Kingdom
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
> Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>
>
> On Fri, 31 May 2024 at 09:53, Amin Mosayyebzadeh 
> wrote:
>
>> I will work on the first two possible causes.
>> For the third one, which I guess is the real problem, Spark treats the
>> testfile.csv object with the url s3a://test-bucket/testfile.csv as a bucket
>> to access _spark_metadata with url
>> s3a://test-bucket/testfile.csv/_spark_metadata
>> testfile.csv is an object and should not be treated as a bucket. But I am
>> not sure how to prevent Spark from doing that.
>>
>


Re: [s3a] Spark is not reading s3 object content

2024-05-30 Thread Amin Mosayyebzadeh
I will work on the first two possible causes.
For the third one, which I guess is the real problem, Spark treats the
testfile.csv object with the url s3a://test-bucket/testfile.csv as a bucket
to access _spark_metadata with url
s3a://test-bucket/testfile.csv/_spark_metadata
testfile.csv is an object and should not be treated as a bucket. But I am
not sure how to prevent Spark from doing that.


Re: [s3a] Spark is not reading s3 object content

2024-05-30 Thread Amin Mosayyebzadeh
The code should read testfile.csv file from s3. and print the content. It
only prints a empty list although the file has content.
I have also checked our custom s3 storage (Ceph based) logs and I see only
LIST operations coming from Spark, there is no GET object operation for
testfile.csv

The only error I see in DEBUG output is these lines:

=
24/05/30 15:39:21 INFO MetadataLogFileIndex: Reading streaming file log
from s3a://test-bucket/testfile.csv/_spark_metadata
24/05/30 15:39:21 DEBUG UserGroupInformation: PrivilegedAction [as: ubuntu
(auth:SIMPLE)][action: org.apache.hadoop.fs.FileContext$2@7af85238]
java.lang.Exception
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
at
org.apache.hadoop.fs.FileContext.getAbstractFileSystem(FileContext.java:339)
at
org.apache.hadoop.fs.FileContext.getFileContext(FileContext.java:465)
at
org.apache.spark.sql.execution.streaming.AbstractFileContextBasedCheckpointFileManager.(CheckpointFileManager.scala:311)
at
org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.(CheckpointFileManager.scala:352)
at
org.apache.spark.sql.execution.streaming.CheckpointFileManager$.create(CheckpointFileManager.scala:209)
at
org.apache.spark.sql.execution.streaming.HDFSMetadataLog.(HDFSMetadataLog.scala:64)
at
org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.(CompactibleFileStreamLog.scala:48)
at
org.apache.spark.sql.execution.streaming.FileStreamSinkLog.(FileStreamSinkLog.scala:91)
at
org.apache.spark.sql.execution.streaming.MetadataLogFileIndex.(MetadataLogFileIndex.scala:52)
at
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:369)
at
org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
at
org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
at scala.Option.getOrElse(Option.scala:201)
at
org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
at
org.apache.spark.sql.DataFrameReader.text(DataFrameReader.scala:646)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:829)

===
Which I am not sure if it is related since Spark can see and list the
bucket (it also returns the correct object size which is 33 Bytes.).

Best,
Amin


On Thu, May 30, 2024 at 4:05 PM Mich Talebzadeh 
wrote:

> Hello,
>
> Overall, the exit code of 0 suggests a successful run of your Spark job.
> Analyze the intended purpose of your code and verify the output or Spark UI
> for further confirmation.
>
> 24/05/30 01:23:43 INFO SparkContext: SparkContext is stopping with
> exitCode 0.
>
> what to check
>
>
>1. Verify Output: If your Spark job was intended to read data from S3
>and process it, you will need to verify the output to ensure the data was
>handled correctly. This might involve checking if any results were written
>to a designated location or if any transformations were applied
>successfully.
>2. Review Code:
>3. Check Spark UI:
>
>
> HTH
>
> Mich Talebzadeh,
> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
> PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial College
> London <https://en.wikipedia.org/wiki/Imperial_College_London>
> London, United Kingdom
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
> Braun <https://en.wikipedia.org/wiki/Wernh

Re: [s3a] Spark is not reading s3 object content

2024-05-29 Thread Amin Mosayyebzadeh
urceStrategy: Pushed Filters:
24/05/30 01:23:43 INFO FileSourceStrategy: Post-Scan Filters:
24/05/30 01:23:43 INFO CodeGenerator: Code generated in 188.932153 ms
24/05/30 01:23:43 INFO MemoryStore: Block broadcast_0 stored as values in
memory (estimated size 201.3 KiB, free 2.8 GiB)
24/05/30 01:23:43 INFO MemoryStore: Block broadcast_0_piece0 stored as
bytes in memory (estimated size 34.8 KiB, free 2.8 GiB)
24/05/30 01:23:43 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory
on MOC-R4PAC08U33-S1C:39343 (size: 34.8 KiB, free: 2.8 GiB)
24/05/30 01:23:43 INFO SparkContext: Created broadcast 0 from showString at
NativeMethodAccessorImpl.java:0
24/05/30 01:23:43 INFO FileSourceScanExec: Planning scan with bin packing,
max size: 4194304 bytes, open cost is considered as scanning 4194304 bytes.
++++
|name|int1|int2|
++++
++++

24/05/30 01:23:43 INFO SparkContext: SparkContext is stopping with exitCode
0.
24/05/30 01:23:43 INFO SparkUI: Stopped Spark web UI at
http://MOC-R4PAC08U33-S1C:4040
24/05/30 01:23:43 INFO MapOutputTrackerMasterEndpoint:
MapOutputTrackerMasterEndpoint stopped!
24/05/30 01:23:43 INFO MemoryStore: MemoryStore cleared
24/05/30 01:23:43 INFO BlockManager: BlockManager stopped
24/05/30 01:23:43 INFO BlockManagerMaster: BlockManagerMaster stopped
24/05/30 01:23:43 INFO
OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:
OutputCommitCoordinator stopped!
24/05/30 01:23:43 INFO SparkContext: Successfully stopped SparkContext
24/05/30 01:23:44 INFO ShutdownHookManager: Shutdown hook called
24/05/30 01:23:44 INFO ShutdownHookManager: Deleting directory
/tmp/spark-f26cd915-aeb6-4efc-8960-56ca51ac1a7d
24/05/30 01:23:44 INFO ShutdownHookManager: Deleting directory
/tmp/spark-91b984d6-62fe-4c6b-9996-36d6873ff5d6
24/05/30 01:23:44 INFO ShutdownHookManager: Deleting directory
/tmp/spark-91b984d6-62fe-4c6b-9996-36d6873ff5d6/pyspark-bcca58a9-38dc-4359-85d1-81b728d6cf82


Best,
Amin



On Thu, May 23, 2024 at 4:20 PM Mich Talebzadeh 
wrote:

> Could be a number of reasons
>
> First test reading the file with a cli
>
> aws s3 cp s3a://input/testfile.csv .
> cat testfile.csv
>
>
> Try this code with debug option to diagnose the problem
>
> from pyspark.sql import SparkSession
> from pyspark.sql.utils import AnalysisException
>
> try:
> # Initialize Spark session
> spark = SparkSession.builder \
> .appName("S3ReadTest") \
> .config("spark.jars.packages",
> "org.apache.hadoop:hadoop-aws:3.3.6") \
> .config("spark.hadoop.fs.s3a.access.key", "R*6") \
> .config("spark.hadoop.fs.s3a.secret.key", "1***e") \
> .config("spark.hadoop.fs.s3a.endpoint", "192.168.52.63:8000") \
> .config("spark.hadoop.fs.s3a.path.style.access", "true") \
> .config("spark.hadoop.fs.s3a.impl",
> "org.apache.hadoop.fs.s3a.S3AFileSystem") \
> .getOrCreate()
>
> # Read the CSV file from S3
> df = spark.read \
> .option("header", "true") \
> .option("inferSchema", "true") \
> .option("delimiter", " ") \  # ensure this is apace
> .csv("s3a://input/testfile.csv")
>
> # Show the data
> df.show(n=1)
>
> except AnalysisException as e:
> print(f"AnalysisException: {e}")
> except Exception as e:
> print(f"Error: {e}")
> finally:
> # Stop the Spark session
> spark.stop()
>
> HTH
>
> Mich Talebzadeh,
> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
> London
> United Kingdom
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
> Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>
>
> On Thu, 23 May 2024 at 20:14, Amin Mosayyebzadeh 
> wrote:
>
>> I am trying to read an s3 object from a local S3 storage (Ceph based)
>> using Spark 3.5.1. I see it can access the bucket and list the files (I
>> have verified it on Ceph side by checking its logs), even returning the
>> correct size of the object. But the content is not read.
>>
>> The object url is:
>> s3a://input/testfile.csv (I have also tested a nested bucket:
>> s3a://test1/test2/test3/testfil

[s3a] Spark is not reading s3 object content

2024-05-23 Thread Amin Mosayyebzadeh
 I am trying to read an s3 object from a local S3 storage (Ceph based)
using Spark 3.5.1. I see it can access the bucket and list the files (I
have verified it on Ceph side by checking its logs), even returning the
correct size of the object. But the content is not read.

The object url is:
s3a://input/testfile.csv (I have also tested a nested bucket:
s3a://test1/test2/test3/testfile.csv)


Object's content:

=
name int1 int2
first 1 2
second 3 4
=


Here is the config I have set so far:

("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.6")
("spark.hadoop.fs.s3a.access.key", "R*6")
("spark.hadoop.fs.s3a.secret.key", "1***e")
("spark.hadoop.fs.s3a.endpoint", "192.168.52.63:8000")
("spark.hadoop.fs.s3a.path.style.access", "true")
("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")


The outop for my following Pyspark application:
df = spark.read \
.option("header", "true") \
.schema(schema) \
.csv("s3a://input/testfile.csv", sep=' ')

df.show(n=1)
==
24/05/20 02:35:00 INFO MetricsSystemImpl: s3a-file-system metrics
system started24/05/20 02:35:01 INFO MetadataLogFileIndex: Reading
streaming file log from
s3a://input/testfile.csv/_spark_metadata24/05/20 02:35:01 INFO
FileStreamSinkLog: BatchIds found from listing:24/05/20 02:35:03 INFO
FileSourceStrategy: Pushed Filters:24/05/20 02:35:03 INFO
FileSourceStrategy: Post-Scan Filters:24/05/20 02:35:03 INFO
CodeGenerator: Code generated in 176.139675 ms24/05/20 02:35:03 INFO
MemoryStore: Block broadcast_0 stored as values in memory (estimated
size 496.6 KiB, free 4.1 GiB)24/05/20 02:35:03 INFO MemoryStore: Block
broadcast_0_piece0 stored as bytes in memory (estimated size 54.4 KiB,
free 4.1 GiB)24/05/20 02:35:03 INFO BlockManagerInfo: Added
broadcast_0_piece0 in memory on master:38197 (size: 54.4 KiB, free:
4.1 GiB)24/05/20 02:35:03 INFO SparkContext: Created broadcast 0 from
showString at NativeMethodAccessorImpl.java:024/05/20 02:35:03 INFO
FileSourceScanExec: Planning scan with bin packing, max size: 4194304
bytes, open cost is considered as scanning 4194304 bytes.
++++
|name|int1|int2|
++++
++++
24/05/20 02:35:04 INFO SparkContext: Invoking stop() from shutdown
hook24/05/20 02:35:04 INFO SparkContext: SparkContext is stopping with
exitCode 0
=

Am I missing something here?

P.S. I see OP_IS_DIRECTORY is set to 1. Is that a correct behavior?


Thanks in advance!