??????????hive table read
blinkStreamTableEnv.getConfig().getConfiguration().setBoolean("table.dynamic-table-options.enabled",
true);
Table table=blinkStreamTableEnv.sqlQuery("SELECT * FROM
test.table_config /*+
OPTIONS('streaming-source.enable'='true','streaming-source.monitor-interval' =
'30 min')*/");
??????????????flink??????
// given that the parallelism of the function is 1, we
can only have 1 or 0 retrieved items.
// the 0 is for the case that we are migrating from a
previous Flink version.
Preconditions.checkArgument(retrievedStates.size() <= 1,
getClass().getSimpleName() + " retrieved
invalid state.");
if (retrievedStates.size() == 1 &&
globalModificationTime != Long.MIN_VALUE) {
// this is the case where we have both legacy
and new state.
// The two should be mutually exclusive for the
operator, thus we throw the exception.
throw new IllegalArgumentException(
"The " + getClass().getSimpleName() + "
has already restored from a previous Flink version.");
} else if (retrievedStates.size() == 1) {
this.globalModificationTime =
retrievedStates.get(0);
if (LOG.isDebugEnabled()) {
LOG.debug("{} retrieved a global mod
time of {}.",
getClass().getSimpleName(),
globalModificationTime);
}
}
------------------ ???????? ------------------
??????:
"Excalibur"
<[email protected]>;
????????: 2020??11??16??(??????) ????10:54
??????: "user-zh"<[email protected]>;
????: flink sql hive streaming??????????????????bug
??????1.11.2
??????????java.lang.IllegalArgumentException:
The ContinuousFileMonitoringFunction has already restored from a
previous Flink version.
at
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.initializeState(ContinuousFileMonitoringFunction.java:176)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:106)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)