Could you show us how you create test harness?
On 3/4/2021 5:13 AM, Rion Williams wrote:
Hi all,
Early today I had asked a few questions regarding the use of the many
testing constructs available within Flink and believe that I have
things in a good direction at present. I did run into a specific case
that either may not be supported, or just isn't documented well enough
for me to determine what is going wrong.
Basically, I have a KeyedProcessFunction that reads some global-level
configuration via GlobalJobParameters within its open function:
override fun open(configuration: Configuration) {
// Omitted for brevity
val parameters =
runtimeContext.executionConfig.globalJobParameters as? ParameterTool
if (parameters != null) {
processingTimeBuffer =
parameters.getLong("processingTimeBuffer", 0L)
}
}
This works just as expected within the actual pipeline itself when set
similarly:
streamEnvironment.config.globalJobParameters = parameters
However, I don't see an effective way to set this against a
TestHarness as I've made several attempts but I never can seem to
populate the globalJobParameters property within the
KeyedProcessFunction itself using a test harness despite multiple attempts
// Attempt 1
magicWindowHarness.operator.runtimeContext.executionConfig.globalJobParameters
= ParameterTool.fromMap(...)
// Attempt 2
magicWindowHarness.executionConfig.globalJobParameters =
ParameterTool.fromMap(...)
// Attempt 3
magicWindowHarness.environment.executionConfig.globalJobParameters =
ParameterTool.fromMap(...)
// Attempt 4
val env = StreamExecutionEnvironment.
env.config.globalJobParameters = ParameterTool.fromMap(...)
Is this supported or am I simply going about it the wrong way? Or even
just perhaps missing a piece of the puzzle?
Thanks much,
Rion