Re: Path style access fs.s3a.path.style.access property is not working in spark code

2020-05-04 Thread Aniruddha P Tekade
Hello User,

I got the solution to this. If you are writing to a custom s3 url, then use
the hadoop-aws-2.8.0.jar as the separate flag was introduced to enable path
style access.

Best,
Aniruddha
---

ᐧ

On Fri, May 1, 2020 at 5:08 PM Aniruddha P Tekade 
wrote:

> Hello Users,
>
> I am using on-premise object storage and able to perform operations on
> different bucket using aws-cli.
> However, when I am trying to use the same path from my spark code, it
> fails. Here are the details -
>
> Addes dependencies in build.sbt -
>
>- hadoop-aws-2.7.4.ja
>- aws-java-sdk-1.7.4.jar
>
> Spark Hadoop Configuration setup as -
>
> spark.sparkContext.hadoopConfiguration.set("fs.s3a.impl", 
> "org.apache.hadoop.fs.s3a.S3AFileSystem")
> spark.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", ENDPOINT);
> spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", ACCESS_KEY);
> spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", SECRET_KEY);
> spark.sparkContext.hadoopConfiguration.set("fs.s3a.path.style.access", "true")
>
> And now I try to write data into my custom s3 endpoint as follows -
>
> val dataStreamWriter: DataStreamWriter[Row] = PM25quality.select(
>   dayofmonth(current_date()) as "day",
>   month(current_date()) as "month",
>   year(current_date()) as "year",
>   column("time"),
>   column("quality"),
>   column("PM25"))
>   .writeStream
>   .partitionBy("year", "month", "day")
>   .format("csv")
>   .outputMode("append")
>   .option("path",  "s3a://test-bucket/")
> val streamingQuery: StreamingQuery = dataStreamWriter.start()
>
>
> However, I am getting en error that AmazonHttpClient is not able to
> execute HTTP request and
> also it is referring to the bucket-name before the URL. Seems like the
> hadoop configuration is not being resolved here -
>
>
> 20/05/01 16:51:37 INFO AmazonHttpClient: Unable to execute HTTP request:
> test-bucket.s3-region0.cloudian.com
> java.net.UnknownHostException: test-bucket.s3-region0.cloudian.com
>
>
> Is there anything that I am missing here in the configurations? Seems like
> even after setting up path style access to true,
> it's not working.
>
> --
> Aniruddha
> ---
> ᐧ
>


Path style access fs.s3a.path.style.access property is not working in spark code

2020-05-01 Thread Aniruddha P Tekade
Hello Users,

I am using on-premise object storage and able to perform operations on
different bucket using aws-cli.
However, when I am trying to use the same path from my spark code, it
fails. Here are the details -

Addes dependencies in build.sbt -

   - hadoop-aws-2.7.4.ja
   - aws-java-sdk-1.7.4.jar

Spark Hadoop Configuration setup as -

spark.sparkContext.hadoopConfiguration.set("fs.s3a.impl",
"org.apache.hadoop.fs.s3a.S3AFileSystem")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", ENDPOINT);
spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", ACCESS_KEY);
spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", SECRET_KEY);
spark.sparkContext.hadoopConfiguration.set("fs.s3a.path.style.access", "true")

And now I try to write data into my custom s3 endpoint as follows -

val dataStreamWriter: DataStreamWriter[Row] = PM25quality.select(
  dayofmonth(current_date()) as "day",
  month(current_date()) as "month",
  year(current_date()) as "year",
  column("time"),
  column("quality"),
  column("PM25"))
  .writeStream
  .partitionBy("year", "month", "day")
  .format("csv")
  .outputMode("append")
  .option("path",  "s3a://test-bucket/")
val streamingQuery: StreamingQuery = dataStreamWriter.start()


However, I am getting en error that AmazonHttpClient is not able to execute
HTTP request and
also it is referring to the bucket-name before the URL. Seems like the
hadoop configuration is not being resolved here -


20/05/01 16:51:37 INFO AmazonHttpClient: Unable to execute HTTP request:
test-bucket.s3-region0.cloudian.com
java.net.UnknownHostException: test-bucket.s3-region0.cloudian.com


Is there anything that I am missing here in the configurations? Seems like
even after setting up path style access to true,
it's not working.

--
Aniruddha
---
ᐧ


Spark job stuck at s3a-file-system metrics system started

2020-04-29 Thread Aniruddha P Tekade
Hello,

I am trying to run a spark job that is trying to write the data into a
custom s3 endpoint bucket. But I am stuck at this line of output and job is
not moving forward at all -

20/04/29 16:03:59 INFO SharedState: Setting
hive.metastore.warehouse.dir ('null') to the value of
spark.sql.warehouse.dir
('file:/Users/abc/IdeaProjects/qct-air-detection/spark-warehouse/').
20/04/29 16:03:59 INFO SharedState: Warehouse path is
'file:/Users/abc/IdeaProjects/qct-air-detection/spark-warehouse/'.
20/04/29 16:04:01 WARN MetricsConfig: Cannot locate configuration:
tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
20/04/29 16:04:02 INFO MetricsSystemImpl: Scheduled Metric snapshot
period at 10 second(s).
20/04/29 16:04:02 INFO MetricsSystemImpl: s3a-file-system metrics system started

After long time of waiting it shows this -

org.apache.hadoop.fs.s3a.AWSClientIOException: doesBucketExist on
test-bucket: com.amazonaws.SdkClientException: Unable to execute HTTP
request: Connect to s3-region0.mycloud.com:443
[s3-region0.mycloud.com/10.10.3.72] failed: Connection refused
(Connection refused): Unable to execute HTTP request: Connect to
s3-region0.mycloud.com:443 [s3-region0.mycloud.com/10.10.3.72] failed:
Connection refused (Connection refused)

However, I am able to access this bucket from aws cli from the same
machine. I don't understand why it is saying not able to execute the HTTP
request.

I am using -

spark   3.0.0-preview2
hadoop-aws  3.2.0
aws-java-sdk-bundle 1.11.375

My spark code has following properties set for hadoop configuration -

spark.sparkContext.hadoopConfiguration.set("fs.s3a.impl",
"org.apache.hadoop.fs.s3a.S3AFileSystem")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", ENDPOINT);
spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", ACCESS_KEY);
spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", SECRET_KEY);
spark.sparkContext.hadoopConfiguration.set("fs.s3a.path.style.access", "true")

Can someone help me in understanding what is wrong here? Is there anything
else I need to configure. The custom s3-endpoint and its keys are valid and
working from aws cli profile. What is wrong with the scala code here?

val dataStreamWriter: DataStreamWriter[Row] =
PM25quality.select(dayofmonth(current_date()) as "day",
month(current_date()) as "month", year(current_date()) as "year")
  .writeStream
  .format("parquet")
  .option("checkpointLocation", "/Users/abc/Desktop/qct-checkpoint/")
  .outputMode("append")
  .trigger(Trigger.ProcessingTime("15 seconds"))
  .partitionBy("year", "month", "day")
  .option("path", "s3a://test-bucket")

val streamingQuery: StreamingQuery = dataStreamWriter.start()

Aniruddha
---
ᐧ


Re: [External Email] Re: Standard practices for building dashboards for spark processed data

2020-02-26 Thread Aniruddha P Tekade
Hi Roland,

Thank you for your reply. That's quite helpful. I think I should try
influxDB then. But I am curious if in case of prometheus writing a custom
exporter be a good choice and solve the purpose efficiently? Grafana is not
something I want to drop.

Best,
Aniruddha
---

ᐧ

On Tue, Feb 25, 2020 at 11:36 PM Roland Johann 
wrote:

> Hi Ani,
>
> Prometheus is not well suited for ingesting explicit timeseries data. Its
> purpose is for technical monitoring. If you want to monitor your spark jobs
> with prometheus you can publish the metrics so prometheus can scrape it.
> What you propably are looking for is a timeseries database that you can
> push metrics to.
>
> Looking for an alternative for grafana should be done only if you find
> grafana is not well suited for your use case regarding visualization.
>
> As said earlier, at a quick glance it sounds that you should look for an
> alternative to prometheus.
>
> For timeseries you can reach out to TimescaleDB, InfluxDB. Other databases
> like normal SQL databases or cassandra lacks up/downsampling capabilities
> that can lead to large query responses and the need for the client to post
> process.
>
> Kind regards,
>
> Aniruddha P Tekade  schrieb am Mi. 26. Feb. 2020
> um 02:23:
>
>> Hello,
>>
>> I am trying to build a data pipeline that uses spark structured streaming
>> with delta project and runs into Kubernetes. Due to this, I get my output
>> files only into parquet format. Since I am asked to use the prometheus and
>> grafana
>> for building the dashboard for this pipeline, I run an another small
>> spark job and convert output into json so that I would be able to insert
>> them into Grafana. Although I can see that this step is redundant,
>> considering the important of delta lake project, I can not write my data
>> directly into json. Therefore I need some help/guidelines/opinions about
>> moving forward from here.
>>
>> I would appreciate if the spark user(s) can provide me some practices to
>> follow with respect to the following questions -
>>
>>1. Since I can not have direct json output from spark structured
>>streams, is there any better way to convert parquet into json? Or should I
>>keep only parquet?
>>2. Will I need to write some custom exporter for prometheus so as to
>>make grafana read those time-series data?
>>3. Is there any better dashboard alternative than Grafana for this
>>requirement?
>>4. Since the pipeline is going to run into Kubernetes, I am trying to
>>avoid InfluxDB as time-series database and moving with prometheus. Is this
>>approach correct?
>>
>> Thanks,
>> Ani
>> ---
>> ᐧ
>>
> --
> Roland Johann
> Software Developer/Data Engineer
>
> phenetic GmbH
> Lütticher Straße 10, 50674 Köln, Germany
>
> Mobil: +49 172 365 26 46
> Mail: roland.joh...@phenetic.io
> Web: phenetic.io
>
> Handelsregister: Amtsgericht Köln (HRB 92595)
> Geschäftsführer: Roland Johann, Uwe Reimann
>


Standard practices for building dashboards for spark processed data

2020-02-25 Thread Aniruddha P Tekade
Hello,

I am trying to build a data pipeline that uses spark structured streaming
with delta project and runs into Kubernetes. Due to this, I get my output
files only into parquet format. Since I am asked to use the prometheus and
grafana
for building the dashboard for this pipeline, I run an another small spark
job and convert output into json so that I would be able to insert them
into Grafana. Although I can see that this step is redundant, considering
the important of delta lake project, I can not write my data directly into
json. Therefore I need some help/guidelines/opinions about moving forward
from here.

I would appreciate if the spark user(s) can provide me some practices to
follow with respect to the following questions -

   1. Since I can not have direct json output from spark structured
   streams, is there any better way to convert parquet into json? Or should I
   keep only parquet?
   2. Will I need to write some custom exporter for prometheus so as to
   make grafana read those time-series data?
   3. Is there any better dashboard alternative than Grafana for this
   requirement?
   4. Since the pipeline is going to run into Kubernetes, I am trying to
   avoid InfluxDB as time-series database and moving with prometheus. Is this
   approach correct?

Thanks,
Ani
---
ᐧ


spark writeStream not working with custom S3 endpoint

2019-12-03 Thread Aniruddha P Tekade
Hello,

While working with Spark Structured Streaming (v2.4.3) I am trying to write
my streaming dataframe to a custom S3. I have made sure that I am able to
login, upload data to s3 buckets manually using UI and have also setup
ACCESS_KEY and SECRET_KEY for it.

val sc = spark.sparkContext
sc.hadoopConfiguration.set("fs.s3a.endpoint",
"s3-region1.myObjectStore.com:443")
sc.hadoopConfiguration.set("fs.s3a.access.key", "00cce9eb2c589b1b1b5b")
sc.hadoopConfiguration.set("fs.s3a.secrete.key",
"flmheKX9Gb1tTlImO6xR++9kvnUByfRKZfI7LJT8")
sc.hadoopConfiguration.set("fs.s3a.path.style.access", "true") //
bucket name appended as url/bucket and not bucket.url

val writeToS3Query = stream.writeStream
  .format("csv")
  .option("sep", ",")
  .option("header", true)
  .outputMode("append")
  .trigger(Trigger.ProcessingTime("30 seconds"))
  .option("path", "s3a://bucket0/")
  .option("checkpointLocation", "/Users/home/checkpoints/s3-checkpointing")
  .start()

However, I am getting the error that

Unable to execute HTTP request: bucket0.s3-region1.myObjectStore.com:
nodename nor servname provided, or not known

I have mapping of URL and IP in my /etc/hosts file and the bucket is
accessable from other sources. Is there any other way to do this
successfully? I am really not sure why bucket name is being appended before
URL when it is executed by Spark.

Can this be because I am setting up the spark context hadoop configurations
after session is created and so they are not effective? But then how it is
able to refer the actual URL when in the path I am providing value as
s3a://bucket0.

Best,
Aniruddha
---
ᐧ


Can spark convert String to Integer when reading using schema in structured streaming

2019-11-22 Thread Aniruddha P Tekade
Hi,

I am new to spark and learning spark structured streaming. I am using
structured streaming with schema specified with the help of case class and
encoders to get the streaming dataframe.

case class SampleLogEntry(
 dateTime: Timestamp,
 clientIp: String,
 userId: String,
 operation: String,
 bucketName: String,
 contAccUsrId: String,
 reqHeader: Integer,
 reqBody: Integer,
 respHeader: Integer,
 respBody: Integer,
 totalReqResSize: Integer,
 duration: Integer,
 objectName: String,
 httpStatus: Integer,
 s3ReqId: String,
 etag: String,
 errCode: Integer,
 srcBucket: String
   )

val sampleLogSchema = Encoders.product[SampleLogEntry].schema // using encoders

val rawData = spark
  .readStream
  .format("")
  .option("delimiter", "|")
  .option("header", "true")
  .schema(sampleLogSchema)
  .load("/Users/home/learning-spark/logs")

However, I am getting only null values with this schema -

---
Batch: 0
---
+++--+-+--++-+---+--++-++--+--++-+---+-+
|dateTime|  
IP|userId|s3Api|bucketName|accessUserId|reqHeader|reqBody|respHeader|respBody|totalSize|duration|objectName|httpStatus|reqestId|objectTag|errCode|srcBucket|
+++--+-+--++-+---+--++-++--+--++-+---+-+
|null|null|  null| null|  null|null| null|   null|
 null|null| null|null|  null|  null|null|
   null|   null| null|
|null|null|  null| null|  null|null| null|   null|
 null|null| null|null|  null|  null|null|
   null|   null| null|
|null|null|  null| null|  null|null| null|   null|
 null|null| null|null|  null|  null|null|
   null|   null| null|
|null|null|  null| null|  null|null| null|   null|
 null|null| null|null|  null|  null|null|
   null|   null| null|

After trying multiple option like getting schema from sample data,
defining schema
structType I changed every field in this schema to String -

case class SampleLogEntry(
   dateTime: String,
   IP: String,
   userId: String,
   s3Api: String,
   bucketName: String,
   accessUserId: String,
   reqHeader: String,
   reqBody: String,
   respHeader: String,
   respBody: String,
   totalSize: String,
   duration: String,
   objectName: String,
   httpStatus: String,
   reqestId: String,
   objectTag: String,
   errCode: String,
   srcBucket: String
 )


I am new to spark and streaming. I am using structured streaming with
schema specified with the help of case class and encoders to get the
streaming dataframe.

case class SampleLogEntry(
 dateTime: Timestamp,
 clientIp: String,
 userId: String,
 operation: String,
 bucketName: String,
 contAccUsrId: String,
 reqHeader: Integer,
 reqBody: Integer,
 respHeader: Integer,
 respBody: Integer,
 totalReqResSize: Integer,
 duration: Integer,
 objectName: String,
 httpStatus: Integer,
 s3ReqId: String,
 etag: String,
 errCode: Integer,
 srcBucket: String
   )

val sampleLogSchema = Encoders.product[SampleLogEntry].schema // using encoders

val rawData = spark
  .readStream
  .format("")
  .option("delimiter", "|")
  .option("header", "true")
  .schema(sampleLogSchema)