ATTENZIONE! segue post lungo e pipposo Salve lista,
questa settimana mi sono imbattuto in una parte di codice "legacy" che si occupa di analizzare un flusso di dati ed estrarre degli eventi che hanno un inizio e una fine (tra le altre cose); a questo codice andava aggiunto un controllo per verificare che l'evento fosse stato registrato in maniera corretta a causa di un bug della sorgente dati. Per incasinare ancora di più la situazione, la verifica dell'evento richiede l'analisi di dati registrati dopo la fine dell'evento. Provo a spiegare meglio il flusso con del codice di esempio: # pseudo codice "originale" senza analisi dell'evento def extract_events(data_store, start_date, end_date): datagrams = data_store.get_datagrams(start_date, end_date) for event in extract_events_from_datagrams(datagrams): yield clean_event(event) def extract_events_from_datagrams(datagrams): for d in datagrams: # logica di estrazione dell'evento molto complessa riassunta con if not interesting_data(d): continue if event_start_detected: event = Event(d) if questo and quel_altro: accumulate_event_data(event, d) elif altre_condizioni: # ripetere per molte altre condizioni cancel_event(event) event = None if event_end_detected: yield event # pseudo codice con l'analisi a posteriori per la verifica dell'evento def extract_events(data_store, start_date, end_date, post_process_check=False): # in questo caso si legge tutto il dataset in memoria datagrams = list(data_store.get_datagrams(start_date, end_date)) for event in extract_events_from_datagrams(datagrams): if post_process_check: event = post_process(event, datagram) if event is None: continue event = clean_event(event) if event: yield event def extract_events_from_datagrams(datagrams): # si tiene traccia dell'indice di ogni dato per usarlo poi come # punto di partenza nel post processing dell'evento for idx, d in enumerate(datagrams): # logica di estrazione dell'evento molto complessa riassunta con if not interesting_data(d): continue if event_start_detected: # si crea l'evento tenendo traccia dell'indice del primo dato # associabile all'evento event = Event(d, first_datagram=idx) if questo and quel_altro: accumulate_event_data(event, d) if event_end_detected: yield event # verifica la presenza di alcuni tipo di datagram che si possono essere # presentati poco dopo l'inizio dell'evento e poco dopo la fine dell'evento # restituisce None se l'evento va scartato altrimenti restituisce l'evento stesso def post_process(event, datagrams): inital_check_slot_start = even.start_timestamp + timedelta(seconds=50) inital_check_slot_end = even.start_timestamp + timedelta(seconds=100) final_check_slot_start = even.end_timestamp - timedelta(seconds=50) final_check_slot_end = even.end_timestamp + timedelta(seconds=50) for d in datagrams[event.first_datagram_idx: ]: if initial_check_slot_start <= d.recorded_at <= initial_check_slot_end: # si verifica la presenza di varie condizioni legate al dato if should_discard_event_intial_slot(d): return None if final_check_slot_start <= d.recorded_at <= final_check_slot_end: # si verifica la presenza di varie condizioni legate al dato if should_discard_event_final_slot(d): return None if d.recorded_at > final_check_slot_end: break return event Questo approccio funziona solo perché il dataset di partenza ha una dimensione ragionevole, nell'ordine di qualche decina di megabyte, e perché questo post processing viene richiamato manualmente solo in certe occasioni quindi il rischio di esaurire la memoria del processo generale, a causa di molteplici chiamate in parallelo, è molto basso. Con questo finiamo il mega pippone introduttivo e passiamo al algoritmo in oggetto per il quale sono alla ricerca di un design pattern che lo descriva (sono sicuro che esiste ma vai e trovalo...) Ora assumiamo che il dataset iniziale sia molto grande (nell'ordine di decine di gigabyte) o che i dati siano presi da uno stream e che quindi non sia possibile mettere tutto in ram, una soluzione veloce, assumendo di poter recuperare i dati storici da un db, potrebbe essere quella di recuperare i dati dell'evento e oltre con una query del tipo: SELECT * FROM data_table WHERE recorded_at >= initial_check_slot_start AND recorded_at <= final_check_slot_end in questo modo la pressione sulla ram diminuisce ma la si sposta sul database o, nel caso che questi dati siano presi da una risorsa in rete ad es. API REST, sul network. Finalmente passo a descrivere il funzionamento di sto benedetto algoritmo, applicandolo al problema descritto in precedenza il corpo di extract_events_from_datagrams diventa più o meno caching_iterator = CachingIterator(datagrams) for d in caching_iterator: if is_event_start(d): event = create_event(d) # da questo momento ogni dato letto da datasource viene messo in una cache # che può essere consumata iterando su out_of_stream_iterator out_of_stream_iterator = caching_iterator.start_caching_from(d) # ...altra roba che arricchisce l'evento e poi if is_event_end(d): yield event, out_of_stream_iterator mentre il corpo di post_process rimane essenzialmente invariato iterando su out_if_stream_iterator il quale, una volta esaurita la cache, mette i nuovi dati in un nuovo buffer che verrà usato da caching_iterator; una volta che il controllo ritorna al loop di caching_iterator questo leggerà prima dal buffer interno e, esaurito questo, caching_iterator ritornerà a chiedere dati al data source Provo a riscrivere il comportamento del caching_iterator: - leggi dal buffer privato, se vuoto leggi da datasource - se è stato richiesto di tenere traccia dei dati alimenta la cache - restituisci dato mentre out_of_stream_iterator: - leggi dalla cache - se cache esaurita leggi da data_source e alimenta una cache locale - restituisci dato - quando si è finito passa la cache locale a caching_iterator nel su buffer privato) in modo che questo possa riprendere da dove si è fermato un implementazione in Python potrebbe essere quella in allegato Se siete arrivati sino a qui complimenti per la pazienza! :) Ora torniamo alla domanda iniziale, esiste un design pattern che descrive un approccio di questo tipo? -- "Unix IS user friendly. It's just selective about who its friend are" "Nevertheless I still think it’s a bad idea to make things harder for ourselves if we can avoid it." "C is quirky, flawed, and an enormous success." -- Dennis Ritchie "Shipping is a feature. A really important feature. Your product must have it." "There is no such a thing as a temporary change or workaround: In most cases, workarounds are tech debt." "La gatta frettolosa ha fatto i gattini ciechi"
""" Implements an iterator tha can replay a part of an iterable useful when accessing each item can be expensive (like db access, APIs etc) and you don't want to cache all of the data in memory example: lets say queryset can return a billion results you have to iterate queryset and find an event that has a START and an END this event may also be rejected if you see some special data after START and, worse, after END; also after END there may be some junk data but also some useful data...so you don't want to lose data after END with this iterator you can: - when an event START is detected you ask to start caching data and you get an handle to the start of the caching data - when an event END is detected you have to see if you can keep or dischard the event so you can pass the handle to the "evaluation" function - the evaluation function replay the data using the provided handle and it goes after the END to look for some interesting data; the data after the end gets stored in handle and the previous cache is purged - when control is returned to the main loop the handle gives back the extra data that it fetched, the main loop consumes it and when it is empty it can restart to fetch data from the queryset """ from collections import deque class OutOfStreamIterator(object): def __init__(self, proxy): self.proxy = proxy self.cache_consumed = False self.local_cache = deque() def __iter__(self): self.proxy.stop_caching() return self def next(self): item = self.proxy.pop_left() if item is None: item = next(self.proxy) self.local_cache.append(item) return item def finish(self): self.proxy.reset_private_cache(self.local_cache) class CachingIterator(object): def __init__(self, iterable, enabled=True): self.iterator = iter(iterable) self.caching = False self.cache = deque() self.private_cache = deque() self.enabled = enabled def __iter__(self): return self def next(self): item = self._pop_left_private() if item is None: item = next(self.iterator) if self.caching: item = next(self.iterator) self.cache.append(item) return item def pop_left(self): try: return self.cache.popleft() except IndexError: return None def _pop_left_private(self): try: return self.private_cache.popleft() except IndexError: return None def start_caching(self): self.caching = self.enabled self.cache.clear() return OutOfStreamIterator(self) def stop_caching(self): self.caching = False def reset_private_cache(self, elements): self.private_cache.extend(elements)
_______________________________________________ Python mailing list Python@lists.python.it https://lists.python.it/mailman/listinfo/python