Thanks Chesnay!
I tried giving that a shot but I still wasn't able to access the
globalJobParameters from within the open function in my
KeyedProcessFunction. You can see the implementation below which I
believe should be correct:
object CustomProcessFunctionTestHarness {
fun <K, IN, OUT> forKeyedProcessFunction(
function: KeyedProcessFunction<K, IN, OUT>,
keySelector: KeySelector<IN, K>,
keyType: TypeInformation<K>,
parameters: ParameterTool
): KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> {
val testHarness: KeyedOneInputStreamOperatorTestHarness<K, IN,
OUT> =
KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>(
KeyedProcessOperator<K, IN,
OUT>(Preconditions.checkNotNull(function)),
keySelector,
keyType,
1,
1,
0
)
// Adjust execution configuration via parameters
testHarness.executionConfig.globalJobParameters = parameters
testHarness.open()
return testHarness
}
}
Usage-wise, the declaration looks as you might expect:
bufferedMagicWindowHarness =
CustomProcessFunctionTestHarness.forKeyedProcessFunction(
MagicWindowFunction(),
{ log -> log.getKey() },
TypeInformation.of(String::class.java),
ParameterTool.fromArgs(arrayOf("--processingTimeBuffer", "60000"))
)
And then I think as I described earlier, these are attempted to be
read via the following in the open() function:
val parameters = runtimeContext.executionConfig.globalJobParameters
as? ParameterTool
if (parameters != null) {
processingTimeBuffer = parameters.getLong("processingTimeBuffer")
}
Does anything look out of place here? I haven't gone spelunking into
the source code for this yet, but I'm assuming that I'm setting the
correct values on the execution configuration.
Thanks again,
Rion
On Thu, Mar 4, 2021 at 7:57 AM Chesnay Schepler <ches...@apache.org
<mailto:ches...@apache.org>> wrote:
The reason why your attempts have failed is that
ProcessFunctionTestHarnesses.forKeyedProcessFunction automatically
calls open(), thus any mutations on the harness happen too late.
I'd suggest to take a look at the implementation of that method
and essentially copy the code.
You can then call the harness constructor manually and mutate the
execution config before calling open().
On 3/4/2021 2:49 PM, Rion Williams wrote:
Absolutely,
I think it's gone through quite a few iterations, but this is the
current state of it (defined in a @Before function as part of
scaffolding out the tests):
private lateinit var magicWindowHarness:
KeyedOneInputStreamOperatorTestHarness<String, Log, FileOutput>
@Before
fun init() {
magicWindowHarness =
ProcessFunctionTestHarnesses.forKeyedProcessFunction(
MagicWindowFunction(),
{ log -> log.getKey() },
TypeInformation.of(String::class.java)
)
}
I've also tried a few variants of that with a separate
declaration for the function itself, etc.
On Thu, Mar 4, 2021 at 6:47 AM Chesnay Schepler
<ches...@apache.org <mailto:ches...@apache.org>> wrote:
Could you show us how you create test harness?
On 3/4/2021 5:13 AM, Rion Williams wrote:
Hi all,
Early today I had asked a few questions regarding the use of
the many testing constructs available within Flink and
believe that I have things in a good direction at present. I
did run into a specific case that either may not be
supported, or just isn't documented well enough for me to
determine what is going wrong.
Basically, I have a KeyedProcessFunction that reads some
global-level configuration via GlobalJobParameters within
its open function:
override fun open(configuration: Configuration) {
// Omitted for brevity
val parameters =
runtimeContext.executionConfig.globalJobParameters as?
ParameterTool
if (parameters != null) {
processingTimeBuffer =
parameters.getLong("processingTimeBuffer", 0L)
}
}
This works just as expected within the actual pipeline
itself when set similarly:
streamEnvironment.config.globalJobParameters = parameters
However, I don't see an effective way to set this against a
TestHarness as I've made several attempts but I never can
seem to populate the globalJobParameters property within the
KeyedProcessFunction itself using a test harness despite
multiple attempts
// Attempt 1
magicWindowHarness.operator.runtimeContext.executionConfig.globalJobParameters
= ParameterTool.fromMap(...)
// Attempt 2
magicWindowHarness.executionConfig.globalJobParameters =
ParameterTool.fromMap(...)
// Attempt 3
magicWindowHarness.environment.executionConfig.globalJobParameters
= ParameterTool.fromMap(...)
// Attempt 4
val env = StreamExecutionEnvironment.
env.config.globalJobParameters = ParameterTool.fromMap(...)
Is this supported or am I simply going about it the wrong
way? Or even just perhaps missing a piece of the puzzle?
Thanks much,
Rion