Flink Jobs are failing for running testcases when trying to build in Jenkins server

2020-07-16 Thread bujjirahul45
Hi,

I am trying to build flink job in Jenkins server and when its running the
testcases its giving me below i am doing a simple pattern validation, where
i am testing data against a set of patterns its build fine in local with
gradle 6.3 but trying to build in Jenkins server its giving below is stack
trace please suggest me what i am doing wrong

SignalPatternDefinitionMatchingTest >
testCompareInputAndOutputDataForInValidSignal() FAILED
java.lang.Exception: Could not create actor system
at
org.apache.flink.runtime.clusterframework.BootstrapTools.startActorSystem(BootstrapTools.java:278)
at
org.apache.flink.runtime.clusterframework.BootstrapTools.startActorSystem(BootstrapTools.java:164)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils.createRpcService(AkkaRpcServiceUtils.java:126)
at
org.apache.flink.runtime.metrics.util.MetricUtils.startMetricsRpcService(MetricUtils.java:139)
at
org.apache.flink.runtime.minicluster.MiniCluster.start(MiniCluster.java:286)
at
org.apache.flink.client.deployment.executors.LocalExecutor.startMiniCluster(LocalExecutor.java:117)
at
org.apache.flink.client.deployment.executors.LocalExecutor.execute(LocalExecutor.java:63)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1733)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1634)
at
org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
at
com.myorg.pattern.service.TestPatternProcessingService.getInValidSignalDataStreamOutput(TestPatternProcessingService.java:140)
at
com.myorg.pattern.pattern.SignalPatternDefinitionMatchingTest.testCompareInputAndOutputDataForInValidSignal(SignalPatternDefinitionMatchingTest.java:24)
at
org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
at
org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
at
org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
at
org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
at
org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
at
org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
at
org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
at
org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
at
org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
at
org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
at
org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
at
org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
at
org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
at
org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
at
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:212)
at
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:208)
at
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:137)
at
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:71)
at
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
at
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
at
org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
at
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at

How to write junit testcases for KeyedBroadCastProcess Function

2020-07-15 Thread bujjirahul45
Hi,

I am new to flink i am trying write junit test cases to test
KeyedBroadCastProcessFunction. Below is my code ,i am currently calling the
getDataStreamOutput method in TestUtils class and passing inputdata and
patternrules to method once the input data is evaluated against list of
pattern rules and if input data satisfy the condition i will get the signal
and calling sink function and returning output data as string in
getDataStreamOutput method

 @Test
public void testCompareInputAndOutputDataForInputSignal() throws
Exception {
Assertions.assertEquals(sampleInputSignal,
TestUtils.getDataStreamOutput(
inputSignal,
patternRules));
}



public static String getDataStreamOutput(JSONObject input, Map patternRules) throws Exception {

env.setParallelism(1);

DataStream inputSignal = env.fromElements(input);

DataStream> rawPatternStream =
env.fromElements(patternRules);

//Generate a key,value pair of set of patterns where key is
pattern name and value is pattern condition
DataStream>>
patternRuleStream =
rawPatternStream.flatMap(new
FlatMapFunction,
Tuple2>>() {
@Override
public void flatMap(Map
patternRules,
Collector>> out) throws Exception {
for (Map.Entry stringEntry :
patternRules.entrySet()) {
JSONObject jsonObject = new
JSONObject(stringEntry.getValue());
Map map = new HashMap<>();
for (String key : jsonObject.keySet()) {
String value =
jsonObject.get(key).toString();
map.put(key, value);
}
out.collect(new
Tuple2<>(stringEntry.getKey(), map));
}
}
});

BroadcastStream>>
patternRuleBroadcast =
patternStream.broadcast(patternRuleDescriptor);


DataStream> validSignal =
inputSignal.map(new MapFunction>() {
@Override
public Tuple2 map(JSONObject
inputSignal) throws Exception {
String source =
inputSignal.getSource();
return new Tuple2<>(source, inputSignal);
}
}).keyBy(0).connect(patternRuleBroadcast).process(new
MyKeyedBroadCastProcessFunction());


 validSignal.map(new MapFunction,
JSONObject>() {
@Override
public JSONObject map(Tuple2
inputSignal) throws Exception {
return inputSignal.f1;
}
}).addSink(new getDataStreamOutput());

env.execute("TestFlink");
}
return (getDataStreamOutput.dataStreamOutput);
}


@SuppressWarnings("serial")
public static final class getDataStreamOutput implements
SinkFunction {
public static String dataStreamOutput;

public void invoke(JSONObject inputSignal) throws Exception {
dataStreamOutput = inputSignal.toString();
}
}
I need to test different inputs with same broadcast rules but each time i
am calling this function its again and again doing process from beginning
take input signal broadcast data, is there a way i can broadcast once and
keeping on sending the input to the method i explored i can use
CoFlatMapFunction something like below to combine datastream and keep on
sending the input rules while method is running but for this one of the
datastream has to keep on getting data from kafka topic again it will
overburden on method to load kafka utils and server

 DataStream inputSignalFromKafka =
env.addSource(inputSignalKafka);

DataStream inputSignalFromMethod =
env.fromElements(inputSignal));

DataStream inputSignal =
inputSignalFromMethod.connect(inputSignalFromKafka)
.flatMap(new SignalCoFlatMapper());


   public static class SignalCoFlatMapper
implements CoFlatMapFunction {

@Override
public void flatMap1(JSONObject inputValue, Collector
out) throws Exception {
out.collect(inputValue);

}

@Override
public void flatMap2(JSONObject kafkaValue, Collector
out) throws Exception {
out.collect(kafkaValue);

}
}
I found a link in stackoverflow How to unit test BroadcastProcessFunction
in flink when processElement depends on broadcasted data but this is
confused me a lot

Any way i can only broadcast only once in Before method in test cases and
keeping sending different kind of data to my broadcast function


Thanks,
Rahul.