Hi all,

Early today I had asked a few questions regarding the use of the many
testing constructs available within Flink and believe that I have things in
a good direction at present. I did run into a specific case that either may
not be supported, or just isn't documented well enough for me to determine
what is going wrong.

Basically, I have a KeyedProcessFunction that reads some global-level
configuration via GlobalJobParameters within its open function:

override fun open(configuration: Configuration) {
    // Omitted for brevity

    val parameters = runtimeContext.executionConfig.globalJobParameters as?
ParameterTool
    if (parameters != null) {
        processingTimeBuffer = parameters.getLong("processingTimeBuffer",
0L)
    }
}

This works just as expected within the actual pipeline itself when set
similarly:

streamEnvironment.config.globalJobParameters = parameters

However, I don't see an effective way to set this against a TestHarness as
I've made several attempts but I never can seem to populate the
globalJobParameters property within the KeyedProcessFunction itself using a
test harness despite multiple attempts

// Attempt 1
magicWindowHarness.operator.runtimeContext.executionConfig.globalJobParameters
= ParameterTool.fromMap(...)

// Attempt 2
magicWindowHarness.executionConfig.globalJobParameters =
ParameterTool.fromMap(...)

// Attempt 3
magicWindowHarness.environment.executionConfig.globalJobParameters =
ParameterTool.fromMap(...)

// Attempt 4
val env = StreamExecutionEnvironment.
env.config.globalJobParameters = ParameterTool.fromMap(...)

Is this supported or am I simply going about it the wrong way? Or even just
perhaps missing a piece of the puzzle?

Thanks much,

Rion

Reply via email to