Hi,
根据你提供的堆栈,真正报错的堆栈如下:
`Parsing error for column 1 of row '锘�1,98' originated by LongParser:
NUMERIC_VALUE_ILLEGAL_CHARACTER.`
这是因为不能将数据转换为 long 类型,故你可以对应字段定义为 varchar。
Best,
Hailong Wang.
在 2020-10-25 17:53:29,"洗你的头" <[email protected]> 写道:
>尊敬的开发者您好:我在新使用pyflink时,跑通了简单的单词统计的例子,但是在运行求和的例子时报错了,我不知道如何解决
>
>具体内容为:
>1.源代码
>from pyflink.table import StreamTableEnvironment, DataTypes, BatchTableEnvironment
>from pyflink.table.descriptors import Schema, OldCsv, FileSystem
>from pyflink.table.udf import udf
>from pyflink.datastream import StreamExecutionEnvironment
>#from pyflink.dataset import ExecutionEnvironment
>
>env = StreamExecutionEnvironment.get_execution_environment()
>#env = ExecutionEnvironment.get_execution_environment()
>env.set_parallelism(1)
>t_env = StreamTableEnvironment.create(env)
>#t_env = BatchTableEnvironment.create(env)
>t_env.get_config().get_configuration().set_string("python.fn-execution.memory.managed", 'true')
>t_env.get_config().get_configuration().set_string("python.fn-execution.arrow.batch.size", '2')
>
>t_env.register_function("add", udf(lambda i, j: i + j, [DataTypes.BIGINT(), DataTypes.BIGINT()], DataTypes.BIGINT()))
>
>t_env.connect(FileSystem().path('input\input')) \
> .with_format(OldCsv()
> .field('a', DataTypes.BIGINT())
> .field('b', DataTypes.BIGINT())) \
> .with_schema(Schema()
> .field('a', DataTypes.BIGINT())
> .field('b', DataTypes.BIGINT())) \
> .create_temporary_table('mySource')
>
>t_env.connect(FileSystem().path('output')) \
> .with_format(OldCsv()
> .field('sum', DataTypes.BIGINT())) \
> .with_schema(Schema()
> .field('sum', DataTypes.BIGINT())) \
> .create_temporary_table('mySink')
>
>t_env.from_path('mySource')\
> .select("add(a, b)") \
> .insert_into('mySink')
>
>t_env.execute("tutorial_job")
>当我运行此代码时,会报出如下错误:
>2.错误信息
>Py4JJavaError Traceback (most recent call last)
><ipython-input-5-4e5da54b7616> in <module> 35
>.insert_into('mySink') 36 ---> 37 t_env.execute("tutorial_job")
>F:\Anaconda3\envs\pyflink\lib\site-packages\pyflink\table\table_environment.py
>in execute(self, job_name) 1055 "use
>create_statement_set for multiple sinks.", DeprecationWarning) 1056
>self._before_execute() -> 1057 return
>JobExecutionResult(self._j_tenv.execute(job_name)) 1058 1059 def
>from_elements(self, elements, schema=None, verify_schema=True):
>F:\Anaconda3\envs\pyflink\lib\site-packages\py4j-0.10.8.1-py3.7.egg\py4j\java_gateway.py
> in __call__(self, *args) 1284 answer =
>self.gateway_client.send_command(command) 1285 return_value =
>get_return_value( -> 1286 answer, self.gateway_client,
>self.target_id, self.name) 1287 1288 for temp_arg in temp_args:
>F:\Anaconda3\envs\pyflink\lib\site-packages\pyflink\util\exceptions.py in
>deco(*a, **kw) 145 def deco(*a, **kw): 146 try: --> 147
> return f(*a, **kw) 148 except Py4JJavaError as e:
>149 s = e.java_exception.toString()
>F:\Anaconda3\envs\pyflink\lib\site-packages\py4j-0.10.8.1-py3.7.egg\py4j\protocol.py
> in get_return_value(answer, gateway_client, target_id, name) 326
> raise Py4JJavaError( 327 "An error occurred
>while calling {0}{1}{2}.\n". --> 328 format(target_id,
>".", name), value) 329 else: 330 raise
>Py4JError( Py4JJavaError: An error occurred while calling o374.execute. :
>java.util.concurrent.ExecutionException:
>org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> at
>java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
> at
>java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
> at
>org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1717)
> at
>org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74)
> at
>org.apache.flink.table.planner.delegation.ExecutorBase.execute(ExecutorBase.java:52)
> at
>org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1214)
> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>Method) at
>java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
>java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.base/java.lang.reflect.Method.invoke(Method.java:566) at
>org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> at
>org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> at
>org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>at
>org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> at
>org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
> at
>org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
> at java.base/java.lang.Thread.run(Thread.java:834) Caused by:
>org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> at
>org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
> at
>org.apache.flink.client.program.PerJobMiniClusterFactory$PerJobMiniClusterJobClient.lambda$getJobExecutionResult$2(PerJobMiniClusterFactory.java:186)
> at
>java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
> at
>java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> at
>java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
> at
>org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:229)
> at
>java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
> at
>java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
> at
>java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> at
>java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
> at
>org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:892)
> at akka.dispatch.OnComplete.internal(Future.scala:264) at
>akka.dispatch.OnComplete.internal(Future.scala:261) at
>akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) at
>akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) at
>scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) at
>org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
> at
>scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
>at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572) at
>akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
> at
>akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
>at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435) at
>scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) at
>akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
> at
>akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
> at
>akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
> at
>akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
> at
>scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>at
>akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>at
>akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>at
>akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
>akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by
>NoRestartBackoffTimeStrategy at
>org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
> at
>org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
> at
>org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
> at
>org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:185)
> at
>org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:179)
> at
>org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:503)
> at
>org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:386)
> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>Method) at
>java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
>java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.base/java.lang.reflect.Method.invoke(Method.java:566) at
>org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
> at
>org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
> at
>org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
> at
>org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at
>akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at
>scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at
>akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at
>scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at
>scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at
>scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at
>akka.actor.Actor$class.aroundReceive(Actor.scala:517) at
>akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at
>akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at
>akka.actor.ActorCell.invoke(ActorCell.scala:561) at
>akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at
>akka.dispatch.Mailbox.run(Mailbox.scala:225) at
>akka.dispatch.Mailbox.exec(Mailbox.scala:235) ... 4 more Caused by:
>org.apache.flink.api.common.io.ParseException: Parsing error for column 1 of
>row '锘�1,98' originated by LongParser: NUMERIC_VALUE_ILLEGAL_CHARACTER.
>at
>org.apache.flink.api.java.io.RowCsvInputFormat.parseRecord(RowCsvInputFormat.java:182)
> at
>org.apache.flink.api.java.io.CsvInputFormat.readRecord(CsvInputFormat.java:111)
> at
>org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:520)
> at
>org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:79)
> at
>org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.readAndCollectRecord(ContinuousFileReaderOperator.java:359)
> at
>org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.processRecord(ContinuousFileReaderOperator.java:326)
> at
>org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.lambda$new$0(ContinuousFileReaderOperator.java:225)
> at
>org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
> at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
> at
>org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:270)
> at
>org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:190)
> at
>org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
> at
>org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
> at
>org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at
>java.base/java.lang.Thread.run(Thread.java:834)
>
>
>我该如何解决呢?(或者您可以推荐几个我在中国 国内比较活跃的答疑社区吗?)
>因这种可能较简单的问题打扰到您,万分歉意,祝您工作愉快,身体健康!