Hello,
I am trying to load a table that is about 200 GiB in size in BigQuery to
Cassandra via a batch job in Beam 2.22.0 on Flink 1.10.1 - the job runs but
fails at random points in time throwing different errors each time - and
not always at the same points in the data (which comes in pretty clean from
BigQuery as far as I can see).
I can see here that there seems to be an invalid query string (log level
is debug) - do I have a chance to set a retry strategy for the CassandraIO
or is there another way to deal with this situation?
2020-07-16 16:22:21,926 ERROR org.apache.flink.runtime.operators.BatchTask
- Error in task code: CHAIN MapPartition (MapPartition at
CassandraIO.Write/ParDo(Write)/ParMultiDo(Write)) -> FlatMap (FlatMap at
CassandraIO.Write/ParDo(Write)/ParMultiDo(Write).output) (2/2)
org.apache.beam.sdk.util.UserCodeException:
java.util.concurrent.ExecutionException:
com.datastax.driver.core.exceptions.InvalidQueryException: String didn't
validate.
at
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
at
org.apache.beam.sdk.io.cassandra.CassandraIO$WriteFn$DoFnInvoker.invokeProcessElement(Unknown
Source)
at
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
at
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
at
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
at
org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction.mapPartition(FlinkDoFnFunction.java:143)
at
org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103)
at
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:504)
at
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:369)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException:
com.datastax.driver.core.exceptions.InvalidQueryException: String didn't
validate.
at
com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:526)
at
com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:487)
at
com.google.common.util.concurrent.AbstractFuture$TrustedFuture.get(AbstractFuture.java:83)
at
org.apache.beam.sdk.io.cassandra.CassandraIO$Mutator.waitForFuturesToFinish(CassandraIO.java:1169)
at
org.apache.beam.sdk.io.cassandra.CassandraIO$Mutator.mutate(CassandraIO.java:1148)
at
org.apache.beam.sdk.io.cassandra.CassandraIO$WriteFn.processElement(CassandraIO.java:1034)
Caused by: com.datastax.driver.core.exceptions.InvalidQueryException:
String didn't validate.
at
com.datastax.driver.core.Responses$Error.asException(Responses.java:181)
at
com.datastax.driver.core.DefaultResultSetFuture.onSet(DefaultResultSetFuture.java:215)
at
com.datastax.driver.core.RequestHandler.setFinalResult(RequestHandler.java:235)
at
com.datastax.driver.core.RequestHandler.access$2600(RequestHandler.java:61)
at
com.datastax.driver.core.RequestHandler$SpeculativeExecution.setFinalResult(RequestHandler.java:1005)
at
com.datastax.driver.core.RequestHandler$SpeculativeExecution.onSet(RequestHandler.java:808)
at
com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1262)
at
com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1180)
at
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323)
at
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:297)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at
io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
at
com.datastax.driver.core.InboundTrafficMeter.channelRead(InboundTrafficMeter.java:38)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965)
at
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:579)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:496)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:458)
at
io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1044)
at
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
Best,
Tobi