Hi all!

I'm trying to use the new DataStream API V2, but I have noticed that the Java 
heap usage keeps increasing infinitely. Upon analyzing a heap dump, I found 
that a HashSet named 'keySet' is consuming 97% of the heap.

Here is the relevant field in the source code I traced it to: 
https://github.com/apache/flink/blob/b8f8d6e602897b7f275f1db36478f5e8f1d6604e/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/BaseKeyedTwoInputNonBroadcastProcessOperator.java#L43
I tried to investigate the source code a little bit but could not find any 
logic for removing elements from the Set.

I have attached my implementation of the 
TwoInputNonBroadcastEventTimeStreamProcessFunction which apprears to trigger 
this behavior.

For context, I'm using the Docker image flink:2.0.0-scala_2.12-java11 with the 
RocksDB state backend.

Any advice or insights would be greatly appreciated!
package io.github.anttikaikkonen;

import java.util.Set;
import org.apache.flink.api.common.state.StateDeclaration;
import org.apache.flink.api.common.state.StateDeclarations;
import org.apache.flink.api.common.state.ValueStateDeclaration;
import org.apache.flink.api.common.state.v2.StateFuture;
import org.apache.flink.api.common.state.v2.ValueState;
import org.apache.flink.api.common.typeinfo.TypeDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.datastream.api.common.Collector;
import org.apache.flink.datastream.api.context.PartitionedContext;
import org.apache.flink.datastream.api.extension.eventtime.function.TwoInputNonBroadcastEventTimeStreamProcessFunction;
import org.apache.flink.datastream.api.extension.eventtime.timer.EventTimeManager;
import io.github.anttikaikkonen.rpcclient.models.TransactionInput;

public class InputAttacher implements
        TwoInputNonBroadcastEventTimeStreamProcessFunction<OutputWithTxid, Tuple2<TransactionInput, Long>, String> {

    private final static ValueStateDeclaration<TransactionInput> INPUT_STATE_DECLARATION =
            StateDeclarations.valueState("transaction-input",
                    new TypeDescriptor<TransactionInput>() {
                        @Override
                        public Class<TransactionInput> getTypeClass() {
                            return TransactionInput.class;
                        }
                    });

    private final static ValueStateDeclaration<OutputWithTxid> OUTPUT_STATE_DECLARATION =
            StateDeclarations.valueState("transaction-output",
                    new TypeDescriptor<OutputWithTxid>() {
                        @Override
                        public Class<OutputWithTxid> getTypeClass() {
                            return OutputWithTxid.class;
                        }
                    });



    private EventTimeManager eventTimeManager;

    @Override
    public Set<StateDeclaration> usesStates() {
        return Set.of(INPUT_STATE_DECLARATION, OUTPUT_STATE_DECLARATION);
    }

    @Override
    public void initEventTimeProcessFunction(EventTimeManager eventTimeManager) {
        this.eventTimeManager = eventTimeManager;
    }

    @Override
    public void processRecordFromFirstInput(OutputWithTxid record, Collector<String> output,
            PartitionedContext<String> ctx) throws Exception {
        ValueState<OutputWithTxid> outputState =
                ctx.getStateManager().getState(OUTPUT_STATE_DECLARATION);
        outputState.asyncUpdate(record);
    }

    @Override
    public void processRecordFromSecondInput(Tuple2<TransactionInput, Long> record,
            Collector<String> output, PartitionedContext<String> ctx) throws Exception {
        ValueState<TransactionInput> inputState =
                ctx.getStateManager().getState(INPUT_STATE_DECLARATION);
        inputState.asyncUpdate(record.f0).thenAccept((a) -> {
            eventTimeManager.registerTimer(record.f1 + 1);
        });

    }

    @Override
    public void onEventTimer(long timestamp, Collector<String> output,
            PartitionedContext<String> ctx) throws Exception {
        ValueState<TransactionInput> inputState =
                ctx.getStateManager().getState(INPUT_STATE_DECLARATION);
        ValueState<OutputWithTxid> outputState =
                ctx.getStateManager().getState(OUTPUT_STATE_DECLARATION);
        StateFuture<OutputWithTxid> outputFuture = outputState.asyncValue();
        StateFuture<TransactionInput> inputFuture = inputState.asyncValue();
        outputFuture.thenAccept(o -> {
            inputFuture.thenAccept(i -> {
                output.collect(o.getTxid() + " - " + o.getOutput().getN() + " spentBy "
                        + i.getTxid() + " - " + i.getVout());
                outputState.asyncClear();
                inputState.asyncClear();
            });
        });
    }

}

Reply via email to