Re: Unable to force small partitions in streaming job without repartitioning

2022-02-12 Thread Chris Coutinho
Hi Gourav,

The static table is broadcasted prior to the join so the shuffle is primarily to
avoid OOME during the UDF.

It's not quite a Cartesian product, but yes the join results in multiple records
per input record. The number of output records varies depending on the number of
duplicates in the static table for that particular non-unique key. The key is a
single column.

Thanks,
Chris

On Fri, 2022-02-11 at 20:10 +, Gourav Sengupta wrote:
> Gourav Sengupta



Re: Unable to access Google buckets using spark-submit

2022-02-12 Thread Gourav Sengupta
Hi,

agree with Holden, have faced quite a few issues with FUSE.

Also trying to understand "spark-submit from local" . Are you submitting
your SPARK jobs from a local laptop or in local mode from a GCP dataproc /
system?

If you are submitting the job from your local laptop, there will be
performance bottlenecks I guess based on the internet bandwidth and volume
of data.

Regards,
Gourav


On Sat, Feb 12, 2022 at 7:12 PM Holden Karau  wrote:

> You can also put the GS access jar with your Spark jars — that’s what the
> class not found exception is pointing you towards.
>
> On Fri, Feb 11, 2022 at 11:58 PM Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> BTW I also answered you in in stackoverflow :
>>
>>
>> https://stackoverflow.com/questions/71088934/unable-to-access-google-buckets-using-spark-submit
>>
>> HTH
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Sat, 12 Feb 2022 at 08:24, Mich Talebzadeh 
>> wrote:
>>
>>> You are trying to access a Google storage bucket gs:// from your local
>>> host.
>>>
>>> It does not see it because spark-submit assumes that it is a local file
>>> system on the host which is not.
>>>
>>> You need to mount gs:// bucket as a local file system.
>>>
>>> You can use the tool called gcsfuse
>>> https://cloud.google.com/storage/docs/gcs-fuse . Cloud Storage FUSE is
>>> an open source FUSE  adapter that allows
>>> you to mount Cloud Storage buckets as file systems on Linux or macOS
>>> systems. You can download gcsfuse from here
>>> 
>>>
>>>
>>> Pretty simple.
>>>
>>>
>>> It will be installed as /usr/bin/gcsfuse and you can mount it by
>>> creating a local mount file like /mnt/gs as root and give permission to
>>> others to use it.
>>>
>>>
>>> As a normal user that needs to access gs:// bucket (not as root), use
>>> gcsfuse to mount it. For example I am mounting a gcs bucket called
>>> spark-jars-karan here
>>>
>>>
>>> Just use the bucket name itself
>>>
>>>
>>> gcsfuse spark-jars-karan /mnt/gs
>>>
>>>
>>> Then you can refer to it as /mnt/gs in spark-submit from on-premise host
>>>
>>> spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0 
>>> --jars /mnt/gs/spark-bigquery-with-dependencies_2.12-0.23.2.jar
>>>
>>> HTH
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Sat, 12 Feb 2022 at 04:31, karan alang  wrote:
>>>
 Hello All,

 I'm trying to access gcp buckets while running spark-submit from local,
 and running into issues.

 I'm getting error :
 ```

 22/02/11 20:06:59 WARN NativeCodeLoader: Unable to load native-hadoop 
 library for your platform... using builtin-java classes where applicable
 Exception in thread "main" 
 org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for 
 scheme "gs"

 ```
 I tried adding the --conf
 spark.hadoop.fs.gs.impl=com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem

 to the spark-submit command, but getting ClassNotFoundException

 Details are in stackoverflow :

 https://stackoverflow.com/questions/71088934/unable-to-access-google-buckets-using-spark-submit

 Any ideas on how to fix this ?
 tia !

 --
> Twitter: https://twitter.com/holdenkarau
> Books (Learning Spark, High Performance Spark, etc.):
> https://amzn.to/2MaRAG9  
> YouTube Live Streams: https://www.youtube.com/user/holdenkarau
>


Re: Unable to force small partitions in streaming job without repartitioning

2022-02-12 Thread Gourav Sengupta
hi,

Did you try to sorting while writing out the data? All of this engineering
may not be required in that case.




Regards,
Gourav Sengupta

On Sat, Feb 12, 2022 at 8:42 PM Chris Coutinho 
wrote:

> Setting the option in the cluster configuration solved the issue, and now
> we're able to specify the row group size based on the block size as
> intended.
>
> Thanks!
>
> On Fri, Feb 11, 2022 at 6:59 PM Adam Binford  wrote:
>
>> Writing to Delta might not support the write.option method. We set
>> spark.hadoop.parquet.block.size in our spark config for writing to Delta.
>>
>> Adam
>>
>> On Fri, Feb 11, 2022, 10:15 AM Chris Coutinho 
>> wrote:
>>
>>> I tried re-writing the table with the updated block size but it doesn't
>>> appear to have an effect on the row group size.
>>>
>>> ```pyspark
>>> df = spark.read.format("delta").load("/path/to/source1")
>>>
>>> df.write \
>>> .format("delta") \
>>> .mode("overwrite") \
>>> .options(**{
>>>   "parquet.block.size": "1m",
>>> }) \
>>> .partitionBy("date") \
>>> .save("/path/to/source2")
>>> ```
>>>
>>> The files created by this job are about 20m in size. Using
>>> `parquet-tools` I can inspect a single file and see the following 12m file
>>> contains a single row group - not the expected 12 based on the block size:
>>>
>>> $ parquet-tools inspect /path/to/source2/date=.../part-.parquet
>>>  file meta data 
>>> created_by: parquet-mr version 1.10.1-databricks9 (build
>>> cf6c823f85c3b69d49e1573e48e236148c709e82)
>>> num_columns: 19
>>> num_rows: 369483
>>> num_row_groups: 1
>>> format_version: 1.0
>>> serialized_size: 6364
>>>
>>>  Columns 
>>> ...
>>>
>>> Chris
>>>
>>> On Fri, Feb 11, 2022 at 3:37 PM Sean Owen  wrote:
>>>
 It should just be parquet.block.size indeed.
 spark.write.option("parquet.block.size", "16m").parquet(...)
 This is an issue in how you write, not read, the parquet.

 On Fri, Feb 11, 2022 at 8:26 AM Chris Coutinho <
 chrisbcouti...@gmail.com> wrote:

> Hi Adam,
>
> Thanks for the explanation on the empty partitions.
>
> We have the freedom to adjust how the source table is written, so if
> there are any improvements we can implement on the source side we'd be
> happy to look into that.
>
> It's not yet clear to me how you can reduce the row group size of the
> parquet files, I see some mention of `parquet.block.size` online , as well
> as various map reduce settings regarding file splitting (SO:
> mapred-min-split-size-in-hdfs
> );
> however, I don't quite understand the link between the splitting settings,
> row group configuration, and resulting number of records when reading from
> a delta table.
>
> For more specifics: we're running Spark 3.1.2 using ADLS as cloud
> storage.
>
> Best,
> Chris
>
> On Fri, Feb 11, 2022 at 1:40 PM Adam Binford 
> wrote:
>
>> The smallest unit of work you can do on a parquet file (under the
>> delta hood) is based on the parquet row group size, which by default is
>> 128mb. If you specify maxPartitionBytes of 10mb, what that will basically
>> do is create a partition for each 10mb of a file, but whatever partition
>> covers the part of the file where the row group starts will consume the
>> entire row group. That's why you're seeing a lot of empty partitions and 
>> a
>> small number with the rest of the actual data.
>>
>> Can't think of any solution other than repartitioning (or rewriting
>> the input Delta table with a much smaller row group size which wouldn't 
>> be
>> ideal performance wise).
>>
>> Adam
>>
>> On Fri, Feb 11, 2022 at 7:23 AM Chris Coutinho <
>> chrisbcouti...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> We have a spark structured streaming job that includes a
>>> stream-static join and a Pandas UDF, streaming to/from delta tables. The
>>> primary key of the static table is non-unique, meaning that the 
>>> streaming
>>> join results in multiple records per input record - in our case 100x
>>> increase. The Pandas UDF is then applied to the resulting stream-static
>>> join and stored in a table. To avoid OOM errors on the executors, we 
>>> need
>>> to start with very small (~10MB) partitions to account for the 
>>> expansion.
>>> Currently this only seems possible by explicitly repartitioning the 
>>> data,
>>> incurring the perf cost associated with the shuffle. Is it possible to
>>> force spark to read parquet files into 10MB partitions without 
>>> explicitly
>>> repartitioning?
>>>
>>> The documentation regarding Performance Tuning [0] suggests that it
>>> should be possible to control how spark reads files into partitions - 
>>> we're
>>> assumin

Re: Unable to force small partitions in streaming job without repartitioning

2022-02-12 Thread Chris Coutinho
Setting the option in the cluster configuration solved the issue, and now
we're able to specify the row group size based on the block size as
intended.

Thanks!

On Fri, Feb 11, 2022 at 6:59 PM Adam Binford  wrote:

> Writing to Delta might not support the write.option method. We set
> spark.hadoop.parquet.block.size in our spark config for writing to Delta.
>
> Adam
>
> On Fri, Feb 11, 2022, 10:15 AM Chris Coutinho 
> wrote:
>
>> I tried re-writing the table with the updated block size but it doesn't
>> appear to have an effect on the row group size.
>>
>> ```pyspark
>> df = spark.read.format("delta").load("/path/to/source1")
>>
>> df.write \
>> .format("delta") \
>> .mode("overwrite") \
>> .options(**{
>>   "parquet.block.size": "1m",
>> }) \
>> .partitionBy("date") \
>> .save("/path/to/source2")
>> ```
>>
>> The files created by this job are about 20m in size. Using
>> `parquet-tools` I can inspect a single file and see the following 12m file
>> contains a single row group - not the expected 12 based on the block size:
>>
>> $ parquet-tools inspect /path/to/source2/date=.../part-.parquet
>>  file meta data 
>> created_by: parquet-mr version 1.10.1-databricks9 (build
>> cf6c823f85c3b69d49e1573e48e236148c709e82)
>> num_columns: 19
>> num_rows: 369483
>> num_row_groups: 1
>> format_version: 1.0
>> serialized_size: 6364
>>
>>  Columns 
>> ...
>>
>> Chris
>>
>> On Fri, Feb 11, 2022 at 3:37 PM Sean Owen  wrote:
>>
>>> It should just be parquet.block.size indeed.
>>> spark.write.option("parquet.block.size", "16m").parquet(...)
>>> This is an issue in how you write, not read, the parquet.
>>>
>>> On Fri, Feb 11, 2022 at 8:26 AM Chris Coutinho 
>>> wrote:
>>>
 Hi Adam,

 Thanks for the explanation on the empty partitions.

 We have the freedom to adjust how the source table is written, so if
 there are any improvements we can implement on the source side we'd be
 happy to look into that.

 It's not yet clear to me how you can reduce the row group size of the
 parquet files, I see some mention of `parquet.block.size` online , as well
 as various map reduce settings regarding file splitting (SO:
 mapred-min-split-size-in-hdfs
 );
 however, I don't quite understand the link between the splitting settings,
 row group configuration, and resulting number of records when reading from
 a delta table.

 For more specifics: we're running Spark 3.1.2 using ADLS as cloud
 storage.

 Best,
 Chris

 On Fri, Feb 11, 2022 at 1:40 PM Adam Binford  wrote:

> The smallest unit of work you can do on a parquet file (under the
> delta hood) is based on the parquet row group size, which by default is
> 128mb. If you specify maxPartitionBytes of 10mb, what that will basically
> do is create a partition for each 10mb of a file, but whatever partition
> covers the part of the file where the row group starts will consume the
> entire row group. That's why you're seeing a lot of empty partitions and a
> small number with the rest of the actual data.
>
> Can't think of any solution other than repartitioning (or rewriting
> the input Delta table with a much smaller row group size which wouldn't be
> ideal performance wise).
>
> Adam
>
> On Fri, Feb 11, 2022 at 7:23 AM Chris Coutinho <
> chrisbcouti...@gmail.com> wrote:
>
>> Hello,
>>
>> We have a spark structured streaming job that includes a
>> stream-static join and a Pandas UDF, streaming to/from delta tables. The
>> primary key of the static table is non-unique, meaning that the streaming
>> join results in multiple records per input record - in our case 100x
>> increase. The Pandas UDF is then applied to the resulting stream-static
>> join and stored in a table. To avoid OOM errors on the executors, we need
>> to start with very small (~10MB) partitions to account for the expansion.
>> Currently this only seems possible by explicitly repartitioning the data,
>> incurring the perf cost associated with the shuffle. Is it possible to
>> force spark to read parquet files into 10MB partitions without explicitly
>> repartitioning?
>>
>> The documentation regarding Performance Tuning [0] suggests that it
>> should be possible to control how spark reads files into partitions - 
>> we're
>> assuming this accounts for structured streaming jobs as well. Based on 
>> our
>> understanding of the page, we used the following to configure spark into
>> reading a stream of 10GB per trigger into 1000 partitions 10 MB each.
>>
>> spark.sql.files.openCostInBytes 128MB
>> spark.sql.files.maxPartitionBytes 10MB
>> spark.sql.files.minPartitionNum 1000
>>
>> Unfortunately we still see a l

Re: Unable to access Google buckets using spark-submit

2022-02-12 Thread Holden Karau
You can also put the GS access jar with your Spark jars — that’s what the
class not found exception is pointing you towards.

On Fri, Feb 11, 2022 at 11:58 PM Mich Talebzadeh 
wrote:

> BTW I also answered you in in stackoverflow :
>
>
> https://stackoverflow.com/questions/71088934/unable-to-access-google-buckets-using-spark-submit
>
> HTH
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sat, 12 Feb 2022 at 08:24, Mich Talebzadeh 
> wrote:
>
>> You are trying to access a Google storage bucket gs:// from your local
>> host.
>>
>> It does not see it because spark-submit assumes that it is a local file
>> system on the host which is not.
>>
>> You need to mount gs:// bucket as a local file system.
>>
>> You can use the tool called gcsfuse
>> https://cloud.google.com/storage/docs/gcs-fuse . Cloud Storage FUSE is
>> an open source FUSE  adapter that allows
>> you to mount Cloud Storage buckets as file systems on Linux or macOS
>> systems. You can download gcsfuse from here
>> 
>>
>>
>> Pretty simple.
>>
>>
>> It will be installed as /usr/bin/gcsfuse and you can mount it by creating
>> a local mount file like /mnt/gs as root and give permission to others to
>> use it.
>>
>>
>> As a normal user that needs to access gs:// bucket (not as root), use
>> gcsfuse to mount it. For example I am mounting a gcs bucket called
>> spark-jars-karan here
>>
>>
>> Just use the bucket name itself
>>
>>
>> gcsfuse spark-jars-karan /mnt/gs
>>
>>
>> Then you can refer to it as /mnt/gs in spark-submit from on-premise host
>>
>> spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0 
>> --jars /mnt/gs/spark-bigquery-with-dependencies_2.12-0.23.2.jar
>>
>> HTH
>>
>>view my Linkedin profile
>> 
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Sat, 12 Feb 2022 at 04:31, karan alang  wrote:
>>
>>> Hello All,
>>>
>>> I'm trying to access gcp buckets while running spark-submit from local,
>>> and running into issues.
>>>
>>> I'm getting error :
>>> ```
>>>
>>> 22/02/11 20:06:59 WARN NativeCodeLoader: Unable to load native-hadoop 
>>> library for your platform... using builtin-java classes where applicable
>>> Exception in thread "main" 
>>> org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for 
>>> scheme "gs"
>>>
>>> ```
>>> I tried adding the --conf
>>> spark.hadoop.fs.gs.impl=com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem
>>>
>>> to the spark-submit command, but getting ClassNotFoundException
>>>
>>> Details are in stackoverflow :
>>>
>>> https://stackoverflow.com/questions/71088934/unable-to-access-google-buckets-using-spark-submit
>>>
>>> Any ideas on how to fix this ?
>>> tia !
>>>
>>> --
Twitter: https://twitter.com/holdenkarau
Books (Learning Spark, High Performance Spark, etc.):
https://amzn.to/2MaRAG9  
YouTube Live Streams: https://www.youtube.com/user/holdenkarau


Failed to construct kafka consumer, Failed to load SSL keystore + Spark Streaming

2022-02-12 Thread joyan sil
Hi All,

I am trying to read from Kafka using spark streaming from spark-shell but
getting the below error. Any suggestions to fix this is much appreciated.

I am running from spark-shell hence it is client mode and the files are
available in the local filesystem.

I tried to access the files as shown below. But I still get the same error.
Any suggestions to make this work from spark-shell

spark-shell \
--packages
org.apache.hudi:hudi-spark-bundle_2.12:0.9.0,org.apache.spark:spark-avro_2.12:2.4.7,org.apache.avro:avro:1.8.2,org.apache.spark:spark-sql-kafka-0-10_2.12:2.4.8
\
--conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \
--conf
'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
\
--conf "spark.sql.hive.convertMetastoreParquet=false" \
--files
/local_dir/kafka.client.truststore.jks,/local_dir/test.kafka.client.xxx.com.jks

val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "server1,server2")
.option("subscribe", "wm-cth-salesstreams")
.option("startingOffsets", "latest")
.option("maxOffsetsPerTrigger", 100)
.option("failOnDataLoss", false)
.option("kafka.security.protocol","SSL")

*
//.option("kafka.ssl.truststore.location","/local_dir/kafka.client.truststore.jks").option("kafka.ssl.truststore.location","file://"
+
org.apache.spark.SparkFiles.get("/local_dir/kafka.client.truststore.jks"))*
.option("kafka.ssl.truststore.password","pwd")
.option("kafka.ssl.keystore.password","pwd")

*
//.option("kafka.ssl.keystore.location","/local_dir/test.kafka.client.xxx.com.jks")).load.option("kafka.ssl.keystore.location","file://"
+
org.apache.spark.SparkFiles.get("/local_dir/test.kafka.client.xxx.com.jks"))).load*

Exception:
22/02/12 15:57:03 INFO org.apache.spark.sql.kafka010.KafkaMicroBatchReader:
Initial offsets:
{"wm-cth-salesstreams":{"23":167267092,"59":167276860,"50":167258479,"32":167281169,"41":167272687,"53":167256274,"17":167269072,"8":167282513,"35":167298150,"44":167244867,"26":167242913,"11":167283073,"56":167304913,"29":167307963,"38":167287380,"47":167312027,"20":167280591,"2":167248970,"5":167308945,"14":167231970,"46":167267534,"55":167275890,"58":167287699,"49":167245856,"40":167247065,"13":167249522,"4":167301468,"22":167269011,"31":167349129,"16":167266948,"7":167272315,"52":167276042,"43":167273593,"25":167232737,"34":167264787,"10":167265137,"37":167252586,"1":167312454,"19":167247237,"28":167280632,"54":167307408,"45":167280214,"27":167249248,"36":167282370,"18":167223580,"9":167223643,"57":167340670,"21":167277793,"48":167273190,"3":167294084,"12":167299093,"30":167236443,"39":167311503,"15":167274468,"42":167292272,"51":167252733,"24":167245661,"6":167241738,"33":167224273,"0":167295530}}
Caused by: org.apache.kafka.common.KafkaException:
org.apache.kafka.common.KafkaException:
org.apache.kafka.common.KafkaException: Failed to load SSL keystore
/local_dir/test.kafka.client.xxx.com.jks of type JKS
at
org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:64)
at
org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:140)
at
org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:65)
at
org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:88)
at
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:713)
... 51 more
Caused by: org.apache.kafka.common.KafkaException:
org.apache.kafka.common.KafkaException: Failed to load SSL keystore
/local_dir/test.kafka.client.xxx.com.jks of type JKS
at
org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:137)
at
org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:62)
... 55 more
*Caused by: org.apache.kafka.common.KafkaException: Failed to load SSL
keystore /local_dir/test.kafka.client.xxx.com.jks of type JKS*
at
org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.load(SslFactory.java:330)
at
org.apache.kafka.common.security.ssl.SslFactory.createSSLContext(SslFactory.java:218)
at
org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:135)
... 56 more
*Caused by: java.io.FileNotFoundException:
/local_dir/test.kafka.client.xxx.com.jks (No such file or directory)*
at java.io.FileInputStream.open0(Native Method)
at java.io.FileInputStream.open(FileInputStream.java:195)
at java.io.FileInputStream.(FileInputStream.java:138)
at java.io.FileInputStream.(FileInputStream.java:93)
at
org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.load(SslFactory.java:323)
... 58 more


Re: Unable to access Google buckets using spark-submit

2022-02-12 Thread Mich Talebzadeh
BTW I also answered you in in stackoverflow :

https://stackoverflow.com/questions/71088934/unable-to-access-google-buckets-using-spark-submit

HTH


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sat, 12 Feb 2022 at 08:24, Mich Talebzadeh 
wrote:

> You are trying to access a Google storage bucket gs:// from your local
> host.
>
> It does not see it because spark-submit assumes that it is a local file
> system on the host which is not.
>
> You need to mount gs:// bucket as a local file system.
>
> You can use the tool called gcsfuse
> https://cloud.google.com/storage/docs/gcs-fuse . Cloud Storage FUSE is an
> open source FUSE  adapter that allows you
> to mount Cloud Storage buckets as file systems on Linux or macOS systems.
> You can download gcsfuse from here
> 
>
>
> Pretty simple.
>
>
> It will be installed as /usr/bin/gcsfuse and you can mount it by creating
> a local mount file like /mnt/gs as root and give permission to others to
> use it.
>
>
> As a normal user that needs to access gs:// bucket (not as root), use
> gcsfuse to mount it. For example I am mounting a gcs bucket called
> spark-jars-karan here
>
>
> Just use the bucket name itself
>
>
> gcsfuse spark-jars-karan /mnt/gs
>
>
> Then you can refer to it as /mnt/gs in spark-submit from on-premise host
>
> spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0 
> --jars /mnt/gs/spark-bigquery-with-dependencies_2.12-0.23.2.jar
>
> HTH
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sat, 12 Feb 2022 at 04:31, karan alang  wrote:
>
>> Hello All,
>>
>> I'm trying to access gcp buckets while running spark-submit from local,
>> and running into issues.
>>
>> I'm getting error :
>> ```
>>
>> 22/02/11 20:06:59 WARN NativeCodeLoader: Unable to load native-hadoop 
>> library for your platform... using builtin-java classes where applicable
>> Exception in thread "main" 
>> org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for 
>> scheme "gs"
>>
>> ```
>> I tried adding the --conf
>> spark.hadoop.fs.gs.impl=com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem
>>
>> to the spark-submit command, but getting ClassNotFoundException
>>
>> Details are in stackoverflow :
>>
>> https://stackoverflow.com/questions/71088934/unable-to-access-google-buckets-using-spark-submit
>>
>> Any ideas on how to fix this ?
>> tia !
>>
>>


Re: Unable to access Google buckets using spark-submit

2022-02-12 Thread Mich Talebzadeh
You are trying to access a Google storage bucket gs:// from your local host.

It does not see it because spark-submit assumes that it is a local file
system on the host which is not.

You need to mount gs:// bucket as a local file system.

You can use the tool called gcsfuse
https://cloud.google.com/storage/docs/gcs-fuse . Cloud Storage FUSE is an
open source FUSE  adapter that allows you to
mount Cloud Storage buckets as file systems on Linux or macOS systems. You
can download gcsfuse from here



Pretty simple.


It will be installed as /usr/bin/gcsfuse and you can mount it by creating a
local mount file like /mnt/gs as root and give permission to others to use
it.


As a normal user that needs to access gs:// bucket (not as root), use
gcsfuse to mount it. For example I am mounting a gcs bucket called
spark-jars-karan here


Just use the bucket name itself


gcsfuse spark-jars-karan /mnt/gs


Then you can refer to it as /mnt/gs in spark-submit from on-premise host

spark-submit --packages
org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0 --jars
/mnt/gs/spark-bigquery-with-dependencies_2.12-0.23.2.jar

HTH

   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sat, 12 Feb 2022 at 04:31, karan alang  wrote:

> Hello All,
>
> I'm trying to access gcp buckets while running spark-submit from local,
> and running into issues.
>
> I'm getting error :
> ```
>
> 22/02/11 20:06:59 WARN NativeCodeLoader: Unable to load native-hadoop library 
> for your platform... using builtin-java classes where applicable
> Exception in thread "main" 
> org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme 
> "gs"
>
> ```
> I tried adding the --conf
> spark.hadoop.fs.gs.impl=com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem
>
> to the spark-submit command, but getting ClassNotFoundException
>
> Details are in stackoverflow :
>
> https://stackoverflow.com/questions/71088934/unable-to-access-google-buckets-using-spark-submit
>
> Any ideas on how to fix this ?
> tia !
>
>