Hi all!
I'm trying to use the new DataStream API V2, but I have noticed that the Java
heap usage keeps increasing infinitely. Upon analyzing a heap dump, I found
that a HashSet named 'keySet' is consuming 97% of the heap.
Here is the relevant field in the source code I traced it to:
https://github.com/apache/flink/blob/b8f8d6e602897b7f275f1db36478f5e8f1d6604e/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/BaseKeyedTwoInputNonBroadcastProcessOperator.java#L43
I tried to investigate the source code a little bit but could not find any
logic for removing elements from the Set.
I have attached my implementation of the
TwoInputNonBroadcastEventTimeStreamProcessFunction which apprears to trigger
this behavior.
For context, I'm using the Docker image flink:2.0.0-scala_2.12-java11 with the
RocksDB state backend.
Any advice or insights would be greatly appreciated!
package io.github.anttikaikkonen;
import java.util.Set;
import org.apache.flink.api.common.state.StateDeclaration;
import org.apache.flink.api.common.state.StateDeclarations;
import org.apache.flink.api.common.state.ValueStateDeclaration;
import org.apache.flink.api.common.state.v2.StateFuture;
import org.apache.flink.api.common.state.v2.ValueState;
import org.apache.flink.api.common.typeinfo.TypeDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.datastream.api.common.Collector;
import org.apache.flink.datastream.api.context.PartitionedContext;
import org.apache.flink.datastream.api.extension.eventtime.function.TwoInputNonBroadcastEventTimeStreamProcessFunction;
import org.apache.flink.datastream.api.extension.eventtime.timer.EventTimeManager;
import io.github.anttikaikkonen.rpcclient.models.TransactionInput;
public class InputAttacher implements
TwoInputNonBroadcastEventTimeStreamProcessFunction<OutputWithTxid, Tuple2<TransactionInput, Long>, String> {
private final static ValueStateDeclaration<TransactionInput> INPUT_STATE_DECLARATION =
StateDeclarations.valueState("transaction-input",
new TypeDescriptor<TransactionInput>() {
@Override
public Class<TransactionInput> getTypeClass() {
return TransactionInput.class;
}
});
private final static ValueStateDeclaration<OutputWithTxid> OUTPUT_STATE_DECLARATION =
StateDeclarations.valueState("transaction-output",
new TypeDescriptor<OutputWithTxid>() {
@Override
public Class<OutputWithTxid> getTypeClass() {
return OutputWithTxid.class;
}
});
private EventTimeManager eventTimeManager;
@Override
public Set<StateDeclaration> usesStates() {
return Set.of(INPUT_STATE_DECLARATION, OUTPUT_STATE_DECLARATION);
}
@Override
public void initEventTimeProcessFunction(EventTimeManager eventTimeManager) {
this.eventTimeManager = eventTimeManager;
}
@Override
public void processRecordFromFirstInput(OutputWithTxid record, Collector<String> output,
PartitionedContext<String> ctx) throws Exception {
ValueState<OutputWithTxid> outputState =
ctx.getStateManager().getState(OUTPUT_STATE_DECLARATION);
outputState.asyncUpdate(record);
}
@Override
public void processRecordFromSecondInput(Tuple2<TransactionInput, Long> record,
Collector<String> output, PartitionedContext<String> ctx) throws Exception {
ValueState<TransactionInput> inputState =
ctx.getStateManager().getState(INPUT_STATE_DECLARATION);
inputState.asyncUpdate(record.f0).thenAccept((a) -> {
eventTimeManager.registerTimer(record.f1 + 1);
});
}
@Override
public void onEventTimer(long timestamp, Collector<String> output,
PartitionedContext<String> ctx) throws Exception {
ValueState<TransactionInput> inputState =
ctx.getStateManager().getState(INPUT_STATE_DECLARATION);
ValueState<OutputWithTxid> outputState =
ctx.getStateManager().getState(OUTPUT_STATE_DECLARATION);
StateFuture<OutputWithTxid> outputFuture = outputState.asyncValue();
StateFuture<TransactionInput> inputFuture = inputState.asyncValue();
outputFuture.thenAccept(o -> {
inputFuture.thenAccept(i -> {
output.collect(o.getTxid() + " - " + o.getOutput().getN() + " spentBy "
+ i.getTxid() + " - " + i.getVout());
outputState.asyncClear();
inputState.asyncClear();
});
});
}
}