求助:通过实时 Pipeline 的手段消费 Hive Table 报java.lang.ArrayIndexOutOfBoundsException: -1,谢谢!

2021-04-01 文章 samuel....@ubtrobot.com
你好:
1. 实时通过读KAFKA,然后将数据写入了hive,建一张hive表,format 是 Parquet,是按天、小时、分钟来分区;

2. 通过实时 Pipeline 的手段消费 Hive Table 报java.lang.ArrayIndexOutOfBoundsException: -1
  在flink sql client下:  
  1)直接select 所有字段,是没有问题,可以正常读出所有数据。
  执行:  select *
 from ubtCatalog.ubtHive.event_all_dwd  
 /*+ OPTIONS('streaming-source.enable'='true', 
'streaming-source.partition.include'='all', 
'streaming-source.monitor-interval'='5s', 
'streaming-source.consume-order'='partition-time','streaming-source.consume-start-offset'='2021-01-01')
 */ 
 ;

2) 在1)基础上加上统计函数,一直报莫名的错,java.lang.ArrayIndexOutOfBoundsException: -1
  执行:  select count(xubtappid) 
 from ubtCatalog.ubtHive.event_all_dwd  
 /*+ OPTIONS('streaming-source.enable'='true', 
'streaming-source.partition.include'='all', 
'streaming-source.monitor-interval'='5s', 
'streaming-source.consume-order'='partition-time','streaming-source.consume-start-offset'='2021-01-01')
 */ 
 ;

具体报错信息如下:
2021-04-02 10:06:26
org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getGlobalFailureHandlingResult(ExecutionFailureHandler.java:89)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleGlobalFailure(DefaultScheduler.java:240)
at 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.lambda$failJob$0(OperatorCoordinatorHolder.java:469)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:404)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:197)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.util.FlinkException: Global failure triggered by 
OperatorCoordinator for 'Source: HiveSource-ubtHive.event_all_dwd' (operator 
bc764cd8ddf7a0cff126f51c16239658).
at 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:466)
at 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:237)
at 
org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:240)
at 
org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.handleUncaughtExceptionFromAsyncCall(SourceCoordinatorContext.java:247)
at 
org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:44)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.connectors.hive.FlinkHiveException: Failed to 
enumerate files
at 
org.apache.flink.connectors.hive.ContinuousHiveSplitEnumerator.handleNewSplits(ContinuousHiveSplitEnumerator.java:148)
at 
org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$null$4(ExecutorNotifier.java:135)
at 
org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:42)
... 3 more
Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
at 
org.apache.flink.connectors.hive.util.HivePartitionUtils.toHiveTablePartition(HivePartitionUtils.java:167)
at 

????: ????????????????????????????????????????????

2020-09-03 文章 samuel....@ubtrobot.com
??

??
??1.11flink sql??,??streaming api 
kafka,eventtime,stream??table,sql,??kafka 
topic,flink webui watermarks No 
Watermark,,kafka 
topic??,watermarks

kafka??



 samuel@ubtrobot.com
?? 2020-09-03 09:23
 user-zh
?? : 
??

??Flink1.11.1??


package com.ubtechinc.dataplatform.flink.etl.exception.monitor;/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import org.apache.commons.collections.map.HashedMap;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import 
org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.parser.Feature;
import com.ubtechinc.dataplatform.flink.util.AES256;
import com.ubtechinc.dataplatform.flink.util.ConstantStr;
import com.ubtechinc.dataplatform.flink.util.MailUtils;
import com.ubtechinc.dataplatform.flink.util.SmsUtil;
import com.ubtechinc.dataplatform.flink.util.YearMonthDayBucketAssigner;

import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import com.mysql.jdbc.Connection;

import java.sql.Timestamp;
import java.text.MessageFormat;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

/**
 * ??
 */
public class ExceptionAlertHour4{

private static final Logger LOG = 
LoggerFactory.getLogger(ExceptionAlertHour4.class);

public static void main(String[] args) throws Exception{
ParameterTool parameterTool = ParameterTool.fromArgs(args);
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(parameterTool);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

Re: Re: 请指教一个关于时间窗的问题,非常感谢!

2020-09-02 文章 samuel....@ubtrobot.com
您好,谢谢回复! 这个窗口在flink1.11.1里是关闭不了,要等下一个时间窗的一条数据来了才会触发。

发件人: taochanglian
发送时间: 2020-09-03 10:35
收件人: user-zh; samuel@ubtrobot.com
主题: Re: 请指教一个关于时间窗的问题,非常感谢!
没有问题的,时间窗口是左闭右开,你的窗口按照org.apache.flink.streaming.api.windowing.windows.TimeWindow按照
 getWindowStartWithOffset方法的定义,应该是17-18的窗口,但是应该不是2020-09-01 18:00:00.0
点出发,因为左闭右开,应该是大于2020-09-01 18:00:00.0
的时间,比如:2020-09-01 18:00:00.001出发。
再加上你的wartermarker5秒,应该是2020-09-01 18:00:05.001 会触发

在 2020/9/2 15:20, samuel@ubtrobot.com 写道:
大牛好:使用flink SQL,希望可以通过tumble window(每小时)来计算,现在遇到问题是到整点时,是没有触发计算的,请帮忙看看!
//指定eventtime字段及生成watermark
DataStream> withTimestampsAndWatermarksDS = 
singleDS.assignTimestampsAndWatermarks(
WatermarkStrategy
.>forBoundedOutOfOrderness(Duration.ofSeconds(5))
//.>forMonotonousTimestamps()
.withIdleness(Duration.ofSeconds(10))   //即时没数据时,也生成watermark
.withTimestampAssigner((event, timestamp)->event.f3));
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
tenv.registerDataStream(
"log",
withTimestampsAndWatermarksDS,
"appid,bugid,eventid,rowtime.rowtime,proctime.proctime");
String sql = "select appid,eventid,cnt," +
"(starttime + interval '8' hour ) as stime," +
"(endtime + interval '8' hour ) as etime  " +  
"from (select appid,eventid,count(*) as cnt," +
"TUMBLE_START(rowtime,INTERVAL '1' HOUR)  as starttime," +
"TUMBLE_END(rowtime,INTERVAL '1' HOUR)  as endtime  " +
"from log  group by appid,eventid,TUMBLE(rowtime,INTERVAL '1' 
HOUR),TIME '00:00:00')";//希望整点结束时触发时间窗关闭
Table table = tenv.sqlQuery(sql);
DataStream dataStream = tenv.toAppendStream(table, Result.class);
输出的结果是:
(400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
(400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
(400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
(400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 
期待的是2020-09-01 18:00:00.0结束时触发关闭窗口,结果是没有的。
(400030024,123123123123,others,1599030999000) //2020/9/2 15:16:39 等到这条数据上来后才触发
ResultHour{appid=400030024,eventid=others,cnt=4, stime=2020-09-01 17:00:00.0, 
etime=2020-09-01 18:00:00.0, SystemTime=1599031415481 //2020/9/2 15:23:35}
请问一下哪里出了问题?万分感谢!



????: ????????????????????????????????????????????

2020-09-02 文章 samuel....@ubtrobot.com
roperties.setProperty("bootstrap.servers", ConstantStr.KAFKA_IP_PORT_TEST);
properties.setProperty("zookeeper.connect", ConstantStr.ZOOKEEPER_IP_PORT_TEST);
properties.setProperty("group.id", 
"etl.exception.monitor.ExceptionAlertHour4-001");
Map offsets = new HashedMap();
offsets.put(new KafkaTopicPartition(ConstantStr.KAFKA_TOPIC_EXCEPTION, 0), 
2800L);
offsets.put(new KafkaTopicPartition(ConstantStr.KAFKA_TOPIC_EXCEPTION, 1), 
2700L);
offsets.put(new KafkaTopicPartition(ConstantStr.KAFKA_TOPIC_EXCEPTION, 2), 
3300L);
ds = env.addSource(new FlinkKafkaConsumer<>(ConstantStr.KAFKA_TOPIC_EXCEPTION, 
new SimpleStringSchema(), properties).setStartFromSpecificOffsets(offsets));
} else if (ConstantStr.ENV_FLAG_PRODUCT.equals(parameterTool.get("envFlag"))) {
hdfs = ConstantStr.HDFS_IP_PORT_PRODUCT;
maxTime = ConstantStr.ROLLINGPOLICY_MAXTIME_PRODUCT;
maxSize = ConstantStr.ROLLINGPOLICY_MAXSIZE_PRODUCT;
inActive = ConstantStr.ROLLINGPOLICY_INACTIVE_PRODUCT;
properties.setProperty("bootstrap.servers", ConstantStr.KAFKA_IP_PORT_PRODUCT);
properties.setProperty("zookeeper.connect", 
ConstantStr.ZOOKEEPER_IP_PORT_PRODUCT);
properties.setProperty("group.id", 
"etl.exception.monitor.ExceptionAlertHour-001");
properties.setProperty("auto.offset.reset", "earliest");
ds = env.addSource(new FlinkKafkaConsumer<>(ConstantStr.KAFKA_TOPIC_EXCEPTION, 
new SimpleStringSchema(), properties));
} else {
System.exit(-1);
}
// transform
SingleOutputStreamOperator> singleDS = 
ds.flatMap(new FlatMapFunction>() {

@Override
public void flatMap(String value, Collector> 
out) {

//System.out.println("Kafka2Hdfs-in:" + value);
String newStr = 
value.replaceAll("*\\r|+r|*\\n|+n|*\\t|+t", "");
//System.out.println("Kafka2Hdfs-newStr:" + newStr);

try {
// JSON
JSONObject record = JSON.parseObject(newStr, Feature.OrderedField);

// 
JSONArray bodyDataArray = record.getJSONArray("body_data");
// ??JSON??
for (int i = 0; i < bodyDataArray.size(); i++) {
// ??JSONi??
JSONObject bodyDataObj = bodyDataArray.getJSONObject(i);

if (bodyDataObj != null) { 
Tuple4 log = Tuple4.of(
record.getString("HW-AppId"),
bodyDataObj.getString("HW-bugId"),
bodyDataObj.getString("HW-bugType"),
Long.valueOf(bodyDataObj.getString("HW-happenedAt"))
);
out.collect(log);
}
}

} catch (Exception e) {
System.out.println(e.getMessage());
}
}

});
singleDS.print();
//eventtime??watermark
DataStream> withTimestampsAndWatermarksDS = 
singleDS.assignTimestampsAndWatermarks(
WatermarkStrategy
.>forBoundedOutOfOrderness(Duration.ofSeconds(5))
//.>forMonotonousTimestamps()
.withIdleness(Duration.ofSeconds(10))
.withTimestampAssigner((event, timestamp)->event.f3));
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
tenv.registerDataStream(
"log",
withTimestampsAndWatermarksDS,
"appid,bugid,eventid,rowtime.rowtime,proctime.proctime");


String sql = "select  appid,eventid,cnt," +
"(starttime + interval '8' hour ) as stime," +
"(endtime + interval '8' hour ) as etime  " +  
"from (select appid,eventid,count(*) as cnt," +
"TUMBLE_START(rowtime,INTERVAL '1' HOUR)  as starttime," +
"TUMBLE_END(rowtime,INTERVAL '1' HOUR)  as endtime  " +
"from log  group by appid,eventid,TUMBLE(rowtime,INTERVAL '1' 
HOUR),TIME '00:00:00')";


Table table = tenv.sqlQuery(sql);
DataStream dataStream = tenv.toAppendStream(table, Result.class);
dataStream.print();

env.execute("etl.exception.monitor.ExceptionAlertHour");
}



public static class Result{
private String appid;
private String eventid;
private long cnt;
private Timestamp stime;
private Timestamp etime;
public String getAppid() {
return appid;
}

public void setAppid(String appid) {
this.appid = appid;
}

public String getEventid() {
return eventid;
}

public void setEventid(String eventid) {
this.eventid = eventid;
}

public long getCnt() {
return cnt;
}

public void setCnt(long cnt) {
this.cnt = cnt;
}


public Timestamp getStime(){
return stime;
}

public void setStime(Timestamp stime){
this.stime = stime;
}

public Timestamp getEtime(){
return etime;
}

public void setEtime(Timestamp etime){
this.etime = etime;
}

@Override
public String toString(){
return "ResultHour{" +
  "appid=" + appid +
  ",eventid=" + eventid +
  ",cnt=" + cnt +
  ", stime=" + stime +
  ", etime=" + etim

??????????????????????????????????????

2020-09-02 文章 samuel....@ubtrobot.com
flink SQL,tumble 
window
//eventtime??watermark
DataStream> withTimestampsAndWatermarksDS = 
singleDS.assignTimestampsAndWatermarks(
WatermarkStrategy
.>forBoundedOutOfOrderness(Duration.ofSeconds(5))
//.>forMonotonousTimestamps()
.withIdleness(Duration.ofSeconds(10))   //watermark
.withTimestampAssigner((event, timestamp)->event.f3));

StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
tenv.registerDataStream(
"log",
withTimestampsAndWatermarksDS,
"appid,bugid,eventid,rowtime.rowtime,proctime.proctime");

String sql = "select appid,eventid,cnt," +
"(starttime + interval '8' hour ) as stime," +
"(endtime + interval '8' hour ) as etime  " +  
"from (select appid,eventid,count(*) as cnt," +
"TUMBLE_START(rowtime,INTERVAL '1' HOUR)  as starttime," +
"TUMBLE_END(rowtime,INTERVAL '1' HOUR)  as endtime  " +
"from log  group by appid,eventid,TUMBLE(rowtime,INTERVAL '1' 
HOUR),TIME '00:00:00')";//

Table table = tenv.sqlQuery(sql);
DataStream dataStream = tenv.toAppendStream(table, Result.class);

??
(400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
(400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
(400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
(400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 
2020-09-01 18:00:00.0??
(400030024,123123123123,others,1599030999000) //2020/9/2 15:16:39 

ResultHour{appid=400030024,eventid=others,cnt=4, stime=2020-09-01 17:00:00.0, 
etime=2020-09-01 18:00:00.0, SystemTime=1599031415481 //2020/9/2 15:23:35}



????????flink??????????????????

2020-08-05 文章 samuel....@ubtrobot.com
flink 
,??


   ??mysql??json
{"times":5}  ---5??
{"temperature": 80} ---80
   
  1)kafka
  
2)flinkkafka??


??
1. 
2.??flink CEP??
3.??


   

 


请问Flink-1.10.1 release可以在哪里下载?(无正文)

2020-04-16 文章 samuel....@ubtrobot.com



求依赖包

2020-04-15 文章 samuel....@ubtrobot.com
大家好,有哪位大神有现成的包,非常感谢!

flink-connector-elasticsearch7_2.11



深圳市优必选科技股份有限公司 | 平台软件部
邱钺 Samuel Qiu
手机/微信: +0086 150 1356 8368
Email: samuel@ubtrobot.com
UBTECH Robotics | www.ubtrobot.com 
广东省深圳市南山区平山路鸿莱科创楼13栋3楼优必选
 
From: samuel@ubtrobot.com
Date: 2020-04-15 17:37
To: user-zh
Subject: flink-sql-connector-elasticsearch7_2.11-1.10.0.jar
在提交job后,发现不成功,这个问题要怎么解决?
版本:Flink1.10.0  elasticsearch:7.6.0

看了源码,确实是没这个类的:

Caused by: java.lang.NoClassDefFoundError: 
org/apache/flink/elasticsearch7/shaded/org/elasticsearch/script/mustache/SearchTemplateRequest
at 
org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7ApiCallBridge.createClient(Elasticsearch7ApiCallBridge.java:76)
at 
org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7ApiCallBridge.createClient(Elasticsearch7ApiCallBridge.java:48)
at 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.open(ElasticsearchSinkBase.java:299)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at 
org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: 
org.apache.flink.elasticsearch7.shaded.org.elasticsearch.script.mustache.SearchTemplateRequest
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 14 more


谢谢!


flink-sql-connector-elasticsearch7_2.11-1.10.0.jar

2020-04-15 文章 samuel....@ubtrobot.com
在提交job后,发现不成功,这个问题要怎么解决?
版本:Flink1.10.0  elasticsearch:7.6.0

看了源码,确实是没这个类的:

Caused by: java.lang.NoClassDefFoundError: 
org/apache/flink/elasticsearch7/shaded/org/elasticsearch/script/mustache/SearchTemplateRequest
at 
org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7ApiCallBridge.createClient(Elasticsearch7ApiCallBridge.java:76)
at 
org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7ApiCallBridge.createClient(Elasticsearch7ApiCallBridge.java:48)
at 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.open(ElasticsearchSinkBase.java:299)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at 
org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: 
org.apache.flink.elasticsearch7.shaded.org.elasticsearch.script.mustache.SearchTemplateRequest
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 14 more


谢谢!