Hi all,

I'm trying to implement a process and I'm not quite sure what the best approach 
to efficiently implement it might be while taking advantage of Beam's 
parallelism and recommended patterns. Basically the problem itself can be 
summarized as follows:

I have a series of incoming events which are read from Kafka into my Beam 
pipeline. These events are Avro-formatted messages which contains nearly a 
hundred different fields with other nested records and values about the event 
(e.g. users, ip addresses, etc.). The goal of the pipeline is two fold:

- Extract any instances of various entities (e.g. users, ip addresses, etc.) 
from the original object, key them (using a deterministic UUID seeded by all of 
the known "key" values), and send them off to their own dedicated Kafka topic 
(e.g. all extracted users -> users_topic).
- Enrich the original event using the identifiers for all of the extracted 
entities (e.g. if the event came in with an array containing two users, the 
expectation is that the generated keys would be present on each of those user 
instances after leaving the pipeline)

My current approach has been to simply build a single transform to avoid 
mutating / enriching the event throughout the pipeline for each series of 
entities as such:

// Extract all entities (into a PCollectionTuple)
        val taggedEvents = Pipeline
            .create(options)
            .apply("Read Events from Kafka", 
KafkaIO.read<Event>(options.incomingEventsTopic, options))
            .apply("Identify All Entity Instances", Entities.identify())

        // Users
        taggedEvents
            .get(Entities.Tags.users)
            .apply("Flatten Multiple Identified Users", Flatten.iterables())
            .apply("Write Users to Kafka", 
KafkaIO.write<User>(options.usersTopic, options))
        
        // IP Addresses
        taggedEvents
            .get(Entities.Tags.ipAddresses)
            .apply("Flatten Multiple Identified IP Addresses", 
Flatten.iterables())
            .apply("Write IP Addresses to Kafka", 
KafkaIO.write<User>(options.ipAddressesTopic, options))
        
        // Events (enriched)
        taggedEvents
            .get(Entities.Tags.events)
            .apply("Write Enriched Events to Kafka", 
KafkaIO.write<Event>(options.identifiedEventsTopic, options))

As mentioned, each of these individual extractions for various entities need to 
add the appropriate identifiers onto the original event such that when the last 
call above is made (sending events to its destination topic).

Currently my Entities.identify() transform basically does the following behind 
the scenes:

class Identify() : PTransform<PCollection<KV<String, Event>>, 
PCollectionTuple>() {
        override fun expand(input: PCollection<KV<String, Event>>): 
PCollectionTuple {
            // Take an event in
            return input
                .apply("Extract all available entities",
                        ParDo
                            .of(ExtractAllEntities())
                            .withOutputTags(Tags.events, TupleTagList.of(listOf(
                                    Entities.Tags.users,
                                    Entities.Tags.computerEndpoints
                            )))
                )
        }
    }

    class ExtractAllEntities() : TraceableDoFn<KV<String, Event>, KV<String, 
Event>>() {

        @ProcessElement
        fun processElement(context: ProcessContext) {
            // Get the event (mutable)
            val event = context.element().value.toMutable<Event>()

            // Process the users
            context.output(Entities.Tags.users, Users.extract(event, tracer))

            // Process the computer endpoints
            context.output(Entities.Tags.computerEndpoints, 
ComputerEndpoints.extract(event, tracer))

            // Tag output
            context.output(Entities.Tags.events, KV.of(context.element().key, 
event))
        }
    }
}

Where all of these extract() function calls are simply private methods, which 
seems wrong. By creating a single instance of a mutable event, it allows me to 
perform all of the mutations and extractions in a single pass (as opposed to 
doing one, pulling out the event, passing it into another transform, repeated 
ad nauseum).

So I have a few questions:

- Is there a preferred pattern for handling something like this?
- Should those private methods (e.g. ComputerEndpoints.extract(), 
Users.extract(), etc.) simply be private functions that take in an event and 
return an array of the requested entities? Or would this better be served as 
DoFn<T> or some other types of transformations?
- In my mind, this seems like it could also be written in such a way so that 
all of the entity-specific transformations could yield multiple 
PCollectionTuples (each with the enriched event and a collection of entity 
instances) which could be merged at some point. Would there be value to doing 
that?

Sorry for the long winded question. I'm still quite new to the Beam ecosystem 
and I'm just trying to follow the best practices to take advantage of things 
like parallelism and some of the other benefits that Beam provides out of the 
box.

Thanks all and I'll be happy to elaborate on any of the steps or provide any 
additional details as needed.

Rion


Reply via email to