Re: [Spark Structured Streaming] retry/replay failed messages

2021-07-09 Thread ayan guha
Hi

Option 1: You can write back to processed queue and add some additional
info like last time tried and some counter.

Option 2: Load unpaired transactiona in a staging table in postgres. Modify
streaming job of "created" flow to try to pair any unpaired trx and clear
it by moving to main table. You may need to create a batch job to delete
records from the staging table to remove unpaired records which pass a
certain age.

Ayan

On Sat, 10 Jul 2021 at 9:49 am, Mich Talebzadeh 
wrote:

> One alternative I can think of is that you publish your orphaned
> transactions to another topic from the main Spark job
>
> You create a new DF based on orphaned transactions
>
> result = orphanedDF \
> ..
> .writeStream \
>  .outputMode('complete') \
>  .format("kafka") \
>  .option("kafka.bootstrap.servers",
> config['MDVariables']['bootstrapServers'],) \
>  .option("topic", "orphaned") \
>  .option('checkpointLocation', checkpoint_path) \
>  .queryName("orphanedTransactions") \
>  .start()
>
>
> And consume it somewhere else
>
>
> HTH
>
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sat, 10 Jul 2021 at 00:36, Bruno Oliveira 
> wrote:
>
>> I mean... I guess?
>>
>> But I don't really have Airflow here, and I didn't really wanted to fall
>> back to a "batch"-kinda approach with Airflow
>>
>> I'd rather use a Dead Letter Queue approach instead (like I mentioned
>> another topic for the failed ones, which is later consumed and pumps
>> the messages back to the original topic),
>> or something with Spark+Delta Lake instead...
>>
>> I was just hoping I could somewhat just retry/replay these "orphaned"
>> transactions somewhat easier...
>>
>> *Question) *Those features of "Stateful Streaming" or "Continuous
>> Processing" mode wouldn't help solve my case, would they?
>>
>> On Fri, Jul 9, 2021 at 8:19 PM Mich Talebzadeh 
>> wrote:
>>
>>> Well this is a matter of using journal entries.
>>>
>>> What you can do is that those "orphaned" transactions that you cannot
>>> pair through transaction_id can be written to a journal table in your
>>> Postgres DB. Then you can pair them with the entries in the relevant
>>> Postgres table. If the essence is not time critical this can be done
>>> through a scheduling job every x minutes through airflow or something
>>> similar on the database alone.
>>>
>>> HTH
>>>
>>>
>>>
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Fri, 9 Jul 2021 at 23:53, Bruno Oliveira 
>>> wrote:
>>>
 That is exactly the case, Sebastian!

 - In practise, that "created  means "*authorized*", but I still cannot
 deduct anything from the customer balance
 - the "processed" means I can safely deduct the transaction_amount
 from the customer balance,
 - and the "refunded" means I must give the transaction amount back to
 the customer balance

 So, technically, we cannot process something that is not "AUTHORIZED"
 (created) yet, nor can we process a refund for a transaction that has NOT
 been PROCESSED yet.


 *You have an authorisation, then the actual transaction and maybe a
> refund some time in the future. You want to proceed with a transaction 
> only
> if you've seen the auth but in an eventually consistent system this might
> not always happen.*


 That's absolutely the case! So, yes, That's correct.

 *You are asking in the case of receiving the transaction before the
> auth how to retry later? *


 Yeah! I'm struggling for days on how to solve with Spark Structured
 Streaming...

 *Right now you are discarding those transactions that didn't match so
> you instead would need to persist them somewhere and either reinject them
> into the job that does lookup (say after x minutes) *



 *Right now, the best I could think of is: *

- Say, I'm reading the messages w/ transaction_id [1, 2, 3] from
Kafka (topic "transactions-processed")
- Then 

Re: [Spark Structured Streaming] retry/replay failed messages

2021-07-09 Thread Mich Talebzadeh
One alternative I can think of is that you publish your orphaned
transactions to another topic from the main Spark job

You create a new DF based on orphaned transactions

result = orphanedDF \
..
.writeStream \
 .outputMode('complete') \
 .format("kafka") \
 .option("kafka.bootstrap.servers",
config['MDVariables']['bootstrapServers'],) \
 .option("topic", "orphaned") \
 .option('checkpointLocation', checkpoint_path) \
 .queryName("orphanedTransactions") \
 .start()


And consume it somewhere else


HTH


   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sat, 10 Jul 2021 at 00:36, Bruno Oliveira  wrote:

> I mean... I guess?
>
> But I don't really have Airflow here, and I didn't really wanted to fall
> back to a "batch"-kinda approach with Airflow
>
> I'd rather use a Dead Letter Queue approach instead (like I mentioned
> another topic for the failed ones, which is later consumed and pumps
> the messages back to the original topic),
> or something with Spark+Delta Lake instead...
>
> I was just hoping I could somewhat just retry/replay these "orphaned"
> transactions somewhat easier...
>
> *Question) *Those features of "Stateful Streaming" or "Continuous
> Processing" mode wouldn't help solve my case, would they?
>
> On Fri, Jul 9, 2021 at 8:19 PM Mich Talebzadeh 
> wrote:
>
>> Well this is a matter of using journal entries.
>>
>> What you can do is that those "orphaned" transactions that you cannot
>> pair through transaction_id can be written to a journal table in your
>> Postgres DB. Then you can pair them with the entries in the relevant
>> Postgres table. If the essence is not time critical this can be done
>> through a scheduling job every x minutes through airflow or something
>> similar on the database alone.
>>
>> HTH
>>
>>
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Fri, 9 Jul 2021 at 23:53, Bruno Oliveira 
>> wrote:
>>
>>> That is exactly the case, Sebastian!
>>>
>>> - In practise, that "created  means "*authorized*", but I still cannot
>>> deduct anything from the customer balance
>>> - the "processed" means I can safely deduct the transaction_amount  from
>>> the customer balance,
>>> - and the "refunded" means I must give the transaction amount back to
>>> the customer balance
>>>
>>> So, technically, we cannot process something that is not "AUTHORIZED"
>>> (created) yet, nor can we process a refund for a transaction that has NOT
>>> been PROCESSED yet.
>>>
>>>
>>> *You have an authorisation, then the actual transaction and maybe a
 refund some time in the future. You want to proceed with a transaction only
 if you've seen the auth but in an eventually consistent system this might
 not always happen.*
>>>
>>>
>>> That's absolutely the case! So, yes, That's correct.
>>>
>>> *You are asking in the case of receiving the transaction before the auth
 how to retry later? *
>>>
>>>
>>> Yeah! I'm struggling for days on how to solve with Spark Structured
>>> Streaming...
>>>
>>> *Right now you are discarding those transactions that didn't match so
 you instead would need to persist them somewhere and either reinject them
 into the job that does lookup (say after x minutes) *
>>>
>>>
>>>
>>> *Right now, the best I could think of is: *
>>>
>>>- Say, I'm reading the messages w/ transaction_id [1, 2, 3] from
>>>Kafka (topic "transactions-processed")
>>>- Then I'm querying the database for these IDs that have the status
>>>"CREATED" (or "AUTHORIZED" to be more accurate), and it returns the
>>>transactions for IDs [1, 2]
>>>- So, while it'll work for the ones with ID [1. 2] , I would have to
>>>put that transaction_id 3 in another topic, say, "
>>>*transaction-processed-retry*"
>>>- And write yet another consumer, to fetch the messages from that 
>>> "*transaction-processed-retry"
>>>*and put them back to the original topic (transactions-processed)
>>>- And do something similar for the transactions-refunded
>>>
>>> *Q1) *I think this approach may work, but I can't stop 

Re: [Spark Structured Streaming] retry/replay failed messages

2021-07-09 Thread Bruno Oliveira
I mean... I guess?

But I don't really have Airflow here, and I didn't really wanted to fall
back to a "batch"-kinda approach with Airflow

I'd rather use a Dead Letter Queue approach instead (like I mentioned
another topic for the failed ones, which is later consumed and pumps
the messages back to the original topic),
or something with Spark+Delta Lake instead...

I was just hoping I could somewhat just retry/replay these "orphaned"
transactions somewhat easier...

*Question) *Those features of "Stateful Streaming" or "Continuous
Processing" mode wouldn't help solve my case, would they?

On Fri, Jul 9, 2021 at 8:19 PM Mich Talebzadeh 
wrote:

> Well this is a matter of using journal entries.
>
> What you can do is that those "orphaned" transactions that you cannot pair
> through transaction_id can be written to a journal table in your Postgres
> DB. Then you can pair them with the entries in the relevant Postgres table.
> If the essence is not time critical this can be done through a scheduling
> job every x minutes through airflow or something similar on the database
> alone.
>
> HTH
>
>
>
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 9 Jul 2021 at 23:53, Bruno Oliveira  wrote:
>
>> That is exactly the case, Sebastian!
>>
>> - In practise, that "created  means "*authorized*", but I still cannot
>> deduct anything from the customer balance
>> - the "processed" means I can safely deduct the transaction_amount  from
>> the customer balance,
>> - and the "refunded" means I must give the transaction amount back to the
>> customer balance
>>
>> So, technically, we cannot process something that is not "AUTHORIZED"
>> (created) yet, nor can we process a refund for a transaction that has NOT
>> been PROCESSED yet.
>>
>>
>> *You have an authorisation, then the actual transaction and maybe a
>>> refund some time in the future. You want to proceed with a transaction only
>>> if you've seen the auth but in an eventually consistent system this might
>>> not always happen.*
>>
>>
>> That's absolutely the case! So, yes, That's correct.
>>
>> *You are asking in the case of receiving the transaction before the auth
>>> how to retry later? *
>>
>>
>> Yeah! I'm struggling for days on how to solve with Spark Structured
>> Streaming...
>>
>> *Right now you are discarding those transactions that didn't match so you
>>> instead would need to persist them somewhere and either reinject them into
>>> the job that does lookup (say after x minutes) *
>>
>>
>>
>> *Right now, the best I could think of is: *
>>
>>- Say, I'm reading the messages w/ transaction_id [1, 2, 3] from
>>Kafka (topic "transactions-processed")
>>- Then I'm querying the database for these IDs that have the status
>>"CREATED" (or "AUTHORIZED" to be more accurate), and it returns the
>>transactions for IDs [1, 2]
>>- So, while it'll work for the ones with ID [1. 2] , I would have to
>>put that transaction_id 3 in another topic, say, "
>>*transaction-processed-retry*"
>>- And write yet another consumer, to fetch the messages from that 
>> "*transaction-processed-retry"
>>*and put them back to the original topic (transactions-processed)
>>- And do something similar for the transactions-refunded
>>
>> *Q1) *I think this approach may work, but I can't stop thinking I'm
>> overengineering this, and was wondering if there isn't a better approach...
>> ?
>>
>> *Is this what you are looking for?*
>>
>>
>> Yes, that's exactly it.
>>
>>
>> *Q2)* I know that, under the hood, Structured Streaming is actually
>> using the micro-batch engine,
>>  if I switched to *Continuous Processing*, would it make any
>> difference? Would it allow me any "retry" mechanism out of the box?
>>
>> *Q3)* I stumbled upon a *Stateful Streaming* (
>> https://databricks.com/session/deep-dive-into-stateful-stream-processing-in-structured-streaming)
>> , but I have never ever used it before,
>> would that actually do something for my case (retrying/replaying
>> a given message) ?
>>
>>
>> Thank you very VERY in advance!
>> Best regards
>>
>>
>> On Fri, Jul 9, 2021 at 6:36 PM Sebastian Piu 
>> wrote:
>>
>>> So in payment systems you have something similar I think
>>>
>>> You have an authorisation, then the actual transaction and maybe a
>>> refund some time in the future. You want to proceed with a transaction only
>>> if you've seen the auth but in an eventually consistent system this might
>>> not always happen.
>>>
>>> You are asking in the case of receiving the transaction before the auth
>>> how to retry later?
>>>
>>> Right now you 

Re: [Spark Structured Streaming] retry/replay failed messages

2021-07-09 Thread Mich Talebzadeh
Well this is a matter of using journal entries.

What you can do is that those "orphaned" transactions that you cannot pair
through transaction_id can be written to a journal table in your Postgres
DB. Then you can pair them with the entries in the relevant Postgres table.
If the essence is not time critical this can be done through a scheduling
job every x minutes through airflow or something similar on the database
alone.

HTH




   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 9 Jul 2021 at 23:53, Bruno Oliveira  wrote:

> That is exactly the case, Sebastian!
>
> - In practise, that "created  means "*authorized*", but I still cannot
> deduct anything from the customer balance
> - the "processed" means I can safely deduct the transaction_amount  from
> the customer balance,
> - and the "refunded" means I must give the transaction amount back to the
> customer balance
>
> So, technically, we cannot process something that is not "AUTHORIZED"
> (created) yet, nor can we process a refund for a transaction that has NOT
> been PROCESSED yet.
>
>
> *You have an authorisation, then the actual transaction and maybe a refund
>> some time in the future. You want to proceed with a transaction only if
>> you've seen the auth but in an eventually consistent system this might not
>> always happen.*
>
>
> That's absolutely the case! So, yes, That's correct.
>
> *You are asking in the case of receiving the transaction before the auth
>> how to retry later? *
>
>
> Yeah! I'm struggling for days on how to solve with Spark Structured
> Streaming...
>
> *Right now you are discarding those transactions that didn't match so you
>> instead would need to persist them somewhere and either reinject them into
>> the job that does lookup (say after x minutes) *
>
>
>
> *Right now, the best I could think of is: *
>
>- Say, I'm reading the messages w/ transaction_id [1, 2, 3] from Kafka
>(topic "transactions-processed")
>- Then I'm querying the database for these IDs that have the status
>"CREATED" (or "AUTHORIZED" to be more accurate), and it returns the
>transactions for IDs [1, 2]
>- So, while it'll work for the ones with ID [1. 2] , I would have to
>put that transaction_id 3 in another topic, say, "
>*transaction-processed-retry*"
>- And write yet another consumer, to fetch the messages from that 
> "*transaction-processed-retry"
>*and put them back to the original topic (transactions-processed)
>- And do something similar for the transactions-refunded
>
> *Q1) *I think this approach may work, but I can't stop thinking I'm
> overengineering this, and was wondering if there isn't a better approach...
> ?
>
> *Is this what you are looking for?*
>
>
> Yes, that's exactly it.
>
>
> *Q2)* I know that, under the hood, Structured Streaming is actually using
> the micro-batch engine,
>  if I switched to *Continuous Processing*, would it make any
> difference? Would it allow me any "retry" mechanism out of the box?
>
> *Q3)* I stumbled upon a *Stateful Streaming* (
> https://databricks.com/session/deep-dive-into-stateful-stream-processing-in-structured-streaming)
> , but I have never ever used it before,
> would that actually do something for my case (retrying/replaying a
> given message) ?
>
>
> Thank you very VERY in advance!
> Best regards
>
>
> On Fri, Jul 9, 2021 at 6:36 PM Sebastian Piu 
> wrote:
>
>> So in payment systems you have something similar I think
>>
>> You have an authorisation, then the actual transaction and maybe a refund
>> some time in the future. You want to proceed with a transaction only if
>> you've seen the auth but in an eventually consistent system this might not
>> always happen.
>>
>> You are asking in the case of receiving the transaction before the auth
>> how to retry later?
>>
>> Right now you are discarding those transactions that didn't match so you
>> instead would need to persist them somewhere and either reinject them into
>> the job that does lookup (say after x minutes)
>>
>> Is this what you are looking for?
>>
>> On Fri, 9 Jul 2021, 9:44 pm Bruno Oliveira, 
>> wrote:
>>
>>> I'm terribly sorry, Mich. That was my mistake.
>>> The timestamps are not the same (I copy without realizing that,
>>> I'm really sorry for the confusion)
>>>
>>> Please assume NONE of the following transactions are in the database yet
>>>
>>> *transactions-created:*
>>> { "transaction_id": 1, "amount":  1000, "timestamp": "2020-04-04
>>> 11:01:00" }
>>> { "transaction_id": 2, "amount":  2000, "timestamp": "2020-04-04
>>> 08:02:00" }
>>>
>>> *transactions-processed: *
>>> { 

Re: [Spark Structured Streaming] retry/replay failed messages

2021-07-09 Thread Bruno Oliveira
That is exactly the case, Sebastian!

- In practise, that "created  means "*authorized*", but I still cannot
deduct anything from the customer balance
- the "processed" means I can safely deduct the transaction_amount  from
the customer balance,
- and the "refunded" means I must give the transaction amount back to the
customer balance

So, technically, we cannot process something that is not "AUTHORIZED"
(created) yet, nor can we process a refund for a transaction that has NOT
been PROCESSED yet.


*You have an authorisation, then the actual transaction and maybe a refund
> some time in the future. You want to proceed with a transaction only if
> you've seen the auth but in an eventually consistent system this might not
> always happen.*


That's absolutely the case! So, yes, That's correct.

*You are asking in the case of receiving the transaction before the auth
> how to retry later? *


Yeah! I'm struggling for days on how to solve with Spark Structured
Streaming...

*Right now you are discarding those transactions that didn't match so you
> instead would need to persist them somewhere and either reinject them into
> the job that does lookup (say after x minutes) *



*Right now, the best I could think of is: *

   - Say, I'm reading the messages w/ transaction_id [1, 2, 3] from Kafka
   (topic "transactions-processed")
   - Then I'm querying the database for these IDs that have the status
   "CREATED" (or "AUTHORIZED" to be more accurate), and it returns the
   transactions for IDs [1, 2]
   - So, while it'll work for the ones with ID [1. 2] , I would have to put
   that transaction_id 3 in another topic, say, "
   *transaction-processed-retry*"
   - And write yet another consumer, to fetch the messages from that
"*transaction-processed-retry"
   *and put them back to the original topic (transactions-processed)
   - And do something similar for the transactions-refunded

*Q1) *I think this approach may work, but I can't stop thinking I'm
overengineering this, and was wondering if there isn't a better approach...
?

*Is this what you are looking for?*


Yes, that's exactly it.


*Q2)* I know that, under the hood, Structured Streaming is actually using
the micro-batch engine,
 if I switched to *Continuous Processing*, would it make any
difference? Would it allow me any "retry" mechanism out of the box?

*Q3)* I stumbled upon a *Stateful Streaming* (
https://databricks.com/session/deep-dive-into-stateful-stream-processing-in-structured-streaming)
, but I have never ever used it before,
would that actually do something for my case (retrying/replaying a
given message) ?


Thank you very VERY in advance!
Best regards


On Fri, Jul 9, 2021 at 6:36 PM Sebastian Piu 
wrote:

> So in payment systems you have something similar I think
>
> You have an authorisation, then the actual transaction and maybe a refund
> some time in the future. You want to proceed with a transaction only if
> you've seen the auth but in an eventually consistent system this might not
> always happen.
>
> You are asking in the case of receiving the transaction before the auth
> how to retry later?
>
> Right now you are discarding those transactions that didn't match so you
> instead would need to persist them somewhere and either reinject them into
> the job that does lookup (say after x minutes)
>
> Is this what you are looking for?
>
> On Fri, 9 Jul 2021, 9:44 pm Bruno Oliveira,  wrote:
>
>> I'm terribly sorry, Mich. That was my mistake.
>> The timestamps are not the same (I copy without realizing that,
>> I'm really sorry for the confusion)
>>
>> Please assume NONE of the following transactions are in the database yet
>>
>> *transactions-created:*
>> { "transaction_id": 1, "amount":  1000, "timestamp": "2020-04-04
>> 11:01:00" }
>> { "transaction_id": 2, "amount":  2000, "timestamp": "2020-04-04
>> 08:02:00" }
>>
>> *transactions-processed: *
>> { "transaction_id": 1, "timestamp": "2020-04-04 11:03:00" } // so
>> it's processed 2 minutes after it was created
>> { "transaction_id": 2, "timestamp": "2020-04-04 12:02:00" } // so
>> it's processed 4 hours after it was created
>> { "transaction_id": 3, "timestamp": "2020-04-04 13:03:00" }// cannot
>> be persisted into the DB yet, because this "transaction_id 3" with the
>> status "CREATED" does NOT exist in the DB
>>
>>
>> *(...) Transactions-created are created at the same time (the same
>>> timestamp) but you have NOT received them and they don't yet exist in your
>>> DB (...)*
>>
>> - Not at the same timestamp, that was my mistake.
>> - Imagine two transactions with the same ID (neither of them are in any
>> Kafka topic yet),
>>
>>- One with the status CREATED, and another with the status PROCESSED,
>>- The one with the status PROCESSED will ALWAYS have a higher/greater
>>timestamp than the one with the status CREATED
>>- Now for whatever reason, this happens:
>>   - Step a) some producer *fails* to push the *created* one to the
>>   

Re: [Spark Structured Streaming] retry/replay failed messages

2021-07-09 Thread Sebastian Piu
So in payment systems you have something similar I think

You have an authorisation, then the actual transaction and maybe a refund
some time in the future. You want to proceed with a transaction only if
you've seen the auth but in an eventually consistent system this might not
always happen.

You are asking in the case of receiving the transaction before the auth how
to retry later?

Right now you are discarding those transactions that didn't match so you
instead would need to persist them somewhere and either reinject them into
the job that does lookup (say after x minutes)

Is this what you are looking for?

On Fri, 9 Jul 2021, 9:44 pm Bruno Oliveira,  wrote:

> I'm terribly sorry, Mich. That was my mistake.
> The timestamps are not the same (I copy without realizing that, I'm
> really sorry for the confusion)
>
> Please assume NONE of the following transactions are in the database yet
>
> *transactions-created:*
> { "transaction_id": 1, "amount":  1000, "timestamp": "2020-04-04 11:01:00"
> }
> { "transaction_id": 2, "amount":  2000, "timestamp": "2020-04-04
> 08:02:00" }
>
> *transactions-processed: *
> { "transaction_id": 1, "timestamp": "2020-04-04 11:03:00" } // so it's
> processed 2 minutes after it was created
> { "transaction_id": 2, "timestamp": "2020-04-04 12:02:00" } // so it's
> processed 4 hours after it was created
> { "transaction_id": 3, "timestamp": "2020-04-04 13:03:00" }// cannot
> be persisted into the DB yet, because this "transaction_id 3" with the
> status "CREATED" does NOT exist in the DB
>
>
> *(...) Transactions-created are created at the same time (the same
>> timestamp) but you have NOT received them and they don't yet exist in your
>> DB (...)*
>
> - Not at the same timestamp, that was my mistake.
> - Imagine two transactions with the same ID (neither of them are in any
> Kafka topic yet),
>
>- One with the status CREATED, and another with the status PROCESSED,
>- The one with the status PROCESSED will ALWAYS have a higher/greater
>timestamp than the one with the status CREATED
>- Now for whatever reason, this happens:
>   - Step a) some producer *fails* to push the *created* one to the
>   topic  *transactions-created, it will RETRY, and will eventually
>   succeed, but that can take minutes, or hours*
>   - Step b) however, the producer *succeeds* in pushing the*
>   'processed' *one to the topic *transactions-processed *
>
>
> *(...) because presumably your relational database is too slow to ingest
>> them? (...)*
>
>
> - it's not like the DB was slow, it was because the message for
> transaction_id 3 didn't arrive at the *topic-created *yet, due to some
> error/failure in Step A, for example
>
>
> * you do a query in Postgres for say transaction_id 3 but they don't exist
>> yet? When are they expected to arrive?*
>
>
> - That's correct. It could take minutes, maybe hours. But it is guaranteed
> that at some point, in the future, they will arrive. I just have to keep
> trying until it works, this transaction_id 3 with the status CREATED
> arrives at the database
>
>
> Huge apologies for the confusion... Is it a bit more clear now?
>
> *PS:* This is a simplified scenario, in practise, there is yet another
> topic for "transactions-refunded". But which cannot be sinked to the DB,
> unless the same transaction_id with the status "PROCESSED" is there. (but
> again, there can only be a transaction_id PROCESSED, if the same
> transaction_id with CREATED exists in the DB)
>
>
> On Fri, Jul 9, 2021 at 4:51 PM Mich Talebzadeh 
> wrote:
>
>> One second
>>
>> The topic called transactions_processed is streaming through Spark.
>> Transactions-created are created at the same time (the same timestamp) but
>> you have NOT received them and they don't yet exist in your DB,
>> because presumably your relational database is too slow to ingest them? you
>> do a query in Postgres for say transaction_id 3 but they don't exist yet?
>> When are they expected to arrive?
>>
>>
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Fri, 9 Jul 2021 at 19:12, Bruno Oliveira 
>> wrote:
>>
>>> Thanks for the quick reply!
>>>
>>> I'm not sure I got the idea correctly... but from what I'm underding,
>>> wouldn't that actually end the same way?
>>> Because, this is the current scenario:
>>>
>>> *transactions-processed: *
>>> { "transaction_id": 1, "timestamp": "2020-04-04 11:01:00" }
>>> { "transaction_id": 2, "timestamp": "2020-04-04 12:02:00" }
>>> { "transaction_id": 3, "timestamp": "2020-04-04 13:03:00" }
>>> { "transaction_id": 4, "timestamp": 

Re: [Spark Structured Streaming] retry/replay failed messages

2021-07-09 Thread Bruno Oliveira
I'm terribly sorry, Mich. That was my mistake.
The timestamps are not the same (I copy without realizing that, I'm
really sorry for the confusion)

Please assume NONE of the following transactions are in the database yet

*transactions-created:*
{ "transaction_id": 1, "amount":  1000, "timestamp": "2020-04-04 11:01:00"
}
{ "transaction_id": 2, "amount":  2000, "timestamp": "2020-04-04
08:02:00" }

*transactions-processed: *
{ "transaction_id": 1, "timestamp": "2020-04-04 11:03:00" } // so it's
processed 2 minutes after it was created
{ "transaction_id": 2, "timestamp": "2020-04-04 12:02:00" } // so it's
processed 4 hours after it was created
{ "transaction_id": 3, "timestamp": "2020-04-04 13:03:00" }// cannot be
persisted into the DB yet, because this "transaction_id 3" with the status
"CREATED" does NOT exist in the DB


*(...) Transactions-created are created at the same time (the same
> timestamp) but you have NOT received them and they don't yet exist in your
> DB (...)*

- Not at the same timestamp, that was my mistake.
- Imagine two transactions with the same ID (neither of them are in any
Kafka topic yet),

   - One with the status CREATED, and another with the status PROCESSED,
   - The one with the status PROCESSED will ALWAYS have a higher/greater
   timestamp than the one with the status CREATED
   - Now for whatever reason, this happens:
  - Step a) some producer *fails* to push the *created* one to the
  topic  *transactions-created, it will RETRY, and will eventually
  succeed, but that can take minutes, or hours*
  - Step b) however, the producer *succeeds* in pushing the*
  'processed' *one to the topic *transactions-processed *


*(...) because presumably your relational database is too slow to ingest
> them? (...)*


- it's not like the DB was slow, it was because the message for
transaction_id 3 didn't arrive at the *topic-created *yet, due to some
error/failure in Step A, for example


* you do a query in Postgres for say transaction_id 3 but they don't exist
> yet? When are they expected to arrive?*


- That's correct. It could take minutes, maybe hours. But it is guaranteed
that at some point, in the future, they will arrive. I just have to keep
trying until it works, this transaction_id 3 with the status CREATED
arrives at the database


Huge apologies for the confusion... Is it a bit more clear now?

*PS:* This is a simplified scenario, in practise, there is yet another
topic for "transactions-refunded". But which cannot be sinked to the DB,
unless the same transaction_id with the status "PROCESSED" is there. (but
again, there can only be a transaction_id PROCESSED, if the same
transaction_id with CREATED exists in the DB)


On Fri, Jul 9, 2021 at 4:51 PM Mich Talebzadeh 
wrote:

> One second
>
> The topic called transactions_processed is streaming through Spark.
> Transactions-created are created at the same time (the same timestamp) but
> you have NOT received them and they don't yet exist in your DB,
> because presumably your relational database is too slow to ingest them? you
> do a query in Postgres for say transaction_id 3 but they don't exist yet?
> When are they expected to arrive?
>
>
>
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 9 Jul 2021 at 19:12, Bruno Oliveira  wrote:
>
>> Thanks for the quick reply!
>>
>> I'm not sure I got the idea correctly... but from what I'm underding,
>> wouldn't that actually end the same way?
>> Because, this is the current scenario:
>>
>> *transactions-processed: *
>> { "transaction_id": 1, "timestamp": "2020-04-04 11:01:00" }
>> { "transaction_id": 2, "timestamp": "2020-04-04 12:02:00" }
>> { "transaction_id": 3, "timestamp": "2020-04-04 13:03:00" }
>> { "transaction_id": 4, "timestamp": "2020-04-04 14:04:00" }
>>
>> *transactions-created:*
>> { "transaction_id": 1, "amount":  1000, "timestamp": "2020-04-04
>> 11:01:00" }
>> { "transaction_id": 2, "amount":  2000, "timestamp": "2020-04-04
>> 12:02:00" }
>>
>> - So, when I fetch ALL messages from both topics, there are still 2x
>> transactions (id: "*3*" and "*4*") which do *not* exist in the topic
>> "transaction-created" yet (and they aren't in Postgres either)
>> - But since they were pulled by "Structured Streaming" already, they'll
>> be kinda marked as "processed" by Spark Structure Streaming checkpoint
>> anyway.
>>
>> And therefore, I can't replay/reprocess them again...
>>
>> Is my understanding correct? Am I missing something here?
>>
>> On Fri, Jul 9, 2021 at 2:02 PM Mich Talebzadeh 
>> wrote:
>>
>>> Thanks for the details.
>>>
>>> Can you read 

Re: [Spark Structured Streaming] retry/replay failed messages

2021-07-09 Thread Mich Talebzadeh
One second

The topic called transactions_processed is streaming through Spark.
Transactions-created are created at the same time (the same timestamp) but
you have NOT received them and they don't yet exist in your DB,
because presumably your relational database is too slow to ingest them? you
do a query in Postgres for say transaction_id 3 but they don't exist yet?
When are they expected to arrive?




   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 9 Jul 2021 at 19:12, Bruno Oliveira  wrote:

> Thanks for the quick reply!
>
> I'm not sure I got the idea correctly... but from what I'm underding,
> wouldn't that actually end the same way?
> Because, this is the current scenario:
>
> *transactions-processed: *
> { "transaction_id": 1, "timestamp": "2020-04-04 11:01:00" }
> { "transaction_id": 2, "timestamp": "2020-04-04 12:02:00" }
> { "transaction_id": 3, "timestamp": "2020-04-04 13:03:00" }
> { "transaction_id": 4, "timestamp": "2020-04-04 14:04:00" }
>
> *transactions-created:*
> { "transaction_id": 1, "amount":  1000, "timestamp": "2020-04-04 11:01:00"
> }
> { "transaction_id": 2, "amount":  2000, "timestamp": "2020-04-04
> 12:02:00" }
>
> - So, when I fetch ALL messages from both topics, there are still 2x
> transactions (id: "*3*" and "*4*") which do *not* exist in the topic
> "transaction-created" yet (and they aren't in Postgres either)
> - But since they were pulled by "Structured Streaming" already, they'll be
> kinda marked as "processed" by Spark Structure Streaming checkpoint anyway.
>
> And therefore, I can't replay/reprocess them again...
>
> Is my understanding correct? Am I missing something here?
>
> On Fri, Jul 9, 2021 at 2:02 PM Mich Talebzadeh 
> wrote:
>
>> Thanks for the details.
>>
>> Can you read these in the same app. For example. This is PySpark but it
>> serves the purpose.
>>
>> Read topic "newtopic" in micro batch and the other topic "md" in another
>> microbatch
>>
>> try:
>> # process topic --> newtopic
>> streamingNewtopic = self.spark \
>> .readStream \
>> .format("kafka") \
>> .option("kafka.bootstrap.servers",
>> config['MDVariables']['bootstrapServers'],) \
>> .option("schema.registry.url",
>> config['MDVariables']['schemaRegistryURL']) \
>> .option("group.id", config['common']['newtopic']) \
>> .option("zookeeper.connection.timeout.ms",
>> config['MDVariables']['zookeeperConnectionTimeoutMs']) \
>> .option("rebalance.backoff.ms",
>> config['MDVariables']['rebalanceBackoffMS']) \
>> .option("zookeeper.session.timeout.ms",
>> config['MDVariables']['zookeeperSessionTimeOutMs']) \
>> .option("auto.commit.interval.ms",
>> config['MDVariables']['autoCommitIntervalMS']) \
>> *.option("subscribe", config['MDVariables']['newtopic'])
>> \*
>> .option("failOnDataLoss", "false") \
>> .option("includeHeaders", "true") \
>> .option("startingOffsets", "latest") \
>> .load() \
>> .select(from_json(col("value").cast("string"),
>> newtopicSchema).alias("newtopic_value"))
>>
>> # construct a streaming dataframe streamingDataFrame that
>> subscribes to topic config['MDVariables']['topic']) -> md (market data)
>> streamingDataFrame = self.spark \
>> .readStream \
>> .format("kafka") \
>> .option("kafka.bootstrap.servers",
>> config['MDVariables']['bootstrapServers'],) \
>> .option("schema.registry.url",
>> config['MDVariables']['schemaRegistryURL']) \
>> .option("group.id", config['common']['appName']) \
>> .option("zookeeper.connection.timeout.ms",
>> config['MDVariables']['zookeeperConnectionTimeoutMs']) \
>> .option("rebalance.backoff.ms",
>> config['MDVariables']['rebalanceBackoffMS']) \
>> .option("zookeeper.session.timeout.ms",
>> config['MDVariables']['zookeeperSessionTimeOutMs']) \
>> .option("auto.commit.interval.ms",
>> config['MDVariables']['autoCommitIntervalMS']) \
>> *.option("subscribe", config['MDVariables']['topic']) \*
>> .option("failOnDataLoss", "false") \
>> .option("includeHeaders", "true") \
>> .option("startingOffsets", "latest") \
>> .load() \
>> .select(from_json(col("value").cast("string"),
>> schema).alias("parsed_value"))
>>
>>
>> 

Re: [Spark Structured Streaming] retry/replay failed messages

2021-07-09 Thread Bruno Oliveira
Thanks for the quick reply!

I'm not sure I got the idea correctly... but from what I'm underding,
wouldn't that actually end the same way?
Because, this is the current scenario:

*transactions-processed: *
{ "transaction_id": 1, "timestamp": "2020-04-04 11:01:00" }
{ "transaction_id": 2, "timestamp": "2020-04-04 12:02:00" }
{ "transaction_id": 3, "timestamp": "2020-04-04 13:03:00" }
{ "transaction_id": 4, "timestamp": "2020-04-04 14:04:00" }

*transactions-created:*
{ "transaction_id": 1, "amount":  1000, "timestamp": "2020-04-04 11:01:00"
}
{ "transaction_id": 2, "amount":  2000, "timestamp": "2020-04-04
12:02:00" }

- So, when I fetch ALL messages from both topics, there are still 2x
transactions (id: "*3*" and "*4*") which do *not* exist in the topic
"transaction-created" yet (and they aren't in Postgres either)
- But since they were pulled by "Structured Streaming" already, they'll be
kinda marked as "processed" by Spark Structure Streaming checkpoint anyway.

And therefore, I can't replay/reprocess them again...

Is my understanding correct? Am I missing something here?

On Fri, Jul 9, 2021 at 2:02 PM Mich Talebzadeh 
wrote:

> Thanks for the details.
>
> Can you read these in the same app. For example. This is PySpark but it
> serves the purpose.
>
> Read topic "newtopic" in micro batch and the other topic "md" in another
> microbatch
>
> try:
> # process topic --> newtopic
> streamingNewtopic = self.spark \
> .readStream \
> .format("kafka") \
> .option("kafka.bootstrap.servers",
> config['MDVariables']['bootstrapServers'],) \
> .option("schema.registry.url",
> config['MDVariables']['schemaRegistryURL']) \
> .option("group.id", config['common']['newtopic']) \
> .option("zookeeper.connection.timeout.ms",
> config['MDVariables']['zookeeperConnectionTimeoutMs']) \
> .option("rebalance.backoff.ms",
> config['MDVariables']['rebalanceBackoffMS']) \
> .option("zookeeper.session.timeout.ms",
> config['MDVariables']['zookeeperSessionTimeOutMs']) \
> .option("auto.commit.interval.ms",
> config['MDVariables']['autoCommitIntervalMS']) \
> *.option("subscribe", config['MDVariables']['newtopic'])
> \*
> .option("failOnDataLoss", "false") \
> .option("includeHeaders", "true") \
> .option("startingOffsets", "latest") \
> .load() \
> .select(from_json(col("value").cast("string"),
> newtopicSchema).alias("newtopic_value"))
>
> # construct a streaming dataframe streamingDataFrame that
> subscribes to topic config['MDVariables']['topic']) -> md (market data)
> streamingDataFrame = self.spark \
> .readStream \
> .format("kafka") \
> .option("kafka.bootstrap.servers",
> config['MDVariables']['bootstrapServers'],) \
> .option("schema.registry.url",
> config['MDVariables']['schemaRegistryURL']) \
> .option("group.id", config['common']['appName']) \
> .option("zookeeper.connection.timeout.ms",
> config['MDVariables']['zookeeperConnectionTimeoutMs']) \
> .option("rebalance.backoff.ms",
> config['MDVariables']['rebalanceBackoffMS']) \
> .option("zookeeper.session.timeout.ms",
> config['MDVariables']['zookeeperSessionTimeOutMs']) \
> .option("auto.commit.interval.ms",
> config['MDVariables']['autoCommitIntervalMS']) \
> *.option("subscribe", config['MDVariables']['topic']) \*
> .option("failOnDataLoss", "false") \
> .option("includeHeaders", "true") \
> .option("startingOffsets", "latest") \
> .load() \
> .select(from_json(col("value").cast("string"),
> schema).alias("parsed_value"))
>
>
> streamingNewtopic.printSchema()
>
> # Now do a writeStream and call the relevant functions to
> process dataframes
>
> newtopicResult = streamingNewtopic.select( \
>  col("newtopic_value.uuid").alias("uuid") \
>, col("newtopic_value.timeissued").alias("timeissued") \
>, col("newtopic_value.queue").alias("queue") \
>, col("newtopic_value.status").alias("status")). \
>  writeStream. \
>  outputMode('append'). \
>  option("truncate", "false"). \
>   *   foreachBatch(sendToControl). \*
>  trigger(processingTime='2 seconds'). \
>  queryName(config['MDVariables']['newtopic']). \
>  start()
>
> result = streamingDataFrame.select( \
>  col("parsed_value.rowkey").alias("rowkey") \
>, 

Re: [Spark Structured Streaming] retry/replay failed messages

2021-07-09 Thread Mich Talebzadeh
Thanks for the details.

Can you read these in the same app. For example. This is PySpark but it
serves the purpose.

Read topic "newtopic" in micro batch and the other topic "md" in another
microbatch

try:
# process topic --> newtopic
streamingNewtopic = self.spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers",
config['MDVariables']['bootstrapServers'],) \
.option("schema.registry.url",
config['MDVariables']['schemaRegistryURL']) \
.option("group.id", config['common']['newtopic']) \
.option("zookeeper.connection.timeout.ms",
config['MDVariables']['zookeeperConnectionTimeoutMs']) \
.option("rebalance.backoff.ms",
config['MDVariables']['rebalanceBackoffMS']) \
.option("zookeeper.session.timeout.ms",
config['MDVariables']['zookeeperSessionTimeOutMs']) \
.option("auto.commit.interval.ms",
config['MDVariables']['autoCommitIntervalMS']) \
*.option("subscribe", config['MDVariables']['newtopic']) \*
.option("failOnDataLoss", "false") \
.option("includeHeaders", "true") \
.option("startingOffsets", "latest") \
.load() \
.select(from_json(col("value").cast("string"),
newtopicSchema).alias("newtopic_value"))

# construct a streaming dataframe streamingDataFrame that
subscribes to topic config['MDVariables']['topic']) -> md (market data)
streamingDataFrame = self.spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers",
config['MDVariables']['bootstrapServers'],) \
.option("schema.registry.url",
config['MDVariables']['schemaRegistryURL']) \
.option("group.id", config['common']['appName']) \
.option("zookeeper.connection.timeout.ms",
config['MDVariables']['zookeeperConnectionTimeoutMs']) \
.option("rebalance.backoff.ms",
config['MDVariables']['rebalanceBackoffMS']) \
.option("zookeeper.session.timeout.ms",
config['MDVariables']['zookeeperSessionTimeOutMs']) \
.option("auto.commit.interval.ms",
config['MDVariables']['autoCommitIntervalMS']) \
*.option("subscribe", config['MDVariables']['topic']) \*
.option("failOnDataLoss", "false") \
.option("includeHeaders", "true") \
.option("startingOffsets", "latest") \
.load() \
.select(from_json(col("value").cast("string"),
schema).alias("parsed_value"))


streamingNewtopic.printSchema()

# Now do a writeStream and call the relevant functions to
process dataframes

newtopicResult = streamingNewtopic.select( \
 col("newtopic_value.uuid").alias("uuid") \
   , col("newtopic_value.timeissued").alias("timeissued") \
   , col("newtopic_value.queue").alias("queue") \
   , col("newtopic_value.status").alias("status")). \
 writeStream. \
 outputMode('append'). \
 option("truncate", "false"). \
  *   foreachBatch(sendToControl). \*
 trigger(processingTime='2 seconds'). \
 queryName(config['MDVariables']['newtopic']). \
 start()

result = streamingDataFrame.select( \
 col("parsed_value.rowkey").alias("rowkey") \
   , col("parsed_value.ticker").alias("ticker") \
   , col("parsed_value.timeissued").alias("timeissued") \
   , col("parsed_value.price").alias("price")). \
 writeStream. \
 outputMode('append'). \
 option("truncate", "false"). \
 *foreachBatch(sendToSink). \*
 trigger(processingTime='30 seconds'). \
 option('checkpointLocation', checkpoint_path). \
 queryName(config['MDVariables']['topic']). \
 start()
print(result)

except Exception as e:
print(f"""{e}, quitting""")
sys.exit(1)

Inside that function say *sendToSink *you can get the df and batchId

def sendToSink(df, batchId):
if(len(df.take(1))) > 0:
print(f"""md batchId is {batchId}""")
df.show(100,False)
df. persist()
# write to BigQuery batch table
s.writeTableToBQ(df, "append",
config['MDVariables']['targetDataset'],config['MDVariables']['targetTable'])
df.unpersist()
print(f"""wrote to DB""")
else:
print("DataFrame md is empty")

And you have created DF from the other topic newtopic

def sendToControl(dfnewtopic, batchId):
if(len(dfnewtopic.take(1))) > 0:
 

Re: [Spark Structured Streaming] retry/replay failed messages

2021-07-09 Thread Bruno Oliveira
Hello! Sure thing!

I'm reading them *separately*, both are apps written with Scala + Spark
Structured Streaming.

I feel like I missed some details on my original thread (sorry it was past
4 AM) and it was getting frustrating
Please let me try to clarify some points:

*Transactions Created Consumer*
---
| Kafka trx-created-topic   |   <--- (Scala + SparkStructured Streaming)
ConsumerApp --->  Sinks to ---> Postgres DB Table (Transactions)
---

*Transactions Processed Consumer*
-
| Kafka trx-processed-topic |  <---   1) (Scala + SparkStructured
Streaming) AnotherConsumerApp fetches a Dataset (let's call it "a")
-   2) Selects the Ids
-
|   Postgres / Trx table |. <--- 3) Fetches the rows w/ the
matching ids that have status 'created (let's call it "b")
- 4)  Performs an intersection
between "a" and "b" resulting in a "b_that_needs_sinking" (but now there's
some "b_leftovers" that were out of the intersection)
 5)  Sinks
"b_that_needs_sinking" to DB, but that leaves the "b_leftovers" as
unprocessed (not persisted)
 6) However, those
"b_leftovers" would, ultimately, be processed at some point (even if it
takes like 1-3 days) - when their corresponding transaction_id are
 pushed to the
"trx-created-topic" Kafka topic, and are then processed by that first
consumer

So, what I'm trying to accomplish is find a way to reprocess those
"b_leftovers" *without *having to restart the app
Does that make sense?

PS: It doesn't necessarily have to be real streaming, if micro-batching
(legacy Spark Streaming) would allow such a thing, it would technically
work (although I keep hearing it's not advisable)

Thank you so much!

Kind regards

On Fri, Jul 9, 2021 at 12:13 PM Mich Talebzadeh 
wrote:

> Can you please clarify if you are reading these two topics separately or
> within the same scala or python script in Spark Structured Streaming?
>
> HTH
>
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 9 Jul 2021 at 13:44, Bruno Oliveira  wrote:
>
>> Hello guys,
>>
>> I've been struggling with this for some days now, without success, so I
>> would highly appreciate any enlightenment. The simplified scenario is the
>> following:
>>
>>- I've got 2 topics in Kafka (it's already like that in production,
>>can't change it)
>>   - transactions-created,
>>   - transaction-processed
>>- Even though the schema is not exactly the same, they all share a
>>correlation_id, which is their "transaction_id"
>>
>> So, long story short, I've got 2 consumers, one for each topic, and all I
>> wanna do is sink them in a chain order. I'm writing them w/ Spark
>> Structured Streaming, btw
>>
>> So far so good, the caveat here is:
>>
>> - I cannot write a given "*processed" *transaction unless there is an
>> entry of that same transaction with the status "*created*".
>>
>> - There is *no* guarantee that any transactions in the topic
>> "transaction-*processed*" have a match (same transaction_id) in the
>> "transaction-*created*" at the moment the messages are fetched.
>>
>> So the workflow so far is:
>> - Msgs from the "transaction-created" just get synced to postgres, no
>> questions asked
>>
>> - As for the "transaction-processed", it goes as follows:
>>
>>- a) Messages are fetched from the Kafka topic
>>- b) Select the transaction_id of those...
>>- c) Fetch all the rows w/ the corresponding id from a Postgres table
>>AND that have the status "CREATED"
>>- d) Then, a pretty much do a intersection between the two datasets,
>>and sink only on "processed" ones that have with step c
>>- e) Persist the resulting dataset
>>
>> But the rows (from the 'processed') that were not part of the
>> intersection get lost afterwards...
>>
>> So my question is:
>> - Is there ANY way to reprocess/replay them at all WITHOUT restarting the
>> app?
>> - For this scenario, should I fall back to Spark Streaming, instead of
>> Structured Streaming?
>>
>> PS: I was playing around with Spark Streaming (legacy) and managed to
>> commit only the ones in the microbatches that were fully successful (still
>> failed to find a way to "poll" for the uncommitted ones without restarting,
>> though).
>>
>> Thank you very much in advance!
>>
>>


Re: [Spark Structured Streaming] retry/replay failed messages

2021-07-09 Thread Mich Talebzadeh
Can you please clarify if you are reading these two topics separately or
within the same scala or python script in Spark Structured Streaming?

HTH


   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 9 Jul 2021 at 13:44, Bruno Oliveira  wrote:

> Hello guys,
>
> I've been struggling with this for some days now, without success, so I
> would highly appreciate any enlightenment. The simplified scenario is the
> following:
>
>- I've got 2 topics in Kafka (it's already like that in production,
>can't change it)
>   - transactions-created,
>   - transaction-processed
>- Even though the schema is not exactly the same, they all share a
>correlation_id, which is their "transaction_id"
>
> So, long story short, I've got 2 consumers, one for each topic, and all I
> wanna do is sink them in a chain order. I'm writing them w/ Spark
> Structured Streaming, btw
>
> So far so good, the caveat here is:
>
> - I cannot write a given "*processed" *transaction unless there is an
> entry of that same transaction with the status "*created*".
>
> - There is *no* guarantee that any transactions in the topic "transaction-
> *processed*" have a match (same transaction_id) in the "transaction-
> *created*" at the moment the messages are fetched.
>
> So the workflow so far is:
> - Msgs from the "transaction-created" just get synced to postgres, no
> questions asked
>
> - As for the "transaction-processed", it goes as follows:
>
>- a) Messages are fetched from the Kafka topic
>- b) Select the transaction_id of those...
>- c) Fetch all the rows w/ the corresponding id from a Postgres table
>AND that have the status "CREATED"
>- d) Then, a pretty much do a intersection between the two datasets,
>and sink only on "processed" ones that have with step c
>- e) Persist the resulting dataset
>
> But the rows (from the 'processed') that were not part of the intersection
> get lost afterwards...
>
> So my question is:
> - Is there ANY way to reprocess/replay them at all WITHOUT restarting the
> app?
> - For this scenario, should I fall back to Spark Streaming, instead of
> Structured Streaming?
>
> PS: I was playing around with Spark Streaming (legacy) and managed to
> commit only the ones in the microbatches that were fully successful (still
> failed to find a way to "poll" for the uncommitted ones without restarting,
> though).
>
> Thank you very much in advance!
>
>