这个错误一般是因为JM或者TM那边缺少相应的依赖,导致反序列化的时候失败了。

On Tue, Nov 10, 2020 at 8:22 AM 蒋龙 <[email protected]> wrote:

> 大佬们好,使用flink读取hive报了一个很奇怪错:Caused by:
> org.apache.flink.runtime.operators.util.CorruptConfigurationException:
> Could not read the user code wrapper: invalid type code: 9C。一直不得其解,求赐教啊~
> flink:1.11.2
> hive: 2.2.0
> hadoop:3.0.3
>
> 感觉像是输入输出流的包装那里有问题,但不知道是哪里有问题。on
> yarn的话,原来的HiveCatalog需要一个hive-site.xml路径,但是on
> yarn的话,这个怎么找,所以这里的HiveConf是我自己构造的,会不会跟这个有关系?
>
> 完成代码:
> public class flink {
>    public static void main(String[] args) throws Exception {
>        EnvironmentSettings settings = EnvironmentSettings
>                .newInstance()
>                .useBlinkPlanner() // 使用BlinkPlanner
>                .inBatchMode() // Batch模式,默认为StreamingMode
>                .build();
>        TableEnvironment tableEnv = TableEnvironment.create(settings);
>        String name = "myhive";      // Catalog名称,定义一个唯一的名称表示
>        String defaultDatabase = "vr_shopping_test";  // 默认数据库名称
>        String hiveConfDir =
> "/Users/john/IdeaProjects/flink-hive-x/src/main/resources";  //
> hive-site.xml路径
>        String version = "2.1.1";
>
>        HiveConf hiveConf = createHiveConf();
>        hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS,
> "thrift://xxxxxxxxxx:9083 <thrift://xxxxxxxxxx:9083>");
>
>        HiveCatalog hive = new MyHiveCatalog(name, defaultDatabase,
> hiveConf, version);
>        tableEnv.registerCatalog("myhive", hive);
>        tableEnv.useCatalog("myhive");
>        tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
>        tableEnv.useDatabase(defaultDatabase);
>        // 创建数据库,目前不支持创建hive表
>        String createDbSql = "select * from dim_vr_shopping_wx_users_v1
> limit 1";
>        TableResult res = tableEnv.executeSql(createDbSql);
>        res.print();
>    }
>
>    private static HiveConf createHiveConf() {
>        // create HiveConf from hadoop configuration
>        Configuration hadoopConf = HadoopUtils.getHadoopConfiguration(new
> org.apache.flink.configuration.Configuration());
>
>        // Add mapred-site.xml. We need to read configurations like
> compression codec.
>        for (String possibleHadoopConfPath :
> HadoopUtils.possibleHadoopConfPaths(new
> org.apache.flink.configuration.Configuration())) {
>            File mapredSite = new File(new File(possibleHadoopConfPath),
> "mapred-site.xml");
>            if (mapredSite.exists()) {
>                hadoopConf.addResource(new
> Path(mapredSite.getAbsolutePath()));
>                break;
>            }
>        }
>        return new HiveConf(hadoopConf, HiveConf.class);
>    }
> }
>
>
>
> 完整堆栈:
>
> The program finished with the following exception:
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: Failed to execute sql
>         at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
>         at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>         at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
>         at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
>         at
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
>         at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
>         at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at javax.security.auth.Subject.doAs(Subject.java:422)
>         at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
>         at
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>         at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
> Caused by: org.apache.flink.table.api.TableException: Failed to execute sql
>         at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:747)
>         at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:1069)
>         at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:690)
>         at com.aibee.flink.main(flink.java:42)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
>         ... 11 more
> Caused by: org.apache.flink.util.FlinkException: Failed to execute job
> 'collect'.
>         at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1821)
>         at
> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128)
>         at
> org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:57)
>         at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:736)
>         ... 19 more
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed
> to submit JobGraph.
>         at
> org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$7(RestClusterClient.java:366)
>         at
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>         at
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
>         at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>         at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>         at
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:292)
>         at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>         at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>         at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>         at
> java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
>         at
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
>         at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.runtime.rest.util.RestClientException:
> [Internal server error., <Exception on server side:
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit
> job.
>         at
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$3(Dispatcher.java:344)
>         at
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
>         at
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
>         at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>         at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>         at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
>         at
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         at
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Could
> not instantiate JobManager.
>         at
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:398)
>         at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>         ... 6 more
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot
> initialize task 'Source: HiveTableSource(id, openid, unionid, nickname,
> language, city, province, headimgurl, country, last_login_time, privilege,
> created_at, updated_at, uuid, vip_code, level_id, member_id, sex)
> TablePath: vr_shopping_test.dim_vr_shopping_wx_users_v1, PartitionPruned:
> false, PartitionNums: null, ProjectedFields: [15]': Loading the
> input/output formats failed:
>         at
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:216)
>         at
> org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:269)
>         at
> org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:242)
>         at
> org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:229)
>         at
> org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:119)
>         at
> org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:103)
>         at
> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:284)
>         at
> org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:272)
>         at
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
>         at
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
>         at
> org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:140)
>         at
> org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84)
>         at
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:388)
>         ... 7 more
> Caused by: java.lang.Exception: Loading the input/output formats failed:
>         at
> org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initInputOutputformatContainer(InputOutputFormatVertex.java:155)
>         at
> org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initializeOnMaster(InputOutputFormatVertex.java:59)
>         at
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:212)
>         ... 19 more
> Caused by: java.lang.RuntimeException: Deserializing the input/output
> formats failed: Could not read the user code wrapper: invalid type code: 9C
>         at
> org.apache.flink.runtime.jobgraph.InputOutputFormatContainer.<init>(InputOutputFormatContainer.java:68)
>         at
> org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initInputOutputformatContainer(InputOutputFormatVertex.java:152)
>         ... 21 more
> Caused by:
> org.apache.flink.runtime.operators.util.CorruptConfigurationException:
> Could not read the user code wrapper: invalid type code: 9C
>         at
> org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:290)
>         at
> org.apache.flink.runtime.jobgraph.InputOutputFormatContainer.<init>(InputOutputFormatContainer.java:66)
>         ... 22 more
> Caused by: java.io.StreamCorruptedException: invalid type code: 9C
>         at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1563)
>         at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
>         at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2125)
>         at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
>         at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>         at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
>         at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
>         at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
>         at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>         at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
>         at java.util.HashMap.readObject(HashMap.java:1404)
>         at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
>         at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2136)
>         at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
>         at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>         at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
>         at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
>         at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
>         at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>         at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
>         at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
>         at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
>         at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>         at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
>         at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
>         at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
>         at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
>         at
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
>         at
> org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
>         ... 23 more
>
> End of exception on server side>]
>         at
> org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:390)
>         at
> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:374)
>         at
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
>         at
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
>         ... 4 more



-- 
Best regards!
Rui Li

回复