Re: Structured Streaming Microbatch Semantics
Hi Rico, there is no way to deferr records from one micro batch to the next one. So it‘s guaranteed that the data and trigger event will be processed within the dame batch. I assume that one trigger event lead to an unknown batch size of actual events pulled via HTTP. This bypasses throughput properties of spark streaming. Depending on the amount of the resulting HTTP records, maybe you consider splitting the pipeline into two parts: - process trigger event, pull data from HTTP, write to kafka - perform structured streaming ingestion Kind regards Dipl.-Inf. Rico Bergmann schrieb am Fr. 5. März 2021 um 09:06: > Hi all! > > I'm using Spark structured streaming for a data ingestion pipeline. > Basically the pipeline reads events (notifications of new available > data) from a Kafka topic and then queries a REST endpoint to get the > real data (within a flatMap). > > For one single event the pipeline creates a few thousand records (rows) > that have to be stored. And to write the data I use foreachBatch(). > > My question is now: Is it guaranteed by Spark that all output records of > one event are always contained in a single batch or can the records also > be split into multiple batches? > > > Best, > > Rico. > > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > -- Roland Johann Data Architect/Data Engineer phenetic GmbH Lütticher Straße 10, 50674 Köln, Germany Mobil: +49 172 365 26 46 Mail: roland.joh...@phenetic.io Web: phenetic.io Handelsregister: Amtsgericht Köln (HRB 92595) Geschäftsführer: Roland Johann, Uwe Reimann
unsubscribe
unsubscribe signature.asc Description: Message signed with OpenPGP
Unsubscribe
unsubscribe-- Roland Johann Data Architect/Data Engineer phenetic GmbH Lütticher Straße 10, 50674 Köln, Germany Mobil: +49 172 365 26 46 Mail: roland.joh...@phenetic.io Web: phenetic.io Handelsregister: Amtsgericht Köln (HRB 92595) Geschäftsführer: Roland Johann, Uwe Reimann
Re: Convert Seq[Any] to Seq[String]
Your code looks overly complicated and the relevant parts are missing. If possible please post the complete snippet including the retrieval/type if rows so we get the complete picture and can try to help. For first simplification you can just convert aMap to Seq[(String, (String, String))] and further map it to flatten the nested tuple into a Seq which you then pass to toDF via var arg expansion. Val colNames: Seq[String] = aMap.toSeq.map(kv => Seq(kv._1, kv._2._1, kv._2._2)) Depending on the type of aMap this leads to problems as we assume it to be Map[String, (String, String)]. Best Regards Vikas Garg schrieb am Fr. 18. Dez. 2020 um 15:46: > I am getting the table schema through Map which I have converted to Seq > and passing to toDF > > On Fri, 18 Dec 2020 at 20:13, Sean Owen wrote: > >> It's not really a Spark question. .toDF() takes column names. >> atrb.head.toSeq.map(_.toString)? but it's not clear what you mean the col >> names to be >> >> On Fri, Dec 18, 2020 at 8:37 AM Vikas Garg wrote: >> >>> Hi, >>> >>> Can someone please help me how to convert Seq[Any] to Seq[String] >>> >>> For line >>> val df = row.toSeq.toDF(newCol.toSeq: _*) >>> I get that error message. >>> >>> I converted Map "val aMap = Map("admit" -> ("description","comments"))" >>> to Seq >>> >>> var atrb = ListBuffer[(String,String,String)]() >>> >>> for((key,value) <- aMap){ >>> atrb += ((key, value._1, value._2)) >>> } >>> >>> var newCol = atrb.head.productIterator.toList.toSeq >>> >>> Please someone help me on this. >>> >>> Thanks >>> >>> -- Roland Johann Data Architect/Data Engineer phenetic GmbH Lütticher Straße 10, 50674 Köln, Germany Mobil: +49 172 365 26 46 Mail: roland.joh...@phenetic.io Web: phenetic.io Handelsregister: Amtsgericht Köln (HRB 92595) Geschäftsführer: Roland Johann, Uwe Reimann
Re: dynamic executor scalling spark on kubernetes client mode
Hi all, don’t want to interrupt the conversation but are keen where I can find information regarding dynamic allocation on kubernetes. As far as I know the docs just point to future work. Thanks a lot, Roland > Am 12.05.2020 um 09:25 schrieb Steven Stetzler : > > Hi all, > > I am interested in this as well. My use-case could benefit from dynamic > executor scaling but we are restricted to using client mode since we are only > using Spark shells. > > Could anyone help me understand the barriers to getting dynamic executor > scaling to work in client mode on Kubernetes? > > Thanks, > Steven > > On Sat, May 9, 2020 at 9:48 AM Pradeepta Choudhury > mailto:pradeeptachoudhu...@gmail.com>> wrote: > Hiii , > > The dynamic executor scalling is working fine for spark on kubernetes (latest > from spark master repository ) in cluster mode . is the dynamic executor > scalling available for client mode ? if yes where can i find the usage doc > for same . > If no is there any PR open for this ? > > Thanks , > Pradeepta
Re: Left Join at SQL query gets planned as inner join
Thank you, that’s absolutely right! Getting the rows of `s` without matches in `p` is now not a problem anymore. Have a nice day Roland > Am 30.04.2020 um 17:36 schrieb Ryan C. Kleck : > > He’s saying you need to move the filters for the ‘p’ table in order to do > what you want. They need to be before your WHERE. The order of operations in > sql applies your join clause filters before the WHERE. The filters on your > ‘s’ table need to stay in the WHERE. It’s the only time the ordering matters > when you are doing OUTER JOINs. > > Intuitively with your query why would you do an OUTER JOIN meaning you think > some things in ‘p’ will be NULL after your join. But your filter in the WHERE > gets rid of all the Nulls in ‘p’. You basically force it into an Inner join > with that filter and the planner recognizes that. > > Ryan Kleck > Software Developer IV > Customer Knowledge Platform > From: Roland Johann > Sent: Thursday, April 30, 2020 8:30:05 AM > To: randy clinton > Cc: Roland Johann ; user > > Subject: Re: Left Join at SQL query gets planned as inner join > > Notice: This email is from an external sender. > > Thank for quick reply. > > It plans the LeftOuter as soon as the filters on the second table will be > removed. > >> It seems like you are asking for a left join, but your filters demand the >> behavior of an inner join. > Can you explain that? > The filters on the second table uses partition pruning that we don’t have to > do a full table scan to only get the data for that one day of the second > table. To be more precise: We want to left outer join the two tables records > of the same day and join on id properties where the second table must not > contain records matching the join condition. > >> s_DF = s_DF.filter(year = 2020 and month = 4 and day = 29) >> p_DF = p_DF.filter(year = 2020 and month = 4 and day = 29 and event_id is >> null) > that points to the second question of mine: event_id is null should be done > after the join to get records of the first table that doesn’t match to > records of the first table. The plan actually prints that it pushes down that > filter to parquet and doesn’t select it anyway, so the entire result set is > empty caused by the additional inner join. The desired behavior can be > achieved by left anti joins but that’s not the point as the where condition > behaves differently that one would expect. > > I hope that this doesn’t gets confusing that we talk about two different, but > somehow related, problems within a single thread.. > > Best > Roland > > >> Am 30.04.2020 um 17:20 schrieb randy clinton > <mailto:randyclin...@gmail.com>>: >> >> Does it still plan an inner join if you remove a filter on both tables? >> >> It seems like you are asking for a left join, but your filters demand the >> behavior of an inner join. >> >> Maybe you could do the filters on the tables first and then join them. >> >> Something roughly like.. >> >> s_DF = s_DF.filter(year = 2020 and month = 4 and day = 29) >> p_DF = p_DF.filter(year = 2020 and month = 4 and day = 29 and event_id is >> null) >> >> output = s_DF.join(p_DF, event_id == source_event_id, left) >> >> >> >> On Thu, Apr 30, 2020 at 11:06 AM Roland Johann >> > <mailto:roland.joh...@phenetic.io.invalid>> wrote: >> Hi All, >> >> >> we are on vanilla Spark 2.4.4 and currently experience a somehow strange >> behavior of the query planner/optimizer and therefore get wrong results. >> >> select >> s.event_id as search_event_id, >> s.query_string, >> p.event_id >> from s >> left outer join p on s.event_id = p.source_event_id >> where >> s.year = 2020 and s.month = 4 and s.day = 29 >> and p.year = 2020 and p.month = 4 and p.day = 29 >> limit 1 >> This query leads to that plan: >> *(2) Project [event_id#12131 AS search_event_id#12118, query_string#12178, >> event_id#12209] >> +- *(2) BroadcastHashJoin [event_id#12131], [source_event_id#12221], Inner, >> BuildLeft >>:- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, >> true])) >>: +- *(1) Project [event_id#12131, query_string#12178] >>: +- *(1) Filter isnotnull(event_id#12131) >>:+- *(1) FileScan parquet >> s[event_id#12131,query_string#12178,year#12194,month#12195,day#12196] >> Batched: true, Format: Parquet, Location: >> PrunedInMemoryFileIndex[hdfs:///search/year=2020/month=4/day=29/..., >> PartitionCount: 1, PartitionFilt
Re: Left Join at SQL query gets planned as inner join
Thank for quick reply. It plans the LeftOuter as soon as the filters on the second table will be removed. > It seems like you are asking for a left join, but your filters demand the > behavior of an inner join. Can you explain that? The filters on the second table uses partition pruning that we don’t have to do a full table scan to only get the data for that one day of the second table. To be more precise: We want to left outer join the two tables records of the same day and join on id properties where the second table must not contain records matching the join condition. > s_DF = s_DF.filter(year = 2020 and month = 4 and day = 29) > p_DF = p_DF.filter(year = 2020 and month = 4 and day = 29 and event_id is > null) that points to the second question of mine: event_id is null should be done after the join to get records of the first table that doesn’t match to records of the first table. The plan actually prints that it pushes down that filter to parquet and doesn’t select it anyway, so the entire result set is empty caused by the additional inner join. The desired behavior can be achieved by left anti joins but that’s not the point as the where condition behaves differently that one would expect. I hope that this doesn’t gets confusing that we talk about two different, but somehow related, problems within a single thread.. Best Roland > Am 30.04.2020 um 17:20 schrieb randy clinton : > > Does it still plan an inner join if you remove a filter on both tables? > > It seems like you are asking for a left join, but your filters demand the > behavior of an inner join. > > Maybe you could do the filters on the tables first and then join them. > > Something roughly like.. > > s_DF = s_DF.filter(year = 2020 and month = 4 and day = 29) > p_DF = p_DF.filter(year = 2020 and month = 4 and day = 29 and event_id is > null) > > output = s_DF.join(p_DF, event_id == source_event_id, left) > > > > On Thu, Apr 30, 2020 at 11:06 AM Roland Johann > wrote: > Hi All, > > > we are on vanilla Spark 2.4.4 and currently experience a somehow strange > behavior of the query planner/optimizer and therefore get wrong results. > > select > s.event_id as search_event_id, > s.query_string, > p.event_id > from s > left outer join p on s.event_id = p.source_event_id > where > s.year = 2020 and s.month = 4 and s.day = 29 > and p.year = 2020 and p.month = 4 and p.day = 29 > limit 1 > This query leads to that plan: > *(2) Project [event_id#12131 AS search_event_id#12118, query_string#12178, > event_id#12209] > +- *(2) BroadcastHashJoin [event_id#12131], [source_event_id#12221], Inner, > BuildLeft >:- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, > true])) >: +- *(1) Project [event_id#12131, query_string#12178] >: +- *(1) Filter isnotnull(event_id#12131) >:+- *(1) FileScan parquet > s[event_id#12131,query_string#12178,year#12194,month#12195,day#12196] > Batched: true, Format: Parquet, Location: > PrunedInMemoryFileIndex[hdfs:///search/year=2020/month=4/day=29/..., > PartitionCount: 1, PartitionFilters: [isnotnull(year#12194), > isnotnull(month#12195), isnotnull(day#12196), (year#12194 = 2020), (month..., > PushedFilters: [IsNotNull(event_id)], ReadSchema: > struct >+- *(2) Project [event_id#12209, source_event_id#12221] > +- *(2) Filter isnotnull(source_event_id#12221) > +- *(2) FileScan parquet > s[event_id#12209,source_event_id#12221,year#12308,month#12309,day#12310] > Batched: true, Format: Parquet, Location: > PrunedInMemoryFileIndex[hdfs:///p/year=2020/month=4/day=2..., > PartitionCount: 1, PartitionFilters: [isnotnull(day#12310), > isnotnull(year#12308), isnotnull(month#12309), (year#12308 = 2020), > (month..., PushedFilters: [IsNotNull(source_event_id)], ReadSchema: > struct > Without partition pruning the join gets planned as LeftOuter, with > SortMergeJoin but we need partition pruning in this case to prevent full > table scans and profit from broadcast join... > > As soon as we rewrite the query with scala the plan looks fine > val s = spark.sql("select event_id, query_string from ssi_kpi.search where > year = 2020 and month = 4 and day = 29") > val p = spark.sql("select event_id, source_event_id from ssi_kpi.pda_show > where year = 2020 and month = 4 and day = 29") > > s > .join(p, s("event_id") <=> p("source_event_id"), "left_outer") > .groupBy(s("query_string")) > .agg(count(s("query_string")), count(p("event_id"))) > .show() > > > The second thing we saw that conditions at the where clause of joined tables > gets pushed down t
Left Join at SQL query gets planned as inner join
Hi All, we are on vanilla Spark 2.4.4 and currently experience a somehow strange behavior of the query planner/optimizer and therefore get wrong results. select s.event_id as search_event_id, s.query_string, p.event_id from s left outer join p on s.event_id = p.source_event_id where s.year = 2020 and s.month = 4 and s.day = 29 and p.year = 2020 and p.month = 4 and p.day = 29 limit 1 This query leads to that plan: *(2) Project [event_id#12131 AS search_event_id#12118, query_string#12178, event_id#12209] +- *(2) BroadcastHashJoin [event_id#12131], [source_event_id#12221], Inner, BuildLeft :- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true])) : +- *(1) Project [event_id#12131, query_string#12178] : +- *(1) Filter isnotnull(event_id#12131) :+- *(1) FileScan parquet s[event_id#12131,query_string#12178,year#12194,month#12195,day#12196] Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[hdfs:///search/year=2020/month=4/day=29/..., PartitionCount: 1, PartitionFilters: [isnotnull(year#12194), isnotnull(month#12195), isnotnull(day#12196), (year#12194 = 2020), (month..., PushedFilters: [IsNotNull(event_id)], ReadSchema: struct +- *(2) Project [event_id#12209, source_event_id#12221] +- *(2) Filter isnotnull(source_event_id#12221) +- *(2) FileScan parquet s[event_id#12209,source_event_id#12221,year#12308,month#12309,day#12310] Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[hdfs:///p/year=2020/month=4/day=2..., PartitionCount: 1, PartitionFilters: [isnotnull(day#12310), isnotnull(year#12308), isnotnull(month#12309), (year#12308 = 2020), (month..., PushedFilters: [IsNotNull(source_event_id)], ReadSchema: struct Without partition pruning the join gets planned as LeftOuter, with SortMergeJoin but we need partition pruning in this case to prevent full table scans and profit from broadcast join... As soon as we rewrite the query with scala the plan looks fine val s = spark.sql("select event_id, query_string from ssi_kpi.search where year = 2020 and month = 4 and day = 29") val p = spark.sql("select event_id, source_event_id from ssi_kpi.pda_show where year = 2020 and month = 4 and day = 29") s .join(p, s("event_id") <=> p("source_event_id"), "left_outer") .groupBy(s("query_string")) .agg(count(s("query_string")), count(p("event_id"))) .show() The second thing we saw that conditions at the where clause of joined tables gets pushed down to the parquet files and lead to wring results, for example: select s.event_id as search_event_id, s.query_string, p.event_id from s left outer join p on s.event_id = p.source_event_id where s.year = 2020 and s.month = 4 and s.day = 29 and p.year = 2020 and p.month = 4 and p.day = 29 and p.event_id is null Until now I assumed that the string based queries and the scala dsl lead to the same execution plan. Can someone point to docs about the internals of this topic of spark? The official docs about SQL in general are not that verbose. Thanks in advance and stay safe! Roland Johann
Re: [pyspark] Load a master data file to spark ecosystem
You can read both, the logs and the tree file into dataframes and join them. Doing this spark can distribute the relevant records or even the whole dataframe via broadcast to optimize the execution. Best regards Sonal Goyal schrieb am Sa. 25. Apr. 2020 um 06:59: > How does your tree_lookup_value function work? > > Thanks, > Sonal > Nube Technologies <http://www.nubetech.co> > > <http://in.linkedin.com/in/sonalgoyal> > > > > > On Fri, Apr 24, 2020 at 8:47 PM Arjun Chundiran > wrote: > >> Hi Team, >> >> I have asked this question in stack overflow >> <https://stackoverflow.com/questions/61386719/load-a-master-data-file-to-spark-ecosystem> >> and I didn't really get any convincing answers. Can somebody help me to >> solve this issue? >> >> Below is my problem >> >> While building a log processing system, I came across a scenario where I >> need to look up data from a tree file (Like a DB) for each and every log >> line for corresponding value. What is the best approach to load an external >> file which is very large into the spark ecosystem? The tree file is of size >> 2GB. >> >> Here is my scenario >> >>1. I have a file contains huge number of log lines. >>2. Each log line needs to be split by a delimiter to 70 fields >>3. Need to lookup the data from tree file for one of the 70 fields of >>a log line. >> >> I am using Apache Spark Python API and running on a 3 node cluster. >> >> Below is the code which I have written. But it is really slow >> >> def process_logline(line, tree): >> row_dict = {} >> line_list = line.split(" ") >> row_dict["host"] = tree_lookup_value(tree, line_list[0]) >> new_row = Row(**row_dict) >> return new_row >> def run_job(vals): >> spark.sparkContext.addFile('somefile') >> tree_val = open(SparkFiles.get('somefile')) >> lines = spark.sparkContext.textFile("log_file") >> converted_lines_rdd = lines.map(lambda l: process_logline(l, tree_val)) >> log_line_rdd = spark.createDataFrame(converted_lines_rdd) >> log_line_rdd.show() >> >> Basically I need some option to load the file one time in memory of workers >> and start using it entire job life time using Python API. >> >> Thanks in advance >> Arjun >> >> >> >> -- Roland Johann Software Developer/Data Engineer phenetic GmbH Lütticher Straße 10, 50674 Köln, Germany Mobil: +49 172 365 26 46 Mail: roland.joh...@phenetic.io Web: phenetic.io Handelsregister: Amtsgericht Köln (HRB 92595) Geschäftsführer: Roland Johann, Uwe Reimann
Re: 30000 partitions vs 1000 partitions with Coalescing
Hi Adnan, coalescing involves network shuffle to other executors. How many executors are configured for that job? Best regards Roland Johann Software Developer/Data Engineer phenetic GmbH Lütticher Straße 10, 50674 Köln, Germany Mobil: +49 172 365 26 46 Mail: roland.joh...@phenetic.io Web: phenetic.io Handelsregister: Amtsgericht Köln (HRB 92595) Geschäftsführer: Roland Johann, Uwe Reimann > Am 23.04.2020 um 20:41 schrieb dev nan : > > I would like to know why it is faster to write out an RDD that has 30,000 > partitions as 30,000 files sized 1K-2M rather than coalescing it to 1000 > partitions and writing out 1000 S3 files of roughly 26MB each, or even 100 > partitions and 100 S3 files of 260MB each. > > The coalescing takes a long time. > > > Thanks, > > Adnan
Re: Standard practices for building dashboards for spark processed data
Hi Ani, Prometheus is not well suited for ingesting explicit timeseries data. Its purpose is for technical monitoring. If you want to monitor your spark jobs with prometheus you can publish the metrics so prometheus can scrape it. What you propably are looking for is a timeseries database that you can push metrics to. Looking for an alternative for grafana should be done only if you find grafana is not well suited for your use case regarding visualization. As said earlier, at a quick glance it sounds that you should look for an alternative to prometheus. For timeseries you can reach out to TimescaleDB, InfluxDB. Other databases like normal SQL databases or cassandra lacks up/downsampling capabilities that can lead to large query responses and the need for the client to post process. Kind regards, Aniruddha P Tekade schrieb am Mi. 26. Feb. 2020 um 02:23: > Hello, > > I am trying to build a data pipeline that uses spark structured streaming > with delta project and runs into Kubernetes. Due to this, I get my output > files only into parquet format. Since I am asked to use the prometheus and > grafana > for building the dashboard for this pipeline, I run an another small spark > job and convert output into json so that I would be able to insert them > into Grafana. Although I can see that this step is redundant, considering > the important of delta lake project, I can not write my data directly into > json. Therefore I need some help/guidelines/opinions about moving forward > from here. > > I would appreciate if the spark user(s) can provide me some practices to > follow with respect to the following questions - > >1. Since I can not have direct json output from spark structured >streams, is there any better way to convert parquet into json? Or should I >keep only parquet? >2. Will I need to write some custom exporter for prometheus so as to >make grafana read those time-series data? >3. Is there any better dashboard alternative than Grafana for this >requirement? >4. Since the pipeline is going to run into Kubernetes, I am trying to >avoid InfluxDB as time-series database and moving with prometheus. Is this >approach correct? > > Thanks, > Ani > --- > ᐧ > -- Roland Johann Software Developer/Data Engineer phenetic GmbH Lütticher Straße 10, 50674 Köln, Germany Mobil: +49 172 365 26 46 Mail: roland.joh...@phenetic.io Web: phenetic.io Handelsregister: Amtsgericht Köln (HRB 92595) Geschäftsführer: Roland Johann, Uwe Reimann
Structured Streaming Kafka change maxOffsetsPerTrigger won't apply
Hi All, changing maxOffsetsPerTrigger and restarting the job won’t apply to the batch size. This is somehow bad as we currently use a trigger duration of 5minutes which consumes only 100k messages with an offset lag in the billions. Decreasing trigger duration affects also micro batch size - but its then only a few hundreds. Spark version in use is 2.4.4. I assume that spark uses previous micro batch sizes and runtimes to somehow calculate current batch sizes based on trigger durations. AFAIK structured streaming isn’t back pressure aware, so this behavior is strange on multiple levels. Any help appreciated. Kind Regards Roland
Re: Delta with intelligent upsett
If the dataset contains a column like changed_at/created_at you can use this as watermark and filter out rows that have changed_at/created_at before the watermark. Best Regards Roland Johann Software Developer/Data Engineer phenetic GmbH Lütticher Straße 10, 50674 Köln, Germany Mobil: +49 172 365 26 46 Mail: roland.joh...@phenetic.io Web: phenetic.io Handelsregister: Amtsgericht Köln (HRB 92595) Geschäftsführer: Roland Johann, Uwe Reimann > Am 01.11.2019 um 07:52 schrieb Gourav Sengupta : > > should not a where clause on the partition field help with that? I am > obviously missing something in the question. > > Regards, > Gourav > > On Thu, Oct 31, 2019 at 9:15 PM ayan guha <mailto:guha.a...@gmail.com>> wrote: > > Hi > > we have a scenario where we have a large table ie 5-6B records. The table is > repository of data from past N years. It is possible that some updates take > place on the data and thus er are using Delta table. > > As part of the business process we know updates can happen only within M > years of past records where M is much smaller than N. Eg the table can hold > 20 yrs of data but we know updates can happen only for last year not before > that. > > Is there some way to indicate this additional intelligence to Delta so it can > look into only last years data while running a merge or update? It seems to > be an obvious performance booster. > > Any thoughts? > -- > Best Regards, > Ayan Guha > -- > Best Regards, > Ayan Guha
Re: Need help regarding logging / log4j.properties
Hi Debu, you need to define spark config properties before the jar file path at spark-submit. Everything after the jar path will be passed as arguments to your application. Best Regards Debabrata Ghosh schrieb am Do. 31. Okt. 2019 um 03:26: > Greetings All ! > > I needed some help in obtaining the application logs but I am really > confused where it's currently located. Please allow me to explain my > problem: > 1. I am running the Spark application (written in Java) in a Hortonworks > Data Platform Hadoop cluster > > 2. My spark-submit command is the following: > spark-submit --class com.ibm.spark.sparkkafkamapper.ServiceMapperMain > --master yarn --packages > org.apache.kafka:kafka-clients:0.10.0.1,org.apache.spark:spark-streaming-kafka-0-10_2.11:2.3.0 > --deploy-mode client --files > job-configuration.properties,log4j.properties,/usr/hdp/current/spark2-client/conf/hive-site.xml > ./spark-job.jar --settings=job-configuration.properties > log4j-spark.properties --conf > "spark.driver.extraJavaOptions=-Dlog4j.configuration=log4j.properties" > --conf > "spark.executor.extraJavaOptions=-Dlog4j.configuration=log4j.properties" > > 3. Content of the log4.properties is the following: > # Set everything to be logged to the console > log4j.rootCategory=ALL, console > log4j.appender.console=org.apache.log4j.ConsoleAppender > log4j.appender.console.target=System.err > log4j.appender.console.layout=org.apache.log4j.PatternLayout > log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p > %c{1}: %m%n > > # Set the default spark-shell log level to WARN. When running the > spark-shell, the > # log level for this class is used to overwrite the root logger's log > level, so that > # the user can have different defaults for the shell and regular Spark > apps. > log4j.logger.org.apache.spark.repl.Main=WARN > > # Settings to quiet third party logs that are too verbose > log4j.logger.org.spark_project.jetty=ALL > log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR > log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO > log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO > log4j.logger.org.apache.parquet=ERROR > log4j.logger.parquet=ERROR > > # SPARK-9183: Settings to avoid annoying messages when looking up > nonexistent UDFs in SparkSQL with Hive support > log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL > log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR > > #Any custom class debug > log4j.logger.com.vmeg.code=DEBUG > > 4. Inside the spark application, I am having plenty of logger functions > and one example is like this: > logger.info("No dataset recieved for this batch"); > > Question: My question is where is the application log files being written > ? I have checked in the yarn logs but couldn't find the messages I have > written in the java file. > Request your help please as I am little confused and know that there is > something very silly which I am missing. > > Thanks in advance ! > > Debu > -- *Roland Johann*Software Developer/Data Engineer *phenetic GmbH* Lütticher Straße 10, 50674 Köln, Germany Mobil: +49 172 365 26 46 Mail: roland.joh...@phenetic.io Web: phenetic.io Handelsregister: Amtsgericht Köln (HRB 92595) Geschäftsführer: Roland Johann, Uwe Reimann
Re: Spark job fails because of timeout to Driver
Hi Jochen, Can you crate a small EMR cluster wirh all defaults and rhn the job there? This way we can ensure that the issue is not infrastructure and YARN configuration related. Kind regards Jochen Hebbrecht schrieb am Fr. 4. Okt. 2019 um 19:27: > Hi Roland, > > I switched to the default security groups, ran my job again but the same > exception pops up :-( ... > All traffic is open on the security groups now. > > Jochen > > Op vr 4 okt. 2019 om 17:37 schreef Roland Johann < > roland.joh...@phenetic.io>: > >> This are dynamic port ranges and dependa on configuration of your >> cluster. Per job there is a separate application master so there can‘t be >> just one port. >> If I remeber correctly the default EMR setup creates worker security >> groups with unrestricted traffic within the group, e.g. Between the worker >> nodes. >> Depending on your security requirements I suggest that you start with a >> default like setup and determine ports and port ranges from the docs >> afterwards to further restrict traffic between the nodes. >> >> Kind regards >> >> Jochen Hebbrecht schrieb am Fr. 4. Okt. 2019 >> um 17:16: >> >>> Hi Roland, >>> >>> We have indeed custom security groups. Can you tell me where exactly I >>> need to be able to access what? >>> For example, is it from the master instance to the driver instance? And >>> which port should be open? >>> >>> Jochen >>> >>> Op vr 4 okt. 2019 om 17:14 schreef Roland Johann < >>> roland.joh...@phenetic.io>: >>> >>>> Ho Jochen, >>>> >>>> did you setup the EMR cluster with custom security groups? Can you >>>> confirm that the relevant EC2 instances can connect through relevant ports? >>>> >>>> Best regards >>>> >>>> Jochen Hebbrecht schrieb am Fr. 4. Okt. >>>> 2019 um 17:09: >>>> >>>>> Hi Jeff, >>>>> >>>>> Thanks! Just tried that, but the same timeout occurs :-( ... >>>>> >>>>> Jochen >>>>> >>>>> Op vr 4 okt. 2019 om 16:37 schreef Jeff Zhang : >>>>> >>>>>> You can try to increase property spark.yarn.am.waitTime (by default >>>>>> it is 100s) >>>>>> Maybe you are doing some very time consuming operation when >>>>>> initializing SparkContext, which cause timeout. >>>>>> >>>>>> See this property here >>>>>> http://spark.apache.org/docs/latest/running-on-yarn.html >>>>>> >>>>>> >>>>>> Jochen Hebbrecht 于2019年10月4日周五 下午10:08写道: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> I'm using Spark 2.4.2 on AWS EMR 5.24.0. I'm trying to send a Spark >>>>>>> job towards the cluster. Thhe job gets accepted, but the YARN >>>>>>> application >>>>>>> fails with: >>>>>>> >>>>>>> >>>>>>> {code} >>>>>>> 19/09/27 14:33:35 ERROR ApplicationMaster: Uncaught exception: >>>>>>> java.util.concurrent.TimeoutException: Futures timed out after >>>>>>> [10 milliseconds] >>>>>>> at >>>>>>> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:223) >>>>>>> at >>>>>>> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:227) >>>>>>> at >>>>>>> org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:220) >>>>>>> at >>>>>>> org.apache.spark.deploy.yarn.ApplicationMaster.runDriver(ApplicationMaster.scala:468) >>>>>>> at org.apache.spark.deploy.yarn.ApplicationMaster.org >>>>>>> $apache$spark$deploy$yarn$ApplicationMaster$$runImpl(ApplicationMaster.scala:305) >>>>>>> at >>>>>>> org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply$mcV$sp(ApplicationMaster.scala:245) >>>>>>> at >>>>>>> org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply(ApplicationMaster.scala:245) >>>>>>> at >>>>>>> org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply(ApplicationMaster.scala:245) >>>>>>> at >>>>>>> org.apache.s
Re: Spark job fails because of timeout to Driver
This are dynamic port ranges and dependa on configuration of your cluster. Per job there is a separate application master so there can‘t be just one port. If I remeber correctly the default EMR setup creates worker security groups with unrestricted traffic within the group, e.g. Between the worker nodes. Depending on your security requirements I suggest that you start with a default like setup and determine ports and port ranges from the docs afterwards to further restrict traffic between the nodes. Kind regards Jochen Hebbrecht schrieb am Fr. 4. Okt. 2019 um 17:16: > Hi Roland, > > We have indeed custom security groups. Can you tell me where exactly I > need to be able to access what? > For example, is it from the master instance to the driver instance? And > which port should be open? > > Jochen > > Op vr 4 okt. 2019 om 17:14 schreef Roland Johann < > roland.joh...@phenetic.io>: > >> Ho Jochen, >> >> did you setup the EMR cluster with custom security groups? Can you >> confirm that the relevant EC2 instances can connect through relevant ports? >> >> Best regards >> >> Jochen Hebbrecht schrieb am Fr. 4. Okt. 2019 >> um 17:09: >> >>> Hi Jeff, >>> >>> Thanks! Just tried that, but the same timeout occurs :-( ... >>> >>> Jochen >>> >>> Op vr 4 okt. 2019 om 16:37 schreef Jeff Zhang : >>> >>>> You can try to increase property spark.yarn.am.waitTime (by default it >>>> is 100s) >>>> Maybe you are doing some very time consuming operation when >>>> initializing SparkContext, which cause timeout. >>>> >>>> See this property here >>>> http://spark.apache.org/docs/latest/running-on-yarn.html >>>> >>>> >>>> Jochen Hebbrecht 于2019年10月4日周五 下午10:08写道: >>>> >>>>> Hi, >>>>> >>>>> I'm using Spark 2.4.2 on AWS EMR 5.24.0. I'm trying to send a Spark >>>>> job towards the cluster. Thhe job gets accepted, but the YARN application >>>>> fails with: >>>>> >>>>> >>>>> {code} >>>>> 19/09/27 14:33:35 ERROR ApplicationMaster: Uncaught exception: >>>>> java.util.concurrent.TimeoutException: Futures timed out after [10 >>>>> milliseconds] >>>>> at >>>>> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:223) >>>>> at >>>>> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:227) >>>>> at >>>>> org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:220) >>>>> at >>>>> org.apache.spark.deploy.yarn.ApplicationMaster.runDriver(ApplicationMaster.scala:468) >>>>> at org.apache.spark.deploy.yarn.ApplicationMaster.org >>>>> $apache$spark$deploy$yarn$ApplicationMaster$$runImpl(ApplicationMaster.scala:305) >>>>> at >>>>> org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply$mcV$sp(ApplicationMaster.scala:245) >>>>> at >>>>> org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply(ApplicationMaster.scala:245) >>>>> at >>>>> org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply(ApplicationMaster.scala:245) >>>>> at >>>>> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$3.run(ApplicationMaster.scala:779) >>>>> at java.security.AccessController.doPrivileged(Native Method) >>>>> at javax.security.auth.Subject.doAs(Subject.java:422) >>>>> at >>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844) >>>>> at >>>>> org.apache.spark.deploy.yarn.ApplicationMaster.doAsUser(ApplicationMaster.scala:778) >>>>> at >>>>> org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:244) >>>>> at >>>>> org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:803) >>>>> at >>>>> org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala) >>>>> 19/09/27 14:33:35 INFO ApplicationMaster: Final app status: FAILED, >>>>> exitCode: 13, (reason: Uncaught exception: >>>>> java.util.concurrent.TimeoutException: Futures timed out after [10 >>>>> milliseconds] >>>>> at >>>>> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:223) >>>>> at >>>>> scala
Re: Spark job fails because of timeout to Driver
Ho Jochen, did you setup the EMR cluster with custom security groups? Can you confirm that the relevant EC2 instances can connect through relevant ports? Best regards Jochen Hebbrecht schrieb am Fr. 4. Okt. 2019 um 17:09: > Hi Jeff, > > Thanks! Just tried that, but the same timeout occurs :-( ... > > Jochen > > Op vr 4 okt. 2019 om 16:37 schreef Jeff Zhang : > >> You can try to increase property spark.yarn.am.waitTime (by default it >> is 100s) >> Maybe you are doing some very time consuming operation when initializing >> SparkContext, which cause timeout. >> >> See this property here >> http://spark.apache.org/docs/latest/running-on-yarn.html >> >> >> Jochen Hebbrecht 于2019年10月4日周五 下午10:08写道: >> >>> Hi, >>> >>> I'm using Spark 2.4.2 on AWS EMR 5.24.0. I'm trying to send a Spark job >>> towards the cluster. Thhe job gets accepted, but the YARN application fails >>> with: >>> >>> >>> {code} >>> 19/09/27 14:33:35 ERROR ApplicationMaster: Uncaught exception: >>> java.util.concurrent.TimeoutException: Futures timed out after [10 >>> milliseconds] >>> at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:223) >>> at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:227) >>> at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:220) >>> at >>> org.apache.spark.deploy.yarn.ApplicationMaster.runDriver(ApplicationMaster.scala:468) >>> at org.apache.spark.deploy.yarn.ApplicationMaster.org >>> $apache$spark$deploy$yarn$ApplicationMaster$$runImpl(ApplicationMaster.scala:305) >>> at >>> org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply$mcV$sp(ApplicationMaster.scala:245) >>> at >>> org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply(ApplicationMaster.scala:245) >>> at >>> org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply(ApplicationMaster.scala:245) >>> at >>> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$3.run(ApplicationMaster.scala:779) >>> at java.security.AccessController.doPrivileged(Native Method) >>> at javax.security.auth.Subject.doAs(Subject.java:422) >>> at >>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844) >>> at >>> org.apache.spark.deploy.yarn.ApplicationMaster.doAsUser(ApplicationMaster.scala:778) >>> at >>> org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:244) >>> at >>> org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:803) >>> at >>> org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala) >>> 19/09/27 14:33:35 INFO ApplicationMaster: Final app status: FAILED, >>> exitCode: 13, (reason: Uncaught exception: >>> java.util.concurrent.TimeoutException: Futures timed out after [10 >>> milliseconds] >>> at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:223) >>> at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:227) >>> at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:220) >>> at >>> org.apache.spark.deploy.yarn.ApplicationMaster.runDriver(ApplicationMaster.scala:468) >>> at org.apache.spark.deploy.yarn.ApplicationMaster.org >>> $apache$spark$deploy$yarn$ApplicationMaster$$runImpl(ApplicationMaster.scala:305) >>> at >>> org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply$mcV$sp(ApplicationMaster.scala:245) >>> at >>> org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply(ApplicationMaster.scala:245) >>> at >>> org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply(ApplicationMaster.scala:245) >>> at >>> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$3.run(ApplicationMaster.scala:779) >>> at java.security.AccessController.doPrivileged(Native Method) >>> at javax.security.auth.Subject.doAs(Subject.java:422) >>> at >>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844) >>> at >>> org.apache.spark.deploy.yarn.ApplicationMaster.doAsUser(ApplicationMaster.scala:778) >>> at >>> org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:244) >>> at >>> org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:803) >>> at >>> org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala) >>> {code} >
Re: PGP Encrypt using spark Scala
I want to add that the major hadoop distributions also offer additional encryption possibilities (for example Ranger from Hortonworks) Roland Johann Software Developer/Data Engineer phenetic GmbH Lütticher Straße 10, 50674 Köln, Germany Mobil: +49 172 365 26 46 Mail: roland.joh...@phenetic.io Web: phenetic.io Handelsregister: Amtsgericht Köln (HRB 92595) Geschäftsführer: Roland Johann, Uwe Reimann > Am 26.08.2019 um 23:09 schrieb Roland Johann : > > Hi all, > > instead of handling encryption explicit at application level, I suggest that > you investigate into the topic „encryption at rest“, for example encryption > at HDFS level > https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/TransparentEncryption.html > > <https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/TransparentEncryption.html> > - obviously if you don’t have to use PGP. Using encryption at the storage > layer simplifies your application and architecture and you don’t need to > reinvent the wheel. > > Kind Regards > > Roland Johann > Software Developer/Data Engineer > > phenetic GmbH > Lütticher Straße 10, 50674 Köln, Germany > > Mobil: +49 172 365 26 46 > Mail: roland.joh...@phenetic.io <mailto:roland.joh...@phenetic.io> > Web: phenetic.io <http://phenetic.io/> > > Handelsregister: Amtsgericht Köln (HRB 92595) > Geschäftsführer: Roland Johann, Uwe Reimann > > > >> Am 26.08.2019 um 17:47 schrieb Sachit Murarka > <mailto:connectsac...@gmail.com>>: >> >> Hi Deepak, >> >> Thanks for reply. >> >> Yes. That is the option I am considering now because even apache camel needs >> data in local. I might need to copy data from hdfs to local if I want >> apache camel ( to get rid of shell). >> >> Thanks >> Sachit >> >> On Mon, 26 Aug 2019, 21:11 Deepak Sharma, > <mailto:deepakmc...@gmail.com>> wrote: >> Hi Schit >> PGP Encrypt is something that is not inbuilt with spark. >> I would suggest writing a shell script that would do pgp encrypt and use it >> in spark scala program , which would run from driver. >> >> Thanks >> Deepak >> >> On Mon, Aug 26, 2019 at 8:10 PM Sachit Murarka > <mailto:connectsac...@gmail.com>> wrote: >> Hi All, >> >> I want to encrypt my files available at HDFS location using PGP Encryption >> How can I do it in spark. I saw Apache Camel but it seems camel is used >> when source files are in Local location rather than HDFS. >> >> Kind Regards, >> Sachit Murarka >> >> >> -- >> Thanks >> Deepak >> www.bigdatabig.com <http://www.bigdatabig.com/> >> www.keosha.net <http://www.keosha.net/>
Re: PGP Encrypt using spark Scala
Hi all, instead of handling encryption explicit at application level, I suggest that you investigate into the topic „encryption at rest“, for example encryption at HDFS level https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/TransparentEncryption.html <https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/TransparentEncryption.html> - obviously if you don’t have to use PGP. Using encryption at the storage layer simplifies your application and architecture and you don’t need to reinvent the wheel. Kind Regards Roland Johann Software Developer/Data Engineer phenetic GmbH Lütticher Straße 10, 50674 Köln, Germany Mobil: +49 172 365 26 46 Mail: roland.joh...@phenetic.io Web: phenetic.io Handelsregister: Amtsgericht Köln (HRB 92595) Geschäftsführer: Roland Johann, Uwe Reimann > Am 26.08.2019 um 17:47 schrieb Sachit Murarka : > > Hi Deepak, > > Thanks for reply. > > Yes. That is the option I am considering now because even apache camel needs > data in local. I might need to copy data from hdfs to local if I want apache > camel ( to get rid of shell). > > Thanks > Sachit > > On Mon, 26 Aug 2019, 21:11 Deepak Sharma, <mailto:deepakmc...@gmail.com>> wrote: > Hi Schit > PGP Encrypt is something that is not inbuilt with spark. > I would suggest writing a shell script that would do pgp encrypt and use it > in spark scala program , which would run from driver. > > Thanks > Deepak > > On Mon, Aug 26, 2019 at 8:10 PM Sachit Murarka <mailto:connectsac...@gmail.com>> wrote: > Hi All, > > I want to encrypt my files available at HDFS location using PGP Encryption > How can I do it in spark. I saw Apache Camel but it seems camel is used when > source files are in Local location rather than HDFS. > > Kind Regards, > Sachit Murarka > > > -- > Thanks > Deepak > www.bigdatabig.com <http://www.bigdatabig.com/> > www.keosha.net <http://www.keosha.net/>
Re: [External]Re: error while connecting to azure blob storage
https://spark.apache.org/docs/latest/submitting-applications.html#advanced-dependency-management <https://spark.apache.org/docs/latest/submitting-applications.html#advanced-dependency-management> Try to add the azure dependency via `—packages org.apache.hadoop:hadoop-azure:2.7.7` assuming you use hadoop 2.7.7. Best Regards Roland Johann Software Developer/Data Engineer phenetic GmbH Lütticher Straße 10, 50674 Köln, Germany Mobil: +49 172 365 26 46 Mail: roland.joh...@phenetic.io Web: phenetic.io Handelsregister: Amtsgericht Köln (HRB 92595) Geschäftsführer: Roland Johann, Uwe Reimann > Am 23.08.2019 um 10:08 schrieb Krishna Chandran Nair > : > > Please find the attached error > > From: Roland Johann > Sent: 23 August 2019 10:51 AM > To: Krishna Chandran Nair > Cc: user@spark.apache.org > Subject: [External]Re: error while connecting to azure blob storage > > Hi Krishna, > > there seems to be no attachment. > In addition, you should NEVER post private credentials to public forums. > Please renew the credentials of your storage account as soon as possible! > > Best Regards > > Roland Johann > Software Developer/Data Engineer > > phenetic GmbH > Lütticher Straße 10, 50674 Köln, Germany > > Mobil: +49 172 365 26 46 > Mail: roland.joh...@phenetic.io <mailto:roland.joh...@phenetic.io> > Web: phenetic.io <http://phenetic.io/> > > Handelsregister: Amtsgericht Köln (HRB 92595) > Geschäftsführer: Roland Johann, Uwe Reimann > > > > > Am 23.08.2019 um 08:33 schrieb Krishna Chandran Nair > mailto:kcn...@qatarairways.com.qa>>: > > > > Hi Team, > > I have written a small code to connect to azure blob storage but go error. I > have attached the error log. Please help > > Calling command -- ./spark-submit stg.py --jars > /home/citus/spark/spark-2.3.3-bin-hadoop2.7/jars/hadoop-azure-3.2.0.jar,/home/citus/spark/spark-2.3.3-bin-hadoop2.7/jars/azure-storage-8.4.0.jar > > Code > > > vi ~/spark/spark-2.3.3-bin-hadoop2.7/bin/stg.py > > from pyspark import SparkContext > from pyspark.sql import SparkSession > from pyspark.sql import DataFrameReader > from pyspark.sql import SparkSession > > session = SparkSession.builder.getOrCreate() > > > #session.conf.set("fs.azure", > "org.apache.hadoop.fs.azure.NativeAzureFileSystem") > #session.conf.set("fs.hdfs.impl", > "org.apache.hadoop.hdfs.DistributedFileSystem") > #session.conf.set("fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem") > > > > #session.conf.set( > # "fs.azure.sas.snowflakestrg.blob.core.windows.net/test", > # > "?sv=2018-03-28=bfqt=sco=rwdlacup=2020-01-01T16:37:05Z=2019-08-13T08:37:05Z=https=BgTl8mibE%2B%2BTTIMG4dKR17NnGinMWEVTtn888MD8PT4%3D" > #) > > session.conf.set( > "fs.azure.account.key.snowflakestrg.blob.core.windows.net", > > "LIWCYzrJOS4hs0DiQH6fAzjuBnuj/F8myVmJImomEqOqlAV4pSt7KWfr24mj0saaOTVNZkGTKUn41k4e9hqKSA==") > > df=session.read.csv("wasbs://t...@snowflakestrg.blob.core.windows.net/users.csv > ") > > df.show(5) > > > Qatar Airways - Going Places Together > > > > Disclaimer:- This message (including attachments) is intended solely for the > addressee named above. It may be confidential, privileged, subject to > copyright, trade secret, or other legal rules and may not be forwarded > without the author's permission. If you are not the addressee you must not > read, copy or disseminate this message. If you have received it in error > please notify the sender immediately and delete the message from all storage > devices. Any opinions expressed in this message do not necessarily represent > the official positions of Qatar Airways. Any agreements (including any > warranties, representations, or offers) concluded with Qatar Airways by using > electronic correspondence shall only come into existence if an authorized > representative of Qatar Airways has explicitly approved such contract > formation. To the fullest extent permissible by law, Qatar Airways disclaim > all liability for loss or damage to person or property arising from this > message being infected by computer virus or other contamination. > >
Re: error while connecting to azure blob storage
Hi Krishna, there seems to be no attachment. In addition, you should NEVER post private credentials to public forums. Please renew the credentials of your storage account as soon as possible! Best Regards Roland Johann Software Developer/Data Engineer phenetic GmbH Lütticher Straße 10, 50674 Köln, Germany Mobil: +49 172 365 26 46 Mail: roland.joh...@phenetic.io Web: phenetic.io Handelsregister: Amtsgericht Köln (HRB 92595) Geschäftsführer: Roland Johann, Uwe Reimann > Am 23.08.2019 um 08:33 schrieb Krishna Chandran Nair > : > > > > Hi Team, > > I have written a small code to connect to azure blob storage but go error. I > have attached the error log. Please help > > Calling command -- ./spark-submit stg.py --jars > /home/citus/spark/spark-2.3.3-bin-hadoop2.7/jars/hadoop-azure-3.2.0.jar,/home/citus/spark/spark-2.3.3-bin-hadoop2.7/jars/azure-storage-8.4.0.jar > > Code > > > vi ~/spark/spark-2.3.3-bin-hadoop2.7/bin/stg.py > > from pyspark import SparkContext > from pyspark.sql import SparkSession > from pyspark.sql import DataFrameReader > from pyspark.sql import SparkSession > > session = SparkSession.builder.getOrCreate() > > > #session.conf.set("fs.azure", > "org.apache.hadoop.fs.azure.NativeAzureFileSystem") > #session.conf.set("fs.hdfs.impl", > "org.apache.hadoop.hdfs.DistributedFileSystem") > #session.conf.set("fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem") > > > > #session.conf.set( > # "fs.azure.sas.snowflakestrg.blob.core.windows.net/test", > # > "?sv=2018-03-28=bfqt=sco=rwdlacup=2020-01-01T16:37:05Z=2019-08-13T08:37:05Z=https=BgTl8mibE%2B%2BTTIMG4dKR17NnGinMWEVTtn888MD8PT4%3D" > #) > > session.conf.set( > "fs.azure.account.key.snowflakestrg.blob.core.windows.net", > > "LIWCYzrJOS4hs0DiQH6fAzjuBnuj/F8myVmJImomEqOqlAV4pSt7KWfr24mj0saaOTVNZkGTKUn41k4e9hqKSA==") > > df=session.read.csv("wasbs://t...@snowflakestrg.blob.core.windows.net/users.csv > ") > > df.show(5) > > > Qatar Airways - Going Places Together > > > > Disclaimer:- This message (including attachments) is intended solely for the > addressee named above. It may be confidential, privileged, subject to > copyright, trade secret, or other legal rules and may not be forwarded > without the author's permission. If you are not the addressee you must not > read, copy or disseminate this message. If you have received it in error > please notify the sender immediately and delete the message from all storage > devices. Any opinions expressed in this message do not necessarily represent > the official positions of Qatar Airways. Any agreements (including any > warranties, representations, or offers) concluded with Qatar Airways by using > electronic correspondence shall only come into existence if an authorized > representative of Qatar Airways has explicitly approved such contract > formation. To the fullest extent permissible by law, Qatar Airways disclaim > all liability for loss or damage to person or property arising from this > message being infected by computer virus or other contamination.