Re: Path style access fs.s3a.path.style.access property is not working in spark code
Hello User, I got the solution to this. If you are writing to a custom s3 url, then use the hadoop-aws-2.8.0.jar as the separate flag was introduced to enable path style access. Best, Aniruddha --- ᐧ On Fri, May 1, 2020 at 5:08 PM Aniruddha P Tekade wrote: > Hello Users, > > I am using on-premise object storage and able to perform operations on > different bucket using aws-cli. > However, when I am trying to use the same path from my spark code, it > fails. Here are the details - > > Addes dependencies in build.sbt - > >- hadoop-aws-2.7.4.ja >- aws-java-sdk-1.7.4.jar > > Spark Hadoop Configuration setup as - > > spark.sparkContext.hadoopConfiguration.set("fs.s3a.impl", > "org.apache.hadoop.fs.s3a.S3AFileSystem") > spark.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", ENDPOINT); > spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", ACCESS_KEY); > spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", SECRET_KEY); > spark.sparkContext.hadoopConfiguration.set("fs.s3a.path.style.access", "true") > > And now I try to write data into my custom s3 endpoint as follows - > > val dataStreamWriter: DataStreamWriter[Row] = PM25quality.select( > dayofmonth(current_date()) as "day", > month(current_date()) as "month", > year(current_date()) as "year", > column("time"), > column("quality"), > column("PM25")) > .writeStream > .partitionBy("year", "month", "day") > .format("csv") > .outputMode("append") > .option("path", "s3a://test-bucket/") > val streamingQuery: StreamingQuery = dataStreamWriter.start() > > > However, I am getting en error that AmazonHttpClient is not able to > execute HTTP request and > also it is referring to the bucket-name before the URL. Seems like the > hadoop configuration is not being resolved here - > > > 20/05/01 16:51:37 INFO AmazonHttpClient: Unable to execute HTTP request: > test-bucket.s3-region0.cloudian.com > java.net.UnknownHostException: test-bucket.s3-region0.cloudian.com > > > Is there anything that I am missing here in the configurations? Seems like > even after setting up path style access to true, > it's not working. > > -- > Aniruddha > --- > ᐧ >
Path style access fs.s3a.path.style.access property is not working in spark code
Hello Users, I am using on-premise object storage and able to perform operations on different bucket using aws-cli. However, when I am trying to use the same path from my spark code, it fails. Here are the details - Addes dependencies in build.sbt - - hadoop-aws-2.7.4.ja - aws-java-sdk-1.7.4.jar Spark Hadoop Configuration setup as - spark.sparkContext.hadoopConfiguration.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") spark.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", ENDPOINT); spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", ACCESS_KEY); spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", SECRET_KEY); spark.sparkContext.hadoopConfiguration.set("fs.s3a.path.style.access", "true") And now I try to write data into my custom s3 endpoint as follows - val dataStreamWriter: DataStreamWriter[Row] = PM25quality.select( dayofmonth(current_date()) as "day", month(current_date()) as "month", year(current_date()) as "year", column("time"), column("quality"), column("PM25")) .writeStream .partitionBy("year", "month", "day") .format("csv") .outputMode("append") .option("path", "s3a://test-bucket/") val streamingQuery: StreamingQuery = dataStreamWriter.start() However, I am getting en error that AmazonHttpClient is not able to execute HTTP request and also it is referring to the bucket-name before the URL. Seems like the hadoop configuration is not being resolved here - 20/05/01 16:51:37 INFO AmazonHttpClient: Unable to execute HTTP request: test-bucket.s3-region0.cloudian.com java.net.UnknownHostException: test-bucket.s3-region0.cloudian.com Is there anything that I am missing here in the configurations? Seems like even after setting up path style access to true, it's not working. -- Aniruddha --- ᐧ
Spark job stuck at s3a-file-system metrics system started
Hello, I am trying to run a spark job that is trying to write the data into a custom s3 endpoint bucket. But I am stuck at this line of output and job is not moving forward at all - 20/04/29 16:03:59 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('file:/Users/abc/IdeaProjects/qct-air-detection/spark-warehouse/'). 20/04/29 16:03:59 INFO SharedState: Warehouse path is 'file:/Users/abc/IdeaProjects/qct-air-detection/spark-warehouse/'. 20/04/29 16:04:01 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties 20/04/29 16:04:02 INFO MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s). 20/04/29 16:04:02 INFO MetricsSystemImpl: s3a-file-system metrics system started After long time of waiting it shows this - org.apache.hadoop.fs.s3a.AWSClientIOException: doesBucketExist on test-bucket: com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to s3-region0.mycloud.com:443 [s3-region0.mycloud.com/10.10.3.72] failed: Connection refused (Connection refused): Unable to execute HTTP request: Connect to s3-region0.mycloud.com:443 [s3-region0.mycloud.com/10.10.3.72] failed: Connection refused (Connection refused) However, I am able to access this bucket from aws cli from the same machine. I don't understand why it is saying not able to execute the HTTP request. I am using - spark 3.0.0-preview2 hadoop-aws 3.2.0 aws-java-sdk-bundle 1.11.375 My spark code has following properties set for hadoop configuration - spark.sparkContext.hadoopConfiguration.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") spark.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", ENDPOINT); spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", ACCESS_KEY); spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", SECRET_KEY); spark.sparkContext.hadoopConfiguration.set("fs.s3a.path.style.access", "true") Can someone help me in understanding what is wrong here? Is there anything else I need to configure. The custom s3-endpoint and its keys are valid and working from aws cli profile. What is wrong with the scala code here? val dataStreamWriter: DataStreamWriter[Row] = PM25quality.select(dayofmonth(current_date()) as "day", month(current_date()) as "month", year(current_date()) as "year") .writeStream .format("parquet") .option("checkpointLocation", "/Users/abc/Desktop/qct-checkpoint/") .outputMode("append") .trigger(Trigger.ProcessingTime("15 seconds")) .partitionBy("year", "month", "day") .option("path", "s3a://test-bucket") val streamingQuery: StreamingQuery = dataStreamWriter.start() Aniruddha --- ᐧ
Re: [External Email] Re: Standard practices for building dashboards for spark processed data
Hi Roland, Thank you for your reply. That's quite helpful. I think I should try influxDB then. But I am curious if in case of prometheus writing a custom exporter be a good choice and solve the purpose efficiently? Grafana is not something I want to drop. Best, Aniruddha --- ᐧ On Tue, Feb 25, 2020 at 11:36 PM Roland Johann wrote: > Hi Ani, > > Prometheus is not well suited for ingesting explicit timeseries data. Its > purpose is for technical monitoring. If you want to monitor your spark jobs > with prometheus you can publish the metrics so prometheus can scrape it. > What you propably are looking for is a timeseries database that you can > push metrics to. > > Looking for an alternative for grafana should be done only if you find > grafana is not well suited for your use case regarding visualization. > > As said earlier, at a quick glance it sounds that you should look for an > alternative to prometheus. > > For timeseries you can reach out to TimescaleDB, InfluxDB. Other databases > like normal SQL databases or cassandra lacks up/downsampling capabilities > that can lead to large query responses and the need for the client to post > process. > > Kind regards, > > Aniruddha P Tekade schrieb am Mi. 26. Feb. 2020 > um 02:23: > >> Hello, >> >> I am trying to build a data pipeline that uses spark structured streaming >> with delta project and runs into Kubernetes. Due to this, I get my output >> files only into parquet format. Since I am asked to use the prometheus and >> grafana >> for building the dashboard for this pipeline, I run an another small >> spark job and convert output into json so that I would be able to insert >> them into Grafana. Although I can see that this step is redundant, >> considering the important of delta lake project, I can not write my data >> directly into json. Therefore I need some help/guidelines/opinions about >> moving forward from here. >> >> I would appreciate if the spark user(s) can provide me some practices to >> follow with respect to the following questions - >> >>1. Since I can not have direct json output from spark structured >>streams, is there any better way to convert parquet into json? Or should I >>keep only parquet? >>2. Will I need to write some custom exporter for prometheus so as to >>make grafana read those time-series data? >>3. Is there any better dashboard alternative than Grafana for this >>requirement? >>4. Since the pipeline is going to run into Kubernetes, I am trying to >>avoid InfluxDB as time-series database and moving with prometheus. Is this >>approach correct? >> >> Thanks, >> Ani >> --- >> ᐧ >> > -- > Roland Johann > Software Developer/Data Engineer > > phenetic GmbH > Lütticher Straße 10, 50674 Köln, Germany > > Mobil: +49 172 365 26 46 > Mail: roland.joh...@phenetic.io > Web: phenetic.io > > Handelsregister: Amtsgericht Köln (HRB 92595) > Geschäftsführer: Roland Johann, Uwe Reimann >
Standard practices for building dashboards for spark processed data
Hello, I am trying to build a data pipeline that uses spark structured streaming with delta project and runs into Kubernetes. Due to this, I get my output files only into parquet format. Since I am asked to use the prometheus and grafana for building the dashboard for this pipeline, I run an another small spark job and convert output into json so that I would be able to insert them into Grafana. Although I can see that this step is redundant, considering the important of delta lake project, I can not write my data directly into json. Therefore I need some help/guidelines/opinions about moving forward from here. I would appreciate if the spark user(s) can provide me some practices to follow with respect to the following questions - 1. Since I can not have direct json output from spark structured streams, is there any better way to convert parquet into json? Or should I keep only parquet? 2. Will I need to write some custom exporter for prometheus so as to make grafana read those time-series data? 3. Is there any better dashboard alternative than Grafana for this requirement? 4. Since the pipeline is going to run into Kubernetes, I am trying to avoid InfluxDB as time-series database and moving with prometheus. Is this approach correct? Thanks, Ani --- ᐧ
spark writeStream not working with custom S3 endpoint
Hello, While working with Spark Structured Streaming (v2.4.3) I am trying to write my streaming dataframe to a custom S3. I have made sure that I am able to login, upload data to s3 buckets manually using UI and have also setup ACCESS_KEY and SECRET_KEY for it. val sc = spark.sparkContext sc.hadoopConfiguration.set("fs.s3a.endpoint", "s3-region1.myObjectStore.com:443") sc.hadoopConfiguration.set("fs.s3a.access.key", "00cce9eb2c589b1b1b5b") sc.hadoopConfiguration.set("fs.s3a.secrete.key", "flmheKX9Gb1tTlImO6xR++9kvnUByfRKZfI7LJT8") sc.hadoopConfiguration.set("fs.s3a.path.style.access", "true") // bucket name appended as url/bucket and not bucket.url val writeToS3Query = stream.writeStream .format("csv") .option("sep", ",") .option("header", true) .outputMode("append") .trigger(Trigger.ProcessingTime("30 seconds")) .option("path", "s3a://bucket0/") .option("checkpointLocation", "/Users/home/checkpoints/s3-checkpointing") .start() However, I am getting the error that Unable to execute HTTP request: bucket0.s3-region1.myObjectStore.com: nodename nor servname provided, or not known I have mapping of URL and IP in my /etc/hosts file and the bucket is accessable from other sources. Is there any other way to do this successfully? I am really not sure why bucket name is being appended before URL when it is executed by Spark. Can this be because I am setting up the spark context hadoop configurations after session is created and so they are not effective? But then how it is able to refer the actual URL when in the path I am providing value as s3a://bucket0. Best, Aniruddha --- ᐧ
Can spark convert String to Integer when reading using schema in structured streaming
Hi, I am new to spark and learning spark structured streaming. I am using structured streaming with schema specified with the help of case class and encoders to get the streaming dataframe. case class SampleLogEntry( dateTime: Timestamp, clientIp: String, userId: String, operation: String, bucketName: String, contAccUsrId: String, reqHeader: Integer, reqBody: Integer, respHeader: Integer, respBody: Integer, totalReqResSize: Integer, duration: Integer, objectName: String, httpStatus: Integer, s3ReqId: String, etag: String, errCode: Integer, srcBucket: String ) val sampleLogSchema = Encoders.product[SampleLogEntry].schema // using encoders val rawData = spark .readStream .format("") .option("delimiter", "|") .option("header", "true") .schema(sampleLogSchema) .load("/Users/home/learning-spark/logs") However, I am getting only null values with this schema - --- Batch: 0 --- +++--+-+--++-+---+--++-++--+--++-+---+-+ |dateTime| IP|userId|s3Api|bucketName|accessUserId|reqHeader|reqBody|respHeader|respBody|totalSize|duration|objectName|httpStatus|reqestId|objectTag|errCode|srcBucket| +++--+-+--++-+---+--++-++--+--++-+---+-+ |null|null| null| null| null|null| null| null| null|null| null|null| null| null|null| null| null| null| |null|null| null| null| null|null| null| null| null|null| null|null| null| null|null| null| null| null| |null|null| null| null| null|null| null| null| null|null| null|null| null| null|null| null| null| null| |null|null| null| null| null|null| null| null| null|null| null|null| null| null|null| null| null| null| After trying multiple option like getting schema from sample data, defining schema structType I changed every field in this schema to String - case class SampleLogEntry( dateTime: String, IP: String, userId: String, s3Api: String, bucketName: String, accessUserId: String, reqHeader: String, reqBody: String, respHeader: String, respBody: String, totalSize: String, duration: String, objectName: String, httpStatus: String, reqestId: String, objectTag: String, errCode: String, srcBucket: String ) I am new to spark and streaming. I am using structured streaming with schema specified with the help of case class and encoders to get the streaming dataframe. case class SampleLogEntry( dateTime: Timestamp, clientIp: String, userId: String, operation: String, bucketName: String, contAccUsrId: String, reqHeader: Integer, reqBody: Integer, respHeader: Integer, respBody: Integer, totalReqResSize: Integer, duration: Integer, objectName: String, httpStatus: Integer, s3ReqId: String, etag: String, errCode: Integer, srcBucket: String ) val sampleLogSchema = Encoders.product[SampleLogEntry].schema // using encoders val rawData = spark .readStream .format("") .option("delimiter", "|") .option("header", "true") .schema(sampleLogSchema)