Hi,

I tried to treat Cassandra as the source of data in Flink with the
information provided in the following links:
-
https://stackoverflow.com/questions/43067681/read-data-from-cassandra-for-processing-in-flink
-
https://www.javatips.net/api/flink-master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java

I got the AsyncWaitOperator exception when I run the task. According the
the 1st link, this exception occurs due to network problem. However, the
strange thing is that I am running Cassandra on my local VM with only 10
rows of data in the target table.

@Jicaar in 1st link also mentions that switching from RichAsyncFunction to
RichMapFunction can avoid the AsyncWaitOperator exception, can someone with
similar experience share how to do it in RichMapFunction?

AsyncWaitOperator exception trace -->
02:21:00.164 [AsyncIO-Emitter-Thread (Source: Custom Source -> async wait
operator -> (Flat Map, Sink: Unnamed) (1/1))] INFO
org.apache.flink.runtime.taskmanager.Task  - Source: Custom Source -> async
wait operator -> (Flat Map, Sink: Unnamed) (1/1)
(2809cef511194e612b2cc65510f78c64) switched from RUNNING to FAILED.
java.lang.Exception: An async function call terminated with an exception.
Failing the AsyncWaitOperator.
  at
org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:137)
[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at
org.apache.flink.streaming.api.operators.async.Emitter.run(Emitter.java:85)
[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at java.lang.Thread.run(Thread.java:745) [na:1.8.0_60]
Caused by:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator
  at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566)
~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:611)
~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:572)
~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at
org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:133)
[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  ... 2 common frames omitted
Caused by: com.esotericsoftware.kryo.KryoException:
java.util.ConcurrentModificationException
Serialization trace:
classes (sun.misc.Launcher$AppClassLoader)
classloader (java.security.ProtectionDomain)
context (java.security.AccessControlContext)
acc
(org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader)
contextClassLoader (java.lang.Thread)
threads (java.lang.ThreadGroup)
groups (java.lang.ThreadGroup)
threadGroup (io.netty.util.concurrent.DefaultThreadFactory)
val$backingThreadFactory
(com.google.common.util.concurrent.ThreadFactoryBuilder$1)
threadFactory (java.util.concurrent.ThreadPoolExecutor)
delegate
(com.google.common.util.concurrent.MoreExecutors$ListeningDecorator)
blockingExecutor (com.datastax.driver.core.Cluster$Manager)
manager (com.datastax.driver.core.Host)
triedHosts (com.datastax.driver.core.ExecutionInfo)
info (com.datastax.driver.core.ArrayBackedResultSet$SinglePage)
  at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:348)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:289)
~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:577)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:68)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:348)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:289)
~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:348)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:289)
~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:82)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:22)
~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:505)
~[kryo-2.24.0.jar:na]
  at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:182)
~[flink-core-1.4.2.jar:1.4.2]
  at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:547)
~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  ... 10 common frames omitted
Caused by: java.util.ConcurrentModificationException: null
  at java.util.Vector$Itr.checkForComodification(Vector.java:1184)
~[na:1.8.0_60]
  at java.util.Vector$Itr.next(Vector.java:1137) ~[na:1.8.0_60]
  at
com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:74)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:22)
~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
~[kryo-2.24.0.jar:na]
  ... 68 common frames omitted



This is a UTF-8 formatted mail
-----------------------------------------------
James C.-C.Yu
+886988713275

Reply via email to