Hi,各位大佬,我处理kafka source Table,print 一个嵌套的json结果,报错JobExecutionException: Job 
execution failed. 这个问题该怎么解决,代码和报错信息如下:@udf(input_types=DataTypes.STRING(), 
result_type=DataTypes.BOOLEAN())
    def error_exist(message):
        if message is None:
            return False
        mes_dic = json.loads(message.strip())
        log = mes_dic.get('log').lower().strip()
        if 'error' in log:
            return True
        else:
            return False

    @udf(input_types=DataTypes.STRING(), 
result_type=DataTypes.ROW([DataTypes.FIELD("content", DataTypes.STRING())]))
    def error_get(message):  
        if message is None:
            return ''
        mes_dic = json.loads(message.strip())
        log = mes_dic.get('log')
        return json.dumps({"content":log.strip()})

    
@udf(input_types=[DataTypes.STRING(),DataTypes.STRING(),DataTypes.STRING()], 
result_type=DataTypes.ROW([\
            DataTypes.FIELD("appId", 
DataTypes.STRING()),DataTypes.FIELD("hostname", DataTypes.STRING()), \
            DataTypes.FIELD("level", 
DataTypes.STRING()),DataTypes.FIELD("timeZone", DataTypes.STRING()), \
            DataTypes.FIELD("timestamp", 
DataTypes.DOUBLE()),DataTypes.FIELD("container", DataTypes.STRING()), \
            DataTypes.FIELD("clusterName", 
DataTypes.STRING()),DataTypes.FIELD("clusterChinese", DataTypes.STRING())]))
    def headers_get(message,container,clusterName):
        mes_dic = json.loads(message.strip())
        tz_utc = mes_dic.get('time')
        tz_utc = tz_utc[:tz_utc.rfind('.') + 7]
        from_zone = tz.gettz('UTC')
        dt_utc = datetime.strptime(tz_utc, '%Y-%m-%dT%H:%M:%S.%f')
        dt_utc = dt_utc.replace(tzinfo=from_zone)
        dt_ts = dt_utc.timestamp()

        map_df = pd.read_csv('cc_log_dev_map.csv')
        clusterChinese = map_df.loc[map_df.cs_English == clusterName, 
'cs_Chinese'].values[0]

        return 
json.dumps({'appId':'','hostname':'','level':'ERROR','timeZone':'Asia/Shanghai','timestamp':dt_ts,\
                           
'container':container,'clusterName':clusterName,'clusterChinese':clusterChinese})

#st_env.execute_sql("""
#        CREATE TABLE source(
#           message STRING,
#           clusterName STRING,
#           kubernetes ROW<container ROW<name STRING>>
#        ) WITH(
#            'connector' = 'kafka',
#        )
#    """)

st_env.execute_sql("""
        CREATE TABLE sink(
            body ROW<content STRING>,
            headers ROW<appId STRING,hostname STRING,level STRING,timeZone 
STRING,\
            `timestamp` DOUBLE,container STRING,clusterName 
STRING,clusterChinese STRING>
        ) WITH(
            'connector' = 'print',
        )
    """)tmp_table =  st_env.from_path("source") \
        .select("message,kubernetes.get('container').get('name') as 
container,clusterName")
 
    data_stream = 
st_env._j_tenv.toAppendStream(tmp_table._j_table,tmp_table._j_table.getSchema().toRowType())
    table = 
Table(st_env._j_tenv.fromDataStream(data_stream,"message,clusterName,container"),st_env)

    sink_table = table \
        .where("error_exist(message) = true") \
        .select("error_get(message) as body, 
headers_get(message,container,clusterName) as headers")
       
    
sink_table.execute_insert("kafka_sink").get_job_client().get_job_execution_result().result()报错:File
 "log_parse_kafka.py", line 180, in from_kafka_to_oracle_demo
    
sink_table.execute_insert("kafka_sink").get_job_client().get_job_execution_result().result()
  File 
"/home/cdh272705/.local/lib/python3.6/site-packages/pyflink/common/completable_future.py",
 line 78, in result
    return self._py_class(self._j_completable_future.get())
  File 
"/home/cdh272705/.local/lib/python3.6/site-packages/py4j/java_gateway.py", line 
1286, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File 
"/home/cdh272705/.local/lib/python3.6/site-packages/pyflink/util/exceptions.py",
 line 147, in deco
    return f(*a, **kw)
  File "/home/cdh272705/.local/lib/python3.6/site-packages/py4j/protocol.py", 
line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o150.get.
: java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
 at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
 at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
 at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
 at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
 at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
 at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
 at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
 at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed.at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
 at 
org.apache.flink.client.program.PerJobMiniClusterFactory$PerJobMiniClusterJobClient.lambda$getJobExecutionResult$2(PerJobMiniClusterFactory.java:186)
 at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
 at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
 at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
 at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
 at 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:229)
 at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
 at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
 at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
 at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
 at 
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:892)
 at akka.dispatch.OnComplete.internal(Future.scala:264)
 at akka.dispatch.OnComplete.internal(Future.scala:261)
 at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
 at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)at 
scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
 at 
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
 at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
 at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
 at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
 at 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
 at 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
 at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
 at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
 at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
 at 
akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
 at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
 at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
 at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
 at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
 at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
 at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
 at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
 at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

回复