Good evening all,

I'm working on a project for a client.  We are trying to execute Flink SQL
using Table API in java.
We are going to pull their data from oracle -> debezium -> kafka -> flink.


Here is a sample of our java code:

package carrera.build;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;

public class BuildFlinkJob {
    public static void main(String[] args) throws Exception {
        EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
        TableEnvironment tEnv = TableEnvironment.create(settings);

        tEnv.executeSql(
                "CREATE TABLE BUILDS (\n" +
                "    `PARTITION` INT METADATA FROM 'partition',\n" +
                "    `OFFSET` BIGINT METADATA FROM 'offset',\n" +
                "    BUILD_ID DOUBLE,\n" +
                "    BUILD_NAME STRING,\n" +
                "    FACILITY_NUMBER STRING,\n" +
                "    START_DATE TIMESTAMP(2),\n" +
                "    END_DATE TIMESTAMP(2),\n" +
                "    RETAILERDIVISION_NAME STRING,\n" +
                "    UPC STRING,\n" +
                "    BUILD_INSTRUCTIONS STRING,\n" +
                "    WORK_INSTRUCTIONS STRING,\n" +
                "    IMAGE_FILE_PATH STRING\n" +
                ") WITH (\n" +
                "    'connector' = 'kafka',\n" +
                "    'topic' =
'clients-name.OBIANEW_SDS_EBS_12_1_3.BUILDS',\n" +
                "    'properties.bootstrap.servers' = 'broker:29092',\n" +
                "    'properties.group.id' = 'builds',\n" +
                "    'format' = 'debezium-avro-confluent',\n" +
                "    'debezium-avro-confluent.url' =
'http://schema-registry:8081',\n" +
                "    'scan.startup.mode' = 'earliest-offset'\n" +
                ")"
        );

        tEnv.executeSql(
                "CREATE TABLE WC_FK_BUILD_D (\n" +
                        "    ROW_WID BIGINT,\n" +
                        "    BUILD_ID DOUBLE,\n" +
                        "    BUILD_NAME STRING,\n" +
                        "    FACILITY_NUMBER STRING,\n" +
                        "    START_DATE TIMESTAMP(0),\n" +
                        "    END_DATE TIMESTAMP(0),\n" +
                        "    DIVISION STRING,\n" +
                        "    UPC STRING,\n" +
                        "    EFFECTIVE_TO_DT TIMESTAMP(0),\n" +
                        "    DELETE_FLG STRING,\n" +
                        "    INTEGRATION_ID STRING,\n" +
                        "    X_CUSTOM STRING,\n" +
                        "    PRIMARY KEY (BUILD_ID) NOT ENFORCED\n" +
                        ") WITH (\n" +
                        "    'connector' = 'upsert-kafka',\n" +
                        "    'topic' = 'WC_FK_BUILD_D',\n" +
                        "    'properties.bootstrap.servers' =
'broker:29092',\n" +
                        "    'key.format' = 'avro-confluent',\n" +
                        "    'key.avro-confluent.url' =
'http://schema-registry:8081',\n" +
                        "    'value.format' = 'avro-confluent',\n" +
                        "    'value.avro-confluent.url' =
'http://schema-registry:8081'\n" +
                        ")"
        );

        Table mapped = tEnv.sqlQuery(
                        "SELECT \n" +
                        "    CAST((CAST((`PARTITION` + 1) as STRING)
|| '0' || CAST(`OFFSET` as STRING)) as BIGINT),\n" +
                        "    BUILD_ID,\n" +
                        "    BUILD_NAME,\n" +
                        "    FACILITY_NUMBER,\n" +
                        "    START_DATE,\n" +
                        "    END_DATE,\n" +
                        "    RETAILERDIVISION_NAME as DIVISION,\n" +
                        "    UPC,\n" +
                        "    TIMESTAMP '3714-01-01 00:00:00' as
EFFECTIVE_TO_DT,\n" +
                        "    'N' as DELETE_FLG,\n" +
                        "    CAST(BUILD_ID as STRING) as INTEGRATION_ID,\n" +
                        "    '0' as X_CUSTOM\n" +
                        "FROM BUILDS"
        );

        mapped.executeInsert("WC_FK_BUILD_D");
    }
}

These queries work perfectly fine directly in flink SQL client, but when
trying to submit our jar as a job, we get this error:

2022-01-10 19:14:56
org.apache.flink.runtime.JobException: Recovery is suppressed by
NoRestartBackoffTimeStrategy
at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:228)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:218)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:209)
at
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:679)
at
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
at
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:444)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316)
at
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
at akka.actor.ActorCell.invoke(ActorCell.scala:548)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException:
Cannot load user class:
org.apache.flink.avro.shaded.org.apache.avro.generic.GenericRecord
ClassLoader info: URL ClassLoader:
    file:
'/tmp/blobStore-32c720eb-f7f8-4d84-8917-4a8e77f51168/job_d6329af81456ed1fa20e6fc2b33cd2d0/blob_p-05a10e6e13eba54f187e8185b129d68cd93e6fff-3a9dfa80abb21f665796e1fd9e7e4f03'
(valid JAR)
Class not resolvable through given classloader.
at
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:338)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:155)
at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.<init>(RegularOperatorChain.java:63)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:666)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException:
org.apache.flink.avro.shaded.org.apache.avro.generic.GenericRecord
at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
at
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65)
at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:172)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at
org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1986)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1850)
at java.io.ObjectInputStream.readClass(ObjectInputStream.java:1813)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1638)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:617)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:602)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:589)
at
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:543)
at
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:324)
... 9 more

Any help, feedback, or advice would be greatly appreciated.

Thank you


Jason Politis
Solutions Architect, Carrera Group
carrera.io
| jpoli...@carrera.io <kpatter...@carrera.io>
<http://us.linkedin.com/in/jasonpolitis>

Reply via email to