你好:
1. 实时通过读KAFKA,然后将数据写入了hive,建一张hive表,format 是 Parquet,是按天、小时、分钟来分区;
2. 通过实时 Pipeline 的手段消费 Hive Table 报java.lang.ArrayIndexOutOfBoundsException: -1
在flink sql client下:
1)直接select 所有字段,是没有问题,可以正常读出所有数据。
执行: select *
from ubtCatalog.ubtHive.event_all_dwd
/*+ OPTIONS('streaming-source.enable'='true',
'streaming-source.partition.include'='all',
'streaming-source.monitor-interval'='5s',
'streaming-source.consume-order'='partition-time','streaming-source.consume-start-offset'='2021-01-01')
*/
;
2) 在1)基础上加上统计函数,一直报莫名的错,java.lang.ArrayIndexOutOfBoundsException: -1
执行: select count(xubtappid)
from ubtCatalog.ubtHive.event_all_dwd
/*+ OPTIONS('streaming-source.enable'='true',
'streaming-source.partition.include'='all',
'streaming-source.monitor-interval'='5s',
'streaming-source.consume-order'='partition-time','streaming-source.consume-start-offset'='2021-01-01')
*/
;
具体报错信息如下:
2021-04-02 10:06:26
org.apache.flink.runtime.JobException: Recovery is suppressed by
NoRestartBackoffTimeStrategy
at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getGlobalFailureHandlingResult(ExecutionFailureHandler.java:89)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.handleGlobalFailure(DefaultScheduler.java:240)
at
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.lambda$failJob$0(OperatorCoordinatorHolder.java:469)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:404)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:197)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.util.FlinkException: Global failure triggered by
OperatorCoordinator for 'Source: HiveSource-ubtHive.event_all_dwd' (operator
bc764cd8ddf7a0cff126f51c16239658).
at
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:466)
at
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:237)
at
org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:240)
at
org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.handleUncaughtExceptionFromAsyncCall(SourceCoordinatorContext.java:247)
at
org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:44)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.connectors.hive.FlinkHiveException: Failed to
enumerate files
at
org.apache.flink.connectors.hive.ContinuousHiveSplitEnumerator.handleNewSplits(ContinuousHiveSplitEnumerator.java:148)
at
org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$null$4(ExecutorNotifier.java:135)
at
org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:42)
... 3 more
Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
at
org.apache.flink.connectors.hive.util.HivePartitionUtils.toHiveTablePartition(HivePartitionUtils.java:167)
at
org.apache.flink.connectors.hive.HiveTableSource$HiveContinuousPartitionFetcherContext.toHiveTablePartition(HiveTableSource.java:388)
at
org.apache.flink.connectors.hive.ContinuousHiveSplitEnumerator$PartitionMonitor.call(ContinuousHiveSplitEnumerator.java:224)
at
org.apache.flink.connectors.hive.ContinuousHiveSplitEnumerator$PartitionMonitor.call(ContinuousHiveSplitEnumerator.java:172)
at
org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$5(ExecutorNotifier.java:132)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
... 3 more
Best regards!
samuel