错误堆栈看着似乎不太完整,有更完整的堆栈吗?

> 在 2020年10月20日,下午7:38,whh_960101 <[email protected]> 写道:
> 
> Hi,各位大佬,我处理kafka source Table,print 一个嵌套的json结果,报错JobExecutionException: Job 
> execution failed. 这个问题该怎么解决,代码和报错信息如下:@udf(input_types=DataTypes.STRING(), 
> result_type=DataTypes.BOOLEAN())
>    def error_exist(message):
>        if message is None:
>            return False
>        mes_dic = json.loads(message.strip())
>        log = mes_dic.get('log').lower().strip()
>        if 'error' in log:
>            return True
>        else:
>            return False
> 
>    @udf(input_types=DataTypes.STRING(), 
> result_type=DataTypes.ROW([DataTypes.FIELD("content", DataTypes.STRING())]))
>    def error_get(message):  
>        if message is None:
>            return ''
>        mes_dic = json.loads(message.strip())
>        log = mes_dic.get('log')
>        return json.dumps({"content":log.strip()})
> 
>    
> @udf(input_types=[DataTypes.STRING(),DataTypes.STRING(),DataTypes.STRING()], 
> result_type=DataTypes.ROW([\
>            DataTypes.FIELD("appId", 
> DataTypes.STRING()),DataTypes.FIELD("hostname", DataTypes.STRING()), \
>            DataTypes.FIELD("level", 
> DataTypes.STRING()),DataTypes.FIELD("timeZone", DataTypes.STRING()), \
>            DataTypes.FIELD("timestamp", 
> DataTypes.DOUBLE()),DataTypes.FIELD("container", DataTypes.STRING()), \
>            DataTypes.FIELD("clusterName", 
> DataTypes.STRING()),DataTypes.FIELD("clusterChinese", DataTypes.STRING())]))
>    def headers_get(message,container,clusterName):
>        mes_dic = json.loads(message.strip())
>        tz_utc = mes_dic.get('time')
>        tz_utc = tz_utc[:tz_utc.rfind('.') + 7]
>        from_zone = tz.gettz('UTC')
>        dt_utc = datetime.strptime(tz_utc, '%Y-%m-%dT%H:%M:%S.%f')
>        dt_utc = dt_utc.replace(tzinfo=from_zone)
>        dt_ts = dt_utc.timestamp()
> 
>        map_df = pd.read_csv('cc_log_dev_map.csv')
>        clusterChinese = map_df.loc[map_df.cs_English == clusterName, 
> 'cs_Chinese'].values[0]
> 
>        return 
> json.dumps({'appId':'','hostname':'','level':'ERROR','timeZone':'Asia/Shanghai','timestamp':dt_ts,\
>                           
> 'container':container,'clusterName':clusterName,'clusterChinese':clusterChinese})
> 
> #st_env.execute_sql("""
> #        CREATE TABLE source(
> #           message STRING,
> #           clusterName STRING,
> #           kubernetes ROW<container ROW<name STRING>>
> #        ) WITH(
> #            'connector' = 'kafka',
> #        )
> #    """)
> 
> st_env.execute_sql("""
>        CREATE TABLE sink(
>            body ROW<content STRING>,
>            headers ROW<appId STRING,hostname STRING,level STRING,timeZone 
> STRING,\
>            `timestamp` DOUBLE,container STRING,clusterName 
> STRING,clusterChinese STRING>
>        ) WITH(
>            'connector' = 'print',
>        )
>    """)tmp_table =  st_env.from_path("source") \
>        .select("message,kubernetes.get('container').get('name') as 
> container,clusterName")
> 
>    data_stream = 
> st_env._j_tenv.toAppendStream(tmp_table._j_table,tmp_table._j_table.getSchema().toRowType())
>    table = 
> Table(st_env._j_tenv.fromDataStream(data_stream,"message,clusterName,container"),st_env)
> 
>    sink_table = table \
>        .where("error_exist(message) = true") \
>        .select("error_get(message) as body, 
> headers_get(message,container,clusterName) as headers")
> 
>    
> sink_table.execute_insert("kafka_sink").get_job_client().get_job_execution_result().result()报错:File
>  "log_parse_kafka.py", line 180, in from_kafka_to_oracle_demo
>    
> sink_table.execute_insert("kafka_sink").get_job_client().get_job_execution_result().result()
>  File 
> "/home/cdh272705/.local/lib/python3.6/site-packages/pyflink/common/completable_future.py",
>  line 78, in result
>    return self._py_class(self._j_completable_future.get())
>  File 
> "/home/cdh272705/.local/lib/python3.6/site-packages/py4j/java_gateway.py", 
> line 1286, in __call__
>    answer, self.gateway_client, self.target_id, self.name)
>  File 
> "/home/cdh272705/.local/lib/python3.6/site-packages/pyflink/util/exceptions.py",
>  line 147, in deco
>    return f(*a, **kw)
>  File "/home/cdh272705/.local/lib/python3.6/site-packages/py4j/protocol.py", 
> line 328, in get_return_value
>    format(target_id, ".", name), value)
> py4j.protocol.Py4JJavaError: An error occurred while calling o150.get.
> : java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> at 
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
> at 
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> at 
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
> at 
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
> at 
> org.apache.flink.client.program.PerJobMiniClusterFactory$PerJobMiniClusterJobClient.lambda$getJobExecutionResult$2(PerJobMiniClusterFactory.java:186)
> at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
> at 
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
> at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:229)
> at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
> at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
> at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
> at 
> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:892)
> at akka.dispatch.OnComplete.internal(Future.scala:264)
> at akka.dispatch.OnComplete.internal(Future.scala:261)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)at 
> scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
> at 
> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
> at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
> at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
> at 
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
> at 
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
> at 
> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
> at 
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
> at 
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
> at 
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
> at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
> at 
> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

回复