Re: Structured Streaming Microbatch Semantics

2021-03-05 Thread Roland Johann
Hi Rico,

there is no way to deferr records from one micro batch to the next one. So
it‘s guaranteed that the data and trigger event will be processed within
the dame batch.

I assume that one trigger event lead to an unknown batch size of actual
events pulled via HTTP. This bypasses throughput properties of spark
streaming. Depending on the amount of the resulting HTTP records, maybe you
consider splitting the pipeline into two parts:
- process trigger event, pull data from HTTP, write to kafka
- perform structured streaming ingestion

Kind regards

Dipl.-Inf. Rico Bergmann  schrieb am Fr. 5. März 2021
um 09:06:

> Hi all!
>
> I'm using Spark structured streaming for a data ingestion pipeline.
> Basically the pipeline reads events (notifications of new available
> data) from a Kafka topic and then queries a REST endpoint to get the
> real data (within a flatMap).
>
> For one single event the pipeline creates a few thousand records (rows)
> that have to be stored. And to write the data I use foreachBatch().
>
> My question is now: Is it guaranteed by Spark that all output records of
> one event are always contained in a single batch or can the records also
> be split into multiple batches?
>
>
> Best,
>
> Rico.
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
> --
Roland Johann
Data Architect/Data Engineer

phenetic GmbH
Lütticher Straße 10, 50674 Köln, Germany

Mobil: +49 172 365 26 46
Mail: roland.joh...@phenetic.io
Web: phenetic.io

Handelsregister: Amtsgericht Köln (HRB 92595)
Geschäftsführer: Roland Johann, Uwe Reimann


unsubscribe

2021-02-26 Thread Roland Johann
unsubscribe


signature.asc
Description: Message signed with OpenPGP


Unsubscribe

2021-02-24 Thread Roland Johann
unsubscribe--
Roland Johann
Data Architect/Data Engineer

phenetic GmbH
Lütticher Straße 10, 50674 Köln, Germany

Mobil: +49 172 365 26 46
Mail: roland.joh...@phenetic.io
Web: phenetic.io

Handelsregister: Amtsgericht Köln (HRB 92595)
Geschäftsführer: Roland Johann, Uwe Reimann


Re: Convert Seq[Any] to Seq[String]

2020-12-19 Thread Roland Johann
Your code looks overly complicated and the relevant parts are missing. If
possible please post the complete snippet including the retrieval/type if
rows so we get the complete picture and can try to help.

For first simplification you can just convert aMap to Seq[(String, (String,
String))] and further map it to flatten the nested tuple into a Seq which
you then pass to toDF via var arg expansion.

Val colNames: Seq[String] = aMap.toSeq.map(kv => Seq(kv._1, kv._2._1,
kv._2._2))

Depending on the type of aMap this leads to problems as we assume it to be
Map[String, (String, String)].

Best Regards

Vikas Garg  schrieb am Fr. 18. Dez. 2020 um 15:46:

> I am getting the table schema through Map which I have converted to Seq
> and passing to toDF
>
> On Fri, 18 Dec 2020 at 20:13, Sean Owen  wrote:
>
>> It's not really a Spark question. .toDF() takes column names.
>> atrb.head.toSeq.map(_.toString)? but it's not clear what you mean the col
>> names to be
>>
>> On Fri, Dec 18, 2020 at 8:37 AM Vikas Garg  wrote:
>>
>>> Hi,
>>>
>>> Can someone please help me how to convert Seq[Any] to Seq[String]
>>>
>>> For line
>>> val df = row.toSeq.toDF(newCol.toSeq: _*)
>>> I get that error message.
>>>
>>> I converted Map "val aMap = Map("admit" -> ("description","comments"))"
>>> to Seq
>>>
>>> var atrb = ListBuffer[(String,String,String)]()
>>>
>>> for((key,value) <- aMap){
>>>   atrb += ((key, value._1, value._2))
>>> }
>>>
>>> var newCol = atrb.head.productIterator.toList.toSeq
>>>
>>> Please someone help me on this.
>>>
>>> Thanks
>>>
>>> --
Roland Johann
Data Architect/Data Engineer

phenetic GmbH
Lütticher Straße 10, 50674 Köln, Germany

Mobil: +49 172 365 26 46
Mail: roland.joh...@phenetic.io
Web: phenetic.io

Handelsregister: Amtsgericht Köln (HRB 92595)
Geschäftsführer: Roland Johann, Uwe Reimann


Re: dynamic executor scalling spark on kubernetes client mode

2020-05-12 Thread Roland Johann
Hi all,

don’t want to interrupt the conversation but are keen where I can find 
information regarding dynamic allocation on kubernetes. As far as I know the 
docs just point to future work.

Thanks a lot,
Roland



> Am 12.05.2020 um 09:25 schrieb Steven Stetzler :
> 
> Hi all,
> 
> I am interested in this as well. My use-case could benefit from dynamic 
> executor scaling but we are restricted to using client mode since we are only 
> using Spark shells.
> 
> Could anyone help me understand the barriers to getting dynamic executor 
> scaling to work in client mode on Kubernetes?
> 
> Thanks,
> Steven
> 
> On Sat, May 9, 2020 at 9:48 AM Pradeepta Choudhury 
> mailto:pradeeptachoudhu...@gmail.com>> wrote:
> Hiii ,
> 
> The dynamic executor scalling is working fine for spark on kubernetes (latest 
> from spark master repository ) in cluster mode . is the dynamic executor 
> scalling available for client mode ? if yes where can i find the usage doc 
> for same .
> If no is there any PR open for this ?
> 
> Thanks ,
> Pradeepta



Re: Left Join at SQL query gets planned as inner join

2020-04-30 Thread Roland Johann
Thank you, that’s absolutely right!
Getting the rows of `s` without matches in `p` is now not a problem anymore.

Have a nice day
Roland


> Am 30.04.2020 um 17:36 schrieb Ryan C. Kleck :
> 
> He’s saying you need to move the filters for the ‘p’ table in order to do 
> what you want. They need to be before your WHERE. The order of operations in 
> sql applies your join clause filters before the WHERE. The filters on your 
> ‘s’ table need to stay in the WHERE. It’s the only time the ordering matters 
> when you are doing OUTER JOINs.
> 
> Intuitively with your query why would you do an OUTER JOIN meaning you think 
> some things in ‘p’ will be NULL after your join. But your filter in the WHERE 
> gets rid of all the Nulls in ‘p’.  You basically force it into an Inner join 
> with that filter and the planner recognizes that.
> 
> Ryan Kleck
> Software Developer IV
> Customer Knowledge Platform
> From: Roland Johann 
> Sent: Thursday, April 30, 2020 8:30:05 AM
> To: randy clinton 
> Cc: Roland Johann ; user 
> 
> Subject: Re: Left Join at SQL query gets planned as inner join
>  
> Notice: This email is from an external sender.
>  
> Thank for quick reply.
> 
> It plans the LeftOuter as soon as the filters on the second table will be 
> removed.
> 
>> It seems like you are asking for a left join, but your filters demand the 
>> behavior of an inner join. 
> Can you explain that?
> The filters on the second table uses partition pruning that we don’t have to 
> do a full table scan to only get the data for that one day of the second 
> table. To be more precise: We want to left outer join the two tables records 
> of the same day and join on id properties where the second table must not 
> contain records matching the join condition.
> 
>> s_DF = s_DF.filter(year = 2020 and month = 4 and day = 29)
>> p_DF = p_DF.filter(year = 2020 and month = 4 and day = 29 and event_id is 
>> null)
> that points to the second question of mine: event_id is null should be done 
> after the join to get records of the first table that doesn’t match to 
> records of the first table. The plan actually prints that it pushes down that 
> filter to parquet and doesn’t select it anyway, so the entire result set is 
> empty caused by the additional inner join. The desired behavior can be 
> achieved by left anti joins but that’s not the point as the where condition 
> behaves differently that one would expect.
> 
> I hope that this doesn’t gets confusing that we talk about two different, but 
> somehow related, problems within a single thread..
> 
> Best
> Roland
> 
> 
>> Am 30.04.2020 um 17:20 schrieb randy clinton > <mailto:randyclin...@gmail.com>>:
>> 
>> Does it still plan an inner join if you remove a filter on both tables?
>> 
>> It seems like you are asking for a left join, but your filters demand the 
>> behavior of an inner join. 
>> 
>> Maybe you could do the filters on the tables first and then join them.
>> 
>> Something roughly like..
>> 
>> s_DF = s_DF.filter(year = 2020 and month = 4 and day = 29)
>> p_DF = p_DF.filter(year = 2020 and month = 4 and day = 29 and event_id is 
>> null)
>> 
>> output = s_DF.join(p_DF, event_id == source_event_id, left)
>> 
>> 
>> 
>> On Thu, Apr 30, 2020 at 11:06 AM Roland Johann 
>> > <mailto:roland.joh...@phenetic.io.invalid>> wrote:
>> Hi All,
>> 
>> 
>> we are on vanilla Spark 2.4.4 and currently experience a somehow strange 
>> behavior of the query planner/optimizer and therefore get wrong results.
>> 
>> select
>> s.event_id as search_event_id,
>> s.query_string,
>> p.event_id
>> from s
>> left outer join p on s.event_id = p.source_event_id
>> where
>> s.year = 2020 and s.month = 4 and s.day = 29
>> and p.year = 2020 and p.month = 4 and p.day = 29
>> limit 1
>> This query leads to that plan:
>> *(2) Project [event_id#12131 AS search_event_id#12118, query_string#12178, 
>> event_id#12209]
>> +- *(2) BroadcastHashJoin [event_id#12131], [source_event_id#12221], Inner, 
>> BuildLeft
>>:- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, 
>> true]))
>>:  +- *(1) Project [event_id#12131, query_string#12178]
>>: +- *(1) Filter isnotnull(event_id#12131)
>>:+- *(1) FileScan parquet 
>> s[event_id#12131,query_string#12178,year#12194,month#12195,day#12196] 
>> Batched: true, Format: Parquet, Location: 
>> PrunedInMemoryFileIndex[hdfs:///search/year=2020/month=4/day=29/...,
>>  PartitionCount: 1, PartitionFilt

Re: Left Join at SQL query gets planned as inner join

2020-04-30 Thread Roland Johann
Thank for quick reply.

It plans the LeftOuter as soon as the filters on the second table will be 
removed.

> It seems like you are asking for a left join, but your filters demand the 
> behavior of an inner join. 
Can you explain that?
The filters on the second table uses partition pruning that we don’t have to do 
a full table scan to only get the data for that one day of the second table. To 
be more precise: We want to left outer join the two tables records of the same 
day and join on id properties where the second table must not contain records 
matching the join condition.

> s_DF = s_DF.filter(year = 2020 and month = 4 and day = 29)
> p_DF = p_DF.filter(year = 2020 and month = 4 and day = 29 and event_id is 
> null)
that points to the second question of mine: event_id is null should be done 
after the join to get records of the first table that doesn’t match to records 
of the first table. The plan actually prints that it pushes down that filter to 
parquet and doesn’t select it anyway, so the entire result set is empty caused 
by the additional inner join. The desired behavior can be achieved by left anti 
joins but that’s not the point as the where condition behaves differently that 
one would expect.

I hope that this doesn’t gets confusing that we talk about two different, but 
somehow related, problems within a single thread..

Best
Roland


> Am 30.04.2020 um 17:20 schrieb randy clinton :
> 
> Does it still plan an inner join if you remove a filter on both tables?
> 
> It seems like you are asking for a left join, but your filters demand the 
> behavior of an inner join. 
> 
> Maybe you could do the filters on the tables first and then join them.
> 
> Something roughly like..
> 
> s_DF = s_DF.filter(year = 2020 and month = 4 and day = 29)
> p_DF = p_DF.filter(year = 2020 and month = 4 and day = 29 and event_id is 
> null)
> 
> output = s_DF.join(p_DF, event_id == source_event_id, left)
> 
> 
> 
> On Thu, Apr 30, 2020 at 11:06 AM Roland Johann 
>  wrote:
> Hi All,
> 
> 
> we are on vanilla Spark 2.4.4 and currently experience a somehow strange 
> behavior of the query planner/optimizer and therefore get wrong results.
> 
> select
> s.event_id as search_event_id,
> s.query_string,
> p.event_id
> from s
> left outer join p on s.event_id = p.source_event_id
> where
> s.year = 2020 and s.month = 4 and s.day = 29
> and p.year = 2020 and p.month = 4 and p.day = 29
> limit 1
> This query leads to that plan:
> *(2) Project [event_id#12131 AS search_event_id#12118, query_string#12178, 
> event_id#12209]
> +- *(2) BroadcastHashJoin [event_id#12131], [source_event_id#12221], Inner, 
> BuildLeft
>:- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, 
> true]))
>:  +- *(1) Project [event_id#12131, query_string#12178]
>: +- *(1) Filter isnotnull(event_id#12131)
>:+- *(1) FileScan parquet 
> s[event_id#12131,query_string#12178,year#12194,month#12195,day#12196] 
> Batched: true, Format: Parquet, Location: 
> PrunedInMemoryFileIndex[hdfs:///search/year=2020/month=4/day=29/...,
>  PartitionCount: 1, PartitionFilters: [isnotnull(year#12194), 
> isnotnull(month#12195), isnotnull(day#12196), (year#12194 = 2020), (month..., 
> PushedFilters: [IsNotNull(event_id)], ReadSchema: 
> struct
>+- *(2) Project [event_id#12209, source_event_id#12221]
>   +- *(2) Filter isnotnull(source_event_id#12221)
>  +- *(2) FileScan parquet 
> s[event_id#12209,source_event_id#12221,year#12308,month#12309,day#12310] 
> Batched: true, Format: Parquet, Location: 
> PrunedInMemoryFileIndex[hdfs:///p/year=2020/month=4/day=2..., 
> PartitionCount: 1, PartitionFilters: [isnotnull(day#12310), 
> isnotnull(year#12308), isnotnull(month#12309), (year#12308 = 2020), 
> (month..., PushedFilters: [IsNotNull(source_event_id)], ReadSchema: 
> struct
> Without partition pruning the join gets planned as LeftOuter, with 
> SortMergeJoin but we need partition pruning in this case to prevent full 
> table scans and profit from broadcast join...
> 
> As soon as we rewrite the query with scala the plan looks fine
> val s = spark.sql("select event_id, query_string from ssi_kpi.search where 
> year = 2020 and month = 4 and day = 29")
> val p = spark.sql("select event_id, source_event_id from ssi_kpi.pda_show 
> where year = 2020 and month = 4 and day = 29")
> 
> s
>   .join(p, s("event_id") <=> p("source_event_id"), "left_outer")
>   .groupBy(s("query_string"))
>   .agg(count(s("query_string")), count(p("event_id")))
>   .show()
> 
> 
> The second thing we saw that conditions at the where clause of joined tables 
> gets pushed down t

Left Join at SQL query gets planned as inner join

2020-04-30 Thread Roland Johann
Hi All,


we are on vanilla Spark 2.4.4 and currently experience a somehow strange 
behavior of the query planner/optimizer and therefore get wrong results.

select
s.event_id as search_event_id,
s.query_string,
p.event_id
from s
left outer join p on s.event_id = p.source_event_id
where
s.year = 2020 and s.month = 4 and s.day = 29
and p.year = 2020 and p.month = 4 and p.day = 29
limit 1
This query leads to that plan:
*(2) Project [event_id#12131 AS search_event_id#12118, query_string#12178, 
event_id#12209]
+- *(2) BroadcastHashJoin [event_id#12131], [source_event_id#12221], Inner, 
BuildLeft
   :- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, 
true]))
   :  +- *(1) Project [event_id#12131, query_string#12178]
   : +- *(1) Filter isnotnull(event_id#12131)
   :+- *(1) FileScan parquet 
s[event_id#12131,query_string#12178,year#12194,month#12195,day#12196] Batched: 
true, Format: Parquet, Location: 
PrunedInMemoryFileIndex[hdfs:///search/year=2020/month=4/day=29/..., 
PartitionCount: 1, PartitionFilters: [isnotnull(year#12194), 
isnotnull(month#12195), isnotnull(day#12196), (year#12194 = 2020), (month..., 
PushedFilters: [IsNotNull(event_id)], ReadSchema: 
struct
   +- *(2) Project [event_id#12209, source_event_id#12221]
  +- *(2) Filter isnotnull(source_event_id#12221)
 +- *(2) FileScan parquet 
s[event_id#12209,source_event_id#12221,year#12308,month#12309,day#12310] 
Batched: true, Format: Parquet, Location: 
PrunedInMemoryFileIndex[hdfs:///p/year=2020/month=4/day=2..., 
PartitionCount: 1, PartitionFilters: [isnotnull(day#12310), 
isnotnull(year#12308), isnotnull(month#12309), (year#12308 = 2020), (month..., 
PushedFilters: [IsNotNull(source_event_id)], ReadSchema: 
struct
Without partition pruning the join gets planned as LeftOuter, with 
SortMergeJoin but we need partition pruning in this case to prevent full table 
scans and profit from broadcast join...

As soon as we rewrite the query with scala the plan looks fine
val s = spark.sql("select event_id, query_string from ssi_kpi.search where year 
= 2020 and month = 4 and day = 29")
val p = spark.sql("select event_id, source_event_id from ssi_kpi.pda_show where 
year = 2020 and month = 4 and day = 29")

s
  .join(p, s("event_id") <=> p("source_event_id"), "left_outer")
  .groupBy(s("query_string"))
  .agg(count(s("query_string")), count(p("event_id")))
  .show()


The second thing we saw that conditions at the where clause of joined tables 
gets pushed down to the parquet files and lead to wring results, for example:
select
s.event_id as search_event_id,
s.query_string,
p.event_id
from s
left outer join p on s.event_id = p.source_event_id
where
s.year = 2020 and s.month = 4 and s.day = 29
and p.year = 2020 and p.month = 4 and p.day = 29
and p.event_id is null
Until now I assumed that the string based queries and the scala dsl lead to the 
same execution plan. Can someone point to docs about the internals of this 
topic of spark? The official docs about SQL in general are not that verbose.

Thanks in advance and stay safe!

Roland Johann

Re: [pyspark] Load a master data file to spark ecosystem

2020-04-25 Thread Roland Johann
You can read both, the logs and the tree file into dataframes and join
them. Doing this spark can distribute the relevant records or even the
whole dataframe via broadcast to optimize the execution.

Best regards

Sonal Goyal  schrieb am Sa. 25. Apr. 2020 um 06:59:

> How does your tree_lookup_value function work?
>
> Thanks,
> Sonal
> Nube Technologies <http://www.nubetech.co>
>
> <http://in.linkedin.com/in/sonalgoyal>
>
>
>
>
> On Fri, Apr 24, 2020 at 8:47 PM Arjun Chundiran 
> wrote:
>
>> Hi Team,
>>
>> I have asked this question in stack overflow
>> <https://stackoverflow.com/questions/61386719/load-a-master-data-file-to-spark-ecosystem>
>> and I didn't really get any convincing answers. Can somebody help me to
>> solve this issue?
>>
>> Below is my problem
>>
>> While building a log processing system, I came across a scenario where I
>> need to look up data from a tree file (Like a DB) for each and every log
>> line for corresponding value. What is the best approach to load an external
>> file which is very large into the spark ecosystem? The tree file is of size
>> 2GB.
>>
>> Here is my scenario
>>
>>1. I have a file contains huge number of log lines.
>>2. Each log line needs to be split by a delimiter to 70 fields
>>3. Need to lookup the data from tree file for one of the 70 fields of
>>a log line.
>>
>> I am using Apache Spark Python API and running on a 3 node cluster.
>>
>> Below is the code which I have written. But it is really slow
>>
>> def process_logline(line, tree):
>> row_dict = {}
>> line_list = line.split(" ")
>> row_dict["host"] = tree_lookup_value(tree, line_list[0])
>> new_row = Row(**row_dict)
>> return new_row
>> def run_job(vals):
>> spark.sparkContext.addFile('somefile')
>> tree_val = open(SparkFiles.get('somefile'))
>> lines = spark.sparkContext.textFile("log_file")
>> converted_lines_rdd = lines.map(lambda l: process_logline(l, tree_val))
>> log_line_rdd = spark.createDataFrame(converted_lines_rdd)
>> log_line_rdd.show()
>>
>> Basically I need some option to load the file one time in memory of workers 
>> and start using it entire job life time using Python API.
>>
>> Thanks in advance
>> Arjun
>>
>>
>>
>> --
Roland Johann
Software Developer/Data Engineer

phenetic GmbH
Lütticher Straße 10, 50674 Köln, Germany

Mobil: +49 172 365 26 46
Mail: roland.joh...@phenetic.io
Web: phenetic.io

Handelsregister: Amtsgericht Köln (HRB 92595)
Geschäftsführer: Roland Johann, Uwe Reimann


Re: 30000 partitions vs 1000 partitions with Coalescing

2020-04-24 Thread Roland Johann
Hi Adnan,

coalescing involves network shuffle to other executors. How many executors are 
configured for that job?

Best regards

Roland Johann
Software Developer/Data Engineer

phenetic GmbH
Lütticher Straße 10, 50674 Köln, Germany

Mobil: +49 172 365 26 46
Mail: roland.joh...@phenetic.io
Web: phenetic.io

Handelsregister: Amtsgericht Köln (HRB 92595)
Geschäftsführer: Roland Johann, Uwe Reimann



> Am 23.04.2020 um 20:41 schrieb dev nan :
> 
> I would like to know why it is faster to write out an RDD that has 30,000 
> partitions as 30,000 files sized 1K-2M rather than coalescing it to 1000 
> partitions and writing out 1000 S3 files of roughly 26MB each, or even 100 
> partitions and 100 S3 files of 260MB each. 
> 
> The coalescing takes a long time.
> 
> 
> Thanks,
> 
> Adnan



Re: Standard practices for building dashboards for spark processed data

2020-02-25 Thread Roland Johann
Hi Ani,

Prometheus is not well suited for ingesting explicit timeseries data. Its
purpose is for technical monitoring. If you want to monitor your spark jobs
with prometheus you can publish the metrics so prometheus can scrape it.
What you propably are looking for is a timeseries database that you can
push metrics to.

Looking for an alternative for grafana should be done only if you find
grafana is not well suited for your use case regarding visualization.

As said earlier, at a quick glance it sounds that you should look for an
alternative to prometheus.

For timeseries you can reach out to TimescaleDB, InfluxDB. Other databases
like normal SQL databases or cassandra lacks up/downsampling capabilities
that can lead to large query responses and the need for the client to post
process.

Kind regards,

Aniruddha P Tekade  schrieb am Mi. 26. Feb. 2020
um 02:23:

> Hello,
>
> I am trying to build a data pipeline that uses spark structured streaming
> with delta project and runs into Kubernetes. Due to this, I get my output
> files only into parquet format. Since I am asked to use the prometheus and
> grafana
> for building the dashboard for this pipeline, I run an another small spark
> job and convert output into json so that I would be able to insert them
> into Grafana. Although I can see that this step is redundant, considering
> the important of delta lake project, I can not write my data directly into
> json. Therefore I need some help/guidelines/opinions about moving forward
> from here.
>
> I would appreciate if the spark user(s) can provide me some practices to
> follow with respect to the following questions -
>
>1. Since I can not have direct json output from spark structured
>streams, is there any better way to convert parquet into json? Or should I
>keep only parquet?
>2. Will I need to write some custom exporter for prometheus so as to
>make grafana read those time-series data?
>3. Is there any better dashboard alternative than Grafana for this
>requirement?
>4. Since the pipeline is going to run into Kubernetes, I am trying to
>avoid InfluxDB as time-series database and moving with prometheus. Is this
>approach correct?
>
> Thanks,
> Ani
> ---
> ᐧ
>
-- 
Roland Johann
Software Developer/Data Engineer

phenetic GmbH
Lütticher Straße 10, 50674 Köln, Germany

Mobil: +49 172 365 26 46
Mail: roland.joh...@phenetic.io
Web: phenetic.io

Handelsregister: Amtsgericht Köln (HRB 92595)
Geschäftsführer: Roland Johann, Uwe Reimann


Structured Streaming Kafka change maxOffsetsPerTrigger won't apply

2019-11-20 Thread Roland Johann
Hi All,

changing maxOffsetsPerTrigger and restarting the job won’t apply to the batch 
size. This is somehow bad as we currently use a trigger duration of 5minutes 
which consumes only 100k messages with an offset lag in the billions. 
Decreasing trigger duration affects also micro batch size - but its then only a 
few hundreds. Spark version in use is 2.4.4.

I assume that spark uses previous micro batch sizes and runtimes to somehow 
calculate current batch sizes based on trigger durations. AFAIK structured 
streaming isn’t back pressure aware, so this behavior is strange on multiple 
levels.

Any help appreciated.

Kind Regards
Roland

Re: Delta with intelligent upsett

2019-11-01 Thread Roland Johann
If the dataset contains a column like changed_at/created_at you can use this as 
watermark and filter out rows that have changed_at/created_at before the 
watermark. 

Best Regards

Roland Johann
Software Developer/Data Engineer

phenetic GmbH
Lütticher Straße 10, 50674 Köln, Germany

Mobil: +49 172 365 26 46
Mail: roland.joh...@phenetic.io
Web: phenetic.io

Handelsregister: Amtsgericht Köln (HRB 92595)
Geschäftsführer: Roland Johann, Uwe Reimann



> Am 01.11.2019 um 07:52 schrieb Gourav Sengupta :
> 
> should not a where clause on the partition field help with that? I am 
> obviously missing something in the question.
> 
> Regards,
> Gourav
> 
> On Thu, Oct 31, 2019 at 9:15 PM ayan guha  <mailto:guha.a...@gmail.com>> wrote:
> 
> Hi
> 
> we have a scenario where we have a large table  ie 5-6B records. The table is 
> repository of data from past N years. It is possible that some updates take 
> place on the data and thus er are using Delta table. 
> 
> As part of the business process we know updates can happen only within M 
> years of past records where M is much smaller than N. Eg the table can hold 
> 20 yrs of data but we know updates can happen only for last year not before 
> that. 
> 
> Is there some way to indicate this additional intelligence to Delta so it can 
> look into only last years data while running a merge or update? It seems to 
> be an obvious performance booster. 
> 
> Any thoughts?
> -- 
> Best Regards,
> Ayan Guha
> -- 
> Best Regards,
> Ayan Guha



Re: Need help regarding logging / log4j.properties

2019-10-31 Thread Roland Johann
Hi Debu,

you need to define spark config properties before the jar file path at
spark-submit. Everything after the jar path will be passed as arguments to
your application.

Best Regards

Debabrata Ghosh  schrieb am Do. 31. Okt. 2019 um
03:26:

> Greetings All !
>
> I needed some help in obtaining the application logs but I am really
> confused where it's currently located. Please allow me to explain my
> problem:
> 1. I am running the Spark application (written in Java) in a Hortonworks
> Data Platform Hadoop cluster
>
> 2. My spark-submit command is the following:
> spark-submit --class com.ibm.spark.sparkkafkamapper.ServiceMapperMain
> --master yarn --packages 
> org.apache.kafka:kafka-clients:0.10.0.1,org.apache.spark:spark-streaming-kafka-0-10_2.11:2.3.0
> --deploy-mode client --files
> job-configuration.properties,log4j.properties,/usr/hdp/current/spark2-client/conf/hive-site.xml
> ./spark-job.jar --settings=job-configuration.properties
> log4j-spark.properties --conf
> "spark.driver.extraJavaOptions=-Dlog4j.configuration=log4j.properties"
> --conf
> "spark.executor.extraJavaOptions=-Dlog4j.configuration=log4j.properties"
>
> 3. Content of the log4.properties is the following:
> # Set everything to be logged to the console
> log4j.rootCategory=ALL, console
> log4j.appender.console=org.apache.log4j.ConsoleAppender
> log4j.appender.console.target=System.err
> log4j.appender.console.layout=org.apache.log4j.PatternLayout
> log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p
> %c{1}: %m%n
>
>  # Set the default spark-shell log level to WARN. When running the
> spark-shell, the
> # log level for this class is used to overwrite the root logger's log
> level, so that
> # the user can have different defaults for the shell and regular Spark
> apps.
> log4j.logger.org.apache.spark.repl.Main=WARN
>
> # Settings to quiet third party logs that are too verbose
> log4j.logger.org.spark_project.jetty=ALL
> log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
> log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
> log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
> log4j.logger.org.apache.parquet=ERROR
> log4j.logger.parquet=ERROR
>
>  # SPARK-9183: Settings to avoid annoying messages when looking up
> nonexistent UDFs in SparkSQL with Hive support
> log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
> log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
>
> #Any custom class debug
> log4j.logger.com.vmeg.code=DEBUG
>
> 4. Inside the spark application, I am having plenty of logger functions
> and one example is like this:
> logger.info("No dataset recieved for this batch");
>
> Question: My question is where is the application log files being written
> ? I have checked in the yarn logs but couldn't find the messages I have
> written in the java file.
> Request your help please as I am little confused and know that there is
> something very silly which I am missing.
>
> Thanks in advance !
>
> Debu
>
-- 


*Roland Johann*Software Developer/Data Engineer

*phenetic GmbH*
Lütticher Straße 10, 50674 Köln, Germany

Mobil: +49 172 365 26 46
Mail: roland.joh...@phenetic.io
Web: phenetic.io

Handelsregister: Amtsgericht Köln (HRB 92595)
Geschäftsführer: Roland Johann, Uwe Reimann


Re: Spark job fails because of timeout to Driver

2019-10-04 Thread Roland Johann
Hi Jochen,

Can you crate a small EMR cluster wirh all defaults and rhn the job there?
This way we can ensure that the issue is not infrastructure and YARN
configuration related.

Kind regards

Jochen Hebbrecht  schrieb am Fr. 4. Okt. 2019 um
19:27:

> Hi Roland,
>
> I switched to the default security groups, ran my job again but the same
> exception pops up :-( ...
> All traffic is open on the security groups now.
>
> Jochen
>
> Op vr 4 okt. 2019 om 17:37 schreef Roland Johann <
> roland.joh...@phenetic.io>:
>
>> This are dynamic port ranges and dependa on configuration of your
>> cluster. Per job there is a separate application master so there can‘t be
>> just one port.
>> If I remeber correctly the default EMR setup creates worker security
>> groups with unrestricted traffic within the group, e.g. Between the worker
>> nodes.
>> Depending on your security requirements I suggest that you start with a
>>  default like setup and determine ports and port ranges from the docs
>> afterwards to further restrict traffic between the nodes.
>>
>> Kind regards
>>
>> Jochen Hebbrecht  schrieb am Fr. 4. Okt. 2019
>> um 17:16:
>>
>>> Hi Roland,
>>>
>>> We have indeed custom security groups. Can you tell me where exactly I
>>> need to be able to access what?
>>> For example, is it from the master instance to the driver instance? And
>>> which port should be open?
>>>
>>> Jochen
>>>
>>> Op vr 4 okt. 2019 om 17:14 schreef Roland Johann <
>>> roland.joh...@phenetic.io>:
>>>
>>>> Ho Jochen,
>>>>
>>>> did you setup the EMR cluster with custom security groups? Can you
>>>> confirm that the relevant EC2 instances can connect through relevant ports?
>>>>
>>>> Best regards
>>>>
>>>> Jochen Hebbrecht  schrieb am Fr. 4. Okt.
>>>> 2019 um 17:09:
>>>>
>>>>> Hi Jeff,
>>>>>
>>>>> Thanks! Just tried that, but the same timeout occurs :-( ...
>>>>>
>>>>> Jochen
>>>>>
>>>>> Op vr 4 okt. 2019 om 16:37 schreef Jeff Zhang :
>>>>>
>>>>>> You can try to increase property spark.yarn.am.waitTime (by default
>>>>>> it is 100s)
>>>>>> Maybe you are doing some very time consuming operation when
>>>>>> initializing SparkContext, which cause timeout.
>>>>>>
>>>>>> See this property here
>>>>>> http://spark.apache.org/docs/latest/running-on-yarn.html
>>>>>>
>>>>>>
>>>>>> Jochen Hebbrecht  于2019年10月4日周五 下午10:08写道:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I'm using Spark 2.4.2 on AWS EMR 5.24.0. I'm trying to send a Spark
>>>>>>> job towards the cluster. Thhe job gets accepted, but the YARN 
>>>>>>> application
>>>>>>> fails with:
>>>>>>>
>>>>>>>
>>>>>>> {code}
>>>>>>> 19/09/27 14:33:35 ERROR ApplicationMaster: Uncaught exception:
>>>>>>> java.util.concurrent.TimeoutException: Futures timed out after
>>>>>>> [10 milliseconds]
>>>>>>> at
>>>>>>> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:223)
>>>>>>> at
>>>>>>> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:227)
>>>>>>> at
>>>>>>> org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:220)
>>>>>>> at
>>>>>>> org.apache.spark.deploy.yarn.ApplicationMaster.runDriver(ApplicationMaster.scala:468)
>>>>>>> at org.apache.spark.deploy.yarn.ApplicationMaster.org
>>>>>>> $apache$spark$deploy$yarn$ApplicationMaster$$runImpl(ApplicationMaster.scala:305)
>>>>>>> at
>>>>>>> org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply$mcV$sp(ApplicationMaster.scala:245)
>>>>>>> at
>>>>>>> org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply(ApplicationMaster.scala:245)
>>>>>>> at
>>>>>>> org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply(ApplicationMaster.scala:245)
>>>>>>> at
>>>>>>> org.apache.s

Re: Spark job fails because of timeout to Driver

2019-10-04 Thread Roland Johann
This are dynamic port ranges and dependa on configuration of your cluster.
Per job there is a separate application master so there can‘t be just one
port.
If I remeber correctly the default EMR setup creates worker security groups
with unrestricted traffic within the group, e.g. Between the worker nodes.
Depending on your security requirements I suggest that you start with a
 default like setup and determine ports and port ranges from the docs
afterwards to further restrict traffic between the nodes.

Kind regards

Jochen Hebbrecht  schrieb am Fr. 4. Okt. 2019 um
17:16:

> Hi Roland,
>
> We have indeed custom security groups. Can you tell me where exactly I
> need to be able to access what?
> For example, is it from the master instance to the driver instance? And
> which port should be open?
>
> Jochen
>
> Op vr 4 okt. 2019 om 17:14 schreef Roland Johann <
> roland.joh...@phenetic.io>:
>
>> Ho Jochen,
>>
>> did you setup the EMR cluster with custom security groups? Can you
>> confirm that the relevant EC2 instances can connect through relevant ports?
>>
>> Best regards
>>
>> Jochen Hebbrecht  schrieb am Fr. 4. Okt. 2019
>> um 17:09:
>>
>>> Hi Jeff,
>>>
>>> Thanks! Just tried that, but the same timeout occurs :-( ...
>>>
>>> Jochen
>>>
>>> Op vr 4 okt. 2019 om 16:37 schreef Jeff Zhang :
>>>
>>>> You can try to increase property spark.yarn.am.waitTime (by default it
>>>> is 100s)
>>>> Maybe you are doing some very time consuming operation when
>>>> initializing SparkContext, which cause timeout.
>>>>
>>>> See this property here
>>>> http://spark.apache.org/docs/latest/running-on-yarn.html
>>>>
>>>>
>>>> Jochen Hebbrecht  于2019年10月4日周五 下午10:08写道:
>>>>
>>>>> Hi,
>>>>>
>>>>> I'm using Spark 2.4.2 on AWS EMR 5.24.0. I'm trying to send a Spark
>>>>> job towards the cluster. Thhe job gets accepted, but the YARN application
>>>>> fails with:
>>>>>
>>>>>
>>>>> {code}
>>>>> 19/09/27 14:33:35 ERROR ApplicationMaster: Uncaught exception:
>>>>> java.util.concurrent.TimeoutException: Futures timed out after [10
>>>>> milliseconds]
>>>>> at
>>>>> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:223)
>>>>> at
>>>>> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:227)
>>>>> at
>>>>> org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:220)
>>>>> at
>>>>> org.apache.spark.deploy.yarn.ApplicationMaster.runDriver(ApplicationMaster.scala:468)
>>>>> at org.apache.spark.deploy.yarn.ApplicationMaster.org
>>>>> $apache$spark$deploy$yarn$ApplicationMaster$$runImpl(ApplicationMaster.scala:305)
>>>>> at
>>>>> org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply$mcV$sp(ApplicationMaster.scala:245)
>>>>> at
>>>>> org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply(ApplicationMaster.scala:245)
>>>>> at
>>>>> org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply(ApplicationMaster.scala:245)
>>>>> at
>>>>> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$3.run(ApplicationMaster.scala:779)
>>>>> at java.security.AccessController.doPrivileged(Native Method)
>>>>> at javax.security.auth.Subject.doAs(Subject.java:422)
>>>>> at
>>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
>>>>> at
>>>>> org.apache.spark.deploy.yarn.ApplicationMaster.doAsUser(ApplicationMaster.scala:778)
>>>>> at
>>>>> org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:244)
>>>>> at
>>>>> org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:803)
>>>>> at
>>>>> org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala)
>>>>> 19/09/27 14:33:35 INFO ApplicationMaster: Final app status: FAILED,
>>>>> exitCode: 13, (reason: Uncaught exception:
>>>>> java.util.concurrent.TimeoutException: Futures timed out after [10
>>>>> milliseconds]
>>>>> at
>>>>> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:223)
>>>>> at
>>>>> scala

Re: Spark job fails because of timeout to Driver

2019-10-04 Thread Roland Johann
Ho Jochen,

did you setup the EMR cluster with custom security groups? Can you confirm
that the relevant EC2 instances can connect through relevant ports?

Best regards

Jochen Hebbrecht  schrieb am Fr. 4. Okt. 2019 um
17:09:

> Hi Jeff,
>
> Thanks! Just tried that, but the same timeout occurs :-( ...
>
> Jochen
>
> Op vr 4 okt. 2019 om 16:37 schreef Jeff Zhang :
>
>> You can try to increase property spark.yarn.am.waitTime (by default it
>> is 100s)
>> Maybe you are doing some very time consuming operation when initializing
>> SparkContext, which cause timeout.
>>
>> See this property here
>> http://spark.apache.org/docs/latest/running-on-yarn.html
>>
>>
>> Jochen Hebbrecht  于2019年10月4日周五 下午10:08写道:
>>
>>> Hi,
>>>
>>> I'm using Spark 2.4.2 on AWS EMR 5.24.0. I'm trying to send a Spark job
>>> towards the cluster. Thhe job gets accepted, but the YARN application fails
>>> with:
>>>
>>>
>>> {code}
>>> 19/09/27 14:33:35 ERROR ApplicationMaster: Uncaught exception:
>>> java.util.concurrent.TimeoutException: Futures timed out after [10
>>> milliseconds]
>>> at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:223)
>>> at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:227)
>>> at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:220)
>>> at
>>> org.apache.spark.deploy.yarn.ApplicationMaster.runDriver(ApplicationMaster.scala:468)
>>> at org.apache.spark.deploy.yarn.ApplicationMaster.org
>>> $apache$spark$deploy$yarn$ApplicationMaster$$runImpl(ApplicationMaster.scala:305)
>>> at
>>> org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply$mcV$sp(ApplicationMaster.scala:245)
>>> at
>>> org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply(ApplicationMaster.scala:245)
>>> at
>>> org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply(ApplicationMaster.scala:245)
>>> at
>>> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$3.run(ApplicationMaster.scala:779)
>>> at java.security.AccessController.doPrivileged(Native Method)
>>> at javax.security.auth.Subject.doAs(Subject.java:422)
>>> at
>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
>>> at
>>> org.apache.spark.deploy.yarn.ApplicationMaster.doAsUser(ApplicationMaster.scala:778)
>>> at
>>> org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:244)
>>> at
>>> org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:803)
>>> at
>>> org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala)
>>> 19/09/27 14:33:35 INFO ApplicationMaster: Final app status: FAILED,
>>> exitCode: 13, (reason: Uncaught exception:
>>> java.util.concurrent.TimeoutException: Futures timed out after [10
>>> milliseconds]
>>> at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:223)
>>> at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:227)
>>> at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:220)
>>> at
>>> org.apache.spark.deploy.yarn.ApplicationMaster.runDriver(ApplicationMaster.scala:468)
>>> at org.apache.spark.deploy.yarn.ApplicationMaster.org
>>> $apache$spark$deploy$yarn$ApplicationMaster$$runImpl(ApplicationMaster.scala:305)
>>> at
>>> org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply$mcV$sp(ApplicationMaster.scala:245)
>>> at
>>> org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply(ApplicationMaster.scala:245)
>>> at
>>> org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply(ApplicationMaster.scala:245)
>>> at
>>> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$3.run(ApplicationMaster.scala:779)
>>> at java.security.AccessController.doPrivileged(Native Method)
>>> at javax.security.auth.Subject.doAs(Subject.java:422)
>>> at
>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
>>> at
>>> org.apache.spark.deploy.yarn.ApplicationMaster.doAsUser(ApplicationMaster.scala:778)
>>> at
>>> org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:244)
>>> at
>>> org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:803)
>>> at
>>> org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala)
>>> {code}
>

Re: PGP Encrypt using spark Scala

2019-08-26 Thread Roland Johann
I want to add that the major hadoop distributions also offer additional 
encryption possibilities (for example Ranger from Hortonworks)

Roland Johann
Software Developer/Data Engineer

phenetic GmbH
Lütticher Straße 10, 50674 Köln, Germany

Mobil: +49 172 365 26 46
Mail: roland.joh...@phenetic.io
Web: phenetic.io

Handelsregister: Amtsgericht Köln (HRB 92595)
Geschäftsführer: Roland Johann, Uwe Reimann



> Am 26.08.2019 um 23:09 schrieb Roland Johann :
> 
> Hi all,
> 
> instead of handling encryption explicit at application level, I suggest that 
> you investigate into the topic „encryption at rest“, for example encryption 
> at HDFS level 
> https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/TransparentEncryption.html
>  
> <https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/TransparentEncryption.html>
>  - obviously if you don’t have to use PGP. Using encryption at the storage 
> layer simplifies your application and architecture and you don’t need to 
> reinvent the wheel.
> 
> Kind Regards
> 
> Roland Johann
> Software Developer/Data Engineer
> 
> phenetic GmbH
> Lütticher Straße 10, 50674 Köln, Germany
> 
> Mobil: +49 172 365 26 46
> Mail: roland.joh...@phenetic.io <mailto:roland.joh...@phenetic.io>
> Web: phenetic.io <http://phenetic.io/>
> 
> Handelsregister: Amtsgericht Köln (HRB 92595)
> Geschäftsführer: Roland Johann, Uwe Reimann
> 
> 
> 
>> Am 26.08.2019 um 17:47 schrieb Sachit Murarka > <mailto:connectsac...@gmail.com>>:
>> 
>> Hi Deepak,
>> 
>> Thanks for reply.
>> 
>> Yes. That is the option I am considering now because even apache camel needs 
>> data in local.  I might need to copy data from hdfs to local if I want 
>> apache camel ( to get rid of shell).
>> 
>> Thanks
>> Sachit
>> 
>> On Mon, 26 Aug 2019, 21:11 Deepak Sharma, > <mailto:deepakmc...@gmail.com>> wrote:
>> Hi Schit
>> PGP Encrypt is something that is not inbuilt with spark.
>> I would suggest writing a shell script that would do pgp encrypt and use it 
>> in spark scala program , which would run from driver.
>> 
>> Thanks
>> Deepak
>> 
>> On Mon, Aug 26, 2019 at 8:10 PM Sachit Murarka > <mailto:connectsac...@gmail.com>> wrote:
>> Hi All,
>> 
>> I want to encrypt my files available at HDFS location using PGP Encryption
>> How can I do it in spark. I saw Apache Camel  but it seems camel is used 
>> when source files are in Local location rather than HDFS.
>> 
>> Kind Regards,
>> Sachit Murarka
>> 
>> 
>> -- 
>> Thanks
>> Deepak
>> www.bigdatabig.com <http://www.bigdatabig.com/>
>> www.keosha.net <http://www.keosha.net/>



Re: PGP Encrypt using spark Scala

2019-08-26 Thread Roland Johann
Hi all,

instead of handling encryption explicit at application level, I suggest that 
you investigate into the topic „encryption at rest“, for example encryption at 
HDFS level 
https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/TransparentEncryption.html
 
<https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/TransparentEncryption.html>
 - obviously if you don’t have to use PGP. Using encryption at the storage 
layer simplifies your application and architecture and you don’t need to 
reinvent the wheel.

Kind Regards

Roland Johann
Software Developer/Data Engineer

phenetic GmbH
Lütticher Straße 10, 50674 Köln, Germany

Mobil: +49 172 365 26 46
Mail: roland.joh...@phenetic.io
Web: phenetic.io

Handelsregister: Amtsgericht Köln (HRB 92595)
Geschäftsführer: Roland Johann, Uwe Reimann



> Am 26.08.2019 um 17:47 schrieb Sachit Murarka :
> 
> Hi Deepak,
> 
> Thanks for reply.
> 
> Yes. That is the option I am considering now because even apache camel needs 
> data in local.  I might need to copy data from hdfs to local if I want apache 
> camel ( to get rid of shell).
> 
> Thanks
> Sachit
> 
> On Mon, 26 Aug 2019, 21:11 Deepak Sharma,  <mailto:deepakmc...@gmail.com>> wrote:
> Hi Schit
> PGP Encrypt is something that is not inbuilt with spark.
> I would suggest writing a shell script that would do pgp encrypt and use it 
> in spark scala program , which would run from driver.
> 
> Thanks
> Deepak
> 
> On Mon, Aug 26, 2019 at 8:10 PM Sachit Murarka  <mailto:connectsac...@gmail.com>> wrote:
> Hi All,
> 
> I want to encrypt my files available at HDFS location using PGP Encryption
> How can I do it in spark. I saw Apache Camel  but it seems camel is used when 
> source files are in Local location rather than HDFS.
> 
> Kind Regards,
> Sachit Murarka
> 
> 
> -- 
> Thanks
> Deepak
> www.bigdatabig.com <http://www.bigdatabig.com/>
> www.keosha.net <http://www.keosha.net/>


Re: [External]Re: error while connecting to azure blob storage

2019-08-23 Thread Roland Johann
https://spark.apache.org/docs/latest/submitting-applications.html#advanced-dependency-management
 
<https://spark.apache.org/docs/latest/submitting-applications.html#advanced-dependency-management>
Try to add the azure dependency via `—packages 
org.apache.hadoop:hadoop-azure:2.7.7` assuming you use hadoop 2.7.7.

Best Regards

Roland Johann
Software Developer/Data Engineer

phenetic GmbH
Lütticher Straße 10, 50674 Köln, Germany

Mobil: +49 172 365 26 46
Mail: roland.joh...@phenetic.io
Web: phenetic.io

Handelsregister: Amtsgericht Köln (HRB 92595)
Geschäftsführer: Roland Johann, Uwe Reimann



> Am 23.08.2019 um 10:08 schrieb Krishna Chandran Nair 
> :
> 
> Please find the attached error
>  
> From: Roland Johann  
> Sent: 23 August 2019 10:51 AM
> To: Krishna Chandran Nair 
> Cc: user@spark.apache.org
> Subject: [External]Re: error while connecting to azure blob storage
>  
> Hi Krishna,
>  
> there seems to be no attachment.
> In addition, you should NEVER post private credentials to public forums. 
> Please renew the credentials of your storage account as soon as possible!
>  
> Best Regards
> 
> Roland Johann
> Software Developer/Data Engineer
> 
> phenetic GmbH
> Lütticher Straße 10, 50674 Köln, Germany
> 
> Mobil: +49 172 365 26 46
> Mail: roland.joh...@phenetic.io <mailto:roland.joh...@phenetic.io>
> Web: phenetic.io <http://phenetic.io/>
> 
> Handelsregister: Amtsgericht Köln (HRB 92595)
> Geschäftsführer: Roland Johann, Uwe Reimann
>  
>  
> 
> 
> Am 23.08.2019 um 08:33 schrieb Krishna Chandran Nair 
> mailto:kcn...@qatarairways.com.qa>>:
>  
>  
>  
> Hi Team,
>  
> I have written a small code to connect to azure blob storage but go error. I 
> have attached the error log.  Please help
>  
> Calling command -- ./spark-submit stg.py --jars 
> /home/citus/spark/spark-2.3.3-bin-hadoop2.7/jars/hadoop-azure-3.2.0.jar,/home/citus/spark/spark-2.3.3-bin-hadoop2.7/jars/azure-storage-8.4.0.jar
>  
> Code
>  
>  
> vi ~/spark/spark-2.3.3-bin-hadoop2.7/bin/stg.py
>  
> from pyspark import SparkContext
> from pyspark.sql import SparkSession
> from pyspark.sql import DataFrameReader
> from pyspark.sql import SparkSession
>  
> session = SparkSession.builder.getOrCreate()
>  
>  
> #session.conf.set("fs.azure", 
> "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
> #session.conf.set("fs.hdfs.impl", 
> "org.apache.hadoop.hdfs.DistributedFileSystem")
> #session.conf.set("fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem")
>  
>  
>  
> #session.conf.set(
> #   "fs.azure.sas.snowflakestrg.blob.core.windows.net/test",
>   #  
> "?sv=2018-03-28=bfqt=sco=rwdlacup=2020-01-01T16:37:05Z=2019-08-13T08:37:05Z=https=BgTl8mibE%2B%2BTTIMG4dKR17NnGinMWEVTtn888MD8PT4%3D"
> #)
>  
> session.conf.set(
>   "fs.azure.account.key.snowflakestrg.blob.core.windows.net",
> 
> "LIWCYzrJOS4hs0DiQH6fAzjuBnuj/F8myVmJImomEqOqlAV4pSt7KWfr24mj0saaOTVNZkGTKUn41k4e9hqKSA==")
>  
> df=session.read.csv("wasbs://t...@snowflakestrg.blob.core.windows.net/users.csv
>  ")
>  
> df.show(5)
>  
>  
> Qatar Airways - Going Places Together 
> 
> 
> 
> Disclaimer:- This message (including attachments) is intended solely for the 
> addressee named above. It may be confidential, privileged, subject to 
> copyright, trade secret, or other legal rules and may not be forwarded 
> without the author's permission. If you are not the addressee you must not 
> read, copy or disseminate this message. If you have received it in error 
> please notify the sender immediately and delete the message from all storage 
> devices. Any opinions expressed in this message do not necessarily represent 
> the official positions of Qatar Airways. Any agreements (including any 
> warranties, representations, or offers) concluded with Qatar Airways by using 
> electronic correspondence shall only come into existence if an authorized 
> representative of Qatar Airways has explicitly approved such contract 
> formation. To the fullest extent permissible by law, Qatar Airways disclaim 
> all liability for loss or damage to person or property arising from this 
> message being infected by computer virus or other contamination.
>  
> 



Re: error while connecting to azure blob storage

2019-08-23 Thread Roland Johann
Hi Krishna,

there seems to be no attachment.
In addition, you should NEVER post private credentials to public forums. Please 
renew the credentials of your storage account as soon as possible!

Best Regards

Roland Johann
Software Developer/Data Engineer

phenetic GmbH
Lütticher Straße 10, 50674 Köln, Germany

Mobil: +49 172 365 26 46
Mail: roland.joh...@phenetic.io
Web: phenetic.io

Handelsregister: Amtsgericht Köln (HRB 92595)
Geschäftsführer: Roland Johann, Uwe Reimann



> Am 23.08.2019 um 08:33 schrieb Krishna Chandran Nair 
> :
> 
>  
>  
> Hi Team,
>  
> I have written a small code to connect to azure blob storage but go error. I 
> have attached the error log.  Please help
>  
> Calling command -- ./spark-submit stg.py --jars 
> /home/citus/spark/spark-2.3.3-bin-hadoop2.7/jars/hadoop-azure-3.2.0.jar,/home/citus/spark/spark-2.3.3-bin-hadoop2.7/jars/azure-storage-8.4.0.jar
>  
> Code
>  
>  
> vi ~/spark/spark-2.3.3-bin-hadoop2.7/bin/stg.py
>  
> from pyspark import SparkContext
> from pyspark.sql import SparkSession
> from pyspark.sql import DataFrameReader
> from pyspark.sql import SparkSession
>  
> session = SparkSession.builder.getOrCreate()
>  
>  
> #session.conf.set("fs.azure", 
> "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
> #session.conf.set("fs.hdfs.impl", 
> "org.apache.hadoop.hdfs.DistributedFileSystem")
> #session.conf.set("fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem")
>  
>  
>  
> #session.conf.set(
> #   "fs.azure.sas.snowflakestrg.blob.core.windows.net/test",
>   #  
> "?sv=2018-03-28=bfqt=sco=rwdlacup=2020-01-01T16:37:05Z=2019-08-13T08:37:05Z=https=BgTl8mibE%2B%2BTTIMG4dKR17NnGinMWEVTtn888MD8PT4%3D"
> #)
>  
> session.conf.set(
>   "fs.azure.account.key.snowflakestrg.blob.core.windows.net",
> 
> "LIWCYzrJOS4hs0DiQH6fAzjuBnuj/F8myVmJImomEqOqlAV4pSt7KWfr24mj0saaOTVNZkGTKUn41k4e9hqKSA==")
>  
> df=session.read.csv("wasbs://t...@snowflakestrg.blob.core.windows.net/users.csv
>  ")
>  
> df.show(5)
>  
>  
> Qatar Airways - Going Places Together 
> 
> 
> 
> Disclaimer:- This message (including attachments) is intended solely for the 
> addressee named above. It may be confidential, privileged, subject to 
> copyright, trade secret, or other legal rules and may not be forwarded 
> without the author's permission. If you are not the addressee you must not 
> read, copy or disseminate this message. If you have received it in error 
> please notify the sender immediately and delete the message from all storage 
> devices. Any opinions expressed in this message do not necessarily represent 
> the official positions of Qatar Airways. Any agreements (including any 
> warranties, representations, or offers) concluded with Qatar Airways by using 
> electronic correspondence shall only come into existence if an authorized 
> representative of Qatar Airways has explicitly approved such contract 
> formation. To the fullest extent permissible by law, Qatar Airways disclaim 
> all liability for loss or damage to person or property arising from this 
> message being infected by computer virus or other contamination.