[jira] [Created] (FLINK-25103) KeyedBroadcastProcessFunction run set 6, parallelism ValueState variables A, could you tell me how to store in the six tasks A

2021-11-29 Thread wangbaohua (Jira)
wangbaohua created FLINK-25103:
--

 Summary: KeyedBroadcastProcessFunction run set 6, parallelism 
ValueState variables A, could you tell me how to store in the six tasks A  
 Key: FLINK-25103
 URL: https://issues.apache.org/jira/browse/FLINK-25103
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Task
Affects Versions: 1.14.0
Reporter: wangbaohua


KeyedBroadcastProcessFunction run set 6, parallelism ValueState variables A, 
excuse me how A stored in the six tasks.  When I was running, I observed that 
some tasks fetched variable A was null, while others had values  .The following 
code  :

setParallelism(9);
..
public class dealStreamProcessFunction extends 
KeyedBroadcastProcessFunction, 
StandardEvent> {
private static final Logger logger = 
LoggerFactory.getLogger(dealStreamProcessFunction.class);

private transient ValueState> listState;
private transient ValueState runingFlagState;
private transient ValueState engineState;
MapStateDescriptor> ruleStateDescriptor = new 
MapStateDescriptor<>(ContextInfo.RULE_SBROAD_CAST_STATE
, BasicTypeInfo.STRING_TYPE_INFO
, new ListTypeInfo<>(String.class));
InferenceEngine engine;

/**
 * open方法只会执行一次
 * 可以在这实现初始化的功能
 *
 * @param parameters
 * @throws Exception
 */
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
ValueStateDescriptor> recentOperatorsDescriptor = 
new ValueStateDescriptor>(
"recent-operator",
TypeInformation.of(new TypeHint>() {
}));

ValueStateDescriptor runingFlagDescriptor = new 
ValueStateDescriptor(
"runingFlag",
Boolean.class);

ValueStateDescriptor engineDescriptor = new 
ValueStateDescriptor(
"runingFlag1",
InferenceEngine.class);
engineState = getRuntimeContext().getState(engineDescriptor);
listState = getRuntimeContext().getState(recentOperatorsDescriptor);
runingFlagState = getRuntimeContext().getState(runingFlagDescriptor);

logger.info("KeyedBroadcastProcessFunction open");
}

@Override
public void processElement(StandardEvent standardEvent, ReadOnlyContext 
readOnlyContext, Collector collector) throws Exception {
if(standardEvent == null){
return;
}
List list = null;
list = 
readOnlyContext.getBroadcastState(ruleStateDescriptor).get(ContextInfo.RULE_SBROAD_CAST_STATE);
if (list == null) {
logger.info("RulesBroadcastState is null..");
List lst = listState.value();
if (lst == null) {
lst = new ArrayList<>();
}
lst.add(standardEvent);
listState.update(lst);
return;
}
//第一次进来
if (runingFlagState.value() == null) {
logger.info("runingFlagState.value() == null");
runingFlagState.update(true);
}
if (((runingFlagState.value() && list.get(0).equals("1")) || 
list.get(0).equals("0"))) {
logger.info("action update.:" + list.size() + ":" + 
runingFlagState.value() + ":" + list.get(0));
String flag = list.get(0);
list.remove(0);
InferenceEngine engine1 = 
InferenceEngine.compile(RuleReader.parseRules(list));
engineState.update(engine1);
if (runingFlagState.value() && flag.equals("1")) {
runingFlagState.update(false);
}
}

if (engineState.value() != null) {
List listTmp = listState.value();
if (listTmp != null) {
for (StandardEvent standardEventTmp : listTmp) {
logger.info("listState.:" + standardEventTmp);
match(standardEventTmp, collector);
}
listState.clear();
}
match(standardEvent, collector);
} else {
logger.info("processElement engine is null.:");

}

}

private void match(StandardEvent standardEvent, Collector 
collector) throws IOException {
PatternMatcher matcher = engineState.value().matcher(standardEvent);
if (matcher.find()) {
List actions = matcher.getActions();
for (Action action : actions) {
if (standardEvent != null) {
if(collector != null)
collector.collect(standardEvent);
else
logger.info("collector is null:" );
}
}
} else {
logger.info("no matcher:" + standardEvent);
}
}

@Override
public void processBroadcastElement(List strings, Context context, 

[jira] [Created] (FLINK-25028) java.lang.OutOfMemoryError: Java heap space

2021-11-23 Thread wangbaohua (Jira)
wangbaohua created FLINK-25028:
--

 Summary: java.lang.OutOfMemoryError: Java heap space
 Key: FLINK-25028
 URL: https://issues.apache.org/jira/browse/FLINK-25028
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Task
Affects Versions: 1.14.0
Reporter: wangbaohua
 Attachments: error.txt

java.lang.OutOfMemoryError: Java heap space
at java.util.HashMap.resize(HashMap.java:703) ~[?:1.8.0_131]
at java.util.HashMap.putVal(HashMap.java:628) ~[?:1.8.0_131]
at java.util.HashMap.put(HashMap.java:611) ~[?:1.8.0_131]
at java.util.HashSet.add(HashSet.java:219) ~[?:1.8.0_131]
at 
java.io.ObjectStreamClass$FieldReflector.(ObjectStreamClass.java:1945) 
~[?:1.8.0_131]
at java.io.ObjectStreamClass.getReflector(ObjectStreamClass.java:2193) 
~[?:1.8.0_131]
at java.io.ObjectStreamClass.(ObjectStreamClass.java:521) 
~[?:1.8.0_131]
at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:369) 
~[?:1.8.0_131]
at java.io.ObjectStreamClass.(ObjectStreamClass.java:468) 
~[?:1.8.0_131]
at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:369) 
~[?:1.8.0_131]
at 
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1134) 
~[?:1.8.0_131]
at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) 
~[?:1.8.0_131]
at 
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) 
~[?:1.8.0_131]
at java.io.ObjectOutputStream.access$300(ObjectOutputStream.java:162) 
~[?:1.8.0_131]
at 
java.io.ObjectOutputStream$PutFieldImpl.writeFields(ObjectOutputStream.java:1707)
 ~[?:1.8.0_131]
at java.io.ObjectOutputStream.writeFields(ObjectOutputStream.java:482) 
~[?:1.8.0_131]
at 
java.util.concurrent.ConcurrentHashMap.writeObject(ConcurrentHashMap.java:1406) 
~[?:1.8.0_131]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
~[?:1.8.0_131]
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
~[?:1.8.0_131]
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_131]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_131]
at 
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) 
~[?:1.8.0_131]
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) 
~[?:1.8.0_131]
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
~[?:1.8.0_131]
at 
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
~[?:1.8.0_131]
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) 
~[?:1.8.0_131]
at 
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:632)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
at 
org.apache.flink.util.SerializedValue.(SerializedValue.java:62) 
~[flink-dist_2.11-1.14.0.jar:1.14.0]
at 
org.apache.flink.runtime.accumulators.AccumulatorSnapshot.(AccumulatorSnapshot.java:51)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
at 
org.apache.flink.runtime.accumulators.AccumulatorRegistry.getSnapshot(AccumulatorRegistry.java:54)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor$JobManagerHeartbeatListener.lambda$retrievePayload$3(TaskExecutor.java:2425)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor$JobManagerHeartbeatListener$$Lambda$1020/78782846.apply(Unknown
 Source) ~[?:?]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-24952) Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before

2021-11-18 Thread wangbaohua (Jira)
wangbaohua created FLINK-24952:
--

 Summary: Rowtime attributes must not be in the input rows of a 
regular join. As a workaround you can cast the time attributes of input tables 
to TIMESTAMP before
 Key: FLINK-24952
 URL: https://issues.apache.org/jira/browse/FLINK-24952
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Affects Versions: 1.13.1
 Environment: public void test() throws Exception {

final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(5000);  //检查点 每5000ms
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

Properties browseProperties = new Properties();
browseProperties.put("bootstrap.servers", "192.168.1.25:9093");
browseProperties.put("group.id", "temporal");
browseProperties.put("auto.offset.reset", "latest");

PropTransformMap.getInstance().readConfigMap("./conf/cfg.properties");
Map configMap = new HashMap();
configMap.put(Constants.DB_JDBC_USER, "root");
configMap.put(Constants.DB_JDBC_PASSWD, "1qazXSW@3edc");
configMap.put(Constants.DB_JDBC_URL, 
"jdbc:mysql://192.168.1.25:3306/SSA?useUnicode=true=utf-8");
configMap.put(Constants.DB_JDBC_DRIVER, 
"com.mysql.jdbc.Driver");
configMap.put(Constants.INITAL_POOL_SIZE, "10");
configMap.put(Constants.MIN_POOL_SIZE, "5");
configMap.put(Constants.MAX_IDLE_TIME, "50");
configMap.put(Constants.MAX_STATE_ELEMENTS, "100");
configMap.put(Constants.MAX_IDLE_TIME, "60");
DbFetcher dbFetcher = new DbFetcher(configMap);
List listRule = RuleReader.readRules(dbFetcher);
System.out.println("ListRule::" + listRule.size());

final String RULE_SBROAD_CAST_STATE = "RulesBroadcastState";

RuleParse ruleParse = new RuleParse();
Map properties = new HashMap();
ruleParse.parseData("./conf/cfg.json");

//1、读取mysql的配置消息
DataStream> conf = env.addSource(new 
MysqlSourceFunction1(dbFetcher));

//2、创建MapStateDescriptor规则,对广播的数据的数据类型的规则
MapStateDescriptor> ruleStateDescriptor = 
new MapStateDescriptor<>(RULE_SBROAD_CAST_STATE
, BasicTypeInfo.STRING_TYPE_INFO
, new ListTypeInfo<>(String.class));
//3、对conf进行broadcast返回BroadcastStream
final BroadcastStream> confBroadcast = 
conf.broadcast(ruleStateDescriptor);

//DataStream dataStream = 
env.fromElements("{\"ORG_ID\":\"1\",\"RAW_MSG\":\"useradd,su - 
root\",\"EVENT_THREE_TYPE\":\"40001\",\"EVENT_TWO_TYPE\":\"40001\",\"SRC_PORT\":\"123\",\"DST_PORT\":\"124\",\"DST_IP\":\"10.16.254.11\",\"SRC_IP\":\"50.115.134.50\",\"CREATE_TIME\":\"2021-07-09
 
18:15:21.001\",\"DEVICE_PARENT_TYPE\":\"LINUX\",\"SNOW_ID\":\"85512\",\"EVENT_THREE_TYPE_DESC\":\"暴力破解失败\",\"ts\":\"2021-05-27
 
16:06:58\",\"ACCOUNT\":\"asap\",\"collectionName\":\"bwdOMS\",\"eRuleId\":\"0\",\"RULE_TJ_COUNT\":11,\"TAGS\":{\"EVENT_ONE_TYPE\":\"2\",\"DIRECTION\":\"内部\",\"EVENT_TWO_TYPE\":\"10015\",\"EVENT_THREE_TYPE\":\"20101\"},\"DEVICE_TYPE\":\"OSM\",\"DIRECTION\":\"0\"}\n");
DataStream dataStream = 
env.fromElements("{\"DIRECTION\":\"0\",\"ATTACK_STAGE\":\"命令控制\",\"DEVICE_PARENT_TYPE\":\"IPS\",\"URL\":\"www.baidu.com\",\"SRC_PORT\":\"58513\",\"DST_PORT\":\"31177\",\"RISK_LEVEL\":\"99\",\"SRC_ASSET_TYPE\":\"4\",\"SRC_ASSET_SUB_TYPE\":\"412\",\"DST_ASSET_TYPE\":\"4\",\"DST_ASSET_SUB_TYPE\":\"412\",\"SRC_POST\":\"1\",\"DST_POST\":\"0\",\"INSERT_TIME\":\"2021-05-01
 
00:00:00.000\",\"DST_ASSET_NAME\":\"ddde\",\"SRC_ASSET_NAME\":\"wangwu\",\"SCENE_ID\":-5216633060008277343,\"SOURCE\":\"4\",\"ASSET_IP\":\"73.243.143.114\",\"TENANT_ID\":\"-1\",\"ORG_ID\":\"1\",\"DST_IP\":\"192.118.8.218\",\"EVENT_TYPE\":\"1008\",\"SRC_IP\":\"153.79.42.45\",\"CUSTOM_VALUE1\":\"187.36.226.184\",\"CUSTOM_VALUE2\":\"41.68.25.104\",\"SRC_PROVINCE\":\"日本\",\"SNOW_ID\":\"469260998\",\"DEVICE_TYPE\":\"TDA\",\"MESSAGE\":\"\",\"CHARACTER\":\"\",\"CUSTOM_LABEL1\":\"控制IP\",\"CREATE_TIME\":\"2021-04-26
 
17:04:18.000\",\"CUSTOM_LABEL2\":\"受控IP\",\"TYPE\":\"未知\",\"MALWARE_TYPE\":\"其他\",\"collectTime\":\"2021-04-29
 
19:40:36.000\",\"RULE_ID\":\"180607832\",\"DEVICE_IP\":\"239.150.69.203\",\"SRC_CITY\":\"日本\",\"recordTime\":\"2021-04-30
 

[jira] [Created] (FLINK-24885) ProcessElement Interface parameter Collector : java.lang.NullPointerException

2021-11-12 Thread wangbaohua (Jira)
wangbaohua created FLINK-24885:
--

 Summary: ProcessElement Interface parameter Collector  : 
java.lang.NullPointerException
 Key: FLINK-24885
 URL: https://issues.apache.org/jira/browse/FLINK-24885
 Project: Flink
  Issue Type: Bug
  Components: API / DataStream
Affects Versions: 1.13.1
Reporter: wangbaohua


2021-11-12 17:11:44,024 WARN  org.apache.flink.runtime.taskmanager.Task 
   [] - Co-Process-Broadcast-Keyed -> Map -> 
DataSteamToTable(stream=default_catalog.default_database.Unregistered_DataStream_Source_8,
 type=*com.asap.demo.model.BeanField<`account` STRING, `accountId` STRING, 
`accountIn` STRING, `accountInName` STRING, `accountInOrgId` STRING, 
`accountInOrgName` STRING, `accountInType` STRING, `accountName` STRING, 
`accountOrgId` STRING, `accountOrgName` STRING, `accountOut` STRING, 
`accountOutName` STRING, `accountOutOrgId` STRING, `accountOutOrgName` STRING, 
`accountOutType` STRING, `accountStatus` STRING, `accountType` STRING, `action` 
STRING, `actionDesc` STRING, `alarmcontext` STRING, `alarmgrade` STRING, 
`alarmtype` STRING, `alertId` STRING, `alertInfo` STRING, `alertLevel` STRING, 
`alertSignatureIdL` STRING, `appId` STRING, `appName` STRING, `appProtocol` 
STRING, `appType` STRING, `areaId` STRING, `areaName` STRING, `areaType` 
STRING, `assetFrom` STRING, `assetId` STRING, `assetInfo` STRING, `assetIp` 
STRING, `assetLevel` STRING, `assetName` STRING, `assetPid` STRING, `assetType` 
STRING, `assetUse` STRING, `assetVendor` STRING, `attackStage` STRING, 
`attackStageCode` STRING, `attackType` STRING, `attackTypeName` STRING, 
`authSerNum` STRING, `authTime` STRING, `authType` STRING, `bankSeqNum` STRING, 
`batchNo` STRING, `blackDomain` STRING, `blackDomainDesc` STRING, 
`blackDomainTag` STRING, `blackDstIp` STRING, `blackFile` STRING, 
`blackFileDesc` STRING, `blackFileTag` STRING, `blackId` STRING, `blackIpTag` 
STRING, `blackSrcIp` STRING, `blackTag` STRING, `blackTagMatchCount` STRING, 
`blackTagMatchDesc` STRING, `blackUrl` STRING, `blackUrlDesc` STRING, 
`blackUrlTag` STRING, `blackVulnCve` STRING, `blackVulnDesc` STRING, 
`blackVulnName` STRING, `blackVulnTag` STRING, `branchId` STRING, `branchName` 
STRING, `businessSystemName` STRING, `businessType` STRING, `cardId` STRING, 
`cascadeSourceIp` STRING, `cascadeSourceName` STRING, `cebUid` STRING, 
`certNum` STRING, `certType` STRING, `chainId` STRING, `channel` STRING, 
`channelId` STRING, `character` STRING, `charge` STRING, `cifSeqNum` STRING, 
`clientInfo` STRING, `clientIp` STRING, `clientMac` STRING, `clientName` 
STRING, `clientPort` STRING, `collectTime` TIMESTAMP_LTZ(9), `collectTimeL` 
TIMESTAMP_LTZ(9), `command` STRING, `commandLine` STRING, `commandResult` 
STRING, `company` STRING, `companyCustomId` STRING, `companyId` STRING, 
`completenessTag` STRING, `confidence` STRING, `confidenceLevel` STRING, 
`consignedUser` STRING, `contractNo` STRING, `count` STRING, `couponAmount` 
STRING, `couponId` STRING, `createTime` TIMESTAMP_LTZ(3), `createTimeL` BIGINT, 
`createdBy` STRING, `curType` STRING, `currency` STRING, `currentBal` STRING, 
`customLabel1` STRING, `customLabel10` STRING, `customLabel2` STRING, 
`customLabel3` STRING, `customLabel4` STRING, `customLabel5` STRING, 
`customLabel6` STRING, `customLabel7` STRING, `customLabel8` STRING, 
`customLabel9` STRING, `customValue1` STRING, `customValue10` STRING, 
`customValue2` STRING, `customValue3` STRING, `customValue4` STRING, 
`customValue5` STRING, `customValue6` STRING, `customValue7` STRING, 
`customValue8` STRING, `customValue9` STRING, `dataQualityTag` STRING, 
`dataType` STRING, `dataTypeName` STRING, `dbInstance` STRING, `dbName` STRING, 
`dbTable` STRING, `dbVersion` STRING, `dealSuggest` STRING, `defVManagerId` 
STRING, `department` STRING, `deviceCategory` STRING, `deviceId` STRING, 
`deviceIp` STRING, `deviceMac` STRING, `deviceName` STRING, `deviceParentType` 
STRING, `deviceType` STRING, `deviceVersion` STRING, `direction` STRING, 
`directionDesc` STRING, `directionOfAttackTag` STRING, `domain` STRING, 
`dstAdminAccount` STRING, `dstAdminEmail` STRING, `dstAdminFOrgId` STRING, 
`dstAdminId` STRING, `dstAdminMobile` STRING, `dstAdminName` STRING, 
`dstAdminOrgId` STRING, `dstAdminOrgName` STRING, `dstAdminType` STRING, 
`dstAsset` STRING, `dstAssetId` STRING, `dstAssetInfo` STRING, `dstAssetKey` 
STRING, `dstAssetLevel` STRING, `dstAssetModel` STRING, `dstAssetName` STRING, 
`dstAssetPid` STRING, `dstAssetStatus` STRING, `dstAssetSubType` STRING, 
`dstAssetType` STRING, `dstAssetVendor` STRING, `dstBizId` STRING, `dstCity` 
STRING, `dstCompany` STRING, `dstCountry` STRING, `dstDbInstance` STRING, 
`dstDomainName` STRING, `dstFGroupId` STRING, `dstGroupId` STRING, 
`dstGroupName` STRING, `dstHostName` STRING, `dstIndustry` STRING, 
`dstIntelDesc` STRING, `dstIntelId` STRING, `dstIntelType` STRING, 

[jira] [Created] (FLINK-24870) Cannot cast "java.util.Date" to "java.time.Instant"

2021-11-10 Thread wangbaohua (Jira)
wangbaohua created FLINK-24870:
--

 Summary: Cannot cast "java.util.Date" to "java.time.Instant"
 Key: FLINK-24870
 URL: https://issues.apache.org/jira/browse/FLINK-24870
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.13.1
Reporter: wangbaohua


        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:582)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:562)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkRuntimeException: 
org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
compiled. This is a bug. Please file an issue.
        at 
org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:76)
        at 
org.apache.flink.table.data.conversion.StructuredObjectConverter.open(StructuredObjectConverter.java:80)
        ... 11 more
Caused by: 
org.apache.flink.shaded.guava18.com.google.common.util.concurrent.UncheckedExecutionException:
 org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
compiled. This is a bug. Please file an issue.
        at 
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203)
        at 
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
        at 
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)
        at 
org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:74)
        ... 12 more
Caused by: org.apache.flink.api.common.InvalidProgramException: Table program 
cannot be compiled. This is a bug. Please file an issue.
        at 
org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:89)
        at 
org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$1(CompileUtils.java:74)
        at 
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742)
        at 
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527)
        at 
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319)
        at 
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282)
        at 
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197)
        ... 15 more
Caused by: org.codehaus.commons.compiler.CompileException: Line 120, Column 
101: Cannot cast "java.util.Date" to "java.time.Instant"
        at 
org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12211)
        at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5051)
        at org.codehaus.janino.UnitCompiler.access$8600(UnitCompiler.java:215)
        at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4418)
        at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4396)
        at org.codehaus.janino.Java$Cast.accept(Java.java:4898)
        at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4396)
        at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5057)
        at org.codehaus.janino.UnitCompiler.access$8100(UnitCompiler.java:215)
        at 
org.codehaus.janino.UnitCompiler$16$1.visitParenthesizedExpression(UnitCompiler.java:4409)
        at 
org.codehaus.janino.UnitCompiler$16$1.visitParenthesizedExpression(UnitCompiler.java:4400)
        at 
org.codehaus.janino.Java$ParenthesizedExpression.accept(Java.java:4924)
        at 
org.codehaus.janino.UnitCompiler$16.visitLvalue(UnitCompiler.java:4400)
        at 
org.codehaus.janino.UnitCompiler$16.visitLvalue(UnitCompiler.java:4396)
        at org.codehaus.janino.Java$Lvalue.accept(Java.java:4148)
        at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4396)
        at 
org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5662)
        at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5182)
        at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)
        at 
org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4423)
        at