Could you show us how you create test harness?

On 3/4/2021 5:13 AM, Rion Williams wrote:
Hi all,

Early today I had asked a few questions regarding the use of the many testing constructs available within Flink and believe that I have things in a good direction at present. I did run into a specific case that either may not be supported, or just isn't documented well enough for me to determine what is going wrong.

Basically, I have a KeyedProcessFunction that reads some global-level configuration via GlobalJobParameters within its open function:

override fun open(configuration: Configuration) {
    // Omitted for brevity

    val parameters = runtimeContext.executionConfig.globalJobParameters as? ParameterTool
    if (parameters != null) {
        processingTimeBuffer = parameters.getLong("processingTimeBuffer", 0L)
    }
}

This works just as expected within the actual pipeline itself when set similarly:

streamEnvironment.config.globalJobParameters = parameters

However, I don't see an effective way to set this against a TestHarness as I've made several attempts but I never can seem to populate the globalJobParameters property within the KeyedProcessFunction itself using a test harness despite multiple attempts

// Attempt 1
magicWindowHarness.operator.runtimeContext.executionConfig.globalJobParameters = ParameterTool.fromMap(...)

// Attempt 2
magicWindowHarness.executionConfig.globalJobParameters = ParameterTool.fromMap(...)

// Attempt 3
magicWindowHarness.environment.executionConfig.globalJobParameters = ParameterTool.fromMap(...)

// Attempt 4
val env = StreamExecutionEnvironment.
env.config.globalJobParameters = ParameterTool.fromMap(...)

Is this supported or am I simply going about it the wrong way? Or even just perhaps missing a piece of the puzzle?

Thanks much,

Rion


Reply via email to