Hi!

Could you share your pom.xml file of your user project? Did you include the
flink-avro dependency? Also did you add the avro format jar to the lib
directory of your Flink distribution?

Jason Politis <jpoli...@carrera.io> 于2022年1月11日周二 08:42写道:

> Good evening all,
>
> I'm working on a project for a client.  We are trying to execute Flink SQL
> using Table API in java.
> We are going to pull their data from oracle -> debezium -> kafka -> flink.
>
>
> Here is a sample of our java code:
>
> package carrera.build;
>
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.TableEnvironment;
>
> public class BuildFlinkJob {
>     public static void main(String[] args) throws Exception {
>         EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
>         TableEnvironment tEnv = TableEnvironment.create(settings);
>
>         tEnv.executeSql(
>                 "CREATE TABLE BUILDS (\n" +
>                 "    `PARTITION` INT METADATA FROM 'partition',\n" +
>                 "    `OFFSET` BIGINT METADATA FROM 'offset',\n" +
>                 "    BUILD_ID DOUBLE,\n" +
>                 "    BUILD_NAME STRING,\n" +
>                 "    FACILITY_NUMBER STRING,\n" +
>                 "    START_DATE TIMESTAMP(2),\n" +
>                 "    END_DATE TIMESTAMP(2),\n" +
>                 "    RETAILERDIVISION_NAME STRING,\n" +
>                 "    UPC STRING,\n" +
>                 "    BUILD_INSTRUCTIONS STRING,\n" +
>                 "    WORK_INSTRUCTIONS STRING,\n" +
>                 "    IMAGE_FILE_PATH STRING\n" +
>                 ") WITH (\n" +
>                 "    'connector' = 'kafka',\n" +
>                 "    'topic' = 
> 'clients-name.OBIANEW_SDS_EBS_12_1_3.BUILDS',\n" +
>                 "    'properties.bootstrap.servers' = 'broker:29092',\n" +
>                 "    'properties.group.id' = 'builds',\n" +
>                 "    'format' = 'debezium-avro-confluent',\n" +
>                 "    'debezium-avro-confluent.url' = 
> 'http://schema-registry:8081',\n" +
>                 "    'scan.startup.mode' = 'earliest-offset'\n" +
>                 ")"
>         );
>
>         tEnv.executeSql(
>                 "CREATE TABLE WC_FK_BUILD_D (\n" +
>                         "    ROW_WID BIGINT,\n" +
>                         "    BUILD_ID DOUBLE,\n" +
>                         "    BUILD_NAME STRING,\n" +
>                         "    FACILITY_NUMBER STRING,\n" +
>                         "    START_DATE TIMESTAMP(0),\n" +
>                         "    END_DATE TIMESTAMP(0),\n" +
>                         "    DIVISION STRING,\n" +
>                         "    UPC STRING,\n" +
>                         "    EFFECTIVE_TO_DT TIMESTAMP(0),\n" +
>                         "    DELETE_FLG STRING,\n" +
>                         "    INTEGRATION_ID STRING,\n" +
>                         "    X_CUSTOM STRING,\n" +
>                         "    PRIMARY KEY (BUILD_ID) NOT ENFORCED\n" +
>                         ") WITH (\n" +
>                         "    'connector' = 'upsert-kafka',\n" +
>                         "    'topic' = 'WC_FK_BUILD_D',\n" +
>                         "    'properties.bootstrap.servers' = 
> 'broker:29092',\n" +
>                         "    'key.format' = 'avro-confluent',\n" +
>                         "    'key.avro-confluent.url' = 
> 'http://schema-registry:8081',\n" +
>                         "    'value.format' = 'avro-confluent',\n" +
>                         "    'value.avro-confluent.url' = 
> 'http://schema-registry:8081'\n" +
>                         ")"
>         );
>
>         Table mapped = tEnv.sqlQuery(
>                         "SELECT \n" +
>                         "    CAST((CAST((`PARTITION` + 1) as STRING) || '0' 
> || CAST(`OFFSET` as STRING)) as BIGINT),\n" +
>                         "    BUILD_ID,\n" +
>                         "    BUILD_NAME,\n" +
>                         "    FACILITY_NUMBER,\n" +
>                         "    START_DATE,\n" +
>                         "    END_DATE,\n" +
>                         "    RETAILERDIVISION_NAME as DIVISION,\n" +
>                         "    UPC,\n" +
>                         "    TIMESTAMP '3714-01-01 00:00:00' as 
> EFFECTIVE_TO_DT,\n" +
>                         "    'N' as DELETE_FLG,\n" +
>                         "    CAST(BUILD_ID as STRING) as INTEGRATION_ID,\n" +
>                         "    '0' as X_CUSTOM\n" +
>                         "FROM BUILDS"
>         );
>
>         mapped.executeInsert("WC_FK_BUILD_D");
>     }
> }
>
> These queries work perfectly fine directly in flink SQL client, but when
> trying to submit our jar as a job, we get this error:
>
> 2022-01-10 19:14:56
> org.apache.flink.runtime.JobException: Recovery is suppressed by
> NoRestartBackoffTimeStrategy
> at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
> at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:228)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:218)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:209)
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:679)
> at
> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
> at
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:444)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316)
> at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> at akka.actor.Actor.aroundReceive(Actor.scala:537)
> at akka.actor.Actor.aroundReceive$(Actor.scala:535)
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
> at akka.actor.ActorCell.invoke(ActorCell.scala:548)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
> at akka.dispatch.Mailbox.run(Mailbox.scala:231)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
> at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> at
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
> at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
> at
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException:
> Cannot load user class:
> org.apache.flink.avro.shaded.org.apache.avro.generic.GenericRecord
> ClassLoader info: URL ClassLoader:
>     file:
> '/tmp/blobStore-32c720eb-f7f8-4d84-8917-4a8e77f51168/job_d6329af81456ed1fa20e6fc2b33cd2d0/blob_p-05a10e6e13eba54f187e8185b129d68cd93e6fff-3a9dfa80abb21f665796e1fd9e7e4f03'
> (valid JAR)
> Class not resolvable through given classloader.
> at
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:338)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:155)
> at
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.<init>(RegularOperatorChain.java:63)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:666)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ClassNotFoundException:
> org.apache.flink.avro.shaded.org.apache.avro.generic.GenericRecord
> at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
> at
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
> at
> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65)
> at
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
> at
> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:172)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
> at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1986)
> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1850)
> at java.io.ObjectInputStream.readClass(ObjectInputStream.java:1813)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1638)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:617)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:602)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:589)
> at
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:543)
> at
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:324)
> ... 9 more
>
> Any help, feedback, or advice would be greatly appreciated.
>
> Thank you
>
>
> Jason Politis
> Solutions Architect, Carrera Group
> carrera.io
> | jpoli...@carrera.io <kpatter...@carrera.io>
> <http://us.linkedin.com/in/jasonpolitis>
>

Reply via email to