ATTENZIONE! segue post lungo e pipposo
Salve lista,
questa settimana mi sono imbattuto in una parte di codice "legacy" che si
occupa di analizzare
un flusso di dati ed estrarre degli eventi che hanno un inizio e una fine
(tra le altre cose);
a questo codice andava aggiunto un controllo per verificare che l'evento
fosse stato
registrato in maniera corretta a causa di un bug della sorgente dati. Per
incasinare
ancora di più la situazione, la verifica dell'evento richiede l'analisi di
dati registrati
dopo la fine dell'evento. Provo a spiegare meglio il flusso con del codice
di esempio:
# pseudo codice "originale" senza analisi dell'evento
def extract_events(data_store, start_date, end_date):
datagrams = data_store.get_datagrams(start_date, end_date)
for event in extract_events_from_datagrams(datagrams):
yield clean_event(event)
def extract_events_from_datagrams(datagrams):
for d in datagrams:
# logica di estrazione dell'evento molto complessa riassunta con
if not interesting_data(d):
continue
if event_start_detected:
event = Event(d)
if questo and quel_altro:
accumulate_event_data(event, d)
elif altre_condizioni: # ripetere per molte altre condizioni
cancel_event(event)
event = None
if event_end_detected:
yield event
# pseudo codice con l'analisi a posteriori per la verifica dell'evento
def extract_events(data_store, start_date, end_date,
post_process_check=False):
# in questo caso si legge tutto il dataset in memoria
datagrams = list(data_store.get_datagrams(start_date, end_date))
for event in extract_events_from_datagrams(datagrams):
if post_process_check:
event = post_process(event, datagram)
if event is None:
continue
event = clean_event(event)
if event:
yield event
def extract_events_from_datagrams(datagrams):
# si tiene traccia dell'indice di ogni dato per usarlo poi come
# punto di partenza nel post processing dell'evento
for idx, d in enumerate(datagrams):
# logica di estrazione dell'evento molto complessa riassunta con
if not interesting_data(d):
continue
if event_start_detected:
# si crea l'evento tenendo traccia dell'indice del primo dato
# associabile all'evento
event = Event(d, first_datagram=idx)
if questo and quel_altro:
accumulate_event_data(event, d)
if event_end_detected:
yield event
# verifica la presenza di alcuni tipo di datagram che si possono essere
# presentati poco dopo l'inizio dell'evento e poco dopo la fine dell'evento
# restituisce None se l'evento va scartato altrimenti restituisce l'evento
stesso
def post_process(event, datagrams):
inital_check_slot_start = even.start_timestamp + timedelta(seconds=50)
inital_check_slot_end = even.start_timestamp + timedelta(seconds=100)
final_check_slot_start = even.end_timestamp - timedelta(seconds=50)
final_check_slot_end = even.end_timestamp + timedelta(seconds=50)
for d in datagrams[event.first_datagram_idx: ]:
if initial_check_slot_start <= d.recorded_at <=
initial_check_slot_end:
# si verifica la presenza di varie condizioni legate al dato
if should_discard_event_intial_slot(d):
return None
if final_check_slot_start <= d.recorded_at <= final_check_slot_end:
# si verifica la presenza di varie condizioni legate al dato
if should_discard_event_final_slot(d):
return None
if d.recorded_at > final_check_slot_end:
break
return event
Questo approccio funziona solo perché il dataset di partenza ha una
dimensione
ragionevole, nell'ordine di qualche decina di megabyte, e perché questo
post
processing viene richiamato manualmente solo in certe occasioni quindi il
rischio di
esaurire la memoria del processo generale, a causa di molteplici chiamate
in parallelo,
è molto basso.
Con questo finiamo il mega pippone introduttivo e passiamo al algoritmo in
oggetto per il
quale sono alla ricerca di un design pattern che lo descriva (sono sicuro
che esiste ma
vai e trovalo...)
Ora assumiamo che il dataset iniziale sia molto grande (nell'ordine di
decine di gigabyte)
o che i dati siano presi da uno stream e che quindi non sia possibile
mettere tutto in
ram, una soluzione veloce, assumendo di poter recuperare i dati storici da
un db,
potrebbe essere quella di recuperare i dati dell'evento e oltre con una
query del tipo:
SELECT * FROM data_table WHERE recorded_at >= initial_check_slot_start AND
recorded_at <= final_check_slot_end
in questo modo la pressione sulla ram diminuisce ma la si sposta sul
database o, nel caso
che questi dati siano presi da una risorsa in rete ad es. API REST, sul
network.
Finalmente passo a descrivere il funzionamento di sto benedetto algoritmo,
applicandolo
al problema descritto in precedenza
il corpo di extract_events_from_datagrams diventa più o meno
caching_iterator = CachingIterator(datagrams)
for d in caching_iterator:
if is_event_start(d):
event = create_event(d)
# da questo momento ogni dato letto da datasource viene messo in
una cache
# che può essere consumata iterando su out_of_stream_iterator
out_of_stream_iterator = caching_iterator.start_caching_from(d)
# ...altra roba che arricchisce l'evento e poi
if is_event_end(d):
yield event, out_of_stream_iterator
mentre il corpo di post_process rimane essenzialmente invariato iterando su
out_if_stream_iterator il quale, una volta esaurita la cache, mette i nuovi
dati in
un nuovo buffer che verrà usato da caching_iterator; una volta che il
controllo
ritorna al loop di caching_iterator questo leggerà prima dal buffer interno
e,
esaurito questo, caching_iterator ritornerà a chiedere dati al data source
Provo a riscrivere il comportamento del caching_iterator:
- leggi dal buffer privato, se vuoto leggi da datasource
- se è stato richiesto di tenere traccia dei dati alimenta la cache
- restituisci dato
mentre out_of_stream_iterator:
- leggi dalla cache
- se cache esaurita leggi da data_source e alimenta una cache locale
- restituisci dato
- quando si è finito passa la cache locale a caching_iterator nel su buffer
privato)
in modo che questo possa riprendere da dove si è fermato
un implementazione in Python potrebbe essere quella in allegato
Se siete arrivati sino a qui complimenti per la pazienza! :)
Ora torniamo alla domanda iniziale, esiste un design pattern che descrive
un approccio di questo tipo?
--
"Unix IS user friendly. It's just selective about who its friend are"
"Nevertheless I still think it’s a bad idea to make things harder for
ourselves if we can avoid it."
"C is quirky, flawed, and an enormous success."
-- Dennis Ritchie
"Shipping is a feature. A really important feature. Your product must have
it."
"There is no such a thing as a temporary change or workaround: In most
cases, workarounds are tech debt."
"La gatta frettolosa ha fatto i gattini ciechi"
"""
Implements an iterator tha can replay a part of an iterable
useful when accessing each item can be expensive (like db access, APIs etc)
and you don't want to cache all of the data in memory
example:
lets say queryset can return a billion results
you have to iterate queryset and find an event that has a START and an END
this event may also be rejected if you see some special data after START and,
worse, after END; also after END there may be some junk data but also some
useful data...so you don't want to lose data after END
with this iterator you can:
- when an event START is detected you ask to start caching data and you get an
handle to the start of the caching data
- when an event END is detected you have to see if you can keep or dischard the
event so you can pass the handle to the "evaluation" function
- the evaluation function replay the data using the provided handle and it
goes after the END to look for some interesting data; the data after the end
gets stored in handle and the previous cache is purged
- when control is returned to the main loop the handle gives back the extra
data that it fetched, the main loop consumes it and when it is empty it can
restart to fetch data from the queryset
"""
from collections import deque
class OutOfStreamIterator(object):
def __init__(self, proxy):
self.proxy = proxy
self.cache_consumed = False
self.local_cache = deque()
def __iter__(self):
self.proxy.stop_caching()
return self
def next(self):
item = self.proxy.pop_left()
if item is None:
item = next(self.proxy)
self.local_cache.append(item)
return item
def finish(self):
self.proxy.reset_private_cache(self.local_cache)
class CachingIterator(object):
def __init__(self, iterable, enabled=True):
self.iterator = iter(iterable)
self.caching = False
self.cache = deque()
self.private_cache = deque()
self.enabled = enabled
def __iter__(self):
return self
def next(self):
item = self._pop_left_private()
if item is None:
item = next(self.iterator)
if self.caching:
item = next(self.iterator)
self.cache.append(item)
return item
def pop_left(self):
try:
return self.cache.popleft()
except IndexError:
return None
def _pop_left_private(self):
try:
return self.private_cache.popleft()
except IndexError:
return None
def start_caching(self):
self.caching = self.enabled
self.cache.clear()
return OutOfStreamIterator(self)
def stop_caching(self):
self.caching = False
def reset_private_cache(self, elements):
self.private_cache.extend(elements)
_______________________________________________
Python mailing list
[email protected]
https://lists.python.it/mailman/listinfo/python