Hello,

I am trying to load a table that is about 200 GiB in size in BigQuery to
Cassandra via a batch job in Beam 2.22.0 on Flink 1.10.1 - the job runs but
fails at random points in time throwing different errors each time - and
not always at the same points in the data (which comes in pretty clean from
BigQuery as far as I can see).

I can see here that there seems to be an invalid query string  (log level
is debug) - do I have a chance to set a retry strategy for the CassandraIO
or is there another way to deal with this situation?

 2020-07-16 16:22:21,926 ERROR org.apache.flink.runtime.operators.BatchTask
                 - Error in task code:  CHAIN MapPartition (MapPartition at
CassandraIO.Write/ParDo(Write)/ParMultiDo(Write)) -> FlatMap (FlatMap at
CassandraIO.Write/ParDo(Write)/ParMultiDo(Write).output) (2/2)
org.apache.beam.sdk.util.UserCodeException:
java.util.concurrent.ExecutionException:
com.datastax.driver.core.exceptions.InvalidQueryException: String didn't
validate.
        at
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
        at
org.apache.beam.sdk.io.cassandra.CassandraIO$WriteFn$DoFnInvoker.invokeProcessElement(Unknown
Source)
        at
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
        at
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
        at
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
        at
org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction.mapPartition(FlinkDoFnFunction.java:143)
        at
org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103)
        at
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:504)
        at
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:369)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException:
com.datastax.driver.core.exceptions.InvalidQueryException: String didn't
validate.
        at
com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:526)
        at
com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:487)
        at
com.google.common.util.concurrent.AbstractFuture$TrustedFuture.get(AbstractFuture.java:83)
        at
org.apache.beam.sdk.io.cassandra.CassandraIO$Mutator.waitForFuturesToFinish(CassandraIO.java:1169)
        at
org.apache.beam.sdk.io.cassandra.CassandraIO$Mutator.mutate(CassandraIO.java:1148)
        at
org.apache.beam.sdk.io.cassandra.CassandraIO$WriteFn.processElement(CassandraIO.java:1034)
Caused by: com.datastax.driver.core.exceptions.InvalidQueryException:
String didn't validate.
        at
com.datastax.driver.core.Responses$Error.asException(Responses.java:181)
        at
com.datastax.driver.core.DefaultResultSetFuture.onSet(DefaultResultSetFuture.java:215)
        at
com.datastax.driver.core.RequestHandler.setFinalResult(RequestHandler.java:235)
        at
com.datastax.driver.core.RequestHandler.access$2600(RequestHandler.java:61)
        at
com.datastax.driver.core.RequestHandler$SpeculativeExecution.setFinalResult(RequestHandler.java:1005)
        at
com.datastax.driver.core.RequestHandler$SpeculativeExecution.onSet(RequestHandler.java:808)
        at
com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1262)
        at
com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1180)
        at
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
        at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
        at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
        at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323)
        at
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:297)
        at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at
io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
        at
com.datastax.driver.core.InboundTrafficMeter.channelRead(InboundTrafficMeter.java:38)
        at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434)
        at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965)
        at
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
        at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644)
        at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:579)
        at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:496)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:458)
        at
io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1044)
        at
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)

Best,
Tobi

Reply via email to