Hi James,

The answer to your question depends on your use case.
The AsyncIOFunction approach works if you have a DataStream that you would
like to enrich with data in a Cassandra table but not if you would like to
create a DataStream from a Cassandra table.

The Flink code base contains a CassandraInputFormat [1], but I don't think
it is extensively used. You might need to adjust it to your needs.
InputFormats are the source connectors for the DataSet API but can also be
used to from the DataStream API to read bounded data. However, InputFormats
do not guarantee the order of emitted records. Hence, they are not well
suited for time-based applications.

Best, Fabian

[1]
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java


2018-03-29 23:09 GMT+02:00 James Yu <cyu...@gmail.com>:

> Hi,
>
> I tried to treat Cassandra as the source of data in Flink with the
> information provided in the following links:
> - https://stackoverflow.com/questions/43067681/read-data-
> from-cassandra-for-processing-in-flink
> - https://www.javatips.net/api/flink-master/flink-examples/
> flink-examples-streaming/src/main/java/org/apache/flink/
> streaming/examples/async/AsyncIOExample.java
>
> I got the AsyncWaitOperator exception when I run the task. According the
> the 1st link, this exception occurs due to network problem. However, the
> strange thing is that I am running Cassandra on my local VM with only 10
> rows of data in the target table.
>
> @Jicaar in 1st link also mentions that switching from RichAsyncFunction to
> RichMapFunction can avoid the AsyncWaitOperator exception, can someone
> with similar experience share how to do it in RichMapFunction?
>
> AsyncWaitOperator exception trace -->
> 02:21:00.164 [AsyncIO-Emitter-Thread (Source: Custom Source -> async wait
> operator -> (Flat Map, Sink: Unnamed) (1/1))] INFO
> org.apache.flink.runtime.taskmanager.Task  - Source: Custom Source ->
> async wait operator -> (Flat Map, Sink: Unnamed) (1/1) (
> 2809cef511194e612b2cc65510f78c64) switched from RUNNING to FAILED.
> java.lang.Exception: An async function call terminated with an exception.
> Failing the AsyncWaitOperator.
>   at 
> org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:137)
> [flink-streaming-java_2.11-1.4.2.jar:1.4.2]
>   at 
> org.apache.flink.streaming.api.operators.async.Emitter.run(Emitter.java:85)
> [flink-streaming-java_2.11-1.4.2.jar:1.4.2]
>   at java.lang.Thread.run(Thread.java:745) [na:1.8.0_60]
> Caused by: org.apache.flink.streaming.runtime.tasks.
> ExceptionInChainedOperatorException: Could not forward element to next
> operator
>   at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.pushToOperator(OperatorChain.java:566)
> ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
>   at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:524)
> ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
>   at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:504)
> ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
>   at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> BroadcastingOutputCollector.collect(OperatorChain.java:611)
> ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
>   at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> BroadcastingOutputCollector.collect(OperatorChain.java:572)
> ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
>   at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:830)
> ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
>   at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:808)
> ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
>   at org.apache.flink.streaming.api.operators.
> TimestampedCollector.collect(TimestampedCollector.java:51)
> ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
>   at 
> org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:133)
> [flink-streaming-java_2.11-1.4.2.jar:1.4.2]
>   ... 2 common frames omitted
> Caused by: com.esotericsoftware.kryo.KryoException: java.util.
> ConcurrentModificationException
> Serialization trace:
> classes (sun.misc.Launcher$AppClassLoader)
> classloader (java.security.ProtectionDomain)
> context (java.security.AccessControlContext)
> acc (org.apache.flink.runtime.execution.librarycache.
> FlinkUserCodeClassLoaders$ChildFirstClassLoader)
> contextClassLoader (java.lang.Thread)
> threads (java.lang.ThreadGroup)
> groups (java.lang.ThreadGroup)
> threadGroup (io.netty.util.concurrent.DefaultThreadFactory)
> val$backingThreadFactory (com.google.common.util.concurrent.
> ThreadFactoryBuilder$1)
> threadFactory (java.util.concurrent.ThreadPoolExecutor)
> delegate (com.google.common.util.concurrent.MoreExecutors$
> ListeningDecorator)
> blockingExecutor (com.datastax.driver.core.Cluster$Manager)
> manager (com.datastax.driver.core.Host)
> triedHosts (com.datastax.driver.core.ExecutionInfo)
> info (com.datastax.driver.core.ArrayBackedResultSet$SinglePage)
>   at 
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82)
> ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.serializers.FieldSerializer.
> write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> ~[kryo-2.24.0.jar:na]
>   at 
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.serializers.FieldSerializer.
> write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
> ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$
> ObjectArraySerializer.write(DefaultArraySerializers.java:348)
> ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$
> ObjectArraySerializer.write(DefaultArraySerializers.java:289)
> ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> ~[kryo-2.24.0.jar:na]
>   at 
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.serializers.FieldSerializer.
> write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:577)
> ~[kryo-2.24.0.jar:na]
>   at 
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:68)
> ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.serializers.FieldSerializer.
> write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> ~[kryo-2.24.0.jar:na]
>   at 
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.serializers.FieldSerializer.
> write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
> ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$
> ObjectArraySerializer.write(DefaultArraySerializers.java:348)
> ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$
> ObjectArraySerializer.write(DefaultArraySerializers.java:289)
> ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> ~[kryo-2.24.0.jar:na]
>   at 
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.serializers.FieldSerializer.
> write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
> ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$
> ObjectArraySerializer.write(DefaultArraySerializers.java:348)
> ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$
> ObjectArraySerializer.write(DefaultArraySerializers.java:289)
> ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> ~[kryo-2.24.0.jar:na]
>   at 
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.serializers.FieldSerializer.
> write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> ~[kryo-2.24.0.jar:na]
>   at 
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.serializers.FieldSerializer.
> write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> ~[kryo-2.24.0.jar:na]
>   at 
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.serializers.FieldSerializer.
> write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> ~[kryo-2.24.0.jar:na]
>   at 
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.serializers.FieldSerializer.
> write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> ~[kryo-2.24.0.jar:na]
>   at 
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.serializers.FieldSerializer.
> write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> ~[kryo-2.24.0.jar:na]
>   at 
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.serializers.FieldSerializer.
> write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> ~[kryo-2.24.0.jar:na]
>   at 
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.serializers.FieldSerializer.
> write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
> ~[kryo-2.24.0.jar:na]
>   at 
> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:82)
> ~[kryo-2.24.0.jar:na]
>   at 
> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:22)
> ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> ~[kryo-2.24.0.jar:na]
>   at 
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.serializers.FieldSerializer.
> write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> ~[kryo-2.24.0.jar:na]
>   at 
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.serializers.FieldSerializer.
> write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:505)
> ~[kryo-2.24.0.jar:na]
>   at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:182)
> ~[flink-core-1.4.2.jar:1.4.2]
>   at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.pushToOperator(OperatorChain.java:547)
> ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
>   ... 10 common frames omitted
> Caused by: java.util.ConcurrentModificationException: null
>   at java.util.Vector$Itr.checkForComodification(Vector.java:1184)
> ~[na:1.8.0_60]
>   at java.util.Vector$Itr.next(Vector.java:1137) ~[na:1.8.0_60]
>   at 
> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:74)
> ~[kryo-2.24.0.jar:na]
>   at 
> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:22)
> ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> ~[kryo-2.24.0.jar:na]
>   at 
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> ~[kryo-2.24.0.jar:na]
>   ... 68 common frames omitted
>
>
>
> This is a UTF-8 formatted mail
> -----------------------------------------------
> James C.-C.Yu
> +886988713275
>

Reply via email to