Re: [Spark Streaming]: Save the records that are dropped by watermarking in spark structured streaming

2024-05-08 Thread Mich Talebzadeh
you may consider

- Increase Watermark Retention: Consider increasing the watermark retention
duration. This allows keeping records for a longer period before dropping
them. However, this might increase processing latency and violate
at-least-once semantics if the watermark lags behind real-time.

OR

- Use a separate stream for dropped records: Create a separate streaming
pipeline to process the dropped records. Try:


   - Filter: Filter out records older than the watermark in the main
   pipeline.  say

   resultC = streamingDataFrame.select( \
 col("parsed_value.rowkey").alias("rowkey") \
   , col("parsed_value.timestamp").alias("timestamp") \
   , col("parsed_value.temperature").alias("temperature"))

"""
We work out the window and the AVG(temperature) in the window's
timeframe below
This should return back the following Dataframe as struct

 root
 |-- window: struct (nullable = false)
 ||-- start: timestamp (nullable = true)
 ||-- end: timestamp (nullable = true)
 |-- avg(temperature): double (nullable = true)

"""
resultM = resultC. \
 *withWatermark("timestamp", "5 minutes").* \
 groupBy(window(resultC.timestamp, "5 minutes", "5
minutes")). \
 avg('temperature')

   - Write to Sink: Write the filtered records (dropped records) to a
   separate Kafka topic.
   - Consume and Store: Consume the dropped records topic with another
   streaming job and store them in a Postgres table or S3 using lib


HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Wed, 8 May 2024 at 05:13, Nandha Kumar  wrote:

> Hi Team,
>We are trying to use *spark structured streaming *for our use
> case. We will be joining 2 streaming sources(from kafka topic) with
> watermarks. As time progresses, the records that are prior to the watermark
> timestamp are removed from the state. For our use case, we want to *store
> these dropped records* in some postgres table or s3.
>
> When searching, we found a similar question
> in
> StackOverflow which is unanswered.
> *We would like to know how to store these dropped records due to the
> watermark.*
>


Re: ********Spark streaming issue to Elastic data**********

2024-05-06 Thread Mich Talebzadeh
Hi Kartrick,

Unfortunately Materialised views are not available in Spark as yet. I
raised Jira [SPARK-48117] Spark Materialized Views: Improve Query
Performance and Data Management - ASF JIRA (apache.org)
 as a feature request.

Let me think about another way and revert

HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Mon, 6 May 2024 at 07:54, Karthick Nk  wrote:

> Thanks Mich,
>
> can you please confirm me is my understanding correct?
>
> First, we have to create the materialized view based on the mapping
> details we have by using multiple tables as source(since we have multiple
> join condition from different tables). From the materialised view we can
> stream the view data into elastic index by using cdc?
>
> Thanks in advance.
>
> On Fri, May 3, 2024 at 3:39 PM Mich Talebzadeh 
> wrote:
>
>> My recommendation! is using materialized views (MVs) created in Hive with
>> Spark Structured Streaming and Change Data Capture (CDC) is a good
>> combination for efficiently streaming view data updates in your scenario.
>>
>> HTH
>>
>> Mich Talebzadeh,
>> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
>> London
>> United Kingdom
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* The information provided is correct to the best of my
>> knowledge but of course cannot be guaranteed . It is essential to note
>> that, as with any advice, quote "one test result is worth one-thousand
>> expert opinions (Werner
>> Von Braun
>> )".
>>
>>
>> On Thu, 2 May 2024 at 21:25, Karthick Nk  wrote:
>>
>>> Hi All,
>>>
>>> Requirements:
>>> I am working on the data flow, which will use the view definition(view
>>> definition already defined in schema), there are multiple tables used in
>>> the view definition. Here we want to stream the view data into elastic
>>> index based on if any of the table(used in the view definition) data got
>>> changed.
>>>
>>>
>>> Current flow:
>>> 1. we are inserting id's from the table(which used in the view
>>> definition) into the common table.
>>> 2. From the common table by using the id, we will be streaming the view
>>> data (by using if any of the incomming id is present in the collective id
>>> of all tables used from view definition) by using spark structured
>>> streaming.
>>>
>>>
>>> Issue:
>>> 1. Here we are facing issue - For each incomming id here we running view
>>> definition(so it will read all the data from all the data) and check if any
>>> of the incomming id is present in the collective id's of view result, Due
>>> to which it is taking more memory in the cluster driver and taking more
>>> time to process.
>>>
>>>
>>> I am epxpecting an alternate solution, if we can avoid full scan of view
>>> definition every time, If you have any alternate deisgn flow how we can
>>> achieve the result, please suggest for the same.
>>>
>>>
>>> Note: Also, it will be helpfull, if you can share the details like
>>> community forum or platform to discuss this kind of deisgn related topics,
>>> it will be more helpfull.
>>>
>>


Re: ********Spark streaming issue to Elastic data**********

2024-05-06 Thread Karthick Nk
Thanks Mich,

can you please confirm me is my understanding correct?

First, we have to create the materialized view based on the mapping details
we have by using multiple tables as source(since we have multiple
join condition from different tables). From the materialised view we can
stream the view data into elastic index by using cdc?

Thanks in advance.

On Fri, May 3, 2024 at 3:39 PM Mich Talebzadeh 
wrote:

> My recommendation! is using materialized views (MVs) created in Hive with
> Spark Structured Streaming and Change Data Capture (CDC) is a good
> combination for efficiently streaming view data updates in your scenario.
>
> HTH
>
> Mich Talebzadeh,
> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  Von
> Braun )".
>
>
> On Thu, 2 May 2024 at 21:25, Karthick Nk  wrote:
>
>> Hi All,
>>
>> Requirements:
>> I am working on the data flow, which will use the view definition(view
>> definition already defined in schema), there are multiple tables used in
>> the view definition. Here we want to stream the view data into elastic
>> index based on if any of the table(used in the view definition) data got
>> changed.
>>
>>
>> Current flow:
>> 1. we are inserting id's from the table(which used in the view
>> definition) into the common table.
>> 2. From the common table by using the id, we will be streaming the view
>> data (by using if any of the incomming id is present in the collective id
>> of all tables used from view definition) by using spark structured
>> streaming.
>>
>>
>> Issue:
>> 1. Here we are facing issue - For each incomming id here we running view
>> definition(so it will read all the data from all the data) and check if any
>> of the incomming id is present in the collective id's of view result, Due
>> to which it is taking more memory in the cluster driver and taking more
>> time to process.
>>
>>
>> I am epxpecting an alternate solution, if we can avoid full scan of view
>> definition every time, If you have any alternate deisgn flow how we can
>> achieve the result, please suggest for the same.
>>
>>
>> Note: Also, it will be helpfull, if you can share the details like
>> community forum or platform to discuss this kind of deisgn related topics,
>> it will be more helpfull.
>>
>


Re: ********Spark streaming issue to Elastic data**********

2024-05-03 Thread Mich Talebzadeh
My recommendation! is using materialized views (MVs) created in Hive with
Spark Structured Streaming and Change Data Capture (CDC) is a good
combination for efficiently streaming view data updates in your scenario.

HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Thu, 2 May 2024 at 21:25, Karthick Nk  wrote:

> Hi All,
>
> Requirements:
> I am working on the data flow, which will use the view definition(view
> definition already defined in schema), there are multiple tables used in
> the view definition. Here we want to stream the view data into elastic
> index based on if any of the table(used in the view definition) data got
> changed.
>
>
> Current flow:
> 1. we are inserting id's from the table(which used in the view definition)
> into the common table.
> 2. From the common table by using the id, we will be streaming the view
> data (by using if any of the incomming id is present in the collective id
> of all tables used from view definition) by using spark structured
> streaming.
>
>
> Issue:
> 1. Here we are facing issue - For each incomming id here we running view
> definition(so it will read all the data from all the data) and check if any
> of the incomming id is present in the collective id's of view result, Due
> to which it is taking more memory in the cluster driver and taking more
> time to process.
>
>
> I am epxpecting an alternate solution, if we can avoid full scan of view
> definition every time, If you have any alternate deisgn flow how we can
> achieve the result, please suggest for the same.
>
>
> Note: Also, it will be helpfull, if you can share the details like
> community forum or platform to discuss this kind of deisgn related topics,
> it will be more helpfull.
>


Re: Spark streaming job for kafka transaction does not consume read_committed messages correctly.

2024-04-14 Thread Kidong Lee
Thanks, Mich for your reply.

I agree, it is not so scalable and efficient. But it works correctly for
kafka transaction, and there is no problem with committing offset to kafka
async for now.

I try to tell you some more details about my streaming job.
CustomReceiver does not receive anything from outside and just forward
notice message to run an executor in which kafka consumer will be run.
See my CustomReceiver.

private static class CustomReceiver extends Receiver {

public CustomReceiver() {
super(StorageLevel.MEMORY_AND_DISK_2());
}

@Override
public void onStart() {
new Thread(this::receive).start();
}

private void receive() {
String input = "receiver input " + UUID.randomUUID().toString();
store(input);
}

@Override
public void onStop() {

}
}


Actually, just one Kafka consumer will be run which consumes committed
messages from kafka directly(, which is not so scalable, I think.).
But the main point of this approach which I need is that spark
session needs to be used to save rdd(parallelized consumed messages) to
iceberg table.
Consumed messages will be converted to spark rdd which will be saved to
iceberg table using spark session.

I have tested this spark streaming job with transactional producers which
send several millions of messages. Correctly consumed and saved to iceberg
tables correctly.

- Kidong.



2024년 4월 14일 (일) 오후 11:05, Mich Talebzadeh 님이 작성:

> Interesting
>
> My concern is infinite Loop in* foreachRDD*: The *while(true)* loop
> within foreachRDD creates an infinite loop within each Spark executor. This
> might not be the most efficient approach, especially since offsets are
> committed asynchronously.?
>
> HTH
>
> Mich Talebzadeh,
> Technologist | Solutions Architect | Data Engineer  | Generative AI
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  Von
> Braun )".
>
>
> On Sun, 14 Apr 2024 at 13:40, Kidong Lee  wrote:
>
>>
>> Because spark streaming for kafk transaction does not work correctly to
>> suit my need, I moved to another approach using raw kafka consumer which
>> handles read_committed messages from kafka correctly.
>>
>> My codes look like the following.
>>
>> JavaDStream stream = ssc.receiverStream(new CustomReceiver()); // 
>> CustomReceiver does nothing special except awaking foreach task.
>>
>> stream.foreachRDD(rdd -> {
>>
>>   KafkaConsumer consumer = new 
>> KafkaConsumer<>(consumerProperties);
>>
>>   consumer.subscribe(Arrays.asList(topic));
>>
>>   while(true){
>>
>> ConsumerRecords records =
>> consumer.poll(java.time.Duration.ofMillis(intervalMs));
>>
>> Map offsetMap = new HashMap<>();
>>
>> List someList = new ArrayList<>();
>>
>> for (ConsumerRecord consumerRecord : records) {
>>
>>   // add something to list.
>>
>>   // put offset to offsetMap.
>>
>> }
>>
>> // process someList.
>>
>> // commit offset.
>>
>> consumer.commitAsync(offsetMap, null);
>>
>>   }
>>
>> });
>>
>>
>> In addition, I increased max.poll.records to 10.
>>
>> Even if this raw kafka consumer approach is not so scalable, it consumes
>> read_committed messages from kafka correctly and is enough for me at the
>> moment.
>>
>> - Kidong.
>>
>>
>>
>> 2024년 4월 12일 (금) 오후 9:19, Kidong Lee 님이 작성:
>>
>>> Hi,
>>>
>>> I have a kafka producer which sends messages transactionally to kafka
>>> and spark streaming job which should consume read_committed messages from
>>> kafka.
>>> But there is a problem for spark streaming to consume read_committed
>>> messages.
>>> The count of messages sent by kafka producer transactionally is not the
>>> same to the count of the read_committed messages consumed by spark
>>> streaming.
>>>
>>> Some consumer properties of my spark streaming job are as follows.
>>>
>>> auto.offset.reset=earliest
>>> enable.auto.commit=false
>>> isolation.level=read_committed
>>>
>>>
>>> I also added the following spark streaming configuration.
>>>
>>> sparkConf.set("spark.streaming.kafka.allowNonConsecutiveOffsets", "true");
>>> sparkConf.set("spark.streaming.kafka.consumer.poll.ms", String.valueOf(2 * 
>>> 60 * 1000));
>>>
>>>
>>> My spark streaming is using DirectStream like this.
>>>
>>> JavaInputDStream> stream =
>>> KafkaUtils.createDirectStream(
>>> ssc,
>>> LocationStrategies.PreferConsistent(),
>>> ConsumerStrategies.>> GenericRecord>Subscribe(topics, kafkaParams)
>>> );
>>>
>>>
>>> stream.foreachRDD(rdd -> O
>>>
>>> 

Re: Spark streaming job for kafka transaction does not consume read_committed messages correctly.

2024-04-14 Thread Mich Talebzadeh
Interesting

My concern is infinite Loop in* foreachRDD*: The *while(true)* loop within
foreachRDD creates an infinite loop within each Spark executor. This might
not be the most efficient approach, especially since offsets are committed
asynchronously.?

HTH

Mich Talebzadeh,
Technologist | Solutions Architect | Data Engineer  | Generative AI
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Sun, 14 Apr 2024 at 13:40, Kidong Lee  wrote:

>
> Because spark streaming for kafk transaction does not work correctly to
> suit my need, I moved to another approach using raw kafka consumer which
> handles read_committed messages from kafka correctly.
>
> My codes look like the following.
>
> JavaDStream stream = ssc.receiverStream(new CustomReceiver()); // 
> CustomReceiver does nothing special except awaking foreach task.
>
> stream.foreachRDD(rdd -> {
>
>   KafkaConsumer consumer = new 
> KafkaConsumer<>(consumerProperties);
>
>   consumer.subscribe(Arrays.asList(topic));
>
>   while(true){
>
> ConsumerRecords records =
> consumer.poll(java.time.Duration.ofMillis(intervalMs));
>
> Map offsetMap = new HashMap<>();
>
> List someList = new ArrayList<>();
>
> for (ConsumerRecord consumerRecord : records) {
>
>   // add something to list.
>
>   // put offset to offsetMap.
>
> }
>
> // process someList.
>
> // commit offset.
>
> consumer.commitAsync(offsetMap, null);
>
>   }
>
> });
>
>
> In addition, I increased max.poll.records to 10.
>
> Even if this raw kafka consumer approach is not so scalable, it consumes
> read_committed messages from kafka correctly and is enough for me at the
> moment.
>
> - Kidong.
>
>
>
> 2024년 4월 12일 (금) 오후 9:19, Kidong Lee 님이 작성:
>
>> Hi,
>>
>> I have a kafka producer which sends messages transactionally to kafka and
>> spark streaming job which should consume read_committed messages from kafka.
>> But there is a problem for spark streaming to consume read_committed
>> messages.
>> The count of messages sent by kafka producer transactionally is not the
>> same to the count of the read_committed messages consumed by spark
>> streaming.
>>
>> Some consumer properties of my spark streaming job are as follows.
>>
>> auto.offset.reset=earliest
>> enable.auto.commit=false
>> isolation.level=read_committed
>>
>>
>> I also added the following spark streaming configuration.
>>
>> sparkConf.set("spark.streaming.kafka.allowNonConsecutiveOffsets", "true");
>> sparkConf.set("spark.streaming.kafka.consumer.poll.ms", String.valueOf(2 * 
>> 60 * 1000));
>>
>>
>> My spark streaming is using DirectStream like this.
>>
>> JavaInputDStream> stream =
>> KafkaUtils.createDirectStream(
>> ssc,
>> LocationStrategies.PreferConsistent(),
>> ConsumerStrategies.Subscribe(topics, 
>> kafkaParams)
>> );
>>
>>
>> stream.foreachRDD(rdd -> O
>>
>>// get offset ranges.
>>
>>OffsetRange[] offsetRanges = ((HasOffsetRanges) 
>> rdd.rdd()).offsetRanges();
>>
>>// process something.
>>
>>
>>// commit offset.
>>((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
>>
>> }
>> );
>>
>>
>>
>> I have tested with a kafka consumer written with raw kafka-clients jar
>> library without problem that it consumes read_committed messages correctly,
>> and the count of consumed read_committed messages is equal to the count of
>> messages sent by kafka producer.
>>
>>
>> And sometimes, I got the following exception.
>>
>> Job aborted due to stage failure: Task 0 in stage 324.0 failed 1 times,
>> most recent failure: Lost task 0.0 in stage 324.0 (TID 1674)
>> (chango-private-1.chango.private executor driver):
>> java.lang.IllegalArgumentException: requirement failed: Failed to get
>> records for compacted spark-executor-school-student-group school-student-7
>> after polling for 12
>>
>> at scala.Predef$.require(Predef.scala:281)
>>
>> at
>> org.apache.spark.streaming.kafka010.InternalKafkaConsumer.compactedNext(KafkaDataConsumer.scala:186)
>>
>> at
>> org.apache.spark.streaming.kafka010.KafkaDataConsumer.compactedNext(KafkaDataConsumer.scala:60)
>>
>> at
>> org.apache.spark.streaming.kafka010.KafkaDataConsumer.compactedNext$(KafkaDataConsumer.scala:59)
>>
>> at
>> org.apache.spark.streaming.kafka010.KafkaDataConsumer$CachedKafkaDataConsumer.compactedNext(KafkaDataConsumer.scala:219)
>>
>>
>>
>> I have experienced spark streaming job which works fine with kafka
>> messages which are non-transactional, and I 

Re: Spark streaming job for kafka transaction does not consume read_committed messages correctly.

2024-04-14 Thread Kidong Lee
Because spark streaming for kafk transaction does not work correctly to
suit my need, I moved to another approach using raw kafka consumer which
handles read_committed messages from kafka correctly.

My codes look like the following.

JavaDStream stream = ssc.receiverStream(new CustomReceiver());
// CustomReceiver does nothing special except awaking foreach task.

stream.foreachRDD(rdd -> {

  KafkaConsumer consumer = new
KafkaConsumer<>(consumerProperties);

  consumer.subscribe(Arrays.asList(topic));

  while(true){

ConsumerRecords records =
consumer.poll(java.time.Duration.ofMillis(intervalMs));

Map offsetMap = new HashMap<>();

List someList = new ArrayList<>();

for (ConsumerRecord consumerRecord : records) {

  // add something to list.

  // put offset to offsetMap.

}

// process someList.

// commit offset.

consumer.commitAsync(offsetMap, null);

  }

});


In addition, I increased max.poll.records to 10.

Even if this raw kafka consumer approach is not so scalable, it consumes
read_committed messages from kafka correctly and is enough for me at the
moment.

- Kidong.



2024년 4월 12일 (금) 오후 9:19, Kidong Lee 님이 작성:

> Hi,
>
> I have a kafka producer which sends messages transactionally to kafka and
> spark streaming job which should consume read_committed messages from kafka.
> But there is a problem for spark streaming to consume read_committed
> messages.
> The count of messages sent by kafka producer transactionally is not the
> same to the count of the read_committed messages consumed by spark
> streaming.
>
> Some consumer properties of my spark streaming job are as follows.
>
> auto.offset.reset=earliest
> enable.auto.commit=false
> isolation.level=read_committed
>
>
> I also added the following spark streaming configuration.
>
> sparkConf.set("spark.streaming.kafka.allowNonConsecutiveOffsets", "true");
> sparkConf.set("spark.streaming.kafka.consumer.poll.ms", String.valueOf(2 * 60 
> * 1000));
>
>
> My spark streaming is using DirectStream like this.
>
> JavaInputDStream> stream =
> KafkaUtils.createDirectStream(
> ssc,
> LocationStrategies.PreferConsistent(),
> ConsumerStrategies.Subscribe(topics, 
> kafkaParams)
> );
>
>
> stream.foreachRDD(rdd -> O
>
>// get offset ranges.
>
>OffsetRange[] offsetRanges = ((HasOffsetRanges) 
> rdd.rdd()).offsetRanges();
>
>// process something.
>
>
>// commit offset.
>((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
>
> }
> );
>
>
>
> I have tested with a kafka consumer written with raw kafka-clients jar
> library without problem that it consumes read_committed messages correctly,
> and the count of consumed read_committed messages is equal to the count of
> messages sent by kafka producer.
>
>
> And sometimes, I got the following exception.
>
> Job aborted due to stage failure: Task 0 in stage 324.0 failed 1 times,
> most recent failure: Lost task 0.0 in stage 324.0 (TID 1674)
> (chango-private-1.chango.private executor driver):
> java.lang.IllegalArgumentException: requirement failed: Failed to get
> records for compacted spark-executor-school-student-group school-student-7
> after polling for 12
>
> at scala.Predef$.require(Predef.scala:281)
>
> at
> org.apache.spark.streaming.kafka010.InternalKafkaConsumer.compactedNext(KafkaDataConsumer.scala:186)
>
> at
> org.apache.spark.streaming.kafka010.KafkaDataConsumer.compactedNext(KafkaDataConsumer.scala:60)
>
> at
> org.apache.spark.streaming.kafka010.KafkaDataConsumer.compactedNext$(KafkaDataConsumer.scala:59)
>
> at
> org.apache.spark.streaming.kafka010.KafkaDataConsumer$CachedKafkaDataConsumer.compactedNext(KafkaDataConsumer.scala:219)
>
>
>
> I have experienced spark streaming job which works fine with kafka
> messages which are non-transactional, and I never encountered the
> exceptions like above.
> It seems that spark streaming for kafka transaction does not handle such
> as kafka consumer properties like isolation.level=read_committed and
> enable.auto.commit=false correctly.
>
> Any help appreciated.
>
> - Kidong.
>
>
> --
> *이기동 *
> *Kidong Lee*
>
> Email: mykid...@gmail.com
> Chango: https://cloudcheflabs.github.io/chango-private-docs
> Web Site: http://www.cloudchef-labs.com/
> Mobile: +82 10 4981 7297
> 
>


-- 
*이기동 *
*Kidong Lee*

Email: mykid...@gmail.com
Chango: https://cloudcheflabs.github.io/chango-private-docs
Web Site: http://www.cloudchef-labs.com/
Mobile: +82 10 4981 7297



Re: Spark streaming job for kafka transaction does not consume read_committed messages correctly.

2024-04-13 Thread Kidong Lee
Thank you Mich for your reply.

Actually, I tried to do most of your advice.

When spark.streaming.kafka.allowNonConsecutiveOffsets=false, I got the
following error.

Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most
recent failure: Lost task 0.0 in stage 1.0 (TID 3)
(chango-private-1.chango.private executor driver):
java.lang.IllegalArgumentException: requirement failed: Got wrong record
for spark-executor-school-student-group school-student-7 even after seeking
to offset 11206961 got offset 11206962 instead. If this is a compacted
topic, consider enabling spark.streaming.kafka.allowNonConsecutiveOffsets

at scala.Predef$.require(Predef.scala:281)

at
org.apache.spark.streaming.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:155)

at
org.apache.spark.streaming.kafka010.KafkaDataConsumer.get(KafkaDataConsumer.scala:40)

at
org.apache.spark.streaming.kafka010.KafkaDataConsumer.get$(KafkaDataConsumer.scala:39)

at
org.apache.spark.streaming.kafka010.KafkaDataConsumer$CachedKafkaDataConsumer.get(KafkaDataConsumer.scala:219)


And I tried to increase spark.streaming.kafka.consumer.poll.ms to avoid the
exceptions, but it did not help.


- Kidong.




2024년 4월 14일 (일) 오전 4:25, Mich Talebzadeh 님이 작성:

> Hi Kidong,
>
> There may be few potential reasons why the message counts from your Kafka
> producer and Spark Streaming consumer might not match, especially with
> transactional messages and read_committed isolation level.
>
> 1) Just ensure that both your Spark Streaming job and the Kafka consumer
> written with raw kafka-clients use the same consumer group. Messages are
> delivered to specific consumer groups, and if they differ, Spark Streaming
> might miss messages consumed by the raw consumer.
> 2) Your Spark Streaming configuration sets *enable.auto.commit=false* and
> uses *commitAsync manually*. However, I noted
> *spark.streaming.kafka.allowNonConsecutiveOffsets=true* which may be
> causing the problem. This setting allows Spark Streaming to read offsets
> that are not strictly increasing, which can happen with transactional
> reads. Generally recommended to set this to* false *for transactional
> reads to ensure Spark Streaming only reads committed messages.
> 3) Missed messages, in transactional messages, Kafka guarantees *delivery
> only after the transaction commits successfully. *There could be a slight
> delay between the producer sending the message and it becoming visible to
> consumers under read_committed isolation level. Spark Streaming could
> potentially miss messages during this window.
> 4) The exception Lost task 0.0 in stage 324.0, suggests a problem fetching
> records for a specific topic partition. Review your code handling of
> potential exceptions during rdd.foreachRDD processing. Ensure retries or
> appropriate error handling if encountering issues with specific partitions.
> 5) Try different configurations for *spark.streaming.kafka.consumer.poll.ms
> * to adjust polling
> frequency and potentially improve visibility into committed messages.
>
> HTH
>
> Mich Talebzadeh,
> Technologist | Solutions Architect | Data Engineer  | Generative AI
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  Von
> Braun )".
>
>
> On Fri, 12 Apr 2024 at 21:38, Kidong Lee  wrote:
>
>> Hi,
>>
>> I have a kafka producer which sends messages transactionally to kafka and
>> spark streaming job which should consume read_committed messages from kafka.
>> But there is a problem for spark streaming to consume read_committed
>> messages.
>> The count of messages sent by kafka producer transactionally is not the
>> same to the count of the read_committed messages consumed by spark
>> streaming.
>>
>> Some consumer properties of my spark streaming job are as follows.
>>
>> auto.offset.reset=earliest
>> enable.auto.commit=false
>> isolation.level=read_committed
>>
>>
>> I also added the following spark streaming configuration.
>>
>> sparkConf.set("spark.streaming.kafka.allowNonConsecutiveOffsets", "true");
>> sparkConf.set("spark.streaming.kafka.consumer.poll.ms", String.valueOf(2 * 
>> 60 * 1000));
>>
>>
>> My spark streaming is using DirectStream like this.
>>
>> JavaInputDStream> stream =
>> KafkaUtils.createDirectStream(
>> ssc,
>> LocationStrategies.PreferConsistent(),
>> ConsumerStrategies.Subscribe(topics, 
>> kafkaParams)
>> );
>>
>>
>> stream.foreachRDD(rdd -> O
>>
>>// get offset ranges.
>>
>> 

Re: Spark streaming job for kafka transaction does not consume read_committed messages correctly.

2024-04-13 Thread Mich Talebzadeh
Hi Kidong,

There may be few potential reasons why the message counts from your Kafka
producer and Spark Streaming consumer might not match, especially with
transactional messages and read_committed isolation level.

1) Just ensure that both your Spark Streaming job and the Kafka consumer
written with raw kafka-clients use the same consumer group. Messages are
delivered to specific consumer groups, and if they differ, Spark Streaming
might miss messages consumed by the raw consumer.
2) Your Spark Streaming configuration sets *enable.auto.commit=false* and
uses *commitAsync manually*. However, I noted
*spark.streaming.kafka.allowNonConsecutiveOffsets=true* which may be
causing the problem. This setting allows Spark Streaming to read offsets
that are not strictly increasing, which can happen with transactional
reads. Generally recommended to set this to* false *for transactional reads
to ensure Spark Streaming only reads committed messages.
3) Missed messages, in transactional messages, Kafka guarantees *delivery
only after the transaction commits successfully. *There could be a slight
delay between the producer sending the message and it becoming visible to
consumers under read_committed isolation level. Spark Streaming could
potentially miss messages during this window.
4) The exception Lost task 0.0 in stage 324.0, suggests a problem fetching
records for a specific topic partition. Review your code handling of
potential exceptions during rdd.foreachRDD processing. Ensure retries or
appropriate error handling if encountering issues with specific partitions.
5) Try different configurations for *spark.streaming.kafka.consumer.poll.ms
* to adjust polling
frequency and potentially improve visibility into committed messages.

HTH

Mich Talebzadeh,
Technologist | Solutions Architect | Data Engineer  | Generative AI
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Fri, 12 Apr 2024 at 21:38, Kidong Lee  wrote:

> Hi,
>
> I have a kafka producer which sends messages transactionally to kafka and
> spark streaming job which should consume read_committed messages from kafka.
> But there is a problem for spark streaming to consume read_committed
> messages.
> The count of messages sent by kafka producer transactionally is not the
> same to the count of the read_committed messages consumed by spark
> streaming.
>
> Some consumer properties of my spark streaming job are as follows.
>
> auto.offset.reset=earliest
> enable.auto.commit=false
> isolation.level=read_committed
>
>
> I also added the following spark streaming configuration.
>
> sparkConf.set("spark.streaming.kafka.allowNonConsecutiveOffsets", "true");
> sparkConf.set("spark.streaming.kafka.consumer.poll.ms", String.valueOf(2 * 60 
> * 1000));
>
>
> My spark streaming is using DirectStream like this.
>
> JavaInputDStream> stream =
> KafkaUtils.createDirectStream(
> ssc,
> LocationStrategies.PreferConsistent(),
> ConsumerStrategies.Subscribe(topics, 
> kafkaParams)
> );
>
>
> stream.foreachRDD(rdd -> O
>
>// get offset ranges.
>
>OffsetRange[] offsetRanges = ((HasOffsetRanges) 
> rdd.rdd()).offsetRanges();
>
>// process something.
>
>
>// commit offset.
>((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
>
> }
> );
>
>
>
> I have tested with a kafka consumer written with raw kafka-clients jar
> library without problem that it consumes read_committed messages correctly,
> and the count of consumed read_committed messages is equal to the count of
> messages sent by kafka producer.
>
>
> And sometimes, I got the following exception.
>
> Job aborted due to stage failure: Task 0 in stage 324.0 failed 1 times,
> most recent failure: Lost task 0.0 in stage 324.0 (TID 1674)
> (chango-private-1.chango.private executor driver):
> java.lang.IllegalArgumentException: requirement failed: Failed to get
> records for compacted spark-executor-school-student-group school-student-7
> after polling for 12
>
> at scala.Predef$.require(Predef.scala:281)
>
> at
> org.apache.spark.streaming.kafka010.InternalKafkaConsumer.compactedNext(KafkaDataConsumer.scala:186)
>
> at
> org.apache.spark.streaming.kafka010.KafkaDataConsumer.compactedNext(KafkaDataConsumer.scala:60)
>
> at
> org.apache.spark.streaming.kafka010.KafkaDataConsumer.compactedNext$(KafkaDataConsumer.scala:59)
>
> at
> 

Re: [Spark streaming]: Microbatch id in logs

2023-06-26 Thread Mich Talebzadeh
In SSS
writeStream. \
   outputMode('append'). \
   option("truncate", "false"). \
  * foreachBatch(SendToBigQuery). \*
   option('checkpointLocation', checkpoint_path). \

so this writeStream will call  foreachBatch()

   """
   "foreachBatch" performs custom write logic on each
micro-batch through SendToBigQuery function
foreachBatch(SendToBigQuery) expects 2 parameters, first:*
micro-batch as DataFrame or Dataset and second: unique id for each batch*
   Using foreachBatch, we write each micro batch to storage
defined in our custom logic. In this case, we store the output of our
streaming application to Google BigQuery table

that does this

def SendToBigQuery(df, batchId):
if(len(df.take(1))) > 0:
print(batchId)
# do your logic
else:
print("DataFrame is empty")

You should also have it in

   option('checkpointLocation', checkpoint_path).

See this article on mine
Processing Change Data Capture with Spark Structured Streaming


HTH

Mich Talebzadeh,
Solutions Architect/Engineering Lead
Palantir Technologies Limited
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 26 Jun 2023 at 06:01, Anil Dasari  wrote:

> Hi,
> I am using spark 3.3.1 distribution and spark stream in my application. Is
> there a way to add a microbatch id to all logs generated by spark and spark
> applications ?
>
> Thanks.
>


Re: Re: spark streaming and kinesis integration

2023-04-12 Thread Mich Talebzadeh
Hi Lingzhe Sun,

Thanks for your comments. I am afraid I won't be able to take part in this
project and contribute.

HTH

Mich Talebzadeh,
Lead Solutions Architect/Engineering Lead
Palantir Technologies Limited
London
United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Wed, 12 Apr 2023 at 02:55, Lingzhe Sun  wrote:

> Hi Mich,
>
> FYI we're using spark operator(
> https://github.com/GoogleCloudPlatform/spark-on-k8s-operator) to build
> stateful structured streaming on k8s for a year. Haven't test it using
> non-operator way.
>
> Besides that, the main contributor of the spark operator, Yinan Li, has
> been inactive for quite long time. Kind of worried that this project might
> finally become outdated as k8s is evolving. So if anyone is interested,
> please support the project.
>
> --
> Lingzhe Sun
> Hirain Technologies
>
>
> *From:* Mich Talebzadeh 
> *Date:* 2023-04-11 02:06
> *To:* Rajesh Katkar 
> *CC:* user 
> *Subject:* Re: spark streaming and kinesis integration
> What I said was this
> "In so far as I know k8s does not support spark structured streaming?"
>
> So it is an open question. I just recalled it. I have not tested myself. I
> know structured streaming works on Google Dataproc cluster but I have not
> seen any official link that says Spark Structured Streaming is supported on
> k8s.
>
> HTH
>
> Mich Talebzadeh,
> Lead Solutions Architect/Engineering Lead
> Palantir Technologies
> London
> United Kingdom
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Mon, 10 Apr 2023 at 06:31, Rajesh Katkar 
> wrote:
>
>> Do you have any link or ticket which justifies that k8s does not support
>> spark streaming ?
>>
>> On Thu, 6 Apr, 2023, 9:15 pm Mich Talebzadeh, 
>> wrote:
>>
>>> Do you have a high level diagram of the proposed solution?
>>>
>>> In so far as I know k8s does not support spark structured streaming?
>>>
>>> Mich Talebzadeh,
>>> Lead Solutions Architect/Engineering Lead
>>> Palantir Technologies
>>> London
>>> United Kingdom
>>>
>>>
>>>view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Thu, 6 Apr 2023 at 16:40, Rajesh Katkar 
>>> wrote:
>>>
>>>> Use case is , we want to read/write to kinesis streams using k8s
>>>> Officially I could not find the connector or reader for kinesis from
>>>> spark like it has for kafka.
>>>>
>>>> Checking here if anyone used kinesis and spark streaming combination ?
>>>>
>>>> On Thu, 6 Apr, 2023, 7:23 pm Mich Talebzadeh, <
>>>> mich.talebza...@gmail.com> wrote:
>>>>
>>>>> Hi Rajesh,
>>>>>
>>>>> What is the use case for Kinesis here? I have not used it personally,
>>>>> Which use case it concerns
>>>>>
>>>>> https://aws.amazon.com/kinesis/
>>>>>
>>>>> Can you use something else instead?
>>>>>
>>>>> HTH
>>>>>
>>>>> Mich Talebzadeh,
>>>>> Lead Solutions Archi

Re: Re: spark streaming and kinesis integration

2023-04-12 Thread 孙令哲
Hi Rajesh,


It's working fine, at least for now. But you'll need to build your own spark 
image using later versions.


Lingzhe Sun
Hirain Technologies

 







Original:
From:Rajesh Katkar Date:2023-04-12 21:36:52To:Lingzhe 
SunCc:Mich Talebzadeh  , 
user Subject:Re: Re: spark streaming and 
kinesis integrationHi Lingzhe,

We are also started using this operator.
Do you see any issues with it? 




On Wed, 12 Apr, 2023, 7:25 am Lingzhe Sun,  wrote:

Hi Mich,


FYI we're using spark 
operator(https://github.com/GoogleCloudPlatform/spark-on-k8s-operator) to build 
stateful structured streaming on k8s for a year. Haven't test it using 
non-operator way.


Besides that, the main contributor of the spark operator, Yinan Li, has been 
inactive for quite long time. Kind of worried that this project might finally 
become outdated as k8s is evolving. So if anyone is interested, please support 
the project.


Lingzhe Sun
Hirain Technologies

 
From: Mich Talebzadeh
Date: 2023-04-11 02:06
To: Rajesh Katkar
CC: user
Subject: Re: spark streaming and kinesis integration


What I said was this"In so far as I know k8s does not support spark structured 
streaming?"


So it is an open question. I just recalled it. I have not tested myself. I know 
structured streaming works on Google Dataproc cluster but I have not seen any 
official link that says Spark Structured Streaming is supported on k8s.


HTH

Mich Talebzadeh,
Lead Solutions Architect/Engineering Lead
Palantir Technologies
London
United Kingdom




   view my Linkedin profile


 https://en.everybodywiki.com/Mich_Talebzadeh
 
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.
 










On Mon, 10 Apr 2023 at 06:31, Rajesh Katkar  wrote:

Do you have any link or ticket which justifies that k8s does not support spark 
streaming ?

On Thu, 6 Apr, 2023, 9:15 pm Mich Talebzadeh,  wrote:

Do you have a high level diagram of the proposed solution?

In so far as I know k8s does not support spark structured streaming?

Mich Talebzadeh,
Lead Solutions Architect/Engineering Lead
Palantir Technologies
London
United Kingdom




   view my Linkedin profile


 https://en.everybodywiki.com/Mich_Talebzadeh
 
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.
 










On Thu, 6 Apr 2023 at 16:40, Rajesh Katkar  wrote:

Use case is , we want to read/write to kinesis streams using k8sOfficially I 
could not find the connector or reader for kinesis from spark like it has for 
kafka.


Checking here if anyone used kinesis and spark streaming combination ?


On Thu, 6 Apr, 2023, 7:23 pm Mich Talebzadeh,  wrote:

Hi Rajesh,

What is the use case for Kinesis here? I have not used it personally, Which use 
case it concerns


https://aws.amazon.com/kinesis/



Can you use something else instead?


HTH

Mich Talebzadeh,
Lead Solutions Architect/Engineering Lead
Palantir Technologies
London
United Kingdom




   view my Linkedin profile


 https://en.everybodywiki.com/Mich_Talebzadeh
 
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.
 










On Thu, 6 Apr 2023 at 13:08, Rajesh Katkar  wrote:

Hi Spark Team,
We need to read/write the kinesis streams using spark streaming.
 We checked the official documentation - 
https://spark.apache.org/docs/latest/streaming-kinesis-integration.html
It does not mention kinesis connector. Alternative is - 
https://github.com/qubole/kinesis-sql which is not active now.  This is now 
handed over here - https://github.com/roncemer/spark-sql-kinesis
Also according to SPARK-18165 , Spark officially do not have any kinesis 
connector 
We have few below questions , It would be great if you can answer 
Does Spark provides officially any kinesis connector which have 
readstream/writestream and endorse any connector for production use cases ?  
https://spark.apache.org/docs/latest/streaming-kinesis-integration.html This 
documentation does not mention how to write to kinesis. This method has default 
dynamodb as checkpoint, can we override it ?We have rocksdb as a state store 
but when we ran an application using official  
https://spark.apache.org/docs/latest/streaming-kinesis-integration.html rocksdb 
configura

Re: Re: spark streaming and kinesis integration

2023-04-12 Thread Yi Huang
unsubscribe

On Wed, Apr 12, 2023 at 3:59 PM Rajesh Katkar 
wrote:

> Hi Lingzhe,
>
> We are also started using this operator.
> Do you see any issues with it?
>
>
> On Wed, 12 Apr, 2023, 7:25 am Lingzhe Sun,  wrote:
>
>> Hi Mich,
>>
>> FYI we're using spark operator(
>> https://github.com/GoogleCloudPlatform/spark-on-k8s-operator) to build
>> stateful structured streaming on k8s for a year. Haven't test it using
>> non-operator way.
>>
>> Besides that, the main contributor of the spark operator, Yinan Li, has
>> been inactive for quite long time. Kind of worried that this project might
>> finally become outdated as k8s is evolving. So if anyone is interested,
>> please support the project.
>>
>> --
>> Lingzhe Sun
>> Hirain Technologies
>>
>>
>> *From:* Mich Talebzadeh 
>> *Date:* 2023-04-11 02:06
>> *To:* Rajesh Katkar 
>> *CC:* user 
>> *Subject:* Re: spark streaming and kinesis integration
>> What I said was this
>> "In so far as I know k8s does not support spark structured streaming?"
>>
>> So it is an open question. I just recalled it. I have not tested myself.
>> I know structured streaming works on Google Dataproc cluster but I have not
>> seen any official link that says Spark Structured Streaming is supported on
>> k8s.
>>
>> HTH
>>
>> Mich Talebzadeh,
>> Lead Solutions Architect/Engineering Lead
>> Palantir Technologies
>> London
>> United Kingdom
>>
>>
>>view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Mon, 10 Apr 2023 at 06:31, Rajesh Katkar 
>> wrote:
>>
>>> Do you have any link or ticket which justifies that k8s does not support
>>> spark streaming ?
>>>
>>> On Thu, 6 Apr, 2023, 9:15 pm Mich Talebzadeh, 
>>> wrote:
>>>
>>>> Do you have a high level diagram of the proposed solution?
>>>>
>>>> In so far as I know k8s does not support spark structured streaming?
>>>>
>>>> Mich Talebzadeh,
>>>> Lead Solutions Architect/Engineering Lead
>>>> Palantir Technologies
>>>> London
>>>> United Kingdom
>>>>
>>>>
>>>>view my Linkedin profile
>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>
>>>>
>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>>
>>>> On Thu, 6 Apr 2023 at 16:40, Rajesh Katkar 
>>>> wrote:
>>>>
>>>>> Use case is , we want to read/write to kinesis streams using k8s
>>>>> Officially I could not find the connector or reader for kinesis from
>>>>> spark like it has for kafka.
>>>>>
>>>>> Checking here if anyone used kinesis and spark streaming combination ?
>>>>>
>>>>> On Thu, 6 Apr, 2023, 7:23 pm Mich Talebzadeh, <
>>>>> mich.talebza...@gmail.com> wrote:
>>>>>
>>>>>> Hi Rajesh,
>>>>>>
>>>>>> What is the use case for Kinesis here? I have not used it personally,
>>>>>> Which use case it concerns
>>>>>>
>>>>>> https://aws.amazon.com/kinesis/
>>>>>>
>>>>>> Can you use something else instead?
>>>>>>
>>>>>> HTH
>>>>>>
>>>>>> Mich Talebzadeh,
>>>>>> Lead Solutions Architect/Engineering Lead
>>>>>> Palantir Tec

Re: Re: spark streaming and kinesis integration

2023-04-12 Thread Rajesh Katkar
Hi Lingzhe,

We are also started using this operator.
Do you see any issues with it?


On Wed, 12 Apr, 2023, 7:25 am Lingzhe Sun,  wrote:

> Hi Mich,
>
> FYI we're using spark operator(
> https://github.com/GoogleCloudPlatform/spark-on-k8s-operator) to build
> stateful structured streaming on k8s for a year. Haven't test it using
> non-operator way.
>
> Besides that, the main contributor of the spark operator, Yinan Li, has
> been inactive for quite long time. Kind of worried that this project might
> finally become outdated as k8s is evolving. So if anyone is interested,
> please support the project.
>
> --
> Lingzhe Sun
> Hirain Technologies
>
>
> *From:* Mich Talebzadeh 
> *Date:* 2023-04-11 02:06
> *To:* Rajesh Katkar 
> *CC:* user 
> *Subject:* Re: spark streaming and kinesis integration
> What I said was this
> "In so far as I know k8s does not support spark structured streaming?"
>
> So it is an open question. I just recalled it. I have not tested myself. I
> know structured streaming works on Google Dataproc cluster but I have not
> seen any official link that says Spark Structured Streaming is supported on
> k8s.
>
> HTH
>
> Mich Talebzadeh,
> Lead Solutions Architect/Engineering Lead
> Palantir Technologies
> London
> United Kingdom
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Mon, 10 Apr 2023 at 06:31, Rajesh Katkar 
> wrote:
>
>> Do you have any link or ticket which justifies that k8s does not support
>> spark streaming ?
>>
>> On Thu, 6 Apr, 2023, 9:15 pm Mich Talebzadeh, 
>> wrote:
>>
>>> Do you have a high level diagram of the proposed solution?
>>>
>>> In so far as I know k8s does not support spark structured streaming?
>>>
>>> Mich Talebzadeh,
>>> Lead Solutions Architect/Engineering Lead
>>> Palantir Technologies
>>> London
>>> United Kingdom
>>>
>>>
>>>view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Thu, 6 Apr 2023 at 16:40, Rajesh Katkar 
>>> wrote:
>>>
>>>> Use case is , we want to read/write to kinesis streams using k8s
>>>> Officially I could not find the connector or reader for kinesis from
>>>> spark like it has for kafka.
>>>>
>>>> Checking here if anyone used kinesis and spark streaming combination ?
>>>>
>>>> On Thu, 6 Apr, 2023, 7:23 pm Mich Talebzadeh, <
>>>> mich.talebza...@gmail.com> wrote:
>>>>
>>>>> Hi Rajesh,
>>>>>
>>>>> What is the use case for Kinesis here? I have not used it personally,
>>>>> Which use case it concerns
>>>>>
>>>>> https://aws.amazon.com/kinesis/
>>>>>
>>>>> Can you use something else instead?
>>>>>
>>>>> HTH
>>>>>
>>>>> Mich Talebzadeh,
>>>>> Lead Solutions Architect/Engineering Lead
>>>>> Palantir Technologies
>>>>> London
>>>>> United Kingdom
>>>>>
>>>>>
>>>>>view my Linkedin profile
>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>
>>>>>
>>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>>
>>>>>
>>>>>
>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>>> any loss, damage or destruction of data or an

Re: Re: spark streaming and kinesis integration

2023-04-11 Thread Lingzhe Sun
Hi Mich,

FYI we're using spark 
operator(https://github.com/GoogleCloudPlatform/spark-on-k8s-operator) to build 
stateful structured streaming on k8s for a year. Haven't test it using 
non-operator way.

Besides that, the main contributor of the spark operator, Yinan Li, has been 
inactive for quite long time. Kind of worried that this project might finally 
become outdated as k8s is evolving. So if anyone is interested, please support 
the project.



Lingzhe Sun
Hirain Technologies
 
From: Mich Talebzadeh
Date: 2023-04-11 02:06
To: Rajesh Katkar
CC: user
Subject: Re: spark streaming and kinesis integration
What I said was this
"In so far as I know k8s does not support spark structured streaming?"

So it is an open question. I just recalled it. I have not tested myself. I know 
structured streaming works on Google Dataproc cluster but I have not seen any 
official link that says Spark Structured Streaming is supported on k8s.

HTH

Mich Talebzadeh,
Lead Solutions Architect/Engineering Lead
Palantir Technologies
London
United Kingdom

   view my Linkedin profile

 https://en.everybodywiki.com/Mich_Talebzadeh
 
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction. 
 


On Mon, 10 Apr 2023 at 06:31, Rajesh Katkar  wrote:
Do you have any link or ticket which justifies that k8s does not support spark 
streaming ?

On Thu, 6 Apr, 2023, 9:15 pm Mich Talebzadeh,  wrote:
Do you have a high level diagram of the proposed solution?

In so far as I know k8s does not support spark structured streaming?

Mich Talebzadeh,
Lead Solutions Architect/Engineering Lead
Palantir Technologies
London
United Kingdom

   view my Linkedin profile

 https://en.everybodywiki.com/Mich_Talebzadeh
 
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction. 
 


On Thu, 6 Apr 2023 at 16:40, Rajesh Katkar  wrote:
Use case is , we want to read/write to kinesis streams using k8s
Officially I could not find the connector or reader for kinesis from spark like 
it has for kafka.

Checking here if anyone used kinesis and spark streaming combination ?

On Thu, 6 Apr, 2023, 7:23 pm Mich Talebzadeh,  wrote:
Hi Rajesh,

What is the use case for Kinesis here? I have not used it personally, Which use 
case it concerns

https://aws.amazon.com/kinesis/

Can you use something else instead?

HTH

Mich Talebzadeh,
Lead Solutions Architect/Engineering Lead
Palantir Technologies
London
United Kingdom

   view my Linkedin profile

 https://en.everybodywiki.com/Mich_Talebzadeh
 
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction. 
 


On Thu, 6 Apr 2023 at 13:08, Rajesh Katkar  wrote:
Hi Spark Team,
We need to read/write the kinesis streams using spark streaming.
 We checked the official documentation - 
https://spark.apache.org/docs/latest/streaming-kinesis-integration.html
It does not mention kinesis connector. Alternative is - 
https://github.com/qubole/kinesis-sql which is not active now.  This is now 
handed over here - https://github.com/roncemer/spark-sql-kinesis
Also according to SPARK-18165 , Spark officially do not have any kinesis 
connector 
We have few below questions , It would be great if you can answer 
Does Spark provides officially any kinesis connector which have 
readstream/writestream and endorse any connector for production use cases ?  
https://spark.apache.org/docs/latest/streaming-kinesis-integration.html This 
documentation does not mention how to write to kinesis. This method has default 
dynamodb as checkpoint, can we override it ?
We have rocksdb as a state store but when we ran an application using official  
https://spark.apache.org/docs/latest/streaming-kinesis-integration.html rocksdb 
configurations were not effective. Can you please confirm if rocksdb is not 
applicable in these cases?
rocksdb however works with qubole connector , do you have any plan to release 
kinesis connector?
Please help/recommend us for any good stable kinesis connector or some pointers 
around it


Re: spark streaming and kinesis integration

2023-04-10 Thread Mich Talebzadeh
Just to clarify, a major benefit of k8s in this case is to host your Spark
applications in the form of containers in an automated fashion so that one
can easily deploy as many instances of the application as required
(autoscaling). From below:

https://price2meet.com/gcp/docs/dataproc_docs_concepts_configuring-clusters_autoscaling.pdf

Autoscaling does not support Spark Structured Streaming (
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html)
(see Autoscaling and Spark Structured Streaming
(#autoscaling_and_spark_structured_streaming)) .

On the same token k8s is more suitable (as of now)  for batch jobs than
Spark Structured Streaming.
https://issues.apache.org/jira/browse/SPARK-12133

Mich Talebzadeh,
Lead Solutions Architect/Engineering Lead
Palantir Technologies
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 10 Apr 2023 at 19:06, Mich Talebzadeh 
wrote:

> What I said was this
> "In so far as I know k8s does not support spark structured streaming?"
>
> So it is an open question. I just recalled it. I have not tested myself. I
> know structured streaming works on Google Dataproc cluster but I have not
> seen any official link that says Spark Structured Streaming is supported on
> k8s.
>
> HTH
>
> Mich Talebzadeh,
> Lead Solutions Architect/Engineering Lead
> Palantir Technologies
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Mon, 10 Apr 2023 at 06:31, Rajesh Katkar 
> wrote:
>
>> Do you have any link or ticket which justifies that k8s does not support
>> spark streaming ?
>>
>> On Thu, 6 Apr, 2023, 9:15 pm Mich Talebzadeh, 
>> wrote:
>>
>>> Do you have a high level diagram of the proposed solution?
>>>
>>> In so far as I know k8s does not support spark structured streaming?
>>>
>>> Mich Talebzadeh,
>>> Lead Solutions Architect/Engineering Lead
>>> Palantir Technologies
>>> London
>>> United Kingdom
>>>
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Thu, 6 Apr 2023 at 16:40, Rajesh Katkar 
>>> wrote:
>>>
 Use case is , we want to read/write to kinesis streams using k8s
 Officially I could not find the connector or reader for kinesis from
 spark like it has for kafka.

 Checking here if anyone used kinesis and spark streaming combination ?

 On Thu, 6 Apr, 2023, 7:23 pm Mich Talebzadeh, <
 mich.talebza...@gmail.com> wrote:

> Hi Rajesh,
>
> What is the use case for Kinesis here? I have not used it personally,
> Which use case it concerns
>
> https://aws.amazon.com/kinesis/
>
> Can you use something else instead?
>
> HTH
>
> Mich Talebzadeh,
> Lead Solutions Architect/Engineering Lead
> Palantir Technologies
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for
> any loss, damage or destruction of data or any other property which may
> arise from relying on this email's technical content is explicitly
> disclaimed. The author will in no case be liable for any monetary damages
> arising from such loss, damage or destruction.
>
>
>
>
> On Thu, 6 Apr 2023 at 13:08, Rajesh Katkar 
> wrote:
>
>> Hi Spark Team,
>>
>> We need to read/write the kinesis streams using spark streaming.
>>
>>  We checked the official documentation -
>> 

Re: spark streaming and kinesis integration

2023-04-10 Thread Mich Talebzadeh
What I said was this
"In so far as I know k8s does not support spark structured streaming?"

So it is an open question. I just recalled it. I have not tested myself. I
know structured streaming works on Google Dataproc cluster but I have not
seen any official link that says Spark Structured Streaming is supported on
k8s.

HTH

Mich Talebzadeh,
Lead Solutions Architect/Engineering Lead
Palantir Technologies
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 10 Apr 2023 at 06:31, Rajesh Katkar  wrote:

> Do you have any link or ticket which justifies that k8s does not support
> spark streaming ?
>
> On Thu, 6 Apr, 2023, 9:15 pm Mich Talebzadeh, 
> wrote:
>
>> Do you have a high level diagram of the proposed solution?
>>
>> In so far as I know k8s does not support spark structured streaming?
>>
>> Mich Talebzadeh,
>> Lead Solutions Architect/Engineering Lead
>> Palantir Technologies
>> London
>> United Kingdom
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Thu, 6 Apr 2023 at 16:40, Rajesh Katkar 
>> wrote:
>>
>>> Use case is , we want to read/write to kinesis streams using k8s
>>> Officially I could not find the connector or reader for kinesis from
>>> spark like it has for kafka.
>>>
>>> Checking here if anyone used kinesis and spark streaming combination ?
>>>
>>> On Thu, 6 Apr, 2023, 7:23 pm Mich Talebzadeh, 
>>> wrote:
>>>
 Hi Rajesh,

 What is the use case for Kinesis here? I have not used it personally,
 Which use case it concerns

 https://aws.amazon.com/kinesis/

 Can you use something else instead?

 HTH

 Mich Talebzadeh,
 Lead Solutions Architect/Engineering Lead
 Palantir Technologies
 London
 United Kingdom


view my Linkedin profile
 


  https://en.everybodywiki.com/Mich_Talebzadeh



 *Disclaimer:* Use it at your own risk. Any and all responsibility for
 any loss, damage or destruction of data or any other property which may
 arise from relying on this email's technical content is explicitly
 disclaimed. The author will in no case be liable for any monetary damages
 arising from such loss, damage or destruction.




 On Thu, 6 Apr 2023 at 13:08, Rajesh Katkar 
 wrote:

> Hi Spark Team,
>
> We need to read/write the kinesis streams using spark streaming.
>
>  We checked the official documentation -
> https://spark.apache.org/docs/latest/streaming-kinesis-integration.html
>
> It does not mention kinesis connector. Alternative is -
> https://github.com/qubole/kinesis-sql which is not active now.  This
> is now handed over here -
> https://github.com/roncemer/spark-sql-kinesis
>
> Also according to SPARK-18165
>  , Spark
> officially do not have any kinesis connector
>
> We have few below questions , It would be great if you can answer
>
>1. Does Spark provides officially any kinesis connector which have
>readstream/writestream and endorse any connector for production use 
> cases ?
>
>2.
>
> https://spark.apache.org/docs/latest/streaming-kinesis-integration.html 
> This
>documentation does not mention how to write to kinesis. This method has
>default dynamodb as checkpoint, can we override it ?
>3. We have rocksdb as a state store but when we ran an application
>using official
>
> https://spark.apache.org/docs/latest/streaming-kinesis-integration.html 
> rocksdb
>configurations were not effective. Can you please confirm if rocksdb 
> is not
>applicable in these cases?
>4. rocksdb however works with qubole connector , do you have any
>plan to release kinesis connector?
>5. Please help/recommend us for any good stable kinesis connector
>or some pointers around it
>
>


Re: spark streaming and kinesis integration

2023-04-10 Thread Rajesh Katkar
Do you have any link or ticket which justifies that k8s does not support
spark streaming ?

On Thu, 6 Apr, 2023, 9:15 pm Mich Talebzadeh, 
wrote:

> Do you have a high level diagram of the proposed solution?
>
> In so far as I know k8s does not support spark structured streaming?
>
> Mich Talebzadeh,
> Lead Solutions Architect/Engineering Lead
> Palantir Technologies
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 6 Apr 2023 at 16:40, Rajesh Katkar 
> wrote:
>
>> Use case is , we want to read/write to kinesis streams using k8s
>> Officially I could not find the connector or reader for kinesis from
>> spark like it has for kafka.
>>
>> Checking here if anyone used kinesis and spark streaming combination ?
>>
>> On Thu, 6 Apr, 2023, 7:23 pm Mich Talebzadeh, 
>> wrote:
>>
>>> Hi Rajesh,
>>>
>>> What is the use case for Kinesis here? I have not used it personally,
>>> Which use case it concerns
>>>
>>> https://aws.amazon.com/kinesis/
>>>
>>> Can you use something else instead?
>>>
>>> HTH
>>>
>>> Mich Talebzadeh,
>>> Lead Solutions Architect/Engineering Lead
>>> Palantir Technologies
>>> London
>>> United Kingdom
>>>
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Thu, 6 Apr 2023 at 13:08, Rajesh Katkar 
>>> wrote:
>>>
 Hi Spark Team,

 We need to read/write the kinesis streams using spark streaming.

  We checked the official documentation -
 https://spark.apache.org/docs/latest/streaming-kinesis-integration.html

 It does not mention kinesis connector. Alternative is -
 https://github.com/qubole/kinesis-sql which is not active now.  This
 is now handed over here - https://github.com/roncemer/spark-sql-kinesis

 Also according to SPARK-18165
  , Spark officially
 do not have any kinesis connector

 We have few below questions , It would be great if you can answer

1. Does Spark provides officially any kinesis connector which have
readstream/writestream and endorse any connector for production use 
 cases ?

2.
https://spark.apache.org/docs/latest/streaming-kinesis-integration.html 
 This
documentation does not mention how to write to kinesis. This method has
default dynamodb as checkpoint, can we override it ?
3. We have rocksdb as a state store but when we ran an application
using official
https://spark.apache.org/docs/latest/streaming-kinesis-integration.html 
 rocksdb
configurations were not effective. Can you please confirm if rocksdb is 
 not
applicable in these cases?
4. rocksdb however works with qubole connector , do you have any
plan to release kinesis connector?
5. Please help/recommend us for any good stable kinesis connector
or some pointers around it




Re: spark streaming and kinesis integration

2023-04-06 Thread Rajesh Katkar
Use case is , we want to read/write to kinesis streams using k8s
Officially I could not find the connector or reader for kinesis from spark
like it has for kafka.

Checking here if anyone used kinesis and spark streaming combination ?

On Thu, 6 Apr, 2023, 7:23 pm Mich Talebzadeh, 
wrote:

> Hi Rajesh,
>
> What is the use case for Kinesis here? I have not used it personally,
> Which use case it concerns
>
> https://aws.amazon.com/kinesis/
>
> Can you use something else instead?
>
> HTH
>
> Mich Talebzadeh,
> Lead Solutions Architect/Engineering Lead
> Palantir Technologies
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 6 Apr 2023 at 13:08, Rajesh Katkar 
> wrote:
>
>> Hi Spark Team,
>>
>> We need to read/write the kinesis streams using spark streaming.
>>
>>  We checked the official documentation -
>> https://spark.apache.org/docs/latest/streaming-kinesis-integration.html
>>
>> It does not mention kinesis connector. Alternative is -
>> https://github.com/qubole/kinesis-sql which is not active now.  This is
>> now handed over here - https://github.com/roncemer/spark-sql-kinesis
>>
>> Also according to SPARK-18165
>>  , Spark officially
>> do not have any kinesis connector
>>
>> We have few below questions , It would be great if you can answer
>>
>>1. Does Spark provides officially any kinesis connector which have
>>readstream/writestream and endorse any connector for production use cases 
>> ?
>>
>>2.
>>https://spark.apache.org/docs/latest/streaming-kinesis-integration.html 
>> This
>>documentation does not mention how to write to kinesis. This method has
>>default dynamodb as checkpoint, can we override it ?
>>3. We have rocksdb as a state store but when we ran an application
>>using official
>>https://spark.apache.org/docs/latest/streaming-kinesis-integration.html 
>> rocksdb
>>configurations were not effective. Can you please confirm if rocksdb is 
>> not
>>applicable in these cases?
>>4. rocksdb however works with qubole connector , do you have any plan
>>to release kinesis connector?
>>5. Please help/recommend us for any good stable kinesis connector or
>>some pointers around it
>>
>>


RE: spark streaming and kinesis integration

2023-04-06 Thread Jonske, Kurt
unsubscribe

Regards,
Kurt Jonske
Senior Director
Alvarez & Marsal
Direct:  212 328 8532
Mobile:  312 560 5040
Email:  kjon...@alvarezandmarsal.com<mailto:kjon...@alvarezandmarsal.com>
www.alvarezandmarsal.com

From: Mich Talebzadeh 
Sent: Thursday, April 06, 2023 11:45 AM
To: Rajesh Katkar 
Cc: u...@spark.incubator.apache.org
Subject: Re: spark streaming and kinesis integration


⚠ [EXTERNAL EMAIL]: Use Caution

Do you have a high level diagram of the proposed solution?

In so far as I know k8s does not support spark structured streaming?

Mich Talebzadeh,
Lead Solutions Architect/Engineering Lead
Palantir Technologies
London
United Kingdom


 
[https://ci3.googleusercontent.com/mail-sig/AIorK4zholKucR2Q9yMrKbHNn-o1TuS4mYXyi2KO6Xmx6ikHPySa9MLaLZ8t2hrA6AUcxSxDgHIwmKE]
   view my Linkedin 
profile<https://protect-us.mimecast.com/s/geRNCR61G4svBlOwGI9l42n?domain=linkedin.com/>

 
https://en.everybodywiki.com/Mich_Talebzadeh<https://protect-us.mimecast.com/s/IvkpCVOQM8Tx9KZV2szZ50n?domain=en.everybodywiki.com>



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




On Thu, 6 Apr 2023 at 16:40, Rajesh Katkar 
mailto:katkar.raj...@gmail.com>> wrote:
Use case is , we want to read/write to kinesis streams using k8s
Officially I could not find the connector or reader for kinesis from spark like 
it has for kafka.

Checking here if anyone used kinesis and spark streaming combination ?

On Thu, 6 Apr, 2023, 7:23 pm Mich Talebzadeh, 
mailto:mich.talebza...@gmail.com>> wrote:
Hi Rajesh,

What is the use case for Kinesis here? I have not used it personally, Which use 
case it concerns

https://aws.amazon.com/kinesis/<https://protect-us.mimecast.com/s/EbXfCW6qNgs5GY416iKUuW5?domain=aws.amazon.com/>

Can you use something else instead?

HTH

Mich Talebzadeh,
Lead Solutions Architect/Engineering Lead
Palantir Technologies
London
United Kingdom


 
[https://ci3.googleusercontent.com/mail-sig/AIorK4zholKucR2Q9yMrKbHNn-o1TuS4mYXyi2KO6Xmx6ikHPySa9MLaLZ8t2hrA6AUcxSxDgHIwmKE]
   view my Linkedin 
profile<https://protect-us.mimecast.com/s/geRNCR61G4svBlOwGI9l42n?domain=linkedin.com/>

 
https://en.everybodywiki.com/Mich_Talebzadeh<https://protect-us.mimecast.com/s/IvkpCVOQM8Tx9KZV2szZ50n?domain=en.everybodywiki.com>



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




On Thu, 6 Apr 2023 at 13:08, Rajesh Katkar 
mailto:katkar.raj...@gmail.com>> wrote:

Hi Spark Team,

We need to read/write the kinesis streams using spark streaming.

 We checked the official documentation - 
https://spark.apache.org/docs/latest/streaming-kinesis-integration.html<https://protect-us.mimecast.com/s/pmRCCXD5OjTX0N9l4iksfyX?domain=spark.apache.org>

It does not mention kinesis connector. Alternative is - 
https://github.com/qubole/kinesis-sql<https://protect-us.mimecast.com/s/wqnCCYE5PksLOZ9KDiMx-Ed?domain=github.com>
 which is not active now.  This is now handed over here - 
https://github.com/roncemer/spark-sql-kinesis<https://protect-us.mimecast.com/s/D3qVCZ60Qls52Rj17iP85Ej?domain=github.com>

Also according to 
SPARK-18165<https://protect-us.mimecast.com/s/s6R_C1w4AmIM5mZr6CyDJHr?domain=issues.apache.org>
 , Spark officially do not have any kinesis connector

We have few below questions , It would be great if you can answer

  1.  Does Spark provides officially any kinesis connector which have 
readstream/writestream and endorse any connector for production use cases ?
  2.  
https://spark.apache.org/docs/latest/streaming-kinesis-integration.html<https://protect-us.mimecast.com/s/pmRCCXD5OjTX0N9l4iksfyX?domain=spark.apache.org>
 This documentation does not mention how to write to kinesis. This method has 
default dynamodb as checkpoint, can we override it ?
  3.  We have rocksdb as a state store but when we ran an application using 
official  
https://spark.apache.org/docs/latest/streaming-kinesis-integration.html<https://protect-us.mimecast.com/s/pmRCCXD5OjTX0N9l4iksfyX?domain=spark.apache.org>
 rocksdb configurations were not effective. Can you please confirm if rocksdb 
is not applicable in these cases?
  4.  rocksdb however works with qubole connector , do you have any plan to 
release kinesis connector?
  5.  Please help/recommend us for any good stable kinesis connector or some 
pointers around it


Re: spark streaming and kinesis integration

2023-04-06 Thread Mich Talebzadeh
Do you have a high level diagram of the proposed solution?

In so far as I know k8s does not support spark structured streaming?

Mich Talebzadeh,
Lead Solutions Architect/Engineering Lead
Palantir Technologies
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 6 Apr 2023 at 16:40, Rajesh Katkar  wrote:

> Use case is , we want to read/write to kinesis streams using k8s
> Officially I could not find the connector or reader for kinesis from spark
> like it has for kafka.
>
> Checking here if anyone used kinesis and spark streaming combination ?
>
> On Thu, 6 Apr, 2023, 7:23 pm Mich Talebzadeh, 
> wrote:
>
>> Hi Rajesh,
>>
>> What is the use case for Kinesis here? I have not used it personally,
>> Which use case it concerns
>>
>> https://aws.amazon.com/kinesis/
>>
>> Can you use something else instead?
>>
>> HTH
>>
>> Mich Talebzadeh,
>> Lead Solutions Architect/Engineering Lead
>> Palantir Technologies
>> London
>> United Kingdom
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Thu, 6 Apr 2023 at 13:08, Rajesh Katkar 
>> wrote:
>>
>>> Hi Spark Team,
>>>
>>> We need to read/write the kinesis streams using spark streaming.
>>>
>>>  We checked the official documentation -
>>> https://spark.apache.org/docs/latest/streaming-kinesis-integration.html
>>>
>>> It does not mention kinesis connector. Alternative is -
>>> https://github.com/qubole/kinesis-sql which is not active now.  This is
>>> now handed over here - https://github.com/roncemer/spark-sql-kinesis
>>>
>>> Also according to SPARK-18165
>>>  , Spark officially
>>> do not have any kinesis connector
>>>
>>> We have few below questions , It would be great if you can answer
>>>
>>>1. Does Spark provides officially any kinesis connector which have
>>>readstream/writestream and endorse any connector for production use 
>>> cases ?
>>>
>>>2.
>>>https://spark.apache.org/docs/latest/streaming-kinesis-integration.html 
>>> This
>>>documentation does not mention how to write to kinesis. This method has
>>>default dynamodb as checkpoint, can we override it ?
>>>3. We have rocksdb as a state store but when we ran an application
>>>using official
>>>https://spark.apache.org/docs/latest/streaming-kinesis-integration.html 
>>> rocksdb
>>>configurations were not effective. Can you please confirm if rocksdb is 
>>> not
>>>applicable in these cases?
>>>4. rocksdb however works with qubole connector , do you have any
>>>plan to release kinesis connector?
>>>5. Please help/recommend us for any good stable kinesis connector or
>>>some pointers around it
>>>
>>>


Re: spark streaming and kinesis integration

2023-04-06 Thread Mich Talebzadeh
Hi Rajesh,

What is the use case for Kinesis here? I have not used it personally, Which
use case it concerns

https://aws.amazon.com/kinesis/

Can you use something else instead?

HTH

Mich Talebzadeh,
Lead Solutions Architect/Engineering Lead
Palantir Technologies
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 6 Apr 2023 at 13:08, Rajesh Katkar  wrote:

> Hi Spark Team,
>
> We need to read/write the kinesis streams using spark streaming.
>
>  We checked the official documentation -
> https://spark.apache.org/docs/latest/streaming-kinesis-integration.html
>
> It does not mention kinesis connector. Alternative is -
> https://github.com/qubole/kinesis-sql which is not active now.  This is
> now handed over here - https://github.com/roncemer/spark-sql-kinesis
>
> Also according to SPARK-18165
>  , Spark officially do
> not have any kinesis connector
>
> We have few below questions , It would be great if you can answer
>
>1. Does Spark provides officially any kinesis connector which have
>readstream/writestream and endorse any connector for production use cases ?
>
>2.
>https://spark.apache.org/docs/latest/streaming-kinesis-integration.html 
> This
>documentation does not mention how to write to kinesis. This method has
>default dynamodb as checkpoint, can we override it ?
>3. We have rocksdb as a state store but when we ran an application
>using official
>https://spark.apache.org/docs/latest/streaming-kinesis-integration.html 
> rocksdb
>configurations were not effective. Can you please confirm if rocksdb is not
>applicable in these cases?
>4. rocksdb however works with qubole connector , do you have any plan
>to release kinesis connector?
>5. Please help/recommend us for any good stable kinesis connector or
>some pointers around it
>
>


Re: Spark streaming

2022-08-20 Thread Gourav Sengupta
Hi,
spark is just an unnecessary overengineered overkill for that kind of a
job. I know they are trying to make SPARK a one stop solution for
everything but that is a marketing attempt to capture market share, rather
than the true blue engineering creativity that led to the creation of SPARK
- so please be aware.

Are you in AWS? Please try DMS. If you are then that might be the best
solution depending on what you are looking for ofcourse.

If you are not in AWS, please let me know your environment, and I can help
you out.



Regards,
Gourav Sengupta

On Fri, Aug 19, 2022 at 1:13 PM sandra sukumaran <
sandrasukumara...@gmail.com> wrote:

> Dear Sir,
>
>
>
>  Is there any possible method to fetch MySQL database bin log, with
> the help of spark streaming.
> Kafka streaming is not applicable in this case.
>
>
>
> Thanks and regards
> Sandra
>


Re: [EXTERNAL] Re: Spark streaming

2022-08-20 Thread sandra sukumaran
Thanks, I'll try it out.

On Fri, 19 Aug 2022, 6:12 pm Saurabh Gulati, 
wrote:

> You can also try out
> https://debezium.io/documentation/reference/0.10/connectors/mysql.html
> --
> *From:* Ajit Kumar Amit 
> *Sent:* 19 August 2022 14:30
> *To:* sandra sukumaran 
> *Cc:* user@spark.apache.org 
> *Subject:* [EXTERNAL] Re: Spark streaming
>
> *Caution! This email originated outside of FedEx. Please do not open
> attachments or click links from an unknown or suspicious origin*.
> https://github.com/allwefantasy/spark-binlog
> <https://urldefense.com/v3/__https://github.com/allwefantasy/spark-binlog__;!!BL9GA0TyTA!actkW_0ZhxHTImtHaWtPiApgCD3oISkeTDvhezK35qIcodH-PjmcOxZ9BCpVfZxXdhNi9ciMkltEEY7C5_QtFQ$>
>
> Sent from my iPhone
>
> On 19 Aug 2022, at 5:45 PM, sandra sukumaran 
> wrote:
>
> 
> Dear Sir,
>
>
>
>  Is there any possible method to fetch MySQL database bin log, with
> the help of spark streaming.
> Kafka streaming is not applicable in this case.
>
>
>
> Thanks and regards
> Sandra
>
>


Re: [EXTERNAL] Re: Spark streaming

2022-08-19 Thread Saurabh Gulati
You can also try out 
https://debezium.io/documentation/reference/0.10/connectors/mysql.html

From: Ajit Kumar Amit 
Sent: 19 August 2022 14:30
To: sandra sukumaran 
Cc: user@spark.apache.org 
Subject: [EXTERNAL] Re: Spark streaming

Caution! This email originated outside of FedEx. Please do not open attachments 
or click links from an unknown or suspicious origin.

https://github.com/allwefantasy/spark-binlog<https://urldefense.com/v3/__https://github.com/allwefantasy/spark-binlog__;!!BL9GA0TyTA!actkW_0ZhxHTImtHaWtPiApgCD3oISkeTDvhezK35qIcodH-PjmcOxZ9BCpVfZxXdhNi9ciMkltEEY7C5_QtFQ$>

Sent from my iPhone

On 19 Aug 2022, at 5:45 PM, sandra sukumaran  
wrote:


Dear Sir,



 Is there any possible method to fetch MySQL database bin log, with the 
help of spark streaming.
Kafka streaming is not applicable in this case.



Thanks and regards
Sandra


Re: Spark streaming

2022-08-19 Thread Ajit Kumar Amit
https://github.com/allwefantasy/spark-binlog

Sent from my iPhone

> On 19 Aug 2022, at 5:45 PM, sandra sukumaran  
> wrote:
> 
> 
> Dear Sir,
> 
> 
> 
>  Is there any possible method to fetch MySQL database bin log, with the 
> help of spark streaming.
> Kafka streaming is not applicable in this case.
> 
> 
> 
> Thanks and regards
> Sandra


Re: Spark streaming

2022-08-18 Thread ミユナ (alice)
> Dear sir,
>
>
>I want to check the logs of MySQL database using spark streaming, can
> someone help me with those listening queries.
>
>
> Thanks and regards
> Akash P
>

you can ingest logs by fluent-bit to kafka then setup spark to read
records from kafka by streaming.


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [EXTERNAL] Re: Spark streaming - Data Ingestion

2022-08-17 Thread Akash Vellukai
I am beginner with spark may , also know how to connect MySQL database with
spark streaming

Thanks and regards
Akash P

On Wed, 17 Aug, 2022, 8:28 pm Saurabh Gulati, 
wrote:

> Another take:
>
>- Debezium
><https://debezium.io/documentation/reference/stable/connectors/mysql.html>
>to read Write Ahead logs(WAL) and send to Kafka
>- Kafka connect to write to cloud storage -> Hive
>   - OR
>
>
>- Spark streaming to parse WAL -> Storage -> Hive
>
> Regards
> --
> *From:* Gibson 
> *Sent:* 17 August 2022 16:53
> *To:* Akash Vellukai 
> *Cc:* user@spark.apache.org 
> *Subject:* [EXTERNAL] Re: Spark streaming - Data Ingestion
>
> *Caution! This email originated outside of FedEx. Please do not open
> attachments or click links from an unknown or suspicious origin*.
> If you have space for a message log like, then you should try:
>
> MySQL -> Kafka (via CDC) -> Spark (Structured Streaming) -> HDFS/S3/ADLS
> -> Hive
>
> On Wed, Aug 17, 2022 at 5:40 PM Akash Vellukai 
> wrote:
>
> Dear sir
>
> I have tried a lot on this could you help me with this?
>
> Data ingestion from MySql to Hive with spark- streaming?
>
> Could you give me an overview.
>
>
> Thanks and regards
> Akash P
>
>


Re: [EXTERNAL] Re: Spark streaming - Data Ingestion

2022-08-17 Thread Gibson
The idea behind spark-streaming is to process change events as they occur,
hence the suggestions above that require capturing change events using
Debezium.

But you can use jdbc drivers to connect Spark to relational databases


On Wed, Aug 17, 2022 at 6:21 PM Akash Vellukai 
wrote:

> I am beginner with spark may , also know how to connect MySQL database
> with spark streaming
>
> Thanks and regards
> Akash P
>
> On Wed, 17 Aug, 2022, 8:28 pm Saurabh Gulati, 
> wrote:
>
>> Another take:
>>
>>- Debezium
>><https://debezium.io/documentation/reference/stable/connectors/mysql.html>
>>to read Write Ahead logs(WAL) and send to Kafka
>>- Kafka connect to write to cloud storage -> Hive
>>   - OR
>>
>>
>>- Spark streaming to parse WAL -> Storage -> Hive
>>
>> Regards
>> --
>> *From:* Gibson 
>> *Sent:* 17 August 2022 16:53
>> *To:* Akash Vellukai 
>> *Cc:* user@spark.apache.org 
>> *Subject:* [EXTERNAL] Re: Spark streaming - Data Ingestion
>>
>> *Caution! This email originated outside of FedEx. Please do not open
>> attachments or click links from an unknown or suspicious origin*.
>> If you have space for a message log like, then you should try:
>>
>> MySQL -> Kafka (via CDC) -> Spark (Structured Streaming) -> HDFS/S3/ADLS
>> -> Hive
>>
>> On Wed, Aug 17, 2022 at 5:40 PM Akash Vellukai <
>> akashvellukai...@gmail.com> wrote:
>>
>> Dear sir
>>
>> I have tried a lot on this could you help me with this?
>>
>> Data ingestion from MySql to Hive with spark- streaming?
>>
>> Could you give me an overview.
>>
>>
>> Thanks and regards
>> Akash P
>>
>>


Re: [EXTERNAL] Re: Spark streaming - Data Ingestion

2022-08-17 Thread Saurabh Gulati
Another take:

  *   
Debezium<https://debezium.io/documentation/reference/stable/connectors/mysql.html>
 to read Write Ahead logs(WAL) and send to Kafka
  *   Kafka connect to write to cloud storage -> Hive
 *   OR

  *   Spark streaming to parse WAL -> Storage -> Hive

Regards

From: Gibson 
Sent: 17 August 2022 16:53
To: Akash Vellukai 
Cc: user@spark.apache.org 
Subject: [EXTERNAL] Re: Spark streaming - Data Ingestion

Caution! This email originated outside of FedEx. Please do not open attachments 
or click links from an unknown or suspicious origin.

If you have space for a message log like, then you should try:

MySQL -> Kafka (via CDC) -> Spark (Structured Streaming) -> HDFS/S3/ADLS -> Hive

On Wed, Aug 17, 2022 at 5:40 PM Akash Vellukai 
mailto:akashvellukai...@gmail.com>> wrote:
Dear sir

I have tried a lot on this could you help me with this?

Data ingestion from MySql to Hive with spark- streaming?

Could you give me an overview.


Thanks and regards
Akash P


Re: Spark streaming - Data Ingestion

2022-08-17 Thread Gibson
If you have space for a message log like, then you should try:

MySQL -> Kafka (via CDC) -> Spark (Structured Streaming) -> HDFS/S3/ADLS ->
Hive

On Wed, Aug 17, 2022 at 5:40 PM Akash Vellukai 
wrote:

> Dear sir
>
> I have tried a lot on this could you help me with this?
>
> Data ingestion from MySql to Hive with spark- streaming?
>
> Could you give me an overview.
>
>
> Thanks and regards
> Akash P
>


Re: Spark streaming pending mircobatches queue max length

2022-07-13 Thread Anil Dasari
Retry.

From: Anil Dasari 
Date: Tuesday, July 12, 2022 at 3:42 PM
To: user@spark.apache.org 
Subject: Spark streaming pending mircobatches queue max length
Hello,

Spark is adding entry to pending microbatches queue at periodic batch interval. 
Is there config to set the max size for pending microbatches queue ?

Thanks


Re: [Spark Streaming] [Debug] Memory error when using NER model in Python

2022-04-20 Thread Xavier Gervilla
Thank you for the flatten function, it has a bigger functionality than what I 
need for my project but the examples (which were really, really useful) helped 
me find a solution.



Instead of accessing the confidence and entity attributes (metadata.confidence 
and metadata.entity) I was accessing by metadata.value, which instead of 
returning an error gave null values. In addition, the confidence value (a 
number) has StringType so before calculating the average I had to convert it to 
DoubleType.



The output generated now is the same as with Spark but using only around 8.5GB 
of RAM so there's no longer a memory error!



Thank you again for all your help!



 Activado Wed, 20 Apr 2022 14:24:46 +0200 Bjørn Jørgensen 
 escribió 



Glad to hear that it works :) 



Your dataframe is nested with both map, array and struct. 



I`m using this function to flatten a nested dataframe to rows and columns.  



from pyspark.sql.types import *
from pyspark.sql.functions import *


def flatten_test(df, sep="_"):
    """Returns a flattened dataframe.
        .. versionadded:: x.X.X
        
        Parameters
        --
        sep : str
            Delimiter for flatted columns. Default `_`
        
        Notes
        -
        Don`t use `.` as `sep`
        It won't work on nested data frames with more than one level.
        And you will have to use `http://columns.name`. 
        
        Flattening Map Types will have to find every key in the column. 
        This can be slow.
        
        Examples
        

        data_mixed = [
            {
                "state": "Florida",
                "shortname": "FL",
                "info": {"governor": "Rick Scott"},
                "counties": [
                    {"name": "Dade", "population": 12345},
                    {"name": "Broward", "population": 4},
                    {"name": "Palm Beach", "population": 6},
                ],
            },
            {
                "state": "Ohio",
                "shortname": "OH",
                "info": {"governor": "John Kasich"},
                "counties": [
                    {"name": "Summit", "population": 1234},
                    {"name": "Cuyahoga", "population": 1337},
                ],
            },
        ]

        data_mixed = spark.createDataFrame(data=data_mixed)

        data_mixed.printSchema()

        root
        |-- counties: array (nullable = true)
        |    |-- element: map (containsNull = true)
        |    |    |-- key: string
        |    |    |-- value: string (valueContainsNull = true)
        |-- info: map (nullable = true)
        |    |-- key: string
        |    |-- value: string (valueContainsNull = true)
        |-- shortname: string (nullable = true)
        |-- state: string (nullable = true)
        
        
        data_mixed_flat = flatten_test(df, sep=":")
        data_mixed_flat.printSchema()
        root
        |-- shortname: string (nullable = true)
        |-- state: string (nullable = true)
        |-- counties:name: string (nullable = true)
        |-- counties:population: string (nullable = true)
        |-- info:governor: string (nullable = true)
        

        
        
        data = [
            {
                "id": 1,
                "name": "Cole Volk",
                "fitness": {"height": 130, "weight": 60},
            },
            {"name": "Mark Reg", "fitness": {"height": 130, "weight": 60}},
            {
                "id": 2,
                "name": "Faye Raker",
                "fitness": {"height": 130, "weight": 60},
            },
        ]


        df = spark.createDataFrame(data=data)

        df.printSchema()

        root
        |-- fitness: map (nullable = true)
        |    |-- key: string
        |    |-- value: long (valueContainsNull = true)
        |-- id: long (nullable = true)
        |-- name: string (nullable = true)
        
        df_flat = flatten_test(df, sep=":")

        df_flat.printSchema()

        root
        |-- id: long (nullable = true)
        |-- name: string (nullable = true)
        |-- fitness:height: long (nullable = true)
        |-- fitness:weight: long (nullable = true)
        
        data_struct = [
                (("James",None,"Smith"),"OH","M"),
                (("Anna","Rose",""),"NY","F"),
                (("Julia","","Williams"),"OH","F"),
                (("Maria","Anne","Jones"),"NY","M"),
                (("Jen","Mary","Brown"),"NY","M"),
                (("Mike","Mary","Williams"),"OH","M")
                ]

                
        schema = StructType([
            StructField('name', StructType([
                StructField('firstname', StringType(), True),
                StructField('middlename', StringType(), True),
                StructField('lastname', StringType(), True)
                ])),
            StructField('state', StringType(), True),
            StructField('gender', StringType(), True)
            ])

 

Re: [Spark Streaming] [Debug] Memory error when using NER model in Python

2022-04-20 Thread Bjørn Jørgensen
Glad to hear that it works :)

Your dataframe is nested with both map, array and struct.

I`m using this function to flatten a nested dataframe to rows and columns.

from pyspark.sql.types import *
from pyspark.sql.functions import *


def flatten_test(df, sep="_"):
"""Returns a flattened dataframe.
.. versionadded:: x.X.X

Parameters
--
sep : str
Delimiter for flatted columns. Default `_`

Notes
-
Don`t use `.` as `sep`
It won't work on nested data frames with more than one level.
And you will have to use `columns.name`.

Flattening Map Types will have to find every key in the column.
This can be slow.

Examples


data_mixed = [
{
"state": "Florida",
"shortname": "FL",
"info": {"governor": "Rick Scott"},
"counties": [
{"name": "Dade", "population": 12345},
{"name": "Broward", "population": 4},
{"name": "Palm Beach", "population": 6},
],
},
{
"state": "Ohio",
"shortname": "OH",
"info": {"governor": "John Kasich"},
"counties": [
{"name": "Summit", "population": 1234},
{"name": "Cuyahoga", "population": 1337},
],
},
]

data_mixed = spark.createDataFrame(data=data_mixed)

data_mixed.printSchema()

root
|-- counties: array (nullable = true)
||-- element: map (containsNull = true)
|||-- key: string
|||-- value: string (valueContainsNull = true)
|-- info: map (nullable = true)
||-- key: string
||-- value: string (valueContainsNull = true)
|-- shortname: string (nullable = true)
|-- state: string (nullable = true)


data_mixed_flat = flatten_test(df, sep=":")
data_mixed_flat.printSchema()
root
|-- shortname: string (nullable = true)
|-- state: string (nullable = true)
|-- counties:name: string (nullable = true)
|-- counties:population: string (nullable = true)
|-- info:governor: string (nullable = true)




data = [
{
"id": 1,
"name": "Cole Volk",
"fitness": {"height": 130, "weight": 60},
},
{"name": "Mark Reg", "fitness": {"height": 130, "weight": 60}},
{
"id": 2,
"name": "Faye Raker",
"fitness": {"height": 130, "weight": 60},
},
]


df = spark.createDataFrame(data=data)

df.printSchema()

root
|-- fitness: map (nullable = true)
||-- key: string
||-- value: long (valueContainsNull = true)
|-- id: long (nullable = true)
|-- name: string (nullable = true)

df_flat = flatten_test(df, sep=":")

df_flat.printSchema()

root
|-- id: long (nullable = true)
|-- name: string (nullable = true)
|-- fitness:height: long (nullable = true)
|-- fitness:weight: long (nullable = true)

data_struct = [
(("James",None,"Smith"),"OH","M"),
(("Anna","Rose",""),"NY","F"),
(("Julia","","Williams"),"OH","F"),
(("Maria","Anne","Jones"),"NY","M"),
(("Jen","Mary","Brown"),"NY","M"),
(("Mike","Mary","Williams"),"OH","M")
]


schema = StructType([
StructField('name', StructType([
StructField('firstname', StringType(), True),
StructField('middlename', StringType(), True),
StructField('lastname', StringType(), True)
])),
StructField('state', StringType(), True),
StructField('gender', StringType(), True)
])

df_struct = spark.createDataFrame(data = data_struct, schema =
schema)

df_struct.printSchema()

root
|-- name: struct (nullable = true)
||-- firstname: string (nullable = true)
||-- middlename: string (nullable = true)
||-- lastname: string (nullable = true)
|-- state: string (nullable = true)
|-- gender: string (nullable = true)

df_struct_flat = flatten_test(df_struct, sep=":")

df_struct_flat.printSchema()

root
|-- state: string (nullable = true)
|-- gender: string (nullable = true)
|-- name:firstname: string (nullable = true)
|-- name:middlename: string (nullable = true)
|-- name:lastname: string (nullable = true)
"""
# compute Complex Fields (Arrays, Structs and Map Types) in Schema

Re: [Spark Streaming] [Debug] Memory error when using NER model in Python

2022-04-19 Thread Bjørn Jørgensen
https://github.com/JohnSnowLabs/spark-nlp#packages-cheatsheet

*change spark = sparknlp.start()*
to
spark = sparknlp.start(spark32=True)


tir. 19. apr. 2022 kl. 21:10 skrev Bjørn Jørgensen :

> Yes, there are some that have that issue.
>
> Please open a new issue at
> https://github.com/JohnSnowLabs/spark-nlp/issues and they will help you.
>
>
>
>
> tir. 19. apr. 2022 kl. 20:33 skrev Xavier Gervilla <
> xavier.gervi...@datapta.com>:
>
>> Thank you for your advice, I had small knowledge of Spark NLP and I
>> thought it was only possible to use with models that required training and
>> therefore my project wasn’t the case. I'm trying now to build the project
>> again with SparkNLP but when I try to load a pretrained model from
>> JohnSnowLabs I get an error (*py4j.protocol.Py4JJavaError: An error
>> occurred while calling
>> z:com.johnsnowlabs.nlp.pretrained.PythonResourceDownloader.getDownloadSize.*
>> ).
>>
>> This is the new basic code to develop the project again:
>>
>>
>> *spark = sparknlp.start()*
>>
>> *pipelineName = 'analyze_sentiment'*
>>
>>
>> *pipeline = PretrainedPipeline(pipelineName, 'en') #this is the line that
>> generates the error*
>>
>> *rawTweets = spark.readStream.format('socket').option('host',
>> 'localhost').option('port',9008).load()*
>>
>> *allTweets = rawTweets.selectExpr('CAST(value AS
>> STRING)').withColumnRenamed('value', 'text').dropDuplicates('text')*
>>
>>
>> *sentPred = pipeline.transform(allTweets)*
>>
>> *query =
>> sentPred.writeStream.outputMode('complete').format('console').start()*
>> *query.awaitTermination()*
>>
>> Spark version is 3.2.1 and SparkNLP version is 3.4.3, while Java version
>> is 8. I've tried with a different model but the error is still the same, so
>> what could be causing it?
>>
>> If this error is solved I think SparkNLP will be the solution I was
>> looking for to reduce memory consumption so thank you again for suggesting
>> it.
>>
>>
>>
>> El 18 abr 2022, a las 21:07, Bjørn Jørgensen 
>> escribió:
>>
>> When did SpaCy have support for Spark?
>>
>> Try Spark NLP  it`s made for spark. They
>> have a lot of notebooks at https://github.com/JohnSnowLabs/spark-nlp and
>> they public user guides at
>> https://towardsdatascience.com/introduction-to-spark-nlp-foundations-and-basic-components-part-i-c83b7629ed59
>>
>>
>>
>>
>> man. 18. apr. 2022 kl. 16:17 skrev Sean Owen :
>>
>> It looks good, are you sure it even starts? the problem I see is that you
>> send a copy of the model from the driver for every task. Try broadcasting
>> the model instead. I'm not sure if that resolves it but would be a good
>> practice.
>>
>> On Mon, Apr 18, 2022 at 9:10 AM Xavier Gervilla <
>> xavier.gervi...@datapta.com> wrote:
>>
>>
>> Hi Team,
>> 
>>
>> I'm developing a project that retrieves tweets on a 'host' app, streams
>> them to Spark and with different operations with DataFrames obtains the
>> Sentiment of the tweets and their entities applying a Sentiment model and a
>> NER model respectively.
>>
>> The problem I've come across is that when applying the NER model, the RAM
>> consumption increases until the program stops with a memory error because
>> there's no memory left to execute. In addition, on SparkUI I've seen that
>> there's only one executor running, the executor driver, but using htop on
>> the terminal I see that the 8 cores of the instance are executing at 100%.
>>
>> The SparkSession is only configured to receive the tweets from the socket
>> that connects with the second program that sends the tweets. The DataFrame
>> goes through some processing to obtain other properties of the tweet like
>> its sentiment (which causes no error even with less than 8GB of RAM) and
>> then the NER is applied.
>>
>> *spark = SparkSession.builder.appName(**"TwitterStreamApp"**).getOrCreate()
>> rawTweets = spark.readStream.**format**(**"socket"**).option(**"host"**, 
>> **"localhost"**).option(**"port"**,**9008**).load()
>> tweets = rawTweets.selectExpr(**"CAST(value AS STRING)"**)
>>
>> **#prior processing of the tweets**
>> sentDF = other_processing(tweets)
>>
>> **#obtaining the column that contains the list of entities from a tweet**
>> nerDF = ner_classification(sentDF)*
>>
>>
>> This is the code of the functions related to obtaining the NER, the "main
>> call" and the UDF function.
>>
>> *nerModel = spacy.load(**"en_core_web_sm"**)
>>
>> **#main call, applies the UDF function to every tweet from the "tweet" 
>> column**def* *ner_classification**(**words**):
>> ner_list = udf(obtain_ner_udf, ArrayType(StringType()))
>> words = words.withColumn(**"nerlist"**, ner_list(**"tweet"**))
>> **return** words
>>
>> **#udf function**def* *obtain_ner_udf**(**words**):
>> **#if the tweet is empty return None*
>> *if** words == **""**:
>> **return* *None*
>> *#else: applying the NER model 

Re: [Spark Streaming] [Debug] Memory error when using NER model in Python

2022-04-19 Thread Bjørn Jørgensen
Yes, there are some that have that issue.

Please open a new issue at https://github.com/JohnSnowLabs/spark-nlp/issues
and they will help you.




tir. 19. apr. 2022 kl. 20:33 skrev Xavier Gervilla <
xavier.gervi...@datapta.com>:

> Thank you for your advice, I had small knowledge of Spark NLP and I
> thought it was only possible to use with models that required training and
> therefore my project wasn’t the case. I'm trying now to build the project
> again with SparkNLP but when I try to load a pretrained model from
> JohnSnowLabs I get an error (*py4j.protocol.Py4JJavaError: An error
> occurred while calling
> z:com.johnsnowlabs.nlp.pretrained.PythonResourceDownloader.getDownloadSize.*
> ).
>
> This is the new basic code to develop the project again:
>
>
> *spark = sparknlp.start()*
>
> *pipelineName = 'analyze_sentiment'*
>
>
> *pipeline = PretrainedPipeline(pipelineName, 'en') #this is the line that
> generates the error*
>
> *rawTweets = spark.readStream.format('socket').option('host',
> 'localhost').option('port',9008).load()*
>
> *allTweets = rawTweets.selectExpr('CAST(value AS
> STRING)').withColumnRenamed('value', 'text').dropDuplicates('text')*
>
>
> *sentPred = pipeline.transform(allTweets)*
>
> *query =
> sentPred.writeStream.outputMode('complete').format('console').start()*
> *query.awaitTermination()*
>
> Spark version is 3.2.1 and SparkNLP version is 3.4.3, while Java version
> is 8. I've tried with a different model but the error is still the same, so
> what could be causing it?
>
> If this error is solved I think SparkNLP will be the solution I was
> looking for to reduce memory consumption so thank you again for suggesting
> it.
>
>
>
> El 18 abr 2022, a las 21:07, Bjørn Jørgensen 
> escribió:
>
> When did SpaCy have support for Spark?
>
> Try Spark NLP  it`s made for spark. They
> have a lot of notebooks at https://github.com/JohnSnowLabs/spark-nlp and
> they public user guides at
> https://towardsdatascience.com/introduction-to-spark-nlp-foundations-and-basic-components-part-i-c83b7629ed59
>
>
>
>
> man. 18. apr. 2022 kl. 16:17 skrev Sean Owen :
>
> It looks good, are you sure it even starts? the problem I see is that you
> send a copy of the model from the driver for every task. Try broadcasting
> the model instead. I'm not sure if that resolves it but would be a good
> practice.
>
> On Mon, Apr 18, 2022 at 9:10 AM Xavier Gervilla <
> xavier.gervi...@datapta.com> wrote:
>
>
> Hi Team,
> 
>
> I'm developing a project that retrieves tweets on a 'host' app, streams
> them to Spark and with different operations with DataFrames obtains the
> Sentiment of the tweets and their entities applying a Sentiment model and a
> NER model respectively.
>
> The problem I've come across is that when applying the NER model, the RAM
> consumption increases until the program stops with a memory error because
> there's no memory left to execute. In addition, on SparkUI I've seen that
> there's only one executor running, the executor driver, but using htop on
> the terminal I see that the 8 cores of the instance are executing at 100%.
>
> The SparkSession is only configured to receive the tweets from the socket
> that connects with the second program that sends the tweets. The DataFrame
> goes through some processing to obtain other properties of the tweet like
> its sentiment (which causes no error even with less than 8GB of RAM) and
> then the NER is applied.
>
> *spark = SparkSession.builder.appName(**"TwitterStreamApp"**).getOrCreate()
> rawTweets = spark.readStream.**format**(**"socket"**).option(**"host"**, 
> **"localhost"**).option(**"port"**,**9008**).load()
> tweets = rawTweets.selectExpr(**"CAST(value AS STRING)"**)
>
> **#prior processing of the tweets**
> sentDF = other_processing(tweets)
>
> **#obtaining the column that contains the list of entities from a tweet**
> nerDF = ner_classification(sentDF)*
>
>
> This is the code of the functions related to obtaining the NER, the "main
> call" and the UDF function.
>
> *nerModel = spacy.load(**"en_core_web_sm"**)
>
> **#main call, applies the UDF function to every tweet from the "tweet" 
> column**def* *ner_classification**(**words**):
> ner_list = udf(obtain_ner_udf, ArrayType(StringType()))
> words = words.withColumn(**"nerlist"**, ner_list(**"tweet"**))
> **return** words
>
> **#udf function**def* *obtain_ner_udf**(**words**):
> **#if the tweet is empty return None*
> *if** words == **""**:
> **return* *None*
> *#else: applying the NER model (Spacy en_core_web_sm)**
> entities = nerModel(words)
>
> **#returns a list of the form ['entity1_label1', 'entity2_label2',...]*
> *return** [ word.text + **'_'** + word.label_ **for** word **in** 
> entities.ents ]*
>
>
>
> And lastly I map each entity with the sentiment from its tweet and obtain
> the 

Re: [Spark Streaming] [Debug] Memory error when using NER model in Python

2022-04-19 Thread Jungtaek Lim
I have no context on ML, but your "streaming" query exposes the possibility
of memory issues.

*flattenedNER.registerTempTable(**"df"**)
>>>
>>>
>>> querySelect = **"SELECT col as entity, avg(sentiment) as sentiment, 
>>> count(col) as count FROM df GROUP BY col"**
>>> finalDF = spark.sql(querySelect)
>>>
>>> query = 
>>> finalDF.writeStream.foreachBatch(processBatch).outputMode(**"complete"**).start()*
>>>
>>>
Since this is a streaming query, grouped aggregation incurs state store,
and since you use the output mode as complete, state store will grow over
time which will dominate the memory in executors.
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#window-operations-on-event-time


On Tue, Apr 19, 2022 at 4:07 AM Bjørn Jørgensen 
wrote:

> When did SpaCy have support for Spark?
>
> Try Spark NLP  it`s made for spark. They
> have a lot of notebooks at https://github.com/JohnSnowLabs/spark-nlp and
> they public user guides at
> https://towardsdatascience.com/introduction-to-spark-nlp-foundations-and-basic-components-part-i-c83b7629ed59
>
>
>
>
> man. 18. apr. 2022 kl. 16:17 skrev Sean Owen :
>
>> It looks good, are you sure it even starts? the problem I see is that you
>> send a copy of the model from the driver for every task. Try broadcasting
>> the model instead. I'm not sure if that resolves it but would be a good
>> practice.
>>
>> On Mon, Apr 18, 2022 at 9:10 AM Xavier Gervilla <
>> xavier.gervi...@datapta.com> wrote:
>>
>>> Hi Team,
>>> 
>>>
>>> I'm developing a project that retrieves tweets on a 'host' app, streams
>>> them to Spark and with different operations with DataFrames obtains the
>>> Sentiment of the tweets and their entities applying a Sentiment model and a
>>> NER model respectively.
>>>
>>> The problem I've come across is that when applying the NER model, the
>>> RAM consumption increases until the program stops with a memory error
>>> because there's no memory left to execute. In addition, on SparkUI I've
>>> seen that there's only one executor running, the executor driver, but using
>>> htop on the terminal I see that the 8 cores of the instance are executing
>>> at 100%.
>>>
>>> The SparkSession is only configured to receive the tweets from the
>>> socket that connects with the second program that sends the tweets. The
>>> DataFrame goes through some processing to obtain other properties of the
>>> tweet like its sentiment (which causes no error even with less than 8GB of
>>> RAM) and then the NER is applied.
>>>
>>> *spark = SparkSession.builder.appName(**"TwitterStreamApp"**).getOrCreate()
>>> rawTweets = spark.readStream.**format**(**"socket"**).option(**"host"**, 
>>> **"localhost"**).option(**"port"**,**9008**).load()
>>> tweets = rawTweets.selectExpr(**"CAST(value AS STRING)"**)
>>>
>>> **#prior processing of the tweets**
>>> sentDF = other_processing(tweets)
>>>
>>> **#obtaining the column that contains the list of entities from a tweet**
>>> nerDF = ner_classification(sentDF)*
>>>
>>>
>>> This is the code of the functions related to obtaining the NER, the
>>> "main call" and the UDF function.
>>>
>>> *nerModel = spacy.load(**"en_core_web_sm"**)
>>>
>>> **#main call, applies the UDF function to every tweet from the "tweet" 
>>> column**def* *ner_classification**(**words**):
>>> ner_list = udf(obtain_ner_udf, ArrayType(StringType()))
>>> words = words.withColumn(**"nerlist"**, ner_list(**"tweet"**))
>>> **return** words
>>>
>>> **#udf function**def* *obtain_ner_udf**(**words**):
>>> **#if the tweet is empty return None*
>>> *if** words == **""**:
>>> **return* *None*
>>> *#else: applying the NER model (Spacy en_core_web_sm)**
>>> entities = nerModel(words)
>>>
>>> **#returns a list of the form ['entity1_label1', 'entity2_label2',...]*
>>> *return** [ word.text + **'_'** + word.label_ **for** word **in** 
>>> entities.ents ]*
>>>
>>>
>>>
>>> And lastly I map each entity with the sentiment from its tweet and
>>> obtain the average sentiment of the entity and the number of appearances.
>>>
>>> *flattenedNER = nerDF.select(nerDF.sentiment, explode(nerDF.nerlist))
>>> flattenedNER.registerTempTable(**"df"**)
>>>
>>>
>>> querySelect = **"SELECT col as entity, avg(sentiment) as sentiment, 
>>> count(col) as count FROM df GROUP BY col"**
>>> finalDF = spark.sql(querySelect)
>>>
>>> query = 
>>> finalDF.writeStream.foreachBatch(processBatch).outputMode(**"complete"**).start()*
>>>
>>>
>>> The resulting DF is processed with a function that separates each column
>>> in a list and prints it.
>>>
>>> *def* *processBatch**(**df**,* *epoch_id**):**entities* *=* 
>>> *[**str**(**t**.**entity**)* *for* *t* *in* 
>>> *df**.**select**(**"entity"**).**collect**()]*
>>> *sentiments* *=* *[**float**(**t**.**sentiment**)* *for* *t* *in* 
>>> 

Re: [Spark Streaming] [Debug] Memory error when using NER model in Python

2022-04-18 Thread Bjørn Jørgensen
When did SpaCy have support for Spark?

Try Spark NLP  it`s made for spark. They have
a lot of notebooks at https://github.com/JohnSnowLabs/spark-nlp and they
public user guides at
https://towardsdatascience.com/introduction-to-spark-nlp-foundations-and-basic-components-part-i-c83b7629ed59




man. 18. apr. 2022 kl. 16:17 skrev Sean Owen :

> It looks good, are you sure it even starts? the problem I see is that you
> send a copy of the model from the driver for every task. Try broadcasting
> the model instead. I'm not sure if that resolves it but would be a good
> practice.
>
> On Mon, Apr 18, 2022 at 9:10 AM Xavier Gervilla <
> xavier.gervi...@datapta.com> wrote:
>
>> Hi Team,
>> 
>>
>> I'm developing a project that retrieves tweets on a 'host' app, streams
>> them to Spark and with different operations with DataFrames obtains the
>> Sentiment of the tweets and their entities applying a Sentiment model and a
>> NER model respectively.
>>
>> The problem I've come across is that when applying the NER model, the RAM
>> consumption increases until the program stops with a memory error because
>> there's no memory left to execute. In addition, on SparkUI I've seen that
>> there's only one executor running, the executor driver, but using htop on
>> the terminal I see that the 8 cores of the instance are executing at 100%.
>>
>> The SparkSession is only configured to receive the tweets from the socket
>> that connects with the second program that sends the tweets. The DataFrame
>> goes through some processing to obtain other properties of the tweet like
>> its sentiment (which causes no error even with less than 8GB of RAM) and
>> then the NER is applied.
>>
>> *spark = SparkSession.builder.appName(**"TwitterStreamApp"**).getOrCreate()
>> rawTweets = spark.readStream.**format**(**"socket"**).option(**"host"**, 
>> **"localhost"**).option(**"port"**,**9008**).load()
>> tweets = rawTweets.selectExpr(**"CAST(value AS STRING)"**)
>>
>> **#prior processing of the tweets**
>> sentDF = other_processing(tweets)
>>
>> **#obtaining the column that contains the list of entities from a tweet**
>> nerDF = ner_classification(sentDF)*
>>
>>
>> This is the code of the functions related to obtaining the NER, the "main
>> call" and the UDF function.
>>
>> *nerModel = spacy.load(**"en_core_web_sm"**)
>>
>> **#main call, applies the UDF function to every tweet from the "tweet" 
>> column**def* *ner_classification**(**words**):
>> ner_list = udf(obtain_ner_udf, ArrayType(StringType()))
>> words = words.withColumn(**"nerlist"**, ner_list(**"tweet"**))
>> **return** words
>>
>> **#udf function**def* *obtain_ner_udf**(**words**):
>> **#if the tweet is empty return None*
>> *if** words == **""**:
>> **return* *None*
>> *#else: applying the NER model (Spacy en_core_web_sm)**
>> entities = nerModel(words)
>>
>> **#returns a list of the form ['entity1_label1', 'entity2_label2',...]*
>> *return** [ word.text + **'_'** + word.label_ **for** word **in** 
>> entities.ents ]*
>>
>>
>>
>> And lastly I map each entity with the sentiment from its tweet and obtain
>> the average sentiment of the entity and the number of appearances.
>>
>> *flattenedNER = nerDF.select(nerDF.sentiment, explode(nerDF.nerlist))
>> flattenedNER.registerTempTable(**"df"**)
>>
>>
>> querySelect = **"SELECT col as entity, avg(sentiment) as sentiment, 
>> count(col) as count FROM df GROUP BY col"**
>> finalDF = spark.sql(querySelect)
>>
>> query = 
>> finalDF.writeStream.foreachBatch(processBatch).outputMode(**"complete"**).start()*
>>
>>
>> The resulting DF is processed with a function that separates each column
>> in a list and prints it.
>>
>> *def* *processBatch**(**df**,* *epoch_id**):**entities* *=* 
>> *[**str**(**t**.**entity**)* *for* *t* *in* 
>> *df**.**select**(**"entity"**).**collect**()]*
>> *sentiments* *=* *[**float**(**t**.**sentiment**)* *for* *t* *in* 
>> *df**.**select**(**"sentiment"**).**collect**()]*
>> *counts* *=* *[**int**(**row**.**asDict**()[**'count'**])* *for* *row* 
>> *in* *df**.**select**(**"count"**).**collect**()]*
>>
>> *print(**entities**,* *sentiments**,* *counts**)*
>>
>>
>> At first I tried with other NER models from Flair they have the same
>> effect, after printing the first batch memory use starts increasing until
>> it fails and stops the execution because of the memory error. When applying
>> a "simple" function instead of the NER model, such as *return
>> words.split()* on the UDF there's no such error so the data ingested
>> should not be what's causing the overload but the model.
>>
>> Is there a way to prevent the excessive RAM consumption? Why is there
>> only the driver executor and no other executors are generated? How could I
>> prevent it from collapsing when applying the NER model?
>>
>> Thanks in advance!

Re: [Spark Streaming] [Debug] Memory error when using NER model in Python

2022-04-18 Thread Sean Owen
It looks good, are you sure it even starts? the problem I see is that you
send a copy of the model from the driver for every task. Try broadcasting
the model instead. I'm not sure if that resolves it but would be a good
practice.

On Mon, Apr 18, 2022 at 9:10 AM Xavier Gervilla 
wrote:

> Hi Team,
> 
>
> I'm developing a project that retrieves tweets on a 'host' app, streams
> them to Spark and with different operations with DataFrames obtains the
> Sentiment of the tweets and their entities applying a Sentiment model and a
> NER model respectively.
>
> The problem I've come across is that when applying the NER model, the RAM
> consumption increases until the program stops with a memory error because
> there's no memory left to execute. In addition, on SparkUI I've seen that
> there's only one executor running, the executor driver, but using htop on
> the terminal I see that the 8 cores of the instance are executing at 100%.
>
> The SparkSession is only configured to receive the tweets from the socket
> that connects with the second program that sends the tweets. The DataFrame
> goes through some processing to obtain other properties of the tweet like
> its sentiment (which causes no error even with less than 8GB of RAM) and
> then the NER is applied.
>
> *spark = SparkSession.builder.appName(**"TwitterStreamApp"**).getOrCreate()
> rawTweets = spark.readStream.**format**(**"socket"**).option(**"host"**, 
> **"localhost"**).option(**"port"**,**9008**).load()
> tweets = rawTweets.selectExpr(**"CAST(value AS STRING)"**)
>
> **#prior processing of the tweets**
> sentDF = other_processing(tweets)
>
> **#obtaining the column that contains the list of entities from a tweet**
> nerDF = ner_classification(sentDF)*
>
>
> This is the code of the functions related to obtaining the NER, the "main
> call" and the UDF function.
>
> *nerModel = spacy.load(**"en_core_web_sm"**)
>
> **#main call, applies the UDF function to every tweet from the "tweet" 
> column**def* *ner_classification**(**words**):
> ner_list = udf(obtain_ner_udf, ArrayType(StringType()))
> words = words.withColumn(**"nerlist"**, ner_list(**"tweet"**))
> **return** words
>
> **#udf function**def* *obtain_ner_udf**(**words**):
> **#if the tweet is empty return None*
> *if** words == **""**:
> **return* *None*
> *#else: applying the NER model (Spacy en_core_web_sm)**
> entities = nerModel(words)
>
> **#returns a list of the form ['entity1_label1', 'entity2_label2',...]*
> *return** [ word.text + **'_'** + word.label_ **for** word **in** 
> entities.ents ]*
>
>
>
> And lastly I map each entity with the sentiment from its tweet and obtain
> the average sentiment of the entity and the number of appearances.
>
> *flattenedNER = nerDF.select(nerDF.sentiment, explode(nerDF.nerlist))
> flattenedNER.registerTempTable(**"df"**)
>
>
> querySelect = **"SELECT col as entity, avg(sentiment) as sentiment, 
> count(col) as count FROM df GROUP BY col"**
> finalDF = spark.sql(querySelect)
>
> query = 
> finalDF.writeStream.foreachBatch(processBatch).outputMode(**"complete"**).start()*
>
>
> The resulting DF is processed with a function that separates each column
> in a list and prints it.
>
> *def* *processBatch**(**df**,* *epoch_id**):**entities* *=* 
> *[**str**(**t**.**entity**)* *for* *t* *in* 
> *df**.**select**(**"entity"**).**collect**()]*
> *sentiments* *=* *[**float**(**t**.**sentiment**)* *for* *t* *in* 
> *df**.**select**(**"sentiment"**).**collect**()]*
> *counts* *=* *[**int**(**row**.**asDict**()[**'count'**])* *for* *row* 
> *in* *df**.**select**(**"count"**).**collect**()]*
>
> *print(**entities**,* *sentiments**,* *counts**)*
>
>
> At first I tried with other NER models from Flair they have the same
> effect, after printing the first batch memory use starts increasing until
> it fails and stops the execution because of the memory error. When applying
> a "simple" function instead of the NER model, such as *return
> words.split()* on the UDF there's no such error so the data ingested
> should not be what's causing the overload but the model.
>
> Is there a way to prevent the excessive RAM consumption? Why is there only
> the driver executor and no other executors are generated? How could I
> prevent it from collapsing when applying the NER model?
>
> Thanks in advance!
>
>


Re: Spark Streaming | Dynamic Action Support

2022-03-03 Thread Mich Talebzadeh
In short, I don't think there is such a possibility. However, there is the
option of shutting down spark gracefully with checkpoint directory enabled.
In such a way you can  re-submit the modified code which will pick up
BatchID from where it was left off, assuming the topic is the same. See the
thread
"How to gracefully shutdown Spark Structured Streaming" in
https://lists.apache.org/list.html?user@spark.apache.org

HTH



   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 3 Mar 2022 at 15:49, Mich Talebzadeh 
wrote:

> What is the definition of action here?
>
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 3 Mar 2022 at 10:56, Pappu Yadav  wrote:
>
>> Hi,
>>
>> Is there any way I can add/delete actions/jobs dynamically in a running
>> spark streaming job.
>> I will call an API and execute only the configured actions in the system.
>>
>> Eg . In the first batch suppose there are 5 actions in the spark
>> application.
>> Now suppose some configuration is changed and one action is added and one
>> is deleted.
>> How can i handle this in the spark streaming job without restarting the
>> application
>>
>


Re: Spark Streaming with Files

2021-04-30 Thread muru
Yes, trigger (once=True) set to all streaming sources and it will treat as
a batch mode. Then you can use any scheduler (e.g airflow) to run it
whatever time window. With checkpointing, in the next run it will start
processing files from the last checkpoint.

On Fri, Apr 23, 2021 at 8:13 AM Mich Talebzadeh 
wrote:

> Interesting.
>
> If we go back to classic Lambda architecture on premise, you could Flume
> API to Kafka to add files to HDFS in time series bases.
>
> Most higher CDC vendors do exactly that. Oracle GoldenGate (OGG) classic
> gets data from Oracle redo logs and sends them to subscribers. One can
> deploy OGC for Big Data to enable these files to be read and processed for
> Kafka, Hive, HDFS etc.
>
> So let us assume that we create these files and stream them on object
> storage in Cloud. Then we can use Spark Structure Streaming (SSS) to act as
> ETL tool. Assuming that streaming interval to be 10 minutes, we can still
> read them but ensure that we only trigger SSS reads every 4 hours.
>
>  writeStream. \
>  outputMode('append'). \
>  option("truncate", "false"). \
>  foreachBatch(sendToSink). \
>  trigger(processingTime='14400 seconds'). \
>  queryName('readFiles'). \
>  start()
>
> This will ensure that spark only processes them every 4 hours.
>
>
> HTH
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 23 Apr 2021 at 15:40, ayan guha  wrote:
>
>> Hi
>>
>> In one of the spark summit demo, it is been alluded that we should think
>> batch jobs in streaming pattern, using "run once" in a schedule.
>> I find this idea very interesting and I understand how this can be
>> achieved for sources like kafka, kinesis or similar. in fact we have
>> implemented this model for cosmos changefeed.
>>
>> My question is: can this model extend to file based sources? I understand
>> it can be for append only file  streams. The use case I have is: A CDC tool
>> like aws dms or shareplex or similar writing changes to a stream of files,
>> in date based folders. So it just goes on like T1, T2 etc folders. Also,
>> lets assume files are written every 10 mins, but I want to process them
>> every 4 hours.
>> Can I use streaming method so that it can manage checkpoints on its own?
>>
>> Best - Ayan
>> --
>> Best Regards,
>> Ayan Guha
>>
>


Re: Spark Streaming non functional requirements

2021-04-27 Thread Mich Talebzadeh
Forgot to add under non-functional requirements under heading



   - *Supportability and Maintainability*

Someone queried the other day on how to shutdown a streaming job
gracefully, meaning wait until such time as the "current queue" including
backlog is drained and all processing is completed.

I have come back with a suggested solution to implement this feature and
raised it as a topic in spark-developers list

http://apache-spark-developers-list.1001551.n3.nabble.com/How-to-gracefully-shutdown-Spark-Structured-Streaming-tp31171.html

Regardless, this feature needs to be a consideration as well.

HTH


   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Tue, 27 Apr 2021 at 15:16, ashok34...@yahoo.com.INVALID
 wrote:

> Hello Mich
>
> Thank you for your great explanation.
>
> Best
>
> A.
>
> On Tuesday, 27 April 2021, 11:25:19 BST, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>
>
> Hi,
>
> Any design (in whatever framework) needs to consider both Functional and
> non-functional requirements. Functional requirements are those which are
> related to the technical functionality of the system that we cover daily in
> this forum. The non-functional requirement is a requirement that
> specifies criteria that can be used to judge the operation of a system
> conditions, rather than specific behaviours.  From my experience the
> non-functional requirements are equally important and in some cases they
> are underestimated when systems go to production. Probably, most
> importantly they need to cover the following:
>
>
>- *Processing time meeting a service-level agreement (SLA). *
>
>   You can get some of this from Spark GUI. Are you comfortably satisfying
> the requirements? How about total delay, Back pressure etc. Are you within
> your SLA. In streaming applications, there is no such thing as an answer
> which is supposed to be late and correct. The timeliness is part of the
> application. If we get the right answer too slowly it becomes useless or
> wrong. We also need to be aware of latency trades off with throughput.
>
>- *Capacity, current and forecast. *
>
>   What is the current capacity? Have you accounted for extra demands,
> sudden surge and loads such as year-end. Can your pipeline handle 1.6-2
> times the current load
>
>- *Scalability*
>
>   How does your application scale if you have to handle multiple topics or
> new topics added at later stages? Scalability also
> includes additional nodes, on-prem or having the ability to add more
> resources such as Google Dataproc compute engines etc
>
>- *Supportability and Maintainability*
>
>   Have you updated docs and procedures in Confluence or equivalent or
> they are typically a year old :).  Is there any single point of failure due
> to skill set? Can ops support the design and maintain BAU themselves. How
> about training and hand-over
>
>- *Disaster recovery and Fault tolerance*
>
>   What provisions are made for disaster recovery. Is there any single
> point of failure in the design (end to end pipeline). Are you using
> standalone dockers or Google Kubernetes Engine (GKE or equivalent)
>
> HTH
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Mon, 26 Apr 2021 at 17:16, ashok34...@yahoo.com.INVALID
>  wrote:
>
> Hello,
>
> When we design a typical spark streaming process, the focus is to get
> functional requirements.
>
> However, I have been asked to provide non-functional requirements as well.
> Likely things I can consider are Fault tolerance and Reliability (component
> failures).  Are there a standard list of non-functional requirements for
> Spark streaming that one needs to consider and verify all?
>
> Thanking you
>
>


Re: Spark Streaming non functional requirements

2021-04-27 Thread ashok34...@yahoo.com.INVALID
 Hello Mich
Thank you for your great explanation.
Best
A.
On Tuesday, 27 April 2021, 11:25:19 BST, Mich Talebzadeh 
 wrote:  
 
 
Hi,
Any design (in whatever framework) needs to consider both Functional and 
non-functional requirements. Functional requirements are those which are 
related to the technical functionality of the system that we cover daily in 
this forum. The non-functional requirement is a requirement that specifies 
criteria that can be used to judge the operation of a system conditions, rather 
than specific behaviours.  From my experience the non-functional requirements 
are equally important and in some cases they are underestimated when systems go 
to production. Probably, most importantly they need to cover the following:
   
   - Processing time meeting a service-level agreement (SLA). 
  You can get some of this from Spark GUI. Are you comfortably satisfying the 
requirements? How about total delay, Back pressure etc. Are you within your 
SLA. In streaming applications, there is no such thing as an answer which is 
supposed to be late and correct. The timeliness is part of the application. If 
we get the right answer too slowly it becomes useless or wrong. We also need to 
be aware of latency trades off with throughput.
   
   - Capacity, current and forecast. 
  What is the current capacity? Have you accounted for extra demands, sudden 
surge and loads such as year-end. Can your pipeline handle 1.6-2 times the 
current load 
   
   - Scalability
  How does your application scale if you have to handle multiple topics or new 
topics added at later stages? Scalability also includes additional nodes, 
on-prem or having the ability to add more resources such as Google Dataproc 
compute engines etc   
   - Supportability and Maintainability
  Have you updated docs and procedures in Confluence or equivalent or they are 
typically a year old :).  Is there any single point of failure due to skill 
set? Can ops support the design and maintain BAU themselves. How about training 
and hand-over
   
   - Disaster recovery and Fault tolerance
  What provisions are made for disaster recovery. Is there any single point of 
failure in the design (end to end pipeline). Are you using standalone dockers 
or Google Kubernetes Engine (GKE or equivalent)
HTH

   view my Linkedin profile

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destructionof data or any other property which may arise from relying 
on this email's technical content is explicitly disclaimed.The author will in 
no case be liable for any monetary damages arising from suchloss, damage or 
destruction. 

 


On Mon, 26 Apr 2021 at 17:16, ashok34...@yahoo.com.INVALID 
 wrote:

Hello,
When we design a typical spark streaming process, the focus is to get 
functional requirements.
However, I have been asked to provide non-functional requirements as well. 
Likely things I can consider are Fault tolerance and Reliability (component 
failures).  Are there a standard list of non-functional requirements for Spark 
streaming that one needs to consider and verify all?
Thanking you
  

Re: Spark Streaming non functional requirements

2021-04-27 Thread Mich Talebzadeh
Hi,

Any design (in whatever framework) needs to consider both Functional and
non-functional requirements. Functional requirements are those which are
related to the technical functionality of the system that we cover daily in
this forum. The non-functional requirement is a requirement that specifies
criteria that can be used to judge the operation of a system conditions,
rather than specific behaviours.  From my experience the non-functional
requirements are equally important and in some cases they are
underestimated when systems go to production. Probably, most importantly
they need to cover the following:


   - *Processing time meeting a service-level agreement (SLA). *

  You can get some of this from Spark GUI. Are you comfortably satisfying
the requirements? How about total delay, Back pressure etc. Are you within
your SLA. In streaming applications, there is no such thing as an answer
which is supposed to be late and correct. The timeliness is part of the
application. If we get the right answer too slowly it becomes useless or
wrong. We also need to be aware of latency trades off with throughput.

   - *Capacity, current and forecast. *

  What is the current capacity? Have you accounted for extra demands,
sudden surge and loads such as year-end. Can your pipeline handle 1.6-2
times the current load

   - *Scalability*

  How does your application scale if you have to handle multiple topics or
new topics added at later stages? Scalability also
includes additional nodes, on-prem or having the ability to add more
resources such as Google Dataproc compute engines etc

   - *Supportability and Maintainability*

  Have you updated docs and procedures in Confluence or equivalent or
they are typically a year old :).  Is there any single point of failure due
to skill set? Can ops support the design and maintain BAU themselves. How
about training and hand-over

   - *Disaster recovery and Fault tolerance*

  What provisions are made for disaster recovery. Is there any single point
of failure in the design (end to end pipeline). Are you using standalone
dockers or Google Kubernetes Engine (GKE or equivalent)

HTH

   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 26 Apr 2021 at 17:16, ashok34...@yahoo.com.INVALID
 wrote:

> Hello,
>
> When we design a typical spark streaming process, the focus is to get
> functional requirements.
>
> However, I have been asked to provide non-functional requirements as well.
> Likely things I can consider are Fault tolerance and Reliability (component
> failures).  Are there a standard list of non-functional requirements for
> Spark streaming that one needs to consider and verify all?
>
> Thanking you
>


Re: [Spark-Streaming] moving average on categorical data with time windowing

2021-04-26 Thread Sean Owen
You might be able to do this with multiple aggregations on avg(col("col1")
== "cat1") etc, but how about pivoting the DataFrame first so that you get
columns like "cat1" being 1 or 0? you would end up with columns x
categories new columns if you want to count all categories in all cols. But
then it's just a simple aggregation on numeric values.

On Mon, Apr 26, 2021 at 9:29 AM halil  wrote:

> Hello everyone,
>
> I am trying to apply moving average on categorical data like below, which
> is a synthetic data generated by myself.
>
> sqltimestamp,col1,col2,col3,col4,col5
>
> 1618574879,cat1,cat4,cat2,cat5,cat3
>
> 1618574880,cat1,cat3,cat4,cat2,cat5
>
> 1618574881,cat5,cat3,cat4,cat2,cat1
>
> 1618574882,cat2,cat3,cat5,cat1,cat4
>
> 1618574883,cat2,cat4,cat1,cat3,cat5
>
> 1618574884,cat1,cat2,cat5,cat4,cat3
>
> 1618574885,cat5,cat3,cat2,cat1,cat4
>
> 1618574886,cat3,cat5,cat4,cat2,cat1
>
> 1618574887,cat3,cat2,cat5,cat4,cat1
>
> 1618574888,cat1,cat5,cat3,cat2,cat4
>
>
>
>
> I like to take the average of the number of "cat1" in the column "col1"
> for each 5 minutes window according to the column "sqltimestamp". I solved
> when column is numeric but I couldn't solve it when the column is
> categorical as above.
>
>
> The code below produces rows of tuples (timestamp, count) and I cannot
> apply avg aggregate function on the result because spark does not support
> multiple aggregation functions on one streaming.
>
> val movavgDF = spark
>
>   .readStream
>
>   .schema(schema)
>
>   .option("failOnDataLoss", true)
>   .option("delimiter", ",")
>   .csv(inputParameters.csvSinkDir)
>
> .withWatermark("sqltimestamp", "5 seconds")
> .groupBy(window(col("sqltimestamp"), "1 minute").as("time_frame"))
> .agg(
> count( when( col("col1") === "cat1", 1)).as("count")
> )
> .withColumn("window_start", col("time_frame")("start").cast(TimestampType
> ))
> .drop("time_frame")
> .orderBy("window_start")
>
>
> After my searches on the net, I have come to the conclusion that we can do it 
> if it is not structural streaming, but I need it while streaming.
>
> I would be very happy if you can provide me a solution for this problem.
>
> Thank you very much in advance.
>
> Best,
>
> -halil.
>
>
>
>
>
>


Re: Spark Streaming with Files

2021-04-23 Thread Mich Talebzadeh
Interesting.

If we go back to classic Lambda architecture on premise, you could Flume
API to Kafka to add files to HDFS in time series bases.

Most higher CDC vendors do exactly that. Oracle GoldenGate (OGG) classic
gets data from Oracle redo logs and sends them to subscribers. One can
deploy OGC for Big Data to enable these files to be read and processed for
Kafka, Hive, HDFS etc.

So let us assume that we create these files and stream them on object
storage in Cloud. Then we can use Spark Structure Streaming (SSS) to act as
ETL tool. Assuming that streaming interval to be 10 minutes, we can still
read them but ensure that we only trigger SSS reads every 4 hours.

 writeStream. \
 outputMode('append'). \
 option("truncate", "false"). \
 foreachBatch(sendToSink). \
 trigger(processingTime='14400 seconds'). \
 queryName('readFiles'). \
 start()

This will ensure that spark only processes them every 4 hours.


HTH

   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 23 Apr 2021 at 15:40, ayan guha  wrote:

> Hi
>
> In one of the spark summit demo, it is been alluded that we should think
> batch jobs in streaming pattern, using "run once" in a schedule.
> I find this idea very interesting and I understand how this can be
> achieved for sources like kafka, kinesis or similar. in fact we have
> implemented this model for cosmos changefeed.
>
> My question is: can this model extend to file based sources? I understand
> it can be for append only file  streams. The use case I have is: A CDC tool
> like aws dms or shareplex or similar writing changes to a stream of files,
> in date based folders. So it just goes on like T1, T2 etc folders. Also,
> lets assume files are written every 10 mins, but I want to process them
> every 4 hours.
> Can I use streaming method so that it can manage checkpoints on its own?
>
> Best - Ayan
> --
> Best Regards,
> Ayan Guha
>


Re: Spark streaming giving error for version 2.4

2021-03-16 Thread Attila Zsolt Piros
Hi!

I am just guessing here (as Gabor said before we need more information /
logs):
But is it possible Renu that you just upgraded one single jar?

Best Regards,
Attila

On Tue, Mar 16, 2021 at 11:31 AM Gabor Somogyi 
wrote:

> Well, this is not much. Please provide driver and executor logs...
>
> G
>
>
> On Tue, Mar 16, 2021 at 6:03 AM Renu Yadav  wrote:
>
>> Hi Team,
>>
>>
>> I have upgraded my spark streaming from 2.2 to 2.4 but getting below
>> error:
>>
>>
>> spark-streaming-kafka_0-10.2.11_2.4.0
>>
>>
>> scala 2.11
>>
>>
>> Any Idea?
>>
>>
>>
>> main" java.lang.AbstractMethodError
>>
>> at
>> org.apache.spark.util.ListenerBus$class.$init$(ListenerBus.scala:34)
>>
>> at
>> org.apache.spark.streaming.scheduler.StreamingListenerBus.(StreamingListenerBus.scala:30)
>>
>> at
>> org.apache.spark.streaming.scheduler.JobScheduler.(JobScheduler.scala:57)
>>
>> at
>> org.apache.spark.streaming.StreamingContext.(StreamingContext.scala:184)
>>
>> at
>> org.apache.spark.streaming.StreamingContext.(StreamingContext.scala:85)
>>
>>
>> Thanks & Regards,
>>
>> Renu Yadav
>>
>>


Re: Spark streaming giving error for version 2.4

2021-03-16 Thread Gabor Somogyi
Well, this is not much. Please provide driver and executor logs...

G


On Tue, Mar 16, 2021 at 6:03 AM Renu Yadav  wrote:

> Hi Team,
>
>
> I have upgraded my spark streaming from 2.2 to 2.4 but getting below error:
>
>
> spark-streaming-kafka_0-10.2.11_2.4.0
>
>
> scala 2.11
>
>
> Any Idea?
>
>
>
> main" java.lang.AbstractMethodError
>
> at
> org.apache.spark.util.ListenerBus$class.$init$(ListenerBus.scala:34)
>
> at
> org.apache.spark.streaming.scheduler.StreamingListenerBus.(StreamingListenerBus.scala:30)
>
> at
> org.apache.spark.streaming.scheduler.JobScheduler.(JobScheduler.scala:57)
>
> at
> org.apache.spark.streaming.StreamingContext.(StreamingContext.scala:184)
>
> at
> org.apache.spark.streaming.StreamingContext.(StreamingContext.scala:85)
>
>
> Thanks & Regards,
>
> Renu Yadav
>
>


Re: Spark Streaming - Routing rdd to Executor based on Key

2021-03-09 Thread forece85
Not sure if kinesis have such flexibility. What else possibilities are there
at transformations level?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming - Routing rdd to Executor based on Key

2021-03-09 Thread forece85
Any example for this please



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming - Routing rdd to Executor based on Key

2021-03-09 Thread Sean Owen
You can also group by the key in the transformation on each batch. But yes
that's faster/easier if it's already partitioned that way.

On Tue, Mar 9, 2021 at 7:30 AM Ali Gouta  wrote:

> Do not know Kenesis, but it looks like it works like kafka. Your producer
> should implement a paritionner that makes it possible to send your data
> with the same key to the same partition. Though, each task in your spark
> streaming app will load data from the same partition in the same executor.
> I think this is the simplest way to achieve what you want to do.
>
> Best regards,
> Ali Gouta.
>
> On Tue, Mar 9, 2021 at 11:30 AM forece85  wrote:
>
>> We are doing batch processing using Spark Streaming with Kinesis with a
>> batch
>> size of 5 mins. We want to send all events with same eventId to same
>> executor for a batch so that we can do multiple events based grouping
>> operations based on eventId. No previous batch or future batch data is
>> concerned. Only Current batch keyed operation needed.
>>
>> Please help me how to achieve this.
>>
>> Thanks.
>>
>>
>>
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>


Re: Spark Streaming - Routing rdd to Executor based on Key

2021-03-09 Thread Ali Gouta
Do not know Kenesis, but it looks like it works like kafka. Your producer
should implement a paritionner that makes it possible to send your data
with the same key to the same partition. Though, each task in your spark
streaming app will load data from the same partition in the same executor.
I think this is the simplest way to achieve what you want to do.

Best regards,
Ali Gouta.

On Tue, Mar 9, 2021 at 11:30 AM forece85  wrote:

> We are doing batch processing using Spark Streaming with Kinesis with a
> batch
> size of 5 mins. We want to send all events with same eventId to same
> executor for a batch so that we can do multiple events based grouping
> operations based on eventId. No previous batch or future batch data is
> concerned. Only Current batch keyed operation needed.
>
> Please help me how to achieve this.
>
> Thanks.
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark streaming with Kafka

2020-11-03 Thread Kevin Pis
Hi,

this is my  Word Count demo.  https://github.com/kevincmchen/wordcount

MohitAbbi  于2020年11月4日周三 上午3:32写道:

> Hi,
>
> Can you please share the correct versions of JAR files which you used to
> resolve the issue. I'm also facing the same issue.
>
> Thanks
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-- 

Best,

Kevin Pis


Re: Spark streaming with Kafka

2020-11-03 Thread MohitAbbi
Hi,

Can you please share the correct versions of JAR files which you used to
resolve the issue. I'm also facing the same issue.

Thanks




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming Job is stucked

2020-10-18 Thread Artemis User
If it was running fine before and stops working now, one thing I could 
think of may be your disk was full.  Check your disk space and clean up 
your old log files might help...


On 10/18/20 12:06 PM, rajat kumar wrote:

Hello Everyone,

My spark streaming job is running too slow, it is having batch time of 
15 seconds and the batch gets completed in 20-22 secs. It was fine 
till 1st week October, but it is behaving this way suddenly. I know 
changing the batch time can help , but other than that any idea what 
can be done?


Please note I am using Direct Stream Approach

Thanks
Rajat Sharma


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming ElasticSearch

2020-10-06 Thread jainshasha
Hi Siva

In that case u can use structured streaming foreach / foreachBatch function
which can help you process each record and write it into some sink 




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming ElasticSearch

2020-10-05 Thread Siva Samraj
Hi Jainshasha,

I need to read each row from Dataframe and made some changes to it before
inserting it into ES.

Thanks
Siva

On Mon, Oct 5, 2020 at 8:06 PM jainshasha  wrote:

> Hi Siva
>
> To emit data into ES using spark structured streaming job you need to used
> ElasticSearch jar which has support for sink for spark structured streaming
> job. For this you can use this one my branch where we have integrated ES
> with spark 3.0 and scala 2.12 compatible
> https://github.com/ThalesGroup/spark/tree/guavus/v3.0.0
>
> Also in this you need to build three jars
> elasticsearch-hadoop-sql
> elasticsearch-hadoop-core
> elasticsearch-hadoop-mr
> which help in writing data into ES through spark structured streaming.
>
> And in your application job u can use this way to sink the data, remember
> with ES there is only support of append mode of structured streaming.
> val esDf = aggregatedDF
> .writeStream
> .outputMode("append")
> .format("org.elasticsearch.spark.sql")
> .option(CHECKPOINTLOCATION, kafkaCheckpointDirPath + "/es")
> .start("aggregation-job-index-latest-1")
>
>
> Let me know if you face any issues, will be happy to help you :)
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark Streaming ElasticSearch

2020-10-05 Thread jainshasha
Hi Siva

To emit data into ES using spark structured streaming job you need to used
ElasticSearch jar which has support for sink for spark structured streaming
job. For this you can use this one my branch where we have integrated ES
with spark 3.0 and scala 2.12 compatible
https://github.com/ThalesGroup/spark/tree/guavus/v3.0.0

Also in this you need to build three jars 
elasticsearch-hadoop-sql
elasticsearch-hadoop-core
elasticsearch-hadoop-mr
which help in writing data into ES through spark structured streaming.

And in your application job u can use this way to sink the data, remember
with ES there is only support of append mode of structured streaming.
val esDf = aggregatedDF
.writeStream
.outputMode("append")
.format("org.elasticsearch.spark.sql")
.option(CHECKPOINTLOCATION, kafkaCheckpointDirPath + "/es")
.start("aggregation-job-index-latest-1")


Let me know if you face any issues, will be happy to help you :)




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming Checkpointing

2020-09-04 Thread András Kolbert
Hi Gábor,


Thanks for your reply on this!

Internally that's used at the company I work at - it hasn't been changed
mainly due to the compatibility of the current deployed java applications.

Hence I am attempting to make the most of this version :)

András



On Fri, 4 Sep 2020, 14:09 Gabor Somogyi,  wrote:

> Hi Andras,
>
> A general suggestion is to use Structured Streaming instead of DStreams
> because it provides several things out of the box (stateful streaming,
> etc...).
> Kafka 0.8 is super old and deprecated (no security...). Do you have a
> specific reason to use that?
>
> BR,
> G
>
>
> On Thu, Sep 3, 2020 at 11:41 AM András Kolbert 
> wrote:
>
>> Hi All,
>>
>> I have a Spark streaming application (2.4.4, Kafka 0.8 >> so Spark Direct
>> Streaming) running just fine.
>>
>> I create a context in the following way:
>>
>> ssc = StreamingContext(sc, 60) opts = 
>> {"metadata.broker.list":kafka_hosts,"auto.offset.reset": "largest", 
>> "group.id": run_type}
>> kvs = KafkaUtils.createDirectStream(ssc, [topic_listen], opts)
>> kvs.checkpoint(120)
>>
>> lines = kvs.map(lambda row: row[1]) lines.foreachRDD(streaming_app)
>> ssc.checkpoint(checkpoint)
>>
>> The streaming app at a high level does this:
>>
>>- processes incoming batch
>>- unions to the dataframe from the previous batch and aggregates them
>>
>> Currently, I use checkpointing explicitly (df = df.checkpoint()) to
>> optimise the lineage. Although this is quite an expensive exercise and was
>> wondering if there is a better way to do this.
>>
>> I tried to disable this explicit checkpointing, as I have a periodical
>> checkpointing (kvs.checkpoint(120) ) so I thought that the lineage will
>> be kept to that checkpointed RDD. Although in reality that is not the case
>> and processing keeps increasing over time.
>>
>> Am I doing something inherently wrong? Is there a better way of doing
>> this?
>>
>> Thanks
>> Andras
>>
>


Re: Spark Streaming Checkpointing

2020-09-04 Thread Gabor Somogyi
Hi Andras,

A general suggestion is to use Structured Streaming instead of DStreams
because it provides several things out of the box (stateful streaming,
etc...).
Kafka 0.8 is super old and deprecated (no security...). Do you have a
specific reason to use that?

BR,
G


On Thu, Sep 3, 2020 at 11:41 AM András Kolbert 
wrote:

> Hi All,
>
> I have a Spark streaming application (2.4.4, Kafka 0.8 >> so Spark Direct
> Streaming) running just fine.
>
> I create a context in the following way:
>
> ssc = StreamingContext(sc, 60) opts = 
> {"metadata.broker.list":kafka_hosts,"auto.offset.reset": "largest", 
> "group.id": run_type}
> kvs = KafkaUtils.createDirectStream(ssc, [topic_listen], opts)
> kvs.checkpoint(120)
>
> lines = kvs.map(lambda row: row[1]) lines.foreachRDD(streaming_app)
> ssc.checkpoint(checkpoint)
>
> The streaming app at a high level does this:
>
>- processes incoming batch
>- unions to the dataframe from the previous batch and aggregates them
>
> Currently, I use checkpointing explicitly (df = df.checkpoint()) to
> optimise the lineage. Although this is quite an expensive exercise and was
> wondering if there is a better way to do this.
>
> I tried to disable this explicit checkpointing, as I have a periodical
> checkpointing (kvs.checkpoint(120) ) so I thought that the lineage will
> be kept to that checkpointed RDD. Although in reality that is not the case
> and processing keeps increasing over time.
>
> Am I doing something inherently wrong? Is there a better way of doing this?
>
> Thanks
> Andras
>


Re: Spark Streaming with Kafka and Python

2020-08-12 Thread Sean Owen
What supports Python in (Kafka?) 0.8? I don't think Spark ever had a
specific Python-Kafka integration. But you have always been able to
use it to read DataFrames as in Structured Streaming.
Kafka 0.8 support is deprecated (gone in 3.0) but 0.10 means 0.10+ -
works with the latest 2.x.
What is the issue?

On Wed, Aug 12, 2020 at 7:53 AM German Schiavon
 wrote:
>
> Hey,
>
> Maybe I'm missing some restriction with EMR, but have you tried to use 
> Structured Streaming instead of Spark Streaming?
>
> https://spark.apache.org/docs/2.4.5/structured-streaming-kafka-integration.html
>
> Regards
>
> On Wed, 12 Aug 2020 at 14:12, Hamish Whittal  
> wrote:
>>
>> Hi folks,
>>
>> Thought I would ask here because it's somewhat confusing. I'm using Spark 
>> 2.4.5 on EMR 5.30.1 with Amazon MSK.
>>
>> The version of Scala used is 2.11.12. I'm using this version of the 
>> libraries spark-streaming-kafka-0-8_2.11-2.4.5.jar
>>
>> Now I'm wanting to read from Kafka topics using Python (I need to stick to 
>> Python specifically).
>>
>> What seems confusing is that 0.8 has Python support, but 0.10 does not. Then 
>> 0.8 seems to have been deprecated as of Spark 2.3.0, so if I'm using 2.4.5 
>> then clearly I'm going to hit a roadblock here.
>>
>> Can someone clarify these things for me? Have I got this right?
>>
>> Thanks in advance,
>> Hamish

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming with Kafka and Python

2020-08-12 Thread German Schiavon
Hey,

Maybe I'm missing some restriction with EMR, but have you tried to use
Structured Streaming instead of Spark Streaming?

https://spark.apache.org/docs/2.4.5/structured-streaming-kafka-integration.html

Regards

On Wed, 12 Aug 2020 at 14:12, Hamish Whittal 
wrote:

> Hi folks,
>
> Thought I would ask here because it's somewhat confusing. I'm using Spark
> 2.4.5 on EMR 5.30.1 with Amazon MSK.
>
> The version of Scala used is 2.11.12. I'm using this version of the
> libraries spark-streaming-kafka-0-8_2.11-2.4.5.jar
>
> Now I'm wanting to read from Kafka topics using Python (I need to stick to
> Python specifically).
>
> What seems confusing is that 0.8 has Python support, but 0.10 does not.
> Then 0.8 seems to have been deprecated as of Spark 2.3.0, so if I'm using
> 2.4.5 then clearly I'm going to hit a roadblock here.
>
> Can someone clarify these things for me? Have I got this right?
>
> Thanks in advance,
> Hamish
>


Re: Spark streaming receivers

2020-08-10 Thread Russell Spitzer
The direct approach, which is also available through dstreams, and
structured streaming use a different model. Instead of being a push based
streaming solution they instead are pull based. (In general)

On every batch the driver uses the configuration to create a number of
partitions, each is responsible for independently pulling a number of
records. The exact number of records and guarantees around the pull are
source and configuration dependent. Since the system is pull based, there
is no need for a receiver or block management system taking up resources.
Every task/partition contains all the information required to get the data
that it describes.

An example in Kafka, the driver might decide that batch 1 contains all the
records between offset 1 and 100. It checks and sees that there are 10
Kafka partitions. So it ends up making a spark job which contains 10 tasks
each task dedicated to a single Kafka partition. Each task will then
independently ask for 100 records from it's Kafka partition. There will be
no Spark resources used outside of those required for those 10 tasks.

On Sun, Aug 9, 2020, 10:44 PM Dark Crusader 
wrote:

> Hi Russell,
> This is super helpful. Thank you so much.
>
> Can you elaborate on the differences between structured streaming vs
> dstreams? How would the number of receivers required etc change?
>
> On Sat, 8 Aug, 2020, 10:28 pm Russell Spitzer, 
> wrote:
>
>> Note, none of this applies to Direct streaming approaches, only receiver
>> based Dstreams.
>>
>> You can think of a receiver as a long running task that never finishes.
>> Each receiver is submitted to an executor slot somewhere, it then runs
>> indefinitely and internally has a method which passes records over to a
>> block management system. There is a timing that you set which decides when
>> each block is "done" and records after that time has passed go into the
>> next block (See parameter
>> 
>>  spark.streaming.blockInterval)  Once a block is done it can be
>> processed in the next Spark batch.. The gap between a block starting and a
>> block being finished is why you can lose data in Receiver streaming without
>> WriteAheadLoging. Usually your block interval is divisible into your batch
>> interval so you'll get X blocks per batch. Each block becomes one partition
>> of the job being done in a Streaming batch. Multiple receivers can be
>> unified into a single dstream, which just means the blocks produced by all
>> of those receivers are handled in the same Streaming batch.
>>
>> So if you have 5 different receivers, you need at minimum 6 executor
>> cores. 1 core for each receiver, and 1 core to actually do your processing
>> work. In a real world case you probably want significantly more  cores on
>> the processing side than just 1. Without repartitioning you will never have
>> more that
>>
>> A quick example
>>
>> I run 5 receivers with block interval of 100ms and spark batch interval
>> of 1 second. I use union to group them all together, I will most likely end
>> up with one Spark Job for each batch every second running with 50
>> partitions (1000ms / 100(ms / partition / receiver) * 5 receivers). If I
>> have a total of 10 cores in the system. 5 of them are running receivers,
>> The remaining 5 must process the 50 partitions of data generated by the
>> last second of work.
>>
>> And again, just to reiterate, if you are doing a direct streaming
>> approach or structured streaming, none of this applies.
>>
>> On Sat, Aug 8, 2020 at 10:03 AM Dark Crusader <
>> relinquisheddra...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I'm having some trouble figuring out how receivers tie into spark
>>> driver-executor structure.
>>> Do all executors have a receiver that is blocked as soon as it
>>> receives some stream data?
>>> Or can multiple streams of data be taken as input into a single executor?
>>>
>>> I have stream data coming in at every second coming from 5 different
>>> sources. I want to aggregate data from each of them. Does this mean I need
>>> 5 executors or does it have to do with threads on the executor?
>>>
>>> I might be mixing in a few concepts here. Any help would be appreciated.
>>> Thank you.
>>>
>>


Re: Spark streaming receivers

2020-08-09 Thread Dark Crusader
Hi Russell,
This is super helpful. Thank you so much.

Can you elaborate on the differences between structured streaming vs
dstreams? How would the number of receivers required etc change?

On Sat, 8 Aug, 2020, 10:28 pm Russell Spitzer, 
wrote:

> Note, none of this applies to Direct streaming approaches, only receiver
> based Dstreams.
>
> You can think of a receiver as a long running task that never finishes.
> Each receiver is submitted to an executor slot somewhere, it then runs
> indefinitely and internally has a method which passes records over to a
> block management system. There is a timing that you set which decides when
> each block is "done" and records after that time has passed go into the
> next block (See parameter
> 
> spark.streaming.blockInterval)  Once a block is done it can be processed
> in the next Spark batch.. The gap between a block starting and a block
> being finished is why you can lose data in Receiver streaming without
> WriteAheadLoging. Usually your block interval is divisible into your batch
> interval so you'll get X blocks per batch. Each block becomes one partition
> of the job being done in a Streaming batch. Multiple receivers can be
> unified into a single dstream, which just means the blocks produced by all
> of those receivers are handled in the same Streaming batch.
>
> So if you have 5 different receivers, you need at minimum 6 executor
> cores. 1 core for each receiver, and 1 core to actually do your processing
> work. In a real world case you probably want significantly more  cores on
> the processing side than just 1. Without repartitioning you will never have
> more that
>
> A quick example
>
> I run 5 receivers with block interval of 100ms and spark batch interval of
> 1 second. I use union to group them all together, I will most likely end up
> with one Spark Job for each batch every second running with 50 partitions
> (1000ms / 100(ms / partition / receiver) * 5 receivers). If I have a total
> of 10 cores in the system. 5 of them are running receivers, The remaining 5
> must process the 50 partitions of data generated by the last second of work.
>
> And again, just to reiterate, if you are doing a direct streaming approach
> or structured streaming, none of this applies.
>
> On Sat, Aug 8, 2020 at 10:03 AM Dark Crusader <
> relinquisheddra...@gmail.com> wrote:
>
>> Hi,
>>
>> I'm having some trouble figuring out how receivers tie into spark
>> driver-executor structure.
>> Do all executors have a receiver that is blocked as soon as it
>> receives some stream data?
>> Or can multiple streams of data be taken as input into a single executor?
>>
>> I have stream data coming in at every second coming from 5 different
>> sources. I want to aggregate data from each of them. Does this mean I need
>> 5 executors or does it have to do with threads on the executor?
>>
>> I might be mixing in a few concepts here. Any help would be appreciated.
>> Thank you.
>>
>


Re: Spark streaming receivers

2020-08-08 Thread Russell Spitzer
Note, none of this applies to Direct streaming approaches, only receiver
based Dstreams.

You can think of a receiver as a long running task that never finishes.
Each receiver is submitted to an executor slot somewhere, it then runs
indefinitely and internally has a method which passes records over to a
block management system. There is a timing that you set which decides when
each block is "done" and records after that time has passed go into the
next block (See parameter

spark.streaming.blockInterval)  Once a block is done it can be processed in
the next Spark batch.. The gap between a block starting and a block being
finished is why you can lose data in Receiver streaming without
WriteAheadLoging. Usually your block interval is divisible into your batch
interval so you'll get X blocks per batch. Each block becomes one partition
of the job being done in a Streaming batch. Multiple receivers can be
unified into a single dstream, which just means the blocks produced by all
of those receivers are handled in the same Streaming batch.

So if you have 5 different receivers, you need at minimum 6 executor cores.
1 core for each receiver, and 1 core to actually do your processing work.
In a real world case you probably want significantly more  cores on the
processing side than just 1. Without repartitioning you will never have
more that

A quick example

I run 5 receivers with block interval of 100ms and spark batch interval of
1 second. I use union to group them all together, I will most likely end up
with one Spark Job for each batch every second running with 50 partitions
(1000ms / 100(ms / partition / receiver) * 5 receivers). If I have a total
of 10 cores in the system. 5 of them are running receivers, The remaining 5
must process the 50 partitions of data generated by the last second of work.

And again, just to reiterate, if you are doing a direct streaming approach
or structured streaming, none of this applies.

On Sat, Aug 8, 2020 at 10:03 AM Dark Crusader 
wrote:

> Hi,
>
> I'm having some trouble figuring out how receivers tie into spark
> driver-executor structure.
> Do all executors have a receiver that is blocked as soon as it
> receives some stream data?
> Or can multiple streams of data be taken as input into a single executor?
>
> I have stream data coming in at every second coming from 5 different
> sources. I want to aggregate data from each of them. Does this mean I need
> 5 executors or does it have to do with threads on the executor?
>
> I might be mixing in a few concepts here. Any help would be appreciated.
> Thank you.
>


Re: Spark Streaming - Set Parallelism and Optimize driver

2020-07-20 Thread Russell Spitzer
Without seeing the rest (and you can confirm this by looking at the DAG
visualization in the Spark UI) I would say your first stage with 6
partitions is:

Stage 1: Read data from kinesis (or read blocks from receiver not sure what
method you are using) and write shuffle files for repartition
Stage 2 : Read shuffle files and do everything else

In general I think the biggest issue here is probably not the distribution
of tasks which based on your UI reading were quite small, but instead the
parallelization of the write operation since it is done synchronously. I
would suggest instead of trying to increase your parallelism by
partitioning, you attempt to have "doJob" run asynchronously and allow for
more parallelism within an executor rather than using multiple executor
threads/jvms.

That said you probably would run faster if you just skipped the repartition
based on the speed of second stage.

On Mon, Jul 20, 2020 at 8:23 AM forece85  wrote:

> Thanks for reply. Please find sudo code below. Its Dstreams reading for
> every
> 10secs from kinesis stream and after transformations, pushing into hbase.
> Once got Dstream, we are using below code to repartition and do processing:
>
> dStream = dStream.repartition(javaSparkContext.defaultMinPartitions() * 3);
> dStream.foreachRDD(javaRDD -> javaRDD.foreachPartition(partitionOfRecords
> ->
> {
>Connection hbaseConnection= ConnectionUtil.getHbaseConnection();
>List listOfRecords = new ArrayList<>();
>while (partitionOfRecords.hasNext()) {
>  listOfRecords.add(partitionOfRecords.next());
>
>  if (listOfRecords.size() < 10 && partitionOfRecords.hasNext())
> continue;
>
>  List finalListOfRecords = listOfRecords;
>  doJob(finalListOfRecords, hbaseConnection);
>  listOfRecords = new ArrayList<>();
>}
> }));
>
>
> We are batching every 10 records and pass to doJob method where we batch
> process and bulk insert to hbase.
>
> With above code, will it be able to tell what is happening at job 1 -> 6
> tasks? and how to replace repartition method efficiently.
>
> Thanks in Advance
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark Streaming - Set Parallelism and Optimize driver

2020-07-20 Thread forece85
Thanks for reply. Please find sudo code below. Its Dstreams reading for every
10secs from kinesis stream and after transformations, pushing into hbase.
Once got Dstream, we are using below code to repartition and do processing:

dStream = dStream.repartition(javaSparkContext.defaultMinPartitions() * 3);
dStream.foreachRDD(javaRDD -> javaRDD.foreachPartition(partitionOfRecords ->
{
   Connection hbaseConnection= ConnectionUtil.getHbaseConnection();
   List listOfRecords = new ArrayList<>();
   while (partitionOfRecords.hasNext()) {
 listOfRecords.add(partitionOfRecords.next());

 if (listOfRecords.size() < 10 && partitionOfRecords.hasNext())
continue;
 
 List finalListOfRecords = listOfRecords;
 doJob(finalListOfRecords, hbaseConnection);
 listOfRecords = new ArrayList<>();
   }
}));


We are batching every 10 records and pass to doJob method where we batch
process and bulk insert to hbase.

With above code, will it be able to tell what is happening at job 1 -> 6
tasks? and how to replace repartition method efficiently.

Thanks in Advance



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming - Set Parallelism and Optimize driver

2020-07-20 Thread forece85
Thanks for reply. Please find sudo code below. We are fetching Dstreams from
kinesis stream for every 10sec and performing transformations and finally
persisting to hbase tables using batch insertions.

dStream = dStream.repartition(jssc.defaultMinPartitions() * 3);
dStream.foreachRDD(javaRDD -> javaRDD.foreachPartition(partitionOfRecords ->
{
Connection hbaseConnection =
ConnectionUtil.getHbaseConnection();
List listOfRecords = new ArrayList<>();
while (partitionOfRecords.hasNext()) {
try {
listOfRecords.add(partitionOfRecords.next());

if (listOfRecords.size() < 10 &&
partitionOfRecords.hasNext())
continue;

List finalListOfRecords = listOfRecords;
doJob(finalListOfRecords, primaryConnection,
lookupsConnection);
listOfRecords = new ArrayList<>();
} catch (Exception e) {
e.printStackTrace();
}
}
})); 

We are batching every 10 records and sending to doJob method where actual
transformations happen and every batch will get batch inserted to hbase
table.

With above code can we guess whats happening at Job 1 => 6 tasks and how to
reduce that time. 
Mainly how to effectively set parallelism avoiding repartition() method.

Thanks in Advance.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming - Set Parallelism and Optimize driver

2020-07-20 Thread Russell Spitzer
Without your code this is hard to determine but a few notes.

The number of partitions is usually dictated by the input source, see if it
has any configuration which allows you to increase input splits.

I'm not sure why you think some of the code is running on the driver. All
methods on dataframes and rdds will be executed on executors. For each
partition is not local.

The difference in partitions is probably the shuffle you added with
repartition. I would actually be not surprised if your code ran faster
without the repartitioning. But again with the real code it would be very
hard to say.

On Mon, Jul 20, 2020, 6:33 AM forece85  wrote:

> I am new to spark streaming and trying to understand spark ui and to do
> optimizations.
>
> 1. Processing at executors took less time than at driver. How to optimize
> to
> make driver tasks fast ?
> 2. We are using dstream.repartition(defaultParallelism*3) to increase
> parallelism which is causing high shuffles. Is there any option to avoid
> repartition manually to reduce data shuffles.
> 3. Also trying to understand how 6 tasks in stage1 and 199 tasks in stage2
> got created?
>
> *hardware configuration:* executor-cores: 3; driver-cores: 3;
> dynamicAllocation is true;
> initial,min,maxExecutors: 25
>
> StackOverFlow link for screenshots:
>
> https://stackoverflow.com/questions/62993030/spark-dstream-help-needed-to-understand-ui-and-how-to-set-parallelism-or-defau
>
> Thanks in Advance
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark streaming with Confluent kafka

2020-07-03 Thread Gabor Somogyi
The error is clear:
Caused by: java.lang.IllegalArgumentException: No serviceName defined in
either JAAS or Kafka config

On Fri, 3 Jul 2020, 15:40 dwgw,  wrote:

> Hi
>
> I am trying to stream confluent kafka topic in the spark shell. For that i
> have invoked spark shell using following command.
>
> # spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0
> --conf
>
> "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/home/spark/kafka_jaas.conf"
> --driver-java-options
> "-Djava.security.auth.login.config=/home/spark/kafka_jaas.conf" --files
> /home/spark/kafka_jaas.conf
>
> kafka_jaas.conf
> -
>
> KafkaClient {
>
>  org.apache.kafka.common.security.plain.PlainLoginModule required
>username="XXX"
>password="XXX";
> };
>
> Readstream
> -
>
> scala> val df = spark.
> | readStream.
> | format("kafka").
> | option("kafka.bootstrap.servers", "pkc-XXX.cloud:9092").
> | option("subscribe", "INTERNAL_VIS_R1227_CDC_ICX_SESSIONS").
> | option("kafka.sasl.mechanisms", "PLAIN").
> | option("kafka.security.protocol", "SASL_SSL").
> | option("startingOffsets", "earliest").
> | load.
> | select($"value".cast("string").alias("value"))
> df: org.apache.spark.sql.DataFrame = [value: string]
>
> Writestream
> --
>
> scala> df.writeStream.
> | format("console").
> | outputMode("append").
> | trigger(Trigger.ProcessingTime("20 seconds")).
> | start
> 2020-07-03 04:07:48,366 WARN streaming.StreamingQueryManager: Temporary
> checkpoint location created which is deleted normally when the query didn't
> fail: /tmp/temporary-897996c3-a86a-4b7d-ac19-62168a14d279. If it's required
> to delete it under any circumstances, please set
> spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to
> know deleting temp checkpoint folder is best effort.
> res0: org.apache.spark.sql.streaming.StreamingQuery =
> org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@324a5741
>
> scala> 2020-07-03 04:07:49,139 WARN kafka010.KafkaOffsetReader: Error in
> attempt 1 getting Kafka offsets:
> org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
> at
>
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:820)
> at
>
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:631)
> at
>
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:612)
> at
>
> org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:76)
> at
>
> org.apache.spark.sql.kafka010.KafkaOffsetReader.consumer(KafkaOffsetReader.scala:88)
> at
>
> org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$partitionsAssignedToConsumer$2(KafkaOffsetReader.scala:538)
> at
>
> org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$withRetriesWithoutInterrupt$1(KafkaOffsetReader.scala:600)
> at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
> at
>
> org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
> at
>
> org.apache.spark.sql.kafka010.KafkaOffsetReader.withRetriesWithoutInterrupt(KafkaOffsetReader.scala:599)
> at
>
> org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$partitionsAssignedToConsumer$1(KafkaOffsetReader.scala:536)
> at
>
> org.apache.spark.sql.kafka010.KafkaOffsetReader.runUninterruptibly(KafkaOffsetReader.scala:567)
> at
>
> org.apache.spark.sql.kafka010.KafkaOffsetReader.partitionsAssignedToConsumer(KafkaOffsetReader.scala:536)
> at
>
> org.apache.spark.sql.kafka010.KafkaOffsetReader.fetchEarliestOffsets(KafkaOffsetReader.scala:298)
> at
>
> org.apache.spark.sql.kafka010.KafkaMicroBatchStream.$anonfun$getOrCreateInitialPartitionOffsets$1(KafkaMicroBatchStream.scala:151)
> at scala.Option.getOrElse(Option.scala:189)
> at
>
> org.apache.spark.sql.kafka010.KafkaMicroBatchStream.getOrCreateInitialPartitionOffsets(KafkaMicroBatchStream.scala:148)
> at
>
> org.apache.spark.sql.kafka010.KafkaMicroBatchStream.initialOffset(KafkaMicroBatchStream.scala:76)
> at
>
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$5(MicroBatchExecution.scala:378)
> at scala.Option.getOrElse(Option.scala:189)
> at
>
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$3(MicroBatchExecution.scala:378)
> at
>
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352)
> at
>
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350)
> at
>
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
> at
>
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$2(MicroBatchExecution.scala:371)
> at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
> at scala.collection.immutable.Map$Map1.foreach(Map.scala:128)
> at 

Re: Spark streaming with Kafka

2020-07-02 Thread dwgw
Hi

I am able to correct the issue. The issue was due to wrong version of JAR
file I have used. I have removed the these JAR files and copied correct
version of JAR files and the error has gone away.

Regards



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark streaming with Kafka

2020-07-02 Thread Jungtaek Lim
I can't reproduce. Could you please make sure you're running spark-shell
with official spark 3.0.0 distribution? Please try out changing the
directory and using relative path like "./spark-shell".

On Thu, Jul 2, 2020 at 9:59 PM dwgw  wrote:

> Hi
> I am trying to stream kafka topic from spark shell but i am getting the
> following error.
> I am using *spark 3.0.0/scala 2.12.10* (Java HotSpot(TM) 64-Bit Server VM,
> *Java 1.8.0_212*)
>
> *[spark@hdp-dev ~]$ spark-shell --packages
> org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0*
> Ivy Default Cache set to: /home/spark/.ivy2/cache
> The jars for the packages stored in: /home/spark/.ivy2/jars
> :: loading settings :: url =
>
> jar:file:/u01/hadoop/spark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
> org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
> :: resolving dependencies ::
>
> org.apache.spark#spark-submit-parent-ed8a74c2-330b-4a8e-9a92-3dad7d22b226;1.0
> confs: [default]
> found org.apache.spark#spark-sql-kafka-0-10_2.12;3.0.0 in central
> found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.0
> in
> central
> found org.apache.kafka#kafka-clients;2.4.1 in central
> found com.github.luben#zstd-jni;1.4.4-3 in central
> found org.lz4#lz4-java;1.7.1 in central
> found org.xerial.snappy#snappy-java;1.1.7.5 in central
> found org.slf4j#slf4j-api;1.7.30 in central
> found org.spark-project.spark#unused;1.0.0 in central
> found org.apache.commons#commons-pool2;2.6.2 in central
> :: resolution report :: resolve 502ms :: artifacts dl 10ms
> :: modules in use:
> com.github.luben#zstd-jni;1.4.4-3 from central in [default]
> org.apache.commons#commons-pool2;2.6.2 from central in [default]
> org.apache.kafka#kafka-clients;2.4.1 from central in [default]
> org.apache.spark#spark-sql-kafka-0-10_2.12;3.0.0 from central in
> [default]
> org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.0 from
> central in [default]
> org.lz4#lz4-java;1.7.1 from central in [default]
> org.slf4j#slf4j-api;1.7.30 from central in [default]
> org.spark-project.spark#unused;1.0.0 from central in [default]
> org.xerial.snappy#snappy-java;1.1.7.5 from central in [default]
>
> -
> |  |modules||   artifacts
> |
> |   conf   | number| search|dwnlded|evicted||
> number|dwnlded|
>
> -
> |  default |   9   |   0   |   0   |   0   ||   9   |   0
> |
>
> -
> :: retrieving ::
> org.apache.spark#spark-submit-parent-ed8a74c2-330b-4a8e-9a92-3dad7d22b226
> confs: [default]
> 0 artifacts copied, 9 already retrieved (0kB/13ms)
> Setting default log level to "WARN".
> To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use
> setLogLevel(newLevel).
> Spark context Web UI available at http://hdp-dev.infodetics.com:4040
> Spark context available as 'sc' (master = yarn, app id =
> application_1593620640299_0015).
> Spark session available as 'spark'.
> Welcome to
>     __
>  / __/__  ___ _/ /__
> _\ \/ _ \/ _ `/ __/  '_/
>/___/ .__/\_,_/_/ /_/\_\   version 3.0.0
>   /_/
>
> Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java
> 1.8.0_212)
> Type in expressions to have them evaluated.
> Type :help for more information.
>
>
> scala> val df = spark.
>  | readStream.
>  | format("kafka").
>  | option("kafka.bootstrap.servers", "XXX").
>  | option("subscribe", "XXX").
>  | option("kafka.sasl.mechanisms", "XXX").
>  | option("kafka.security.protocol", "XXX").
>  | option("kafka.sasl.username","XXX").
>  | option("kafka.sasl.password", "XXX").
>  | option("startingOffsets", "earliest").
>  | load
> java.lang.AbstractMethodError: Method
>
> org/apache/spark/sql/kafka010/KafkaSourceProvider.inferSchema(Lorg/apache/spark/sql/util/CaseInsensitiveStringMap;)Lorg/apache/spark/sql/types/StructType;
> is abstract
>   at
>
> org.apache.spark.sql.kafka010.KafkaSourceProvider.inferSchema(KafkaSourceProvider.scala)
>   at
>
> org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils$.getTableFromProvider(DataSourceV2Utils.scala:81)
>   at
>
> org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:215)
>   ... 57 elided
>
> Looking forward for a response.
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark Streaming Memory

2020-05-17 Thread Ali Gouta
The spark UI is misleading in spark 2.4.4. I moved to spark 2.4.5 and it
fixed it. Now, your problem should be somewhere else. Probably related to
memory consumption but not the one you see in the UI.

Best regards,
Ali Gouta.

On Sun, May 17, 2020 at 7:36 PM András Kolbert 
wrote:

> Hi,
>
> I have a streaming job (Spark 2.4.4) in which the memory usage keeps
> increasing over time.
>
> Periodically (20-25) mins the executors fall over
> (org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
> location for shuffle 6987) due to out of memory. In the UI, I can see that
> the memory keeps increasing batch by batch, although I do not keep more
> data in memory (I keep unpersisting, checkpointing and caching new data
> frames though), the storage tabs shows only the expected 4 objects overtime.
>
> I wish I just missed a parameter in the spark configuration (like garbage
> collection, reference tracking, etc) that would solve my issue. I have seen
> a few JIRA tickets around memory leak (SPARK-19644
> , SPARK-29055
> , SPARK-29321
> ) it might be the same
> issue?
>
>  ("spark.cleaner.referenceTracking.cleanCheckpoints", "true"),
>  ('spark.cleaner.periodicGC.interval', '1min'),
>  ('spark.cleaner.referenceTracking','true'),
>  ('spark.cleaner.referenceTracking.blocking.shuffle','true'),
>  ('spark.sql.streaming.minBatchesToRetain', '2'),
>  ('spark.sql.streaming.maxBatchesToRetainInMemory', '5'),
>  ('spark.ui.retainedJobs','50' ),
>  ('spark.ui.retainedStages','50'),
>  ('spark.ui.retainedTasks','500'),
>  ('spark.worker.ui.retainedExecutors','50'),
>  ('spark.worker.ui.retainedDrivers','50'),
>  ('spark.sql.ui.retainedExecutions','50'),
>  ('spark.streaming.ui.retainedBatches','1440'),
>  ('spark.executor.JavaOptions','-XX:+UseG1GC -verbose:gc
> -XX:+PrintGCDetails -XX:+PrintGCTimeStamps')
>
> I've tried lowering the spark.streaming.ui.retainedBatches to 8, did not
> help.
>
> The application works fine apart from the fact that the processing
> some batches take longer (when the executors fall over).
>
> [image: image.png]
> [image: image.png]
>
>
> Any ideas?
>
> I've attached my code.
>
>
> Thanks,
> Andras
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Re: [spark streaming] checkpoint location feature for batch processing

2020-05-03 Thread Jungtaek Lim
Replied inline:

On Sun, May 3, 2020 at 6:25 PM Magnus Nilsson  wrote:

> Thank you, so that would mean spark gets the current latest offset(s) when
> the trigger fires and then process all available messages in the topic upto
> and including that offset as long as maxOffsetsPerTrigger is the default of
> None (or large enought to handle all available messages).
>

Yes it starts from the offset of latest batch. `maxOffsetsPerTrigger` will
be ignored starting from Spark 3.0.0, which means for Spark 2.x it's still
affecting even Trigger.Once is used I guess.


>
> I think the word micro-batch confused me (more like mega-batch in some
> cases). It makes sense though, this makes Trigger.Once a fixed interval
> trigger that's only fired once and not repeatedly.
>

"micro" is relative - though Spark by default processes all available
inputs per batch, in most cases you'll want to make the batch size
(interval) as small as possible, as it defines the latency of the output.
Trigger.Once is an unusual case in streaming workload - that's more alike
continuous execution of "batch". I refer "continuous" as picking up latest
context which is the characteristic of streaming query, hence hybrid one.


>
>
> On Sun, May 3, 2020 at 3:20 AM Jungtaek Lim 
> wrote:
>
>> If I understand correctly, Trigger.once executes only one micro-batch and
>> terminates, that's all. Your understanding of structured streaming applies
>> there as well.
>>
>> It's like a hybrid approach as bringing incremental processing from
>> micro-batch but having processing interval as batch. That said, while it
>> enables to get both sides of benefits, it's basically structured streaming,
>> inheriting all the limitations on the structured streaming, compared to the
>> batch query.
>>
>> Spark 3.0.0 will bring some change on Trigger.once (SPARK-30669 [1]) -
>> Trigger.once will "ignore" the read limit per micro-batch on data source
>> (like maxOffsetsPerTrigger) and process all available input as possible.
>> (Data sources should migrate to the new API to take effect, but works for
>> built-in data sources like file and Kafka.)
>>
>> 1. https://issues.apache.org/jira/browse/SPARK-30669
>>
>> 2020년 5월 2일 (토) 오후 5:35, Magnus Nilsson 님이 작성:
>>
>>> I've always had a question about Trigger.Once that I never got around to
>>> ask or test for myself. If you have a 24/7 stream to a Kafka topic.
>>>
>>> Will Trigger.Once get the last offset(s) when it starts and then quit
>>> once it hits this offset(s) or will the job run until no new messages is
>>> added to the topic for a particular amount of time?
>>>
>>> br,
>>>
>>> Magnus
>>>
>>> On Sat, May 2, 2020 at 1:22 AM Burak Yavuz  wrote:
>>>
 Hi Rishi,

 That is exactly why Trigger.Once was created for Structured Streaming.
 The way we look at streaming is that it doesn't have to be always real
 time, or 24-7 always on. We see streaming as a workflow that you have to
 repeat indefinitely. See this blog post for more details!

 https://databricks.com/blog/2017/05/22/running-streaming-jobs-day-10x-cost-savings.html

 Best,
 Burak

 On Fri, May 1, 2020 at 2:55 PM Rishi Shah 
 wrote:

> Hi All,
>
> I recently started playing with spark streaming, and checkpoint
> location feature looks very promising. I wonder if anyone has an opinion
> about using spark streaming with checkpoint location option as a slow 
> batch
> processing solution. What would be the pros and cons of utilizing 
> streaming
> with checkpoint location feature to achieve fault tolerance in batch
> processing application?
>
> --
> Regards,
>
> Rishi Shah
>



Re: [spark streaming] checkpoint location feature for batch processing

2020-05-03 Thread Magnus Nilsson
Thank you, so that would mean spark gets the current latest offset(s) when
the trigger fires and then process all available messages in the topic upto
and including that offset as long as maxOffsetsPerTrigger is the default of
None (or large enought to handle all available messages).

I think the word micro-batch confused me (more like mega-batch in some
cases). It makes sense though, this makes Trigger.Once a fixed interval
trigger that's only fired once and not repeatedly.


On Sun, May 3, 2020 at 3:20 AM Jungtaek Lim 
wrote:

> If I understand correctly, Trigger.once executes only one micro-batch and
> terminates, that's all. Your understanding of structured streaming applies
> there as well.
>
> It's like a hybrid approach as bringing incremental processing from
> micro-batch but having processing interval as batch. That said, while it
> enables to get both sides of benefits, it's basically structured streaming,
> inheriting all the limitations on the structured streaming, compared to the
> batch query.
>
> Spark 3.0.0 will bring some change on Trigger.once (SPARK-30669 [1]) -
> Trigger.once will "ignore" the read limit per micro-batch on data source
> (like maxOffsetsPerTrigger) and process all available input as possible.
> (Data sources should migrate to the new API to take effect, but works for
> built-in data sources like file and Kafka.)
>
> 1. https://issues.apache.org/jira/browse/SPARK-30669
>
> 2020년 5월 2일 (토) 오후 5:35, Magnus Nilsson 님이 작성:
>
>> I've always had a question about Trigger.Once that I never got around to
>> ask or test for myself. If you have a 24/7 stream to a Kafka topic.
>>
>> Will Trigger.Once get the last offset(s) when it starts and then quit
>> once it hits this offset(s) or will the job run until no new messages is
>> added to the topic for a particular amount of time?
>>
>> br,
>>
>> Magnus
>>
>> On Sat, May 2, 2020 at 1:22 AM Burak Yavuz  wrote:
>>
>>> Hi Rishi,
>>>
>>> That is exactly why Trigger.Once was created for Structured Streaming.
>>> The way we look at streaming is that it doesn't have to be always real
>>> time, or 24-7 always on. We see streaming as a workflow that you have to
>>> repeat indefinitely. See this blog post for more details!
>>>
>>> https://databricks.com/blog/2017/05/22/running-streaming-jobs-day-10x-cost-savings.html
>>>
>>> Best,
>>> Burak
>>>
>>> On Fri, May 1, 2020 at 2:55 PM Rishi Shah 
>>> wrote:
>>>
 Hi All,

 I recently started playing with spark streaming, and checkpoint
 location feature looks very promising. I wonder if anyone has an opinion
 about using spark streaming with checkpoint location option as a slow batch
 processing solution. What would be the pros and cons of utilizing streaming
 with checkpoint location feature to achieve fault tolerance in batch
 processing application?

 --
 Regards,

 Rishi Shah

>>>


Re: [spark streaming] checkpoint location feature for batch processing

2020-05-02 Thread Jungtaek Lim
If I understand correctly, Trigger.once executes only one micro-batch and
terminates, that's all. Your understanding of structured streaming applies
there as well.

It's like a hybrid approach as bringing incremental processing from
micro-batch but having processing interval as batch. That said, while it
enables to get both sides of benefits, it's basically structured streaming,
inheriting all the limitations on the structured streaming, compared to the
batch query.

Spark 3.0.0 will bring some change on Trigger.once (SPARK-30669 [1]) -
Trigger.once will "ignore" the read limit per micro-batch on data source
(like maxOffsetsPerTrigger) and process all available input as possible.
(Data sources should migrate to the new API to take effect, but works for
built-in data sources like file and Kafka.)

1. https://issues.apache.org/jira/browse/SPARK-30669

2020년 5월 2일 (토) 오후 5:35, Magnus Nilsson 님이 작성:

> I've always had a question about Trigger.Once that I never got around to
> ask or test for myself. If you have a 24/7 stream to a Kafka topic.
>
> Will Trigger.Once get the last offset(s) when it starts and then quit once
> it hits this offset(s) or will the job run until no new messages is added
> to the topic for a particular amount of time?
>
> br,
>
> Magnus
>
> On Sat, May 2, 2020 at 1:22 AM Burak Yavuz  wrote:
>
>> Hi Rishi,
>>
>> That is exactly why Trigger.Once was created for Structured Streaming.
>> The way we look at streaming is that it doesn't have to be always real
>> time, or 24-7 always on. We see streaming as a workflow that you have to
>> repeat indefinitely. See this blog post for more details!
>>
>> https://databricks.com/blog/2017/05/22/running-streaming-jobs-day-10x-cost-savings.html
>>
>> Best,
>> Burak
>>
>> On Fri, May 1, 2020 at 2:55 PM Rishi Shah 
>> wrote:
>>
>>> Hi All,
>>>
>>> I recently started playing with spark streaming, and checkpoint location
>>> feature looks very promising. I wonder if anyone has an opinion about using
>>> spark streaming with checkpoint location option as a slow batch processing
>>> solution. What would be the pros and cons of utilizing streaming with
>>> checkpoint location feature to achieve fault tolerance in batch processing
>>> application?
>>>
>>> --
>>> Regards,
>>>
>>> Rishi Shah
>>>
>>


Re: [spark streaming] checkpoint location feature for batch processing

2020-05-02 Thread Magnus Nilsson
I've always had a question about Trigger.Once that I never got around to
ask or test for myself. If you have a 24/7 stream to a Kafka topic.

Will Trigger.Once get the last offset(s) when it starts and then quit once
it hits this offset(s) or will the job run until no new messages is added
to the topic for a particular amount of time?

br,

Magnus

On Sat, May 2, 2020 at 1:22 AM Burak Yavuz  wrote:

> Hi Rishi,
>
> That is exactly why Trigger.Once was created for Structured Streaming. The
> way we look at streaming is that it doesn't have to be always real time, or
> 24-7 always on. We see streaming as a workflow that you have to repeat
> indefinitely. See this blog post for more details!
>
> https://databricks.com/blog/2017/05/22/running-streaming-jobs-day-10x-cost-savings.html
>
> Best,
> Burak
>
> On Fri, May 1, 2020 at 2:55 PM Rishi Shah 
> wrote:
>
>> Hi All,
>>
>> I recently started playing with spark streaming, and checkpoint location
>> feature looks very promising. I wonder if anyone has an opinion about using
>> spark streaming with checkpoint location option as a slow batch processing
>> solution. What would be the pros and cons of utilizing streaming with
>> checkpoint location feature to achieve fault tolerance in batch processing
>> application?
>>
>> --
>> Regards,
>>
>> Rishi Shah
>>
>


Re: [spark streaming] checkpoint location feature for batch processing

2020-05-01 Thread Rishi Shah
Thanks Burak! Appreciate it. This makes sense.

How do you suggest we make sure resulting data doesn't produce tiny files?
If we are not on databricks yet and can not leverage delta lake features?
Also checkpointing feature, do you have active blog/article I can take
a look at to try out an example?

On Fri, May 1, 2020 at 7:22 PM Burak Yavuz  wrote:

> Hi Rishi,
>
> That is exactly why Trigger.Once was created for Structured Streaming. The
> way we look at streaming is that it doesn't have to be always real time, or
> 24-7 always on. We see streaming as a workflow that you have to repeat
> indefinitely. See this blog post for more details!
>
> https://databricks.com/blog/2017/05/22/running-streaming-jobs-day-10x-cost-savings.html
>
> Best,
> Burak
>
> On Fri, May 1, 2020 at 2:55 PM Rishi Shah 
> wrote:
>
>> Hi All,
>>
>> I recently started playing with spark streaming, and checkpoint location
>> feature looks very promising. I wonder if anyone has an opinion about using
>> spark streaming with checkpoint location option as a slow batch processing
>> solution. What would be the pros and cons of utilizing streaming with
>> checkpoint location feature to achieve fault tolerance in batch processing
>> application?
>>
>> --
>> Regards,
>>
>> Rishi Shah
>>
>

-- 
Regards,

Rishi Shah


Re: [spark streaming] checkpoint location feature for batch processing

2020-05-01 Thread Burak Yavuz
Hi Rishi,

That is exactly why Trigger.Once was created for Structured Streaming. The
way we look at streaming is that it doesn't have to be always real time, or
24-7 always on. We see streaming as a workflow that you have to repeat
indefinitely. See this blog post for more details!
https://databricks.com/blog/2017/05/22/running-streaming-jobs-day-10x-cost-savings.html

Best,
Burak

On Fri, May 1, 2020 at 2:55 PM Rishi Shah  wrote:

> Hi All,
>
> I recently started playing with spark streaming, and checkpoint location
> feature looks very promising. I wonder if anyone has an opinion about using
> spark streaming with checkpoint location option as a slow batch processing
> solution. What would be the pros and cons of utilizing streaming with
> checkpoint location feature to achieve fault tolerance in batch processing
> application?
>
> --
> Regards,
>
> Rishi Shah
>


Re: Spark Streaming not working

2020-04-14 Thread Gerard Maas
Hi,

Could you share the code that you're using to configure the connection to
the Kafka broker?

This is a bread-and-butter feature. My first thought is that there's
something in your particular setup that prevents this from working.

kind regards, Gerard.

On Fri, Apr 10, 2020 at 7:34 PM Debabrata Ghosh 
wrote:

> Hi,
> I have a spark streaming application where Kafka is producing
> records but unfortunately spark streaming isn't able to consume those.
>
> I am hitting the following error:
>
> 20/04/10 17:28:04 ERROR Executor: Exception in task 0.5 in stage 0.0 (TID 24)
> java.lang.AssertionError: assertion failed: Failed to get records for 
> spark-executor-service-spark-ingestion dice-ingestion 11 0 after polling for 
> 12
>   at scala.Predef$.assert(Predef.scala:170)
>   at 
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74)
>   at 
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223)
>   at 
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189)
>
>
> Would you please be able to help with a resolution.
>
> Thanks,
> Debu
>


Re: Spark Streaming not working

2020-04-14 Thread Gabor Somogyi
Sorry, hit the send accidentally...

The symptom is simple, the broker is not responding in 120 seconds.
That's the reason why Debabrata asked the broker config.

What I can suggest is to check the previous printout which logs the Kafka
consumer settings.
With the mentioned settings you can start a console consumer on the exact
same host where the executor ran...
If that works you can open a Spark jira with driver and executor logs,
otherwise fix the connection issue.

BR,
G


On Tue, Apr 14, 2020 at 1:32 PM Gabor Somogyi 
wrote:

> The symptom is simple, the broker is not responding in 120 seconds.
> That's the reason why Debabrata asked the broker config.
>
> What I can suggest is to check the previous printout which logs the Kafka
> consumer settings.
> With
>
>
> On Tue, Apr 14, 2020 at 11:44 AM ZHANG Wei  wrote:
>
>> Here is the assertion error message format:
>>
>>s"Failed to get records for $groupId $topic $partition $offset after
>> polling for $timeout")
>>
>> You might have to check the kafka service with the error log:
>>
>> > 20/04/10 17:28:04 ERROR Executor: Exception in task 0.5 in stage 0.0
>> (TID 24)
>> > java.lang.AssertionError: assertion failed: Failed to get records for
>> spark-executor-service-spark-ingestion dice-ingestion 11 0 after polling
>> for 12
>>
>> Cheers,
>> -z
>>
>> ____
>> From: Debabrata Ghosh 
>> Sent: Saturday, April 11, 2020 2:25
>> To: user
>> Subject: Re: Spark Streaming not working
>>
>> Any solution please ?
>>
>> On Fri, Apr 10, 2020 at 11:04 PM Debabrata Ghosh > <mailto:mailford...@gmail.com>> wrote:
>> Hi,
>> I have a spark streaming application where Kafka is producing
>> records but unfortunately spark streaming isn't able to consume those.
>>
>> I am hitting the following error:
>>
>> 20/04/10 17:28:04 ERROR Executor: Exception in task 0.5 in stage 0.0 (TID
>> 24)
>> java.lang.AssertionError: assertion failed: Failed to get records for
>> spark-executor-service-spark-ingestion dice-ingestion 11 0 after polling
>> for 12
>> at scala.Predef$.assert(Predef.scala:170)
>> at
>> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74)
>> at
>> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223)
>> at
>> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189)
>>
>> Would you please be able to help with a resolution.
>>
>> Thanks,
>> Debu
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>


Re: Spark Streaming not working

2020-04-14 Thread Gabor Somogyi
The symptom is simple, the broker is not responding in 120 seconds.
That's the reason why Debabrata asked the broker config.

What I can suggest is to check the previous printout which logs the Kafka
consumer settings.
With


On Tue, Apr 14, 2020 at 11:44 AM ZHANG Wei  wrote:

> Here is the assertion error message format:
>
>s"Failed to get records for $groupId $topic $partition $offset after
> polling for $timeout")
>
> You might have to check the kafka service with the error log:
>
> > 20/04/10 17:28:04 ERROR Executor: Exception in task 0.5 in stage 0.0
> (TID 24)
> > java.lang.AssertionError: assertion failed: Failed to get records for
> spark-executor-service-spark-ingestion dice-ingestion 11 0 after polling
> for 12
>
> Cheers,
> -z
>
> 
> From: Debabrata Ghosh 
> Sent: Saturday, April 11, 2020 2:25
> To: user
> Subject: Re: Spark Streaming not working
>
> Any solution please ?
>
> On Fri, Apr 10, 2020 at 11:04 PM Debabrata Ghosh  <mailto:mailford...@gmail.com>> wrote:
> Hi,
> I have a spark streaming application where Kafka is producing
> records but unfortunately spark streaming isn't able to consume those.
>
> I am hitting the following error:
>
> 20/04/10 17:28:04 ERROR Executor: Exception in task 0.5 in stage 0.0 (TID
> 24)
> java.lang.AssertionError: assertion failed: Failed to get records for
> spark-executor-service-spark-ingestion dice-ingestion 11 0 after polling
> for 12
> at scala.Predef$.assert(Predef.scala:170)
> at
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74)
> at
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223)
> at
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189)
>
> Would you please be able to help with a resolution.
>
> Thanks,
> Debu
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark Streaming not working

2020-04-14 Thread ZHANG Wei
Here is the assertion error message format:

   s"Failed to get records for $groupId $topic $partition $offset after polling 
for $timeout")

You might have to check the kafka service with the error log:

> 20/04/10 17:28:04 ERROR Executor: Exception in task 0.5 in stage 0.0 (TID 24)
> java.lang.AssertionError: assertion failed: Failed to get records for 
> spark-executor-service-spark-ingestion dice-ingestion 11 0 after polling for 
> 12

Cheers,
-z


From: Debabrata Ghosh 
Sent: Saturday, April 11, 2020 2:25
To: user
Subject: Re: Spark Streaming not working

Any solution please ?

On Fri, Apr 10, 2020 at 11:04 PM Debabrata Ghosh 
mailto:mailford...@gmail.com>> wrote:
Hi,
I have a spark streaming application where Kafka is producing records 
but unfortunately spark streaming isn't able to consume those.

I am hitting the following error:

20/04/10 17:28:04 ERROR Executor: Exception in task 0.5 in stage 0.0 (TID 24)
java.lang.AssertionError: assertion failed: Failed to get records for 
spark-executor-service-spark-ingestion dice-ingestion 11 0 after polling for 
12
at scala.Predef$.assert(Predef.scala:170)
at 
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74)
at 
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223)
at 
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189)

Would you please be able to help with a resolution.

Thanks,
Debu

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming not working

2020-04-10 Thread Debabrata Ghosh
Any solution please ?

On Fri, Apr 10, 2020 at 11:04 PM Debabrata Ghosh 
wrote:

> Hi,
> I have a spark streaming application where Kafka is producing
> records but unfortunately spark streaming isn't able to consume those.
>
> I am hitting the following error:
>
> 20/04/10 17:28:04 ERROR Executor: Exception in task 0.5 in stage 0.0 (TID 24)
> java.lang.AssertionError: assertion failed: Failed to get records for 
> spark-executor-service-spark-ingestion dice-ingestion 11 0 after polling for 
> 12
>   at scala.Predef$.assert(Predef.scala:170)
>   at 
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74)
>   at 
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223)
>   at 
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189)
>
>
> Would you please be able to help with a resolution.
>
> Thanks,
> Debu
>


Re: Spark Streaming not working

2020-04-10 Thread Chenguang He
unsubscribe


Re: Spark Streaming not working

2020-04-10 Thread Debabrata Ghosh
Yes the Kafka producer is producing records from the same host - Rechecked
Kafka connection and the connection is there. Came across this URL but
unable to understand it

https://stackoverflow.com/questions/42264669/spark-streaming-assertion-failed-failed-to-get-records-for-spark-executor-a-gro

On Fri, Apr 10, 2020 at 11:14 PM Srinivas V  wrote:

> Check if your broker details are correct, verify if you have network
> connectivity to your client box and Kafka broker server host.
>
> On Fri, Apr 10, 2020 at 11:04 PM Debabrata Ghosh 
> wrote:
>
>> Hi,
>> I have a spark streaming application where Kafka is producing
>> records but unfortunately spark streaming isn't able to consume those.
>>
>> I am hitting the following error:
>>
>> 20/04/10 17:28:04 ERROR Executor: Exception in task 0.5 in stage 0.0 (TID 24)
>> java.lang.AssertionError: assertion failed: Failed to get records for 
>> spark-executor-service-spark-ingestion dice-ingestion 11 0 after polling for 
>> 12
>>  at scala.Predef$.assert(Predef.scala:170)
>>  at 
>> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74)
>>  at 
>> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223)
>>  at 
>> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189)
>>
>>
>> Would you please be able to help with a resolution.
>>
>> Thanks,
>> Debu
>>
>


Re: Spark Streaming not working

2020-04-10 Thread Srinivas V
Check if your broker details are correct, verify if you have network
connectivity to your client box and Kafka broker server host.

On Fri, Apr 10, 2020 at 11:04 PM Debabrata Ghosh 
wrote:

> Hi,
> I have a spark streaming application where Kafka is producing
> records but unfortunately spark streaming isn't able to consume those.
>
> I am hitting the following error:
>
> 20/04/10 17:28:04 ERROR Executor: Exception in task 0.5 in stage 0.0 (TID 24)
> java.lang.AssertionError: assertion failed: Failed to get records for 
> spark-executor-service-spark-ingestion dice-ingestion 11 0 after polling for 
> 12
>   at scala.Predef$.assert(Predef.scala:170)
>   at 
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74)
>   at 
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223)
>   at 
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189)
>
>
> Would you please be able to help with a resolution.
>
> Thanks,
> Debu
>


Re: Spark Streaming on Compact Kafka topic - consumers 1 message per partition per batch

2020-04-08 Thread Hrishikesh Mishra
It seems, I found the issue. The actual problem is something related to
back pressure. When I am adding these config
*spark.streaming.kafka.maxRatePerPartition* or
*spark.streaming.backpressure.initialRate* (the of these configs are 100).
After that it starts consuming one message per partition per batch. Not why
it's happening.


On Thu, Apr 2, 2020 at 8:48 AM Waleed Fateem 
wrote:

> Well this is interesting. Not sure if this is the expected behavior. The
> log messages you have referenced are actually printed out by the Kafka
> Consumer itself (org.apache.kafka.clients.consumer.internals.Fetcher).
>
> That log message belongs to a new feature added starting with Kafka 1.1:
> https://issues.apache.org/jira/browse/KAFKA-6397
>
> I'm assuming then that you're using Spark 2.4?
>
> From Kafka's perspective, when you do a describe on your
> demandIngestion.SLTarget topic, does that look okay? All partitions are
> available with a valid leader.
>
> The other thing I'm curious about, after you
> enabled spark.streaming.kafka.allowNonConsecutiveOffsets, did you try going
> back to the older group.id and do you see the same behavior? Was there a
> reason you chose to start reading again from the beginning by using a new
> consumer group rather then sticking to the same consumer group?
>
> In your application, are you manually committing offsets to Kafka?
>
> Regards,
>
> Waleed
>
> On Wed, Apr 1, 2020 at 1:31 AM Hrishikesh Mishra 
> wrote:
>
>> Hi
>>
>> Our Spark streaming job was working fine as expected (the number of
>> events to process in a batch). But due to some reasons, we added compaction
>> on Kafka topic and restarted the job. But after restart it was failing for
>> below reason:
>>
>>
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>> 16 in stage 2.0 failed 4 times, most recent failure: Lost task 16.3 in
>> stage 2.0 (TID 231, 10.34.29.38, executor 4):
>> java.lang.IllegalArgumentException: requirement failed: Got wrong record
>> for spark-executor-pc-nfr-loop-31-march-2020-4 demandIngestion.SLTarget-39
>> even after seeking to offset 106847 got offset 199066 instead. If this is a
>> compacted topic, consider enabling
>> spark.streaming.kafka.allowNonConsecutiveOffsets
>>   at scala.Predef$.require(Predef.scala:224)
>>   at
>> org.apache.spark.streaming.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:146)
>>
>>
>>
>> So, I added spark.streaming.kafka.allowNonConsecutiveOffsets: true  in
>> spark config and I changed the group name to consume from beginning. Now
>> the problem is, it reading only one message from per partition. So if a
>> topic has 50 partitions then its reading 50 message per batch (batch
>> duration is 5 sec).
>>
>> The topic is 1M records and consumer has huge lag.
>>
>>
>> Driver log which fetches 1 message per partition.
>>
>> 20/03/31 18:25:55 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
>> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211951.
>> 20/03/31 18:26:00 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
>> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211952.
>> 20/03/31 18:26:05 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
>> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211953.
>> 20/03/31 18:26:10 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
>> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211954.
>> 20/03/31 18:26:15 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
>> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211955.
>> 20/03/31 18:26:20 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
>> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211956.
>> 20/03/31 18:26:25 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
>> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211957.
>> 20/03/31 18:26:30 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
>> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211958.
>> 20/03/31 18:26:35 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
>> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211959.
>> 20/03/31 18:26:40 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
>> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211960.
>> 20/03/31 18:26:45 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
>> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211961.
>> 20/03/31 18:26:50 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
>> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211962.
>> 20/03/31 18:26:55 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
>> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211963.
>> 20/03/31 18:27:00 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
>> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211964.
>> 20/03/31 

Re: Spark Streaming on Compact Kafka topic - consumers 1 message per partition per batch

2020-04-01 Thread Waleed Fateem
Well this is interesting. Not sure if this is the expected behavior. The
log messages you have referenced are actually printed out by the Kafka
Consumer itself (org.apache.kafka.clients.consumer.internals.Fetcher).

That log message belongs to a new feature added starting with Kafka 1.1:
https://issues.apache.org/jira/browse/KAFKA-6397

I'm assuming then that you're using Spark 2.4?

>From Kafka's perspective, when you do a describe on your
demandIngestion.SLTarget topic, does that look okay? All partitions are
available with a valid leader.

The other thing I'm curious about, after you
enabled spark.streaming.kafka.allowNonConsecutiveOffsets, did you try going
back to the older group.id and do you see the same behavior? Was there a
reason you chose to start reading again from the beginning by using a new
consumer group rather then sticking to the same consumer group?

In your application, are you manually committing offsets to Kafka?

Regards,

Waleed

On Wed, Apr 1, 2020 at 1:31 AM Hrishikesh Mishra 
wrote:

> Hi
>
> Our Spark streaming job was working fine as expected (the number of events
> to process in a batch). But due to some reasons, we added compaction on
> Kafka topic and restarted the job. But after restart it was failing for
> below reason:
>
>
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 16
> in stage 2.0 failed 4 times, most recent failure: Lost task 16.3 in stage
> 2.0 (TID 231, 10.34.29.38, executor 4): java.lang.IllegalArgumentException:
> requirement failed: Got wrong record for
> spark-executor-pc-nfr-loop-31-march-2020-4 demandIngestion.SLTarget-39 even
> after seeking to offset 106847 got offset 199066 instead. If this is a
> compacted topic, consider enabling
> spark.streaming.kafka.allowNonConsecutiveOffsets
>   at scala.Predef$.require(Predef.scala:224)
>   at
> org.apache.spark.streaming.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:146)
>
>
>
> So, I added spark.streaming.kafka.allowNonConsecutiveOffsets: true  in
> spark config and I changed the group name to consume from beginning. Now
> the problem is, it reading only one message from per partition. So if a
> topic has 50 partitions then its reading 50 message per batch (batch
> duration is 5 sec).
>
> The topic is 1M records and consumer has huge lag.
>
>
> Driver log which fetches 1 message per partition.
>
> 20/03/31 18:25:55 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211951.
> 20/03/31 18:26:00 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211952.
> 20/03/31 18:26:05 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211953.
> 20/03/31 18:26:10 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211954.
> 20/03/31 18:26:15 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211955.
> 20/03/31 18:26:20 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211956.
> 20/03/31 18:26:25 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211957.
> 20/03/31 18:26:30 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211958.
> 20/03/31 18:26:35 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211959.
> 20/03/31 18:26:40 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211960.
> 20/03/31 18:26:45 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211961.
> 20/03/31 18:26:50 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211962.
> 20/03/31 18:26:55 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211963.
> 20/03/31 18:27:00 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211964.
> 20/03/31 18:27:05 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211965.
> 20/03/31 18:27:10 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211966.
> 20/03/31 18:27:15 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211967.
> 20/03/31 18:27:20 INFO Fetcher: 

Re: Spark Streaming Code

2020-03-28 Thread Jungtaek Lim
To get any meaningful answers you may want to provide the
information/context as much as possible. e.g. Spark version, which
behavior/output was expected (and why you think) and how it behaves
actually.

On Sun, Mar 29, 2020 at 3:37 AM Siva Samraj  wrote:

> Hi Team,
>
> Need help on windowing & watermark concept.  This code is not working as
> expected.
>
> package com.jiomoney.streaming
>
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.streaming.ProcessingTime
>
> object SlingStreaming {
>   def main(args: Array[String]): Unit = {
> val spark = SparkSession
>   .builder()
>   .master("local[*]")
>   .appName("Coupons_ViewingNow")
>   .getOrCreate()
>
> import spark.implicits._
>
> val checkpoint_path = "/opt/checkpoints/"
>
> val ks = spark
>   .readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", "localhost:9092")
>   .option("subscribe", "test")
>   .option("startingOffsets", "latest")
>   .option("failOnDataLoss", "false")
>   .option("kafka.replica.fetch.max.bytes", "16777216")
>   .load()
>
> val dfDeviceid = ks
>   .withColumn("val", ($"value").cast("string"))
>   .withColumn("count1", get_json_object(($"val"), "$.a"))
>   .withColumn("deviceId", get_json_object(($"val"), "$.b"))
>   .withColumn("timestamp", current_timestamp())
>
>
> val final_ids = dfDeviceid
>   .withColumn("processing_time", current_timestamp())
>   .withWatermark("processing_time","1 minutes")
>   .groupBy(window($"processing_time", "10 seconds"), $"deviceId")
>   .agg(sum($"count1") as "total")
>
> val t = final_ids
>   .select(to_json(struct($"*")) as "value")
>   .writeStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", "localhost:9092")
>   .option("topic", "sub_topic")
>   .option("checkpointLocation", checkpoint_path)
>   .outputMode("append")
>   .trigger(ProcessingTime("1 seconds"))
>   .start()
>
> t.awaitTermination()
>
>   }
>
> }
>
>
> Thanks
>
>


Re: Spark Streaming with mapGroupsWithState

2020-03-02 Thread Something Something
I changed it to Tuple2 and that problem is solved.

Any thoughts on this message

*Unapplied methods are only converted to functions when a function type is
expected.*

*You can make this conversion explicit by writing `updateAcrossEvents _` or
`updateAcrossEvents(_,_,_,_,_)` instead of `updateAcrossEvents`.
.mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)*

On Mon, Mar 2, 2020 at 5:12 PM lec ssmi  wrote:

> maybe you can combine the fields you want to use into one field
>
> Something Something  于2020年3月3日周二 上午6:37写道:
>
>> I am writing a Stateful Streaming application in which I am using
>> mapGroupsWithState to create aggregates for Groups but I need to create 
>> *Groups
>> based on more than one column in the Input Row*. All the examples in the
>> 'Spark: The Definitive Guide' use only one column such as 'User' or
>> 'Device'. I am using code similar to what's given below. *How do I
>> specify more than one field in the 'groupByKey'?*
>>
>> There are other challenges as well. The book says we can use
>> 'updateAcrossEvents' the way given below but I get compile time error
>> saying:
>>
>>
>> *Error:(43, 65) missing argument list for method updateAcrossEvents in
>> object MainUnapplied methods are only converted to functions when a
>> function type is expected.You can make this conversion explicit by writing
>> `updateAcrossEvents _` or `updateAcrossEvents(_,_,_,_,_)` instead of
>> `updateAcrossEvents`.
>> .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)*
>>
>> Another challenge: Compiler also complains about the my *MyReport*: 
>> *Error:(41,
>> 12) Unable to find encoder for type stored in a Dataset.  Primitive types
>> (Int, String, etc) and Product types (case classes) are supported by
>> importing spark.implicits._  Support for serializing other types will be
>> added in future releases.*
>>
>> Help in resolving these errors would be greatly appreciated. Thanks in
>> advance.
>>
>>
>> withEventTime
>> .as[MyReport]
>>   .groupByKey(_.getKeys.getKey1). // How do I add _.getKeys.getKey2?
>>   
>> .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)
>>   .writeStream
>>   .queryName("test_query")
>>   .format("memory")
>>   .outputMode("update")
>>   .start()
>>
>>


Re: Spark Streaming with mapGroupsWithState

2020-03-02 Thread lec ssmi
maybe you can combine the fields you want to use into one field

Something Something  于2020年3月3日周二 上午6:37写道:

> I am writing a Stateful Streaming application in which I am using
> mapGroupsWithState to create aggregates for Groups but I need to create 
> *Groups
> based on more than one column in the Input Row*. All the examples in the
> 'Spark: The Definitive Guide' use only one column such as 'User' or
> 'Device'. I am using code similar to what's given below. *How do I
> specify more than one field in the 'groupByKey'?*
>
> There are other challenges as well. The book says we can use
> 'updateAcrossEvents' the way given below but I get compile time error
> saying:
>
>
> *Error:(43, 65) missing argument list for method updateAcrossEvents in
> object MainUnapplied methods are only converted to functions when a
> function type is expected.You can make this conversion explicit by writing
> `updateAcrossEvents _` or `updateAcrossEvents(_,_,_,_,_)` instead of
> `updateAcrossEvents`.
> .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)*
>
> Another challenge: Compiler also complains about the my *MyReport*: 
> *Error:(41,
> 12) Unable to find encoder for type stored in a Dataset.  Primitive types
> (Int, String, etc) and Product types (case classes) are supported by
> importing spark.implicits._  Support for serializing other types will be
> added in future releases.*
>
> Help in resolving these errors would be greatly appreciated. Thanks in
> advance.
>
>
> withEventTime
> .as[MyReport]
>   .groupByKey(_.getKeys.getKey1). // How do I add _.getKeys.getKey2?
>   
> .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)
>   .writeStream
>   .queryName("test_query")
>   .format("memory")
>   .outputMode("update")
>   .start()
>
>


Re: Spark Streaming: Aggregating values across batches

2020-02-27 Thread Tathagata Das
Use Structured Streaming. Its aggregation, by definition, is across batches.

On Thu, Feb 27, 2020 at 3:17 PM Something Something <
mailinglist...@gmail.com> wrote:

> We've a Spark Streaming job that calculates some values in each batch.
> What we need to do now is aggregate values across ALL batches. What is the
> best strategy to do this in Spark Streaming. Should we use 'Spark
> Accumulators' for this?
>


Re: spark streaming exception

2019-11-10 Thread Akshay Bhardwaj
Hi,

Could you provide with the code snippet of how you are connecting and
reading data from kafka?

Akshay Bhardwaj
+91-97111-33849


On Thu, Oct 17, 2019 at 8:39 PM Amit Sharma  wrote:

> Please update me if any one knows about it.
>
>
> Thanks
> Amit
>
> On Thu, Oct 10, 2019 at 3:49 PM Amit Sharma  wrote:
>
>> Hi , we have spark streaming job to which we send a request through our
>> UI using kafka. It process and returned the response. We are getting below
>> error and this stareming is not processing any request.
>>
>> Listener StreamingJobProgressListener threw an exception
>> java.util.NoSuchElementException: key not found: 1570689515000 ms
>> at scala.collection.MapLike$class.default(MapLike.scala:228)
>> at scala.collection.AbstractMap.default(Map.scala:59)
>> at scala.collection.mutable.HashMap.apply(HashMap.scala:65)
>> at
>> org.apache.spark.streaming.ui.StreamingJobProgressListener.onOutputOperationCompleted(StreamingJobProgressListener.scala:134)
>> at
>> org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:67)
>> at
>> org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:29).
>>
>> Please help me in find out the root cause of this issue.
>>
>


Re: spark streaming exception

2019-10-17 Thread Amit Sharma
Please update me if any one knows about it.


Thanks
Amit

On Thu, Oct 10, 2019 at 3:49 PM Amit Sharma  wrote:

> Hi , we have spark streaming job to which we send a request through our UI
> using kafka. It process and returned the response. We are getting below
> error and this stareming is not processing any request.
>
> Listener StreamingJobProgressListener threw an exception
> java.util.NoSuchElementException: key not found: 1570689515000 ms
> at scala.collection.MapLike$class.default(MapLike.scala:228)
> at scala.collection.AbstractMap.default(Map.scala:59)
> at scala.collection.mutable.HashMap.apply(HashMap.scala:65)
> at
> org.apache.spark.streaming.ui.StreamingJobProgressListener.onOutputOperationCompleted(StreamingJobProgressListener.scala:134)
> at
> org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:67)
> at
> org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:29).
>
> Please help me in find out the root cause of this issue.
>


Re: [Spark Streaming Kafka 0-10] - What was the reason for adding "spark-executor-" prefix to group id in executor configurations

2019-09-06 Thread Sethupathi T
Gabor,

Thanks for the clarification.

Thanks

On Fri, Sep 6, 2019 at 12:38 AM Gabor Somogyi 
wrote:

> Sethupathi,
>
> Let me extract then the important part what I've shared:
>
> 1. "This ensures that each Kafka source has its own consumer group that
> does not face interference from any other consumer"
> 2. Consumers may eat the data from each other, offset calculation may give
> back wrong result (that's the reason why "extreme caution" is recommended
> in Structured Streaming doc which still applies here)
> 3. yes
>
> BR,
> G
>
>
> On Thu, Sep 5, 2019 at 8:34 PM Sethupathi T 
> wrote:
>
>> Gabor,
>>
>> Thanks for the quick response and sharing about spark 3.0,  we need to
>> use spark streaming (KafkaUtils.createDirectStream) than structured
>> streaming by following this document
>> https://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html and
>> re-iterating the issue again for better understanding.
>> spark-streaming-kafka-0-10
>>  
>> kafka
>> connector prefix "spark-executor" + group.id for executors, driver uses
>> original group id.
>>
>> *Here is the code where executor construct executor specific group id *
>>
>>
>> https://github.com/apache/spark/blob/v2.2.0/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala
>>  line # 212,
>>
>>
>> *It would be great if you could provide the explanation to the following
>> questions.*
>>
>> #1 What was the specific reason for prefixing group id in executor ?
>>
>> #2 Will it be harmful if i build the custom spark-streaming-kafka-0-10
>> 
>>   library by removing the group id prefix? at line # 212 in
>> KafkaUtils.scala
>> #3 KafkaUtils.scala is marked as @Experimental what does it mean? is it
>> advisable to use in production?
>>
>> *Here is the my spark streaming code snippet*
>>
>> val kafkaParams = Map[String, Object](
>>
>>   ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> Constants.BOOTSTRAP_SERVERS,
>>
>>   ConsumerConfig.GROUP_ID_CONFIG -> subscribers,
>>
>>   ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
>>
>>   ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
>>
>>   ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> 
>> classOf[StringDeserializer],
>>
>>   ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> 
>> classOf[MessageDeserializer],
>>
>>   "security.protocol" -> "SSL",
>>
>>   "ssl.truststore.location" -> Constants.TRUSTSTORE_PATH,
>>
>>   "ssl.truststore.password" -> Constants.TRUSTSTORE_PASSWORD,
>>
>>   "ssl.keystore.location" -> Constants.KEYSTORE_PATH,
>>
>>   "ssl.keystore.password" -> Constants.KEYSTORE_PASSWORD,
>>
>>   "ssl.key.password" -> Constants.KEYSTORE_PASSWORD
>>
>> )
>>
>> val stream = KafkaUtils.createDirectStream[String, Message](
>>
>>   ssc,
>>
>>   PreferConsistent,
>>
>>   Subscribe[String, Message](topicsSet, kafkaParams)
>>
>> )
>>
>> ---
>> Thanks in Advance,
>> Sethupathi.T
>>
>>
>> On Thu, Sep 5, 2019 at 9:17 AM Gabor Somogyi 
>> wrote:
>>
>>> Hi,
>>>
>>> Let me share Spark 3.0 documentation part (Structured Streaming and not
>>> DStreams what you've mentioned but still relevant):
>>>
>>> kafka.group.id string none streaming and batch The Kafka group id to
>>> use in Kafka consumer while reading from Kafka. Use this with caution. By
>>> default, each query generates a unique group id for reading data. This
>>> ensures that each Kafka source has its own consumer group that does not
>>> face interference from any other consumer, and therefore can read all of
>>> the partitions of its subscribed topics. In some scenarios (for example,
>>> Kafka group-based authorization), you may want to use a specific authorized
>>> group id to read data. You can optionally set the group id. However, do
>>> this with extreme caution as it can cause unexpected behavior. Concurrently
>>> running queries (both, batch and streaming) or sources with the same group
>>> id are likely interfere with each other causing each query to read only
>>> part of the data. This may also occur when queries are started/restarted in
>>> quick succession. To minimize such issues, set the Kafka consumer session
>>> timeout (by setting option "kafka.session.timeout.ms") to be very
>>> small. When this is set, option "groupIdPrefix" will be ignored.
>>> I think it answers your questions.
>>>
>>> As a general suggestion maybe it worth to revisit Spark 3.0 because
>>> Structured Streaming has another interesting feature:
>>> groupIdPrefix string spark-kafka-source streaming and batch Prefix of
>>> consumer group identifiers (`group.id`) that are generated by
>>> structured streaming queries. If "kafka.group.id" is set, this option
>>> will be ignored.
>>>
>>> BR,
>>> G
>>>
>>>
>>> On Thu, Sep 5, 2019 at 10:05 AM Sethupathi T
>>>  wrote:
>>>
 Hi Team,

 We have secured Kafka cluster (which only allows to consume from the

Re: [Spark Streaming Kafka 0-10] - What was the reason for adding "spark-executor-" prefix to group id in executor configurations

2019-09-06 Thread Gabor Somogyi
Sethupathi,

Let me extract then the important part what I've shared:

1. "This ensures that each Kafka source has its own consumer group that
does not face interference from any other consumer"
2. Consumers may eat the data from each other, offset calculation may give
back wrong result (that's the reason why "extreme caution" is recommended
in Structured Streaming doc which still applies here)
3. yes

BR,
G


On Thu, Sep 5, 2019 at 8:34 PM Sethupathi T 
wrote:

> Gabor,
>
> Thanks for the quick response and sharing about spark 3.0,  we need to use
> spark streaming (KafkaUtils.createDirectStream) than structured streaming
> by following this document
> https://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html and
> re-iterating the issue again for better understanding.
> spark-streaming-kafka-0-10
>  
> kafka
> connector prefix "spark-executor" + group.id for executors, driver uses
> original group id.
>
> *Here is the code where executor construct executor specific group id *
>
>
> https://github.com/apache/spark/blob/v2.2.0/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala
>  line # 212,
>
>
> *It would be great if you could provide the explanation to the following
> questions.*
>
> #1 What was the specific reason for prefixing group id in executor ?
>
> #2 Will it be harmful if i build the custom spark-streaming-kafka-0-10
> 
>   library by removing the group id prefix? at line # 212 in
> KafkaUtils.scala
> #3 KafkaUtils.scala is marked as @Experimental what does it mean? is it
> advisable to use in production?
>
> *Here is the my spark streaming code snippet*
>
> val kafkaParams = Map[String, Object](
>
>   ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> Constants.BOOTSTRAP_SERVERS,
>
>   ConsumerConfig.GROUP_ID_CONFIG -> subscribers,
>
>   ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
>
>   ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
>
>   ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
>
>   ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> 
> classOf[MessageDeserializer],
>
>   "security.protocol" -> "SSL",
>
>   "ssl.truststore.location" -> Constants.TRUSTSTORE_PATH,
>
>   "ssl.truststore.password" -> Constants.TRUSTSTORE_PASSWORD,
>
>   "ssl.keystore.location" -> Constants.KEYSTORE_PATH,
>
>   "ssl.keystore.password" -> Constants.KEYSTORE_PASSWORD,
>
>   "ssl.key.password" -> Constants.KEYSTORE_PASSWORD
>
> )
>
> val stream = KafkaUtils.createDirectStream[String, Message](
>
>   ssc,
>
>   PreferConsistent,
>
>   Subscribe[String, Message](topicsSet, kafkaParams)
>
> )
>
> ---
> Thanks in Advance,
> Sethupathi.T
>
>
> On Thu, Sep 5, 2019 at 9:17 AM Gabor Somogyi 
> wrote:
>
>> Hi,
>>
>> Let me share Spark 3.0 documentation part (Structured Streaming and not
>> DStreams what you've mentioned but still relevant):
>>
>> kafka.group.id string none streaming and batch The Kafka group id to use
>> in Kafka consumer while reading from Kafka. Use this with caution. By
>> default, each query generates a unique group id for reading data. This
>> ensures that each Kafka source has its own consumer group that does not
>> face interference from any other consumer, and therefore can read all of
>> the partitions of its subscribed topics. In some scenarios (for example,
>> Kafka group-based authorization), you may want to use a specific authorized
>> group id to read data. You can optionally set the group id. However, do
>> this with extreme caution as it can cause unexpected behavior. Concurrently
>> running queries (both, batch and streaming) or sources with the same group
>> id are likely interfere with each other causing each query to read only
>> part of the data. This may also occur when queries are started/restarted in
>> quick succession. To minimize such issues, set the Kafka consumer session
>> timeout (by setting option "kafka.session.timeout.ms") to be very small.
>> When this is set, option "groupIdPrefix" will be ignored.
>> I think it answers your questions.
>>
>> As a general suggestion maybe it worth to revisit Spark 3.0 because
>> Structured Streaming has another interesting feature:
>> groupIdPrefix string spark-kafka-source streaming and batch Prefix of
>> consumer group identifiers (`group.id`) that are generated by structured
>> streaming queries. If "kafka.group.id" is set, this option will be
>> ignored.
>>
>> BR,
>> G
>>
>>
>> On Thu, Sep 5, 2019 at 10:05 AM Sethupathi T
>>  wrote:
>>
>>> Hi Team,
>>>
>>> We have secured Kafka cluster (which only allows to consume from the
>>> pre-configured, authorized consumer group), there is a scenario where we
>>> want to use spark streaming to consume from secured kafka. so we have
>>> decided to use spark-streaming-kafka-0-10
>>> 

Re: [Spark Streaming Kafka 0-10] - What was the reason for adding "spark-executor-" prefix to group id in executor configurations

2019-09-05 Thread Sethupathi T
Gabor,

Thanks for the quick response and sharing about spark 3.0,  we need to use
spark streaming (KafkaUtils.createDirectStream) than structured streaming
by following this document
https://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html and
re-iterating the issue again for better understanding.
spark-streaming-kafka-0-10

kafka
connector prefix "spark-executor" + group.id for executors, driver uses
original group id.

*Here is the code where executor construct executor specific group id *

https://github.com/apache/spark/blob/v2.2.0/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala
 line # 212,


*It would be great if you could provide the explanation to the following
questions.*

#1 What was the specific reason for prefixing group id in executor ?

#2 Will it be harmful if i build the custom spark-streaming-kafka-0-10

  library by removing the group id prefix? at line # 212 in KafkaUtils.scala
#3 KafkaUtils.scala is marked as @Experimental what does it mean? is it
advisable to use in production?

*Here is the my spark streaming code snippet*

val kafkaParams = Map[String, Object](

  ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> Constants.BOOTSTRAP_SERVERS,

  ConsumerConfig.GROUP_ID_CONFIG -> subscribers,

  ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",

  ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),

  ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],

  ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG ->
classOf[MessageDeserializer],

  "security.protocol" -> "SSL",

  "ssl.truststore.location" -> Constants.TRUSTSTORE_PATH,

  "ssl.truststore.password" -> Constants.TRUSTSTORE_PASSWORD,

  "ssl.keystore.location" -> Constants.KEYSTORE_PATH,

  "ssl.keystore.password" -> Constants.KEYSTORE_PASSWORD,

  "ssl.key.password" -> Constants.KEYSTORE_PASSWORD

)

val stream = KafkaUtils.createDirectStream[String, Message](

  ssc,

  PreferConsistent,

  Subscribe[String, Message](topicsSet, kafkaParams)

)

---
Thanks in Advance,
Sethupathi.T


On Thu, Sep 5, 2019 at 9:17 AM Gabor Somogyi 
wrote:

> Hi,
>
> Let me share Spark 3.0 documentation part (Structured Streaming and not
> DStreams what you've mentioned but still relevant):
>
> kafka.group.id string none streaming and batch The Kafka group id to use
> in Kafka consumer while reading from Kafka. Use this with caution. By
> default, each query generates a unique group id for reading data. This
> ensures that each Kafka source has its own consumer group that does not
> face interference from any other consumer, and therefore can read all of
> the partitions of its subscribed topics. In some scenarios (for example,
> Kafka group-based authorization), you may want to use a specific authorized
> group id to read data. You can optionally set the group id. However, do
> this with extreme caution as it can cause unexpected behavior. Concurrently
> running queries (both, batch and streaming) or sources with the same group
> id are likely interfere with each other causing each query to read only
> part of the data. This may also occur when queries are started/restarted in
> quick succession. To minimize such issues, set the Kafka consumer session
> timeout (by setting option "kafka.session.timeout.ms") to be very small.
> When this is set, option "groupIdPrefix" will be ignored.
> I think it answers your questions.
>
> As a general suggestion maybe it worth to revisit Spark 3.0 because
> Structured Streaming has another interesting feature:
> groupIdPrefix string spark-kafka-source streaming and batch Prefix of
> consumer group identifiers (`group.id`) that are generated by structured
> streaming queries. If "kafka.group.id" is set, this option will be
> ignored.
>
> BR,
> G
>
>
> On Thu, Sep 5, 2019 at 10:05 AM Sethupathi T
>  wrote:
>
>> Hi Team,
>>
>> We have secured Kafka cluster (which only allows to consume from the
>> pre-configured, authorized consumer group), there is a scenario where we
>> want to use spark streaming to consume from secured kafka. so we have
>> decided to use spark-streaming-kafka-0-10
>>  
>> (it
>> supports SSL/TSL, Direct DStream, new Kafka consumer API, etc) api. When i
>> deploy the application in cluster mode, i realized that the actual group id
>> has been prefixed with "spark-executor" in executor configuration (executor
>> as trying to connect to kafka with "spark-executor" + actual group id,
>> which is not really exists and getting exception).
>>
>> *Here is the code where executor construct executor specific group id *
>>
>>
>> https://github.com/apache/spark/blob/v2.2.0/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala
>>  line
>> # 212,
>>
>> *Here 

Re: [Spark Streaming Kafka 0-10] - What was the reason for adding "spark-executor-" prefix to group id in executor configurations

2019-09-05 Thread Sethupathi T
Gabor,

Thanks for the quick response and sharing about spark 3.0,  we need to use
spark streaming (KafkaUtils.createDirectStream) than structured streaming
by following this document
https://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html and
re-iterating the issue again for better understanding.
spark-streaming-kafka-0-10

kafka
connector prefix "spark-executor" + group.id for executors, driver uses
original group id.

*Here is the code where executor construct executor specific group id *

https://github.com/apache/spark/blob/v2.2.0/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala
line
# 212,

*It would be great if you could provide the explanation to the following
questions.*

#1 What was the specific reason for prefixing group id in executor ?

#2 Will it be harmful if i build the custom spark-streaming-kafka-0-10

library by removing the group id prefix? at line # 212 in KafkaUtils.scala

#3 KafkaUtils.scala is marked as @Experimental what does it mean? is it
advisable to use in production?

*Here is the my spark streaming code snippet*

val kafkaParams = Map[String, Object](

  ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> Constants.BOOTSTRAP_SERVERS,
  ConsumerConfig.GROUP_ID_CONFIG -> subscribers,
  ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
  ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
  ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
  ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG ->
classOf[MessageDeserializer],
  "security.protocol" -> "SSL",
  "ssl.truststore.location" -> Constants.TRUSTSTORE_PATH,
  "ssl.truststore.password" -> Constants.TRUSTSTORE_PASSWORD,
  "ssl.keystore.location" -> Constants.KEYSTORE_PATH,
  "ssl.keystore.password" -> Constants.KEYSTORE_PASSWORD,
  "ssl.key.password" -> Constants.KEYSTORE_PASSWORD
)

val stream = KafkaUtils.createDirectStream[String, Message](
  ssc,
  PreferConsistent,
  Subscribe[String, Message](topicsSet, kafkaParams)
)

---
Thanks in Advance,
Sethupathi.T

On Thu, Sep 5, 2019 at 9:17 AM Gabor Somogyi 
wrote:

> Hi,
>
> Let me share Spark 3.0 documentation part (Structured Streaming and not
> DStreams what you've mentioned but still relevant):
>
> kafka.group.id string none streaming and batch The Kafka group id to use
> in Kafka consumer while reading from Kafka. Use this with caution. By
> default, each query generates a unique group id for reading data. This
> ensures that each Kafka source has its own consumer group that does not
> face interference from any other consumer, and therefore can read all of
> the partitions of its subscribed topics. In some scenarios (for example,
> Kafka group-based authorization), you may want to use a specific authorized
> group id to read data. You can optionally set the group id. However, do
> this with extreme caution as it can cause unexpected behavior. Concurrently
> running queries (both, batch and streaming) or sources with the same group
> id are likely interfere with each other causing each query to read only
> part of the data. This may also occur when queries are started/restarted in
> quick succession. To minimize such issues, set the Kafka consumer session
> timeout (by setting option "kafka.session.timeout.ms") to be very small.
> When this is set, option "groupIdPrefix" will be ignored.
> I think it answers your questions.
>
> As a general suggestion maybe it worth to revisit Spark 3.0 because
> Structured Streaming has another interesting feature:
> groupIdPrefix string spark-kafka-source streaming and batch Prefix of
> consumer group identifiers (`group.id`) that are generated by structured
> streaming queries. If "kafka.group.id" is set, this option will be
> ignored.
>
> BR,
> G
>
>
> On Thu, Sep 5, 2019 at 10:05 AM Sethupathi T
>  wrote:
>
>> Hi Team,
>>
>> We have secured Kafka cluster (which only allows to consume from the
>> pre-configured, authorized consumer group), there is a scenario where we
>> want to use spark streaming to consume from secured kafka. so we have
>> decided to use spark-streaming-kafka-0-10
>>  
>> (it
>> supports SSL/TSL, Direct DStream, new Kafka consumer API, etc) api. When i
>> deploy the application in cluster mode, i realized that the actual group id
>> has been prefixed with "spark-executor" in executor configuration (executor
>> as trying to connect to kafka with "spark-executor" + actual group id,
>> which is not really exists and getting exception).
>>
>> *Here is the code where executor construct executor specific group id *
>>
>>
>> https://github.com/apache/spark/blob/v2.2.0/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala
>>  line
>> # 212,
>>
>> *Here are my Questions*
>>

Re: [Spark Streaming Kafka 0-10] - What was the reason for adding "spark-executor-" prefix to group id in executor configurations

2019-09-05 Thread Gabor Somogyi
Hi,

Let me share Spark 3.0 documentation part (Structured Streaming and not
DStreams what you've mentioned but still relevant):

kafka.group.id string none streaming and batch The Kafka group id to use in
Kafka consumer while reading from Kafka. Use this with caution. By default,
each query generates a unique group id for reading data. This ensures that
each Kafka source has its own consumer group that does not face
interference from any other consumer, and therefore can read all of the
partitions of its subscribed topics. In some scenarios (for example, Kafka
group-based authorization), you may want to use a specific authorized group
id to read data. You can optionally set the group id. However, do this with
extreme caution as it can cause unexpected behavior. Concurrently running
queries (both, batch and streaming) or sources with the same group id are
likely interfere with each other causing each query to read only part of
the data. This may also occur when queries are started/restarted in quick
succession. To minimize such issues, set the Kafka consumer session timeout
(by setting option "kafka.session.timeout.ms") to be very small. When this
is set, option "groupIdPrefix" will be ignored.
I think it answers your questions.

As a general suggestion maybe it worth to revisit Spark 3.0 because
Structured Streaming has another interesting feature:
groupIdPrefix string spark-kafka-source streaming and batch Prefix of
consumer group identifiers (`group.id`) that are generated by structured
streaming queries. If "kafka.group.id" is set, this option will be ignored.

BR,
G


On Thu, Sep 5, 2019 at 10:05 AM Sethupathi T
 wrote:

> Hi Team,
>
> We have secured Kafka cluster (which only allows to consume from the
> pre-configured, authorized consumer group), there is a scenario where we
> want to use spark streaming to consume from secured kafka. so we have
> decided to use spark-streaming-kafka-0-10
>  
> (it
> supports SSL/TSL, Direct DStream, new Kafka consumer API, etc) api. When i
> deploy the application in cluster mode, i realized that the actual group id
> has been prefixed with "spark-executor" in executor configuration (executor
> as trying to connect to kafka with "spark-executor" + actual group id,
> which is not really exists and getting exception).
>
> *Here is the code where executor construct executor specific group id *
>
>
> https://github.com/apache/spark/blob/v2.2.0/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala
>  line
> # 212,
>
> *Here are my Questions*
>
> #1 What was the specific reason for prefixing group id in executor ?
>
> #2 Will it be harmful if i build the custom spark-streaming-kafka-0-10
> 
> library by removing the group id prefix?
>
> #3 KafkaUtils.scala is marked as @Experimental what does it mean? is it
> advisable to use in production?
>
> *Here is the my spark streaming code snippet*
>
> val kafkaParams = Map[String, Object](
>
>   ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> Constants.BOOTSTRAP_SERVERS,
>   ConsumerConfig.GROUP_ID_CONFIG -> subscribers,
>   ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
>   ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
>   ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
>   ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> 
> classOf[MessageDeserializer],
>   "security.protocol" -> "SSL",
>   "ssl.truststore.location" -> Constants.TRUSTSTORE_PATH,
>   "ssl.truststore.password" -> Constants.TRUSTSTORE_PASSWORD,
>   "ssl.keystore.location" -> Constants.KEYSTORE_PATH,
>   "ssl.keystore.password" -> Constants.KEYSTORE_PASSWORD,
>   "ssl.key.password" -> Constants.KEYSTORE_PASSWORD
> )
>
> val stream = KafkaUtils.createDirectStream[String, Message](
>   ssc,
>   PreferConsistent,
>   Subscribe[String, Message](topicsSet, kafkaParams)
> )
>
> ---
> Thanks in Advance,
> Sethupathi.T
>


  1   2   3   4   5   6   7   8   9   10   >