Spark Stremaing - Dstreams - Removing RDD

2020-07-27 Thread forece85
We are using Spark Streaming (Dstreams) with Kinesis batch interval as 10sec.
For every random batch, processing time is taking very long. While checking
logs, we found below log lines when ever we are getting spike in processing
time:

 

Processing Time:

 

We dont have any code to manually remove RDDs. How to get rid of these?

Thanks in Advance.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Lazy Spark Structured Streaming

2020-07-27 Thread Jungtaek Lim
I'm not sure what exactly your problem is, but given you've mentioned
window and OutputMode.Append, you may want to remind that append mode
doesn't produce the output of aggregation unless the watermark "passes by".
It's expected behavior if you're seeing lazy outputs on OutputMode.Append
compared to OutputMode.Update.

Unfortunately there's no mechanism on SSS to move forward only watermark
without actual input, so if you want to test some behavior on
OutputMode.Append you would need to add a dummy record to move watermark
forward.

Hope this helps.

Thanks,
Jungtaek Lim (HeartSaVioR)

On Mon, Jul 27, 2020 at 8:10 PM Phillip Henry 
wrote:

> Sorry, should have mentioned that Spark only seems reluctant to take the
> last windowed, groupBy batch from Kafka when using OutputMode.Append.
>
> I've asked on StackOverflow:
>
> https://stackoverflow.com/questions/62915922/spark-structured-streaming-wont-pull-the-final-batch-from-kafka
> but am still struggling. Can anybody please help?
>
> How do people test their SSS code if you have to put a message on Kafka to
> get Spark to consume a batch?
>
> Kind regards,
>
> Phillip
>
>
> On Sun, Jul 12, 2020 at 4:55 PM Phillip Henry 
> wrote:
>
>> Hi, folks.
>>
>> I noticed that SSS won't process a waiting batch if there are no batches
>> after that. To put it another way, Spark must always leave one batch on
>> Kafka waiting to be consumed.
>>
>> There is a JIRA for this at:
>>
>> https://issues.apache.org/jira/browse/SPARK-24156
>>
>> that says it's resolved in 2.4.0 but my code
>> 
>> is using 2.4.2 yet I still see Spark reluctant to consume another batch
>> from Kafka if it means there is nothing else waiting to be processed in the
>> topic.
>>
>> Do I have to do something special to exploit the behaviour that
>> SPARK-24156 says it has addressed?
>>
>> Regards,
>>
>> Phillip
>>
>>
>>
>>


How to map DataSet row to Struct in java?

2020-07-27 Thread anuragDada
When i try to use this below code getting this error :*Exception in thread
"main" org.apache.spark.sql.AnalysisException: Generators are not supported
when it's nested in expressions, but got:
generatorouter(explode(generatorouter(explode(json;*StructType schema =
DataTypes.createStructType( new StructField[] {
DataTypes.createStructField("AckedState", DataTypes.StringType, true),  

DataTypes.createStructField("ConfirmedState", DataTypes.StringType, true),  

DataTypes.createStructField("Time", DataTypes.DoubleType, true),

DataTypes.createStructField("ActiveState", DataTypes.StringType, true), 

DataTypes.createStructField("SourceNode", DataTypes.StringType, true),  

DataTypes.createStructField("SourceName", DataTypes.StringType, true),  

DataTypes.createStructField("Message", DataTypes.StringType, true), 

DataTypes.createStructField("Severity", DataTypes.IntegerType, true) });
Dataset df =
sparkSession.readStream().format("kafka").option("kafka.bootstrap.servers",
brokers).option("kafka.ssl.keystore.location",
keystore).option("kafka.security.protocol", "SASL_SSL") 
.option("kafka.ssl.keystore.password",
"abc").option("kafka.ssl.truststore.location", truststore)  
.option("kafka.ssl.truststore.password",
"abc").option("kafka.sasl.jaas.config", jaasCfg)
.option("kafka.sasl.mechanism", "PLAIN").option("group.id", consumerGroupId)

.option("value.serializer", serializer).option("key.serializer", serializer)

.option("session.timeout.ms", "3").option("subscribe", "alerts5").load()

.selectExpr("CAST(value AS STRING) as json");   df.printSchema();   
Column col
= new Column("json");   Dataset data = 
df.select(functions.from_json(col,
schema).as("json"));data.printSchema(); 
   Dataset results =
data.select(functions.explode_outer(functions.explode_outer(new
Column("json";results.printSchema(); 
results.show(); 
results.writeStream().format("console").option("truncate",   
false).start().awaitTermination();i am using function.from_json



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

How to map DataSet row to Struct in java?

2020-07-27 Thread anuragDada
When i try to use this below code getting this error :
*Exception in thread "main" org.apache.spark.sql.AnalysisException:
Generators are not supported when it's nested in expressions, but got:
generatorouter(explode(generatorouter(explode(json;
*


StructType schema = DataTypes.createStructType(
new StructField[] { 
DataTypes.createStructField("AckedState",
DataTypes.StringType, true),

DataTypes.createStructField("ConfirmedState", DataTypes.StringType,
true),

DataTypes.createStructField("Time", DataTypes.DoubleType, true),

DataTypes.createStructField("ActiveState", DataTypes.StringType,
true),

DataTypes.createStructField("SourceNode", DataTypes.StringType, true),

DataTypes.createStructField("SourceName", DataTypes.StringType, true),

DataTypes.createStructField("Message", DataTypes.StringType, true),

DataTypes.createStructField("Severity", DataTypes.IntegerType, true)
});


Dataset df =
sparkSession.readStream().format("kafka").option("kafka.bootstrap.servers",
brokers)
.option("kafka.ssl.keystore.location",
keystore).option("kafka.security.protocol", "SASL_SSL")
.option("kafka.ssl.keystore.password",
"abc").option("kafka.ssl.truststore.location", truststore)
.option("kafka.ssl.truststore.password",
"abc").option("kafka.sasl.jaas.config", jaasCfg)
.option("kafka.sasl.mechanism", 
"PLAIN").option("group.id",
consumerGroupId)
.option("value.serializer", 
serializer).option("key.serializer",
serializer)
.option("session.timeout.ms", 
"3").option("subscribe",
"alerts5").load()
.selectExpr("CAST(value AS STRING) as json");


df.printSchema();


Column col = new Column("json");


Dataset data = df.select(functions.from_json(col,
schema).as("json"));
data.printSchema();



   Dataset results =
data.select(functions.explode_outer(functions.explode_outer(new
Column("json";

  results.printSchema(); results.show();
  results.writeStream().format("console").option("truncate",
  false).start().awaitTermination();

i am using function.from_json



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark memory distribution

2020-07-27 Thread dben
Hi,
I'm having a computation held on top of a big dynamic model that is
constantly having changes / online updates, therefore, thought that working
in batch mode (stateless): s.t. requires of heavy model sent to spark will
be less appropriate than working in stream mode.
Therefore, was able to have computations in stream mode and have the
model living in spark and getting live updates required.
However, as the main motivation for using spark was complexity
reduction/compute speedup given a distributed algorithm. State management is
evidently a challenging task in terms of memory resources which i don't want
to overwhelm/disrupt computation
My main Q is: as effective spark may be computation wise (using a
corresponding distributed algorithm) is there any grade of control for user
with is memory footprint?
e.g.: Is splitting its memory between its workers is feasible? or all memory
is eventually centralized to spark master (sometime driver, depending on
work mode: client or cluster)
I'm basically looking for a way to scale out memory-wise and not just
compute wise...
is the transformation of a centric data-structure into RDDs (which I'm using
in compute) may relieve/distribute memory footprint as well?
for example: can i split my main memory data structure into set of clusters
assigned to each worker etc?
thanks a lot,
David



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Secrets in Spark apps

2020-07-27 Thread Dávid Szakállas
Hi folks,

Do you know what’s the best method to passing secrets to Spark operations, for 
e.g doing encryption, salting with a secret before hashing etc.?
I have multiple ideas on top of my head

The secret's source:
- environment variable
- config property
- remote service accessed through an API.

Passing to the executors:
1. Driver resolves the secret
   a. it passes it to the encryption function as an argument, which ends up as 
an argument to a UDF/gets interpolated in the expression’s generated code.
   b. it passes it to the encryption function as a literal expression. For 
security, I can create a SecretLiteral expression that redacts it from the 
pretty printed and SQL versions. Are there any other concerns here?

2. Executors resolves the secret
   a. e.g. reads it from an env/config/service, only the env var name/property 
name/path/URI is passed as part of the plan. I need to cache the secret on the 
executor to prevent a performance hit especially in the remote service case.
   b. Similarly to (1.b), I can create an expression that resolves the secret 
during execution.

In (1) the secret will be passed as part of the plan, so the RPC connections 
have to be encrypted if the attacker can sniff on the network for secrets. 1.b 
and 2.b is superior for composing with existing expressions, e.g 
`sha1(concat(colToMask, secretLit(“mySecret")))` for masking a column 
deterministically using a cryptographic hash function and a secret salt. (2) 
might involve a more complicated design than (1).

If you can point me to existing work in this space it would be a great help!

Thanks in advance,
David Szakallas






Re: Lazy Spark Structured Streaming

2020-07-27 Thread Phillip Henry
Sorry, should have mentioned that Spark only seems reluctant to take the
last windowed, groupBy batch from Kafka when using OutputMode.Append.

I've asked on StackOverflow:
https://stackoverflow.com/questions/62915922/spark-structured-streaming-wont-pull-the-final-batch-from-kafka
but am still struggling. Can anybody please help?

How do people test their SSS code if you have to put a message on Kafka to
get Spark to consume a batch?

Kind regards,

Phillip


On Sun, Jul 12, 2020 at 4:55 PM Phillip Henry 
wrote:

> Hi, folks.
>
> I noticed that SSS won't process a waiting batch if there are no batches
> after that. To put it another way, Spark must always leave one batch on
> Kafka waiting to be consumed.
>
> There is a JIRA for this at:
>
> https://issues.apache.org/jira/browse/SPARK-24156
>
> that says it's resolved in 2.4.0 but my code
> 
> is using 2.4.2 yet I still see Spark reluctant to consume another batch
> from Kafka if it means there is nothing else waiting to be processed in the
> topic.
>
> Do I have to do something special to exploit the behaviour that
> SPARK-24156 says it has addressed?
>
> Regards,
>
> Phillip
>
>
>
>


Re: test

2020-07-27 Thread Ashley Hoff
Yes, your emails are getting through.

On Mon, Jul 27, 2020 at 6:31 PM Suat Toksöz  wrote:

> user@spark.apache.org
>
> --
>
> Best regards,
>
> *Suat Toksoz*
>


-- 
Kustoms On Silver 


test

2020-07-27 Thread Suat Toksöz
user@spark.apache.org

-- 

Best regards,

*Suat Toksoz*


Re: Apache Spark- Help with email library

2020-07-27 Thread Suat Toksöz
Why I am not able to send my question to the spark email list?
Thanks

On Mon, Jul 27, 2020 at 10:31 AM tianlangstudio
 wrote:

> I use SimpleJavaEmail http://www.simplejavamail.org/#/features   for Send
> email and parse email file. It is awesome and may help you.
>
> 
> 
> 
> 
> 
> 
> TianlangStudio 
> Some of the biggest lies: I will start tomorrow/Others are better than
> me/I am not good enough/I don't have time/This is the way I am
> 
>
>
> --
> 发件人:sn.noufal 
> 发送时间:2020年7月27日(星期一) 08:01
> 收件人:user 
> 主 题:Apache Spark- Help with email library
>
> Hi,
>
> I am looking to send a dataframe as email.How do I do that? Do you have any 
> library with sample.Appreciate your response
>
> Regards,
> Mohamed
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-- 

Best regards,

*Suat Toksoz*


Apache Spark + Python + Pyspark + Kaola

2020-07-27 Thread Suat Toksöz
Hi everyone, I want to ask for guidance for my log analyzer platform idea.
I have an elasticsearch system which collects the logs from different
platforms, and creates alerts. The system writes the alerts to an index on
ES. Also, my alerts are stored in a folder as JSON (multi line format).

The Goals:

   1. Read json folder or ES index as streaming (read in new entry within 5
   min)
   2. Select only alerts that I want to work on ( alert.id = 100 ,
   status=true , ...)
   3. Create a DataFrame + Window for 10 min period
   4. Run a query fro that DataFrame by grupping by IP ( If same IP gets 3
   alerts then show me the result)
   5. All the coding should be in python


The ideas is something like this, my question is how should I proceed to
this task. What are the technologies that I should use?

*Apache Spark + Python + Pyspark + Kaola *can handle this ?

Best regards,

*Suat Toksoz*


回复:Apache Spark- Help with email library

2020-07-27 Thread tianlangstudio
I use SimpleJavaEmail http://www.simplejavamail.org/#/features   for Send email 
and parse email file. It is awesome and may help you.

 
TianlangStudio
Some of the biggest lies: I will start tomorrow/Others are better than me/I am 
not good enough/I don't have time/This is the way I am
 


--
发件人:sn.noufal 
发送时间:2020年7月27日(星期一) 08:01
收件人:user 
主 题:Apache Spark- Help with email library

Hi,
I am looking to send a dataframe as email.How do I do that? Do you have any 
library with sample.Appreciate your response

Regards,
Mohamed


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

github-logo.png
Description: Binary data
<>


51cto-logo.png
Description: Binary data


duxiaomai-logo (1).png
Description: Binary data


iqiyi-logo.png
Description: Binary data


huya-logo.png
Description: Binary data


logo-baidu-220X220.png
Description: Binary data


Guidance

2020-07-27 Thread Suat Toksöz
Hi everyone, I want to ask for guidance for my log analyzer platform idea.
I have an elasticsearch system which collects the logs from different
platforms, and creates alerts. The system writes the alerts to an index on
ES. Also, my alerts are stored in a folder as JSON (multi line format).

The Goals:

   1. Read json folder or ES index as streaming (read in new entry within 5
   min)
   2. Select only alerts that I want to work on ( alert.id = 100 ,
   status=true , ...)
   3. Create a DataFrame + Window for 10 min period
   4. Run a query fro that DataFrame by grupping by IP ( If same IP gets 3
   alerts then show me the result)
   5. All the coding should be in python


The ideas is something like this, my question is how should I proceed to
this task. What are the technologies that I should use?

*Apache Spark + Python + Pyspark + Kaola *can handle this ?

-- 

Best regards,

*Suat Toksoz*