Is this a duplicate of
https://lists.apache.org/thread.html/r2272040a06457cfdb867832a61f2933d1a3ba832057cffda89ee248a%40%3Cuser.beam.apache.org%3E
?


On Tue, Apr 28, 2020 at 9:26 AM Rion Williams <[email protected]> wrote:

> Hi all,
>
> I'm trying to implement a process and I'm not quite sure what the best
> approach to efficiently implement it might be while taking advantage of
> Beam's parallelism and recommended patterns. Basically the problem itself
> can be summarized as follows:
>
> I have a series of incoming events which are read from Kafka into my Beam
> pipeline. These events are Avro-formatted messages which contains nearly a
> hundred different fields with other nested records and values about the
> event (e.g. users, ip addresses, etc.). The goal of the pipeline is two
> fold:
>
> - Extract any instances of various entities (e.g. users, ip addresses,
> etc.) from the original object, key them (using a deterministic UUID seeded
> by all of the known "key" values), and send them off to their own dedicated
> Kafka topic (e.g. all extracted users -> users_topic).
> - Enrich the original event using the identifiers for all of the extracted
> entities (e.g. if the event came in with an array containing two users, the
> expectation is that the generated keys would be present on each of those
> user instances after leaving the pipeline)
>
> My current approach has been to simply build a single transform to avoid
> mutating / enriching the event throughout the pipeline for each series of
> entities as such:
>
> // Extract all entities (into a PCollectionTuple)
>         val taggedEvents = Pipeline
>             .create(options)
>             .apply("Read Events from Kafka",
> KafkaIO.read<Event>(options.incomingEventsTopic, options))
>             .apply("Identify All Entity Instances", Entities.identify())
>
>         // Users
>         taggedEvents
>             .get(Entities.Tags.users)
>             .apply("Flatten Multiple Identified Users",
> Flatten.iterables())
>             .apply("Write Users to Kafka",
> KafkaIO.write<User>(options.usersTopic, options))
>
>         // IP Addresses
>         taggedEvents
>             .get(Entities.Tags.ipAddresses)
>             .apply("Flatten Multiple Identified IP Addresses",
> Flatten.iterables())
>             .apply("Write IP Addresses to Kafka",
> KafkaIO.write<User>(options.ipAddressesTopic, options))
>
>         // Events (enriched)
>         taggedEvents
>             .get(Entities.Tags.events)
>             .apply("Write Enriched Events to Kafka",
> KafkaIO.write<Event>(options.identifiedEventsTopic, options))
>
> As mentioned, each of these individual extractions for various entities
> need to add the appropriate identifiers onto the original event such that
> when the last call above is made (sending events to its destination topic).
>
> Currently my Entities.identify() transform basically does the following
> behind the scenes:
>
> class Identify() : PTransform<PCollection<KV<String, Event>>,
> PCollectionTuple>() {
>         override fun expand(input: PCollection<KV<String, Event>>):
> PCollectionTuple {
>             // Take an event in
>             return input
>                 .apply("Extract all available entities",
>                         ParDo
>                             .of(ExtractAllEntities())
>                             .withOutputTags(Tags.events,
> TupleTagList.of(listOf(
>                                     Entities.Tags.users,
>                                     Entities.Tags.computerEndpoints
>                             )))
>                 )
>         }
>     }
>
>     class ExtractAllEntities() : TraceableDoFn<KV<String, Event>,
> KV<String, Event>>() {
>
>         @ProcessElement
>         fun processElement(context: ProcessContext) {
>             // Get the event (mutable)
>             val event = context.element().value.toMutable<Event>()
>
>             // Process the users
>             context.output(Entities.Tags.users, Users.extract(event,
> tracer))
>
>             // Process the computer endpoints
>             context.output(Entities.Tags.computerEndpoints,
> ComputerEndpoints.extract(event, tracer))
>
>             // Tag output
>             context.output(Entities.Tags.events,
> KV.of(context.element().key, event))
>         }
>     }
> }
>
> Where all of these extract() function calls are simply private methods,
> which seems wrong. By creating a single instance of a mutable event, it
> allows me to perform all of the mutations and extractions in a single pass
> (as opposed to doing one, pulling out the event, passing it into another
> transform, repeated ad nauseum).
>
> So I have a few questions:
>
> - Is there a preferred pattern for handling something like this?
> - Should those private methods (e.g. ComputerEndpoints.extract(),
> Users.extract(), etc.) simply be private functions that take in an event
> and return an array of the requested entities? Or would this better be
> served as DoFn<T> or some other types of transformations?
> - In my mind, this seems like it could also be written in such a way so
> that all of the entity-specific transformations could yield multiple
> PCollectionTuples (each with the enriched event and a collection of entity
> instances) which could be merged at some point. Would there be value to
> doing that?
>
> Sorry for the long winded question. I'm still quite new to the Beam
> ecosystem and I'm just trying to follow the best practices to take
> advantage of things like parallelism and some of the other benefits that
> Beam provides out of the box.
>
> Thanks all and I'll be happy to elaborate on any of the steps or provide
> any additional details as needed.
>
> Rion
>
>
>

Reply via email to