Row类型的对象在python中是怎么表示的,字典?
在 2020-10-20 20:35:22,"Dian Fu" <[email protected]> 写道: >你这两个UDF(error_get和headers_get)实际的返回值类型都是string,但是却标成Row类型。如果确实需要返回Row类型,udf的返回值需要返回一个Row类型的对象。 > >> 在 2020年10月20日,下午7:56,Dian Fu <[email protected]> 写道: >> >> 错误堆栈看着似乎不太完整,有更完整的堆栈吗? >> >>> 在 2020年10月20日,下午7:38,whh_960101 <[email protected]> 写道: >>> >>> Hi,各位大佬,我处理kafka source Table,print 一个嵌套的json结果,报错JobExecutionException: >>> Job execution failed. >>> 这个问题该怎么解决,代码和报错信息如下:@udf(input_types=DataTypes.STRING(), >>> result_type=DataTypes.BOOLEAN()) >>> def error_exist(message): >>> if message is None: >>> return False >>> mes_dic = json.loads(message.strip()) >>> log = mes_dic.get('log').lower().strip() >>> if 'error' in log: >>> return True >>> else: >>> return False >>> >>> @udf(input_types=DataTypes.STRING(), >>> result_type=DataTypes.ROW([DataTypes.FIELD("content", DataTypes.STRING())])) >>> def error_get(message): >>> if message is None: >>> return '' >>> mes_dic = json.loads(message.strip()) >>> log = mes_dic.get('log') >>> return json.dumps({"content":log.strip()}) >>> >>> >>> @udf(input_types=[DataTypes.STRING(),DataTypes.STRING(),DataTypes.STRING()], >>> result_type=DataTypes.ROW([\ >>> DataTypes.FIELD("appId", >>> DataTypes.STRING()),DataTypes.FIELD("hostname", DataTypes.STRING()), \ >>> DataTypes.FIELD("level", >>> DataTypes.STRING()),DataTypes.FIELD("timeZone", DataTypes.STRING()), \ >>> DataTypes.FIELD("timestamp", >>> DataTypes.DOUBLE()),DataTypes.FIELD("container", DataTypes.STRING()), \ >>> DataTypes.FIELD("clusterName", >>> DataTypes.STRING()),DataTypes.FIELD("clusterChinese", DataTypes.STRING())])) >>> def headers_get(message,container,clusterName): >>> mes_dic = json.loads(message.strip()) >>> tz_utc = mes_dic.get('time') >>> tz_utc = tz_utc[:tz_utc.rfind('.') + 7] >>> from_zone = tz.gettz('UTC') >>> dt_utc = datetime.strptime(tz_utc, '%Y-%m-%dT%H:%M:%S.%f') >>> dt_utc = dt_utc.replace(tzinfo=from_zone) >>> dt_ts = dt_utc.timestamp() >>> >>> map_df = pd.read_csv('cc_log_dev_map.csv') >>> clusterChinese = map_df.loc[map_df.cs_English == clusterName, >>> 'cs_Chinese'].values[0] >>> >>> return >>> json.dumps({'appId':'','hostname':'','level':'ERROR','timeZone':'Asia/Shanghai','timestamp':dt_ts,\ >>> >>> 'container':container,'clusterName':clusterName,'clusterChinese':clusterChinese}) >>> >>> #st_env.execute_sql(""" >>> # CREATE TABLE source( >>> # message STRING, >>> # clusterName STRING, >>> # kubernetes ROW<container ROW<name STRING>> >>> # ) WITH( >>> # 'connector' = 'kafka', >>> # ) >>> # """) >>> >>> st_env.execute_sql(""" >>> CREATE TABLE sink( >>> body ROW<content STRING>, >>> headers ROW<appId STRING,hostname STRING,level STRING,timeZone >>> STRING,\ >>> `timestamp` DOUBLE,container STRING,clusterName >>> STRING,clusterChinese STRING> >>> ) WITH( >>> 'connector' = 'print', >>> ) >>> """)tmp_table = st_env.from_path("source") \ >>> .select("message,kubernetes.get('container').get('name') as >>> container,clusterName") >>> >>> data_stream = >>> st_env._j_tenv.toAppendStream(tmp_table._j_table,tmp_table._j_table.getSchema().toRowType()) >>> table = >>> Table(st_env._j_tenv.fromDataStream(data_stream,"message,clusterName,container"),st_env) >>> >>> sink_table = table \ >>> .where("error_exist(message) = true") \ >>> .select("error_get(message) as body, >>> headers_get(message,container,clusterName) as headers") >>> >>> >>> sink_table.execute_insert("kafka_sink").get_job_client().get_job_execution_result().result()报错:File >>> "log_parse_kafka.py", line 180, in from_kafka_to_oracle_demo >>> >>> sink_table.execute_insert("kafka_sink").get_job_client().get_job_execution_result().result() >>> File >>> "/home/cdh272705/.local/lib/python3.6/site-packages/pyflink/common/completable_future.py", >>> line 78, in result >>> return self._py_class(self._j_completable_future.get()) >>> File >>> "/home/cdh272705/.local/lib/python3.6/site-packages/py4j/java_gateway.py", >>> line 1286, in __call__ >>> answer, self.gateway_client, self.target_id, self.name) >>> File >>> "/home/cdh272705/.local/lib/python3.6/site-packages/pyflink/util/exceptions.py", >>> line 147, in deco >>> return f(*a, **kw) >>> File "/home/cdh272705/.local/lib/python3.6/site-packages/py4j/protocol.py", >>> line 328, in get_return_value >>> format(target_id, ".", name), value) >>> py4j.protocol.Py4JJavaError: An error occurred while calling o150.get. >>> : java.util.concurrent.ExecutionException: >>> org.apache.flink.runtime.client.JobExecutionException: Job execution failed. >>> at >>> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) >>> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> at >>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>> at >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> at java.lang.reflect.Method.invoke(Method.java:498) >>> at >>> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) >>> at >>> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) >>> at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) >>> at >>> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) >>> at >>> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) >>> at >>> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) >>> at java.lang.Thread.run(Thread.java:748) >>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job >>> execution failed.at >>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147) >>> at >>> org.apache.flink.client.program.PerJobMiniClusterFactory$PerJobMiniClusterJobClient.lambda$getJobExecutionResult$2(PerJobMiniClusterFactory.java:186) >>> at >>> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) >>> at >>> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) >>> at >>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) >>> at >>> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) >>> at >>> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:229) >>> at >>> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) >>> at >>> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) >>> at >>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) >>> at >>> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) >>> at >>> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:892) >>> at akka.dispatch.OnComplete.internal(Future.scala:264) >>> at akka.dispatch.OnComplete.internal(Future.scala:261) >>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) >>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)at >>> scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) >>> at >>> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74) >>> at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44) >>> at >>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252) >>> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572) >>> at >>> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22) >>> at >>> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21) >>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436) >>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435) >>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) >>> at >>> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) >>> at >>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91) >>> at >>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) >>> at >>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) >>> at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) >>> at >>> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90) >>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) >>> at >>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) >>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>> at >>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>> at >>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>
