Hi all, Early today I had asked a few questions regarding the use of the many testing constructs available within Flink and believe that I have things in a good direction at present. I did run into a specific case that either may not be supported, or just isn't documented well enough for me to determine what is going wrong.
Basically, I have a KeyedProcessFunction that reads some global-level configuration via GlobalJobParameters within its open function: override fun open(configuration: Configuration) { // Omitted for brevity val parameters = runtimeContext.executionConfig.globalJobParameters as? ParameterTool if (parameters != null) { processingTimeBuffer = parameters.getLong("processingTimeBuffer", 0L) } } This works just as expected within the actual pipeline itself when set similarly: streamEnvironment.config.globalJobParameters = parameters However, I don't see an effective way to set this against a TestHarness as I've made several attempts but I never can seem to populate the globalJobParameters property within the KeyedProcessFunction itself using a test harness despite multiple attempts // Attempt 1 magicWindowHarness.operator.runtimeContext.executionConfig.globalJobParameters = ParameterTool.fromMap(...) // Attempt 2 magicWindowHarness.executionConfig.globalJobParameters = ParameterTool.fromMap(...) // Attempt 3 magicWindowHarness.environment.executionConfig.globalJobParameters = ParameterTool.fromMap(...) // Attempt 4 val env = StreamExecutionEnvironment. env.config.globalJobParameters = ParameterTool.fromMap(...) Is this supported or am I simply going about it the wrong way? Or even just perhaps missing a piece of the puzzle? Thanks much, Rion