[ https://issues.apache.org/jira/browse/FLINK-22261?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Dian Fu updated FLINK-22261: ---------------------------- Fix Version/s: (was: 1.13.0) > Python StreamingModeDataStreamTests is failed on Azure > ------------------------------------------------------ > > Key: FLINK-22261 > URL: https://issues.apache.org/jira/browse/FLINK-22261 > Project: Flink > Issue Type: Bug > Components: API / Python > Affects Versions: 1.13.0 > Reporter: Jark Wu > Priority: Major > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=16443&view=logs&j=9cada3cb-c1d3-5621-16da-0f718fb86602&t=8d78fe4f-d658-5c70-12f8-4921589024c3 > {code} > 2021-04-13T11:49:32.1640428Z =================================== FAILURES > =================================== > 2021-04-13T11:49:32.1641478Z _____ > StreamingModeDataStreamTests.test_keyed_process_function_with_state ______ > 2021-04-13T11:49:32.1641744Z > 2021-04-13T11:49:32.1642074Z self = > <pyflink.datastream.tests.test_data_stream.StreamingModeDataStreamTests > testMethod=test_keyed_process_function_with_state> > 2021-04-13T11:49:32.1642359Z > 2021-04-13T11:49:32.1642606Z def > test_keyed_process_function_with_state(self): > 2021-04-13T11:49:32.1644412Z self.env.set_parallelism(1) > 2021-04-13T11:49:32.1644941Z > self.env.get_config().set_auto_watermark_interval(2000) > 2021-04-13T11:49:32.1645447Z > self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime) > 2021-04-13T11:49:32.1647182Z data_stream = > self.env.from_collection([(1, 'hi', '1603708211000'), > 2021-04-13T11:49:32.1648276Z > (2, 'hello', '1603708224000'), > 2021-04-13T11:49:32.1661775Z > (3, 'hi', '1603708226000'), > 2021-04-13T11:49:32.1663379Z > (4, 'hello', '1603708289000'), > 2021-04-13T11:49:32.1665197Z > (5, 'hi', '1603708291000'), > 2021-04-13T11:49:32.1666200Z > (6, 'hello', '1603708293000')], > 2021-04-13T11:49:32.1666827Z > type_info=Types.ROW([Types.INT(), Types.STRING(), > 2021-04-13T11:49:32.1667449Z > Types.STRING()])) > 2021-04-13T11:49:32.1667830Z > 2021-04-13T11:49:32.1668351Z class > MyTimestampAssigner(TimestampAssigner): > 2021-04-13T11:49:32.1668755Z > 2021-04-13T11:49:32.1669783Z def extract_timestamp(self, value, > record_timestamp) -> int: > 2021-04-13T11:49:32.1670386Z return int(value[2]) > 2021-04-13T11:49:32.1670672Z > 2021-04-13T11:49:32.1671063Z class > MyProcessFunction(KeyedProcessFunction): > 2021-04-13T11:49:32.1671434Z > 2021-04-13T11:49:32.1671727Z def __init__(self): > 2021-04-13T11:49:32.1672090Z self.value_state = None > 2021-04-13T11:49:32.1685812Z self.list_state = None > 2021-04-13T11:49:32.1686276Z self.map_state = None > 2021-04-13T11:49:32.1686609Z > 2021-04-13T11:49:32.1687039Z def open(self, runtime_context: > RuntimeContext): > 2021-04-13T11:49:32.1688350Z value_state_descriptor = > ValueStateDescriptor('value_state', Types.INT()) > 2021-04-13T11:49:32.1688953Z self.value_state = > runtime_context.get_state(value_state_descriptor) > 2021-04-13T11:49:32.1689892Z list_state_descriptor = > ListStateDescriptor('list_state', Types.INT()) > 2021-04-13T11:49:32.1690492Z self.list_state = > runtime_context.get_list_state(list_state_descriptor) > 2021-04-13T11:49:32.1691407Z map_state_descriptor = > MapStateDescriptor('map_state', Types.INT(), Types.STRING()) > 2021-04-13T11:49:32.1692052Z self.map_state = > runtime_context.get_map_state(map_state_descriptor) > 2021-04-13T11:49:32.1692481Z > 2021-04-13T11:49:32.1693134Z def process_element(self, value, > ctx): > 2021-04-13T11:49:32.1693632Z current_value = > self.value_state.value() > 2021-04-13T11:49:32.1694106Z self.value_state.update(value[0]) > 2021-04-13T11:49:32.1694573Z current_list = [_ for _ in > self.list_state.get()] > 2021-04-13T11:49:32.1695051Z self.list_state.add(value[0]) > 2021-04-13T11:49:32.1695445Z map_entries_string = [] > 2021-04-13T11:49:32.1695902Z for k, v in > self.map_state.items(): > 2021-04-13T11:49:32.1696822Z > map_entries_string.append(str(k) + ': ' + str(v)) > 2021-04-13T11:49:32.1697700Z map_entries_string = '{' + ', > '.join(map_entries_string) + '}' > 2021-04-13T11:49:32.1698483Z self.map_state.put(value[0], > value[1]) > 2021-04-13T11:49:32.1698941Z current_key = > ctx.get_current_key() > 2021-04-13T11:49:32.1699840Z yield "current key: {}, current > value state: {}, current list state: {}, " \ > 2021-04-13T11:49:32.1700593Z "current map state: {}, > current value: {}".format(str(current_key), > 2021-04-13T11:49:32.1701275Z > str(current_value), > 2021-04-13T11:49:32.1701894Z > str(current_list), > 2021-04-13T11:49:32.1702536Z > map_entries_string, > 2021-04-13T11:49:32.1703219Z > str(value)) > 2021-04-13T11:49:32.1703645Z > 2021-04-13T11:49:32.1703992Z def on_timer(self, timestamp, ctx): > 2021-04-13T11:49:32.1704400Z pass > 2021-04-13T11:49:32.1704654Z > 2021-04-13T11:49:32.1705031Z watermark_strategy = > WatermarkStrategy.for_monotonous_timestamps() \ > 2021-04-13T11:49:32.1705544Z > .with_timestamp_assigner(MyTimestampAssigner()) > 2021-04-13T11:49:32.1706077Z > data_stream.assign_timestamps_and_watermarks(watermark_strategy) \ > 2021-04-13T11:49:32.1706635Z .key_by(lambda x: x[1], > key_type=Types.STRING()) \ > 2021-04-13T11:49:32.1707203Z .process(MyProcessFunction(), > output_type=Types.STRING()) \ > 2021-04-13T11:49:32.1707650Z .add_sink(self.test_sink) > 2021-04-13T11:49:32.1708767Z > self.env.execute('test time stamp > assigner with keyed process function') > 2021-04-13T11:49:32.1709132Z > 2021-04-13T11:49:32.1709485Z > pyflink/datastream/tests/test_data_stream.py:710: > 2021-04-13T11:49:32.1710039Z _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > 2021-04-13T11:49:32.1710613Z > pyflink/datastream/stream_execution_environment.py:645: in execute > 2021-04-13T11:49:32.1711242Z return > JobExecutionResult(self._j_stream_execution_environment.execute(j_stream_graph)) > 2021-04-13T11:49:32.1712222Z > .tox/py38/lib/python3.8/site-packages/py4j/java_gateway.py:1285: in __call__ > 2021-04-13T11:49:32.1713103Z return_value = get_return_value( > 2021-04-13T11:49:32.1713507Z pyflink/util/exceptions.py:147: in deco > 2021-04-13T11:49:32.1713897Z return f(*a, **kw) > 2021-04-13T11:49:32.1714331Z _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > 2021-04-13T11:49:32.1714688Z > 2021-04-13T11:49:32.1715233Z answer = 'xro4189' > 2021-04-13T11:49:32.1715657Z gateway_client = > <py4j.java_gateway.GatewayClient object at 0x7fc977dede50> > 2021-04-13T11:49:32.1716395Z target_id = 'o4063', name = 'execute' > 2021-04-13T11:49:32.1716648Z > 2021-04-13T11:49:32.1717092Z def get_return_value(answer, gateway_client, > target_id=None, name=None): > 2021-04-13T11:49:32.1717760Z """Converts an answer received from the > Java gateway into a Python object. > 2021-04-13T11:49:32.1718304Z > 2021-04-13T11:49:32.1718736Z For example, string representation of > integers are converted to Python > 2021-04-13T11:49:32.1719302Z integer, string representation of > objects are converted to JavaObject > 2021-04-13T11:49:32.1719757Z instances, etc. > 2021-04-13T11:49:32.1720023Z > 2021-04-13T11:49:32.1720419Z :param answer: the string returned by > the Java gateway > 2021-04-13T11:49:32.1720995Z :param gateway_client: the gateway > client used to communicate with the Java > 2021-04-13T11:49:32.1721610Z Gateway. Only necessary if the > answer is a reference (e.g., object, > 2021-04-13T11:49:32.1722048Z list, map) > 2021-04-13T11:49:32.1722529Z :param target_id: the name of the object > from which the answer comes from > 2021-04-13T11:49:32.1723167Z (e.g., *object1* in > `object1.hello()`). Optional. > 2021-04-13T11:49:32.1723738Z :param name: the name of the member from > which the answer comes from > 2021-04-13T11:49:32.1724279Z (e.g., *hello* in > `object1.hello()`). Optional. > 2021-04-13T11:49:32.1724669Z """ > 2021-04-13T11:49:32.1725177Z if is_error(answer)[0]: > 2021-04-13T11:49:32.1725572Z if len(answer) > 1: > 2021-04-13T11:49:32.1725982Z type = answer[1] > 2021-04-13T11:49:32.1726465Z value = > OUTPUT_CONVERTER[type](answer[2:], gateway_client) > 2021-04-13T11:49:32.1727049Z if answer[1] == REFERENCE_TYPE: > 2021-04-13T11:49:32.1727490Z > raise Py4JJavaError( > 2021-04-13T11:49:32.1727999Z "An error occurred while > calling {0}{1}{2}.\n". > 2021-04-13T11:49:32.1728599Z format(target_id, ".", > name), value) > 2021-04-13T11:49:32.1729202Z E py4j.protocol.Py4JJavaError: > An error occurred while calling o4063.execute. > 2021-04-13T11:49:32.1729951Z E : > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > 2021-04-13T11:49:32.1730801Z E at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) > 2021-04-13T11:49:32.1731734Z E at > org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137) > 2021-04-13T11:49:32.1732664Z E at > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) > 2021-04-13T11:49:32.1733592Z E at > java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) > 2021-04-13T11:49:32.1734430Z E at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) > 2021-04-13T11:49:32.1735358Z E at > java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) > 2021-04-13T11:49:32.1735925Z E at > org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:237) > 2021-04-13T11:49:32.1736477Z E at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) > 2021-04-13T11:49:32.1737033Z E at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) > 2021-04-13T11:49:32.1737877Z E at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) > 2021-04-13T11:49:32.1738678Z E at > java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) > 2021-04-13T11:49:32.1739352Z E at > org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1066) > 2021-04-13T11:49:32.1740102Z E at > akka.dispatch.OnComplete.internal(Future.scala:264) > 2021-04-13T11:49:32.1740903Z E at > akka.dispatch.OnComplete.internal(Future.scala:261) > 2021-04-13T11:49:32.1741740Z E at > akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) > 2021-04-13T11:49:32.1742523Z E at > akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) > 2021-04-13T11:49:32.1743271Z E at > scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) > 2021-04-13T11:49:32.1743927Z E at > org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73) > 2021-04-13T11:49:32.1744611Z E at > scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44) > 2021-04-13T11:49:32.1745098Z E at > scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252) > 2021-04-13T11:49:32.1745556Z E at > akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572) > 2021-04-13T11:49:32.1746041Z E at > akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22) > 2021-04-13T11:49:32.1746605Z E at > akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21) > 2021-04-13T11:49:32.1747100Z E at > scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436) > 2021-04-13T11:49:32.1747563Z E at > scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435) > 2021-04-13T11:49:32.1748266Z E at > scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) > 2021-04-13T11:49:32.1748761Z E at > akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) > 2021-04-13T11:49:32.1749534Z E at > akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91) > 2021-04-13T11:49:32.1750409Z E at > akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) > 2021-04-13T11:49:32.1751250Z E at > akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) > 2021-04-13T11:49:32.1752304Z E at > scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) > 2021-04-13T11:49:32.1753238Z E at > akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90) > 2021-04-13T11:49:32.1754028Z E at > akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > 2021-04-13T11:49:32.1754922Z E at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) > 2021-04-13T11:49:32.1755795Z E at > akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > 2021-04-13T11:49:32.1756608Z E at > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > 2021-04-13T11:49:32.1757384Z E at > akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > 2021-04-13T11:49:32.1758377Z E at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > 2021-04-13T11:49:32.1759238Z E Caused by: > org.apache.flink.runtime.JobException: Recovery is suppressed by > NoRestartBackoffTimeStrategy > 2021-04-13T11:49:32.1760250Z E at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138) > 2021-04-13T11:49:32.1761417Z E at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82) > 2021-04-13T11:49:32.1762673Z E at > org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:206) > 2021-04-13T11:49:32.1763798Z E at > org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:196) > 2021-04-13T11:49:32.1764788Z E at > org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:187) > 2021-04-13T11:49:32.1765795Z E at > org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:680) > 2021-04-13T11:49:32.1766714Z E at > org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79) > 2021-04-13T11:49:32.1767594Z E at > org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435) > 2021-04-13T11:49:32.1768413Z E at > sun.reflect.GeneratedMethodAccessor31.invoke(Unknown Source) > 2021-04-13T11:49:32.1769165Z E at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > 2021-04-13T11:49:32.1769887Z E at > java.lang.reflect.Method.invoke(Method.java:498) > 2021-04-13T11:49:32.1770666Z E at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305) > 2021-04-13T11:49:32.1771501Z E at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212) > 2021-04-13T11:49:32.1772414Z E at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) > 2021-04-13T11:49:32.1773421Z E at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) > 2021-04-13T11:49:32.1774434Z E at > akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) > 2021-04-13T11:49:32.1775170Z E at > akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) > 2021-04-13T11:49:32.1775912Z E at > scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > 2021-04-13T11:49:32.1776686Z E at > akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > 2021-04-13T11:49:32.1777448Z E at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) > 2021-04-13T11:49:32.1778349Z E at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > 2021-04-13T11:49:32.1779164Z E at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > 2021-04-13T11:49:32.1779859Z E at > akka.actor.Actor$class.aroundReceive(Actor.scala:517) > 2021-04-13T11:49:32.1780553Z E at > akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) > 2021-04-13T11:49:32.1781229Z E at > akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) > 2021-04-13T11:49:32.1781882Z E at > akka.actor.ActorCell.invoke(ActorCell.scala:561) > 2021-04-13T11:49:32.1782487Z E at > akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) > 2021-04-13T11:49:32.1783251Z E at > akka.dispatch.Mailbox.run(Mailbox.scala:225) > 2021-04-13T11:49:32.1783862Z E at > akka.dispatch.Mailbox.exec(Mailbox.scala:235) > 2021-04-13T11:49:32.1784358Z E ... 4 more > 2021-04-13T11:49:32.1784800Z E Caused by: > java.lang.NullPointerException > 2021-04-13T11:49:32.1785574Z E at > org.apache.flink.streaming.api.runners.python.beam.PythonSharedResources.close(PythonSharedResources.java:69) > 2021-04-13T11:49:32.1786547Z E at > org.apache.flink.runtime.memory.SharedResources$LeasedResource.dispose(SharedResources.java:180) > 2021-04-13T11:49:32.1787407Z E at > org.apache.flink.runtime.memory.SharedResources.release(SharedResources.java:108) > 2021-04-13T11:49:32.1788747Z E at > org.apache.flink.runtime.memory.MemoryManager.lambda$getSharedMemoryResourceForManagedMemory$7(MemoryManager.java:565) > 2021-04-13T11:49:32.1789750Z E at > org.apache.flink.runtime.memory.OpaqueMemoryResource.close(OpaqueMemoryResource.java:65) > 2021-04-13T11:49:32.1790708Z E at > org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.close(BeamPythonFunctionRunner.java:297) > 2021-04-13T11:49:32.1791701Z E at > org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.dispose(AbstractPythonFunctionOperator.java:164) > 2021-04-13T11:49:32.1792626Z E at > org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:833) > 2021-04-13T11:49:32.1793608Z E at > org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:697) > 2021-04-13T11:49:32.1794453Z E at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:629) > 2021-04-13T11:49:32.1795204Z E at > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776) > 2021-04-13T11:49:32.1795892Z E at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) > 2021-04-13T11:49:32.1796535Z E at > java.lang.Thread.run(Thread.java:748) > 2021-04-13T11:49:32.1796858Z > 2021-04-13T11:49:32.1797825Z > .tox/py38/lib/python3.8/site-packages/py4j/protocol.py:326: Py4JJavaError > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)