Hi all,

I've been exploring a few different options for storing tenant-specific
configurations within Flink state based on the messages I have flowing
through my job. Initially I had considered creating a source that would
periodically poll an HTTP API and connect that stream to my original event
stream.

However, I realized that this configuration information would basically
never change and thus it doesn't quite make sense to poll so frequently. My
next approach would be to have a function that would be keyed (by tenant)
and storing the configuration for that tenant in state (and issue an HTTP
call when I did not have it). Something like this:

class ConfigurationLookupFunction: KeyedProcessFunction<String,
JsonObject, JsonObject>() {
    // Tenant specific configuration
    private lateinit var httpClient: HttpClient
    private lateinit var configuration: ValueState<String>

    override fun open(parameters: Configuration) {
        super.open(parameters)
        httpClient = HttpClient.newHttpClient()
    }

    override fun processElement(message: JsonObject, context: Context,
out: Collector<JsonObject>) {
        if (configuration.value() == null){
            // Issue a request to the appropriate API to load the configuration
            val url =
HttpRequest.newBuilder(URI.create(".../${context.currentKey}")).build()
            httpClient.send(..., {
                // Store the configuration info within state here
                configuration.update(...)
            })

            out.collect(message)
        }
        else {
            // Get the configuration information and pass it
downstream to be used by the sink
            out.collect(message)
        }
    }
}

I didn't see any support for using the Async I/O functions from a keyed
context, otherwise I'd imagine that would be ideal. The requests themselves
should be very infrequent (initial call per tenant) and I'd imagine after
that the necessary configuration could be pulled/stored within the state
for that key.

Is there a good way of handling this that I might be overlooking with an
existing Flink construct or function? I'd love to be able to leverage the
Async I/O connectors as they seem pretty well thought out.

Thanks in advance!

Rion

Reply via email to