Hi Edward,

could you try adding the static keyword to ExecQueue and RingBufferExec? As
is they hold a reference to the MyKeyedProcessFunction, which has
unforeseen consequences.

On Sun, Oct 11, 2020 at 5:38 AM Colletta, Edward <edward.colle...@fmr.com>
wrote:

> Tried to attach tar file but it got blocked.   Resending with files
> attached individually.
>
>
>
> Ok, have minimal reproducible example.   Attaching a tar file of the job
> that crashed.
>
> The crash has nothing to do with the number of state variables.  But it
> does seem to be caused by using a type for the state variable that is a
> class nested in the KeyedProcessFunction.
>
> Reduced to a single state variable.  The type of the state variable was a
> class (ExecQueue) defined in class implementing KeyedProcessFunction.
> Moving the ExecQueue definition to its own file fixed the problem.
>
>
>
> The attached example always crashes  the taskManager in 30 seconds to 5
> minutes.
>
>
>
> MyKeyedProcessFunction.java  and also cut and pasted here:
>
>
>
> package crash;
>
>
>
> import org.slf4j.Logger;
>
> import org.slf4j.LoggerFactory;
>
>
>
> import org.apache.flink.api.common.state.ValueStateDescriptor;
>
> import org.apache.flink.api.common.typeinfo.TypeHint;
>
> import org.apache.flink.api.common.typeinfo.TypeInformation;
>
> import org.apache.flink.api.common.state.ValueState;
>
> import org.apache.flink.configuration.Configuration;
>
> import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
>
> import
> org.apache.flink.streaming.api.functions.KeyedProcessFunction.Context;
>
> import
> org.apache.flink.streaming.api.functions.KeyedProcessFunction.OnTimerContext;
>
> import org.apache.flink.util.Collector;
>
>
>
> public class MyKeyedProcessFunction extends KeyedProcessFunction<String,
> Exec, Exec> {
>
>     private static final Logger LOG =
> LoggerFactory.getLogger(MyKeyedProcessFunction.class);
>
>     public TypeInformation<ExecQueue> leftTypeInfo;
>
>     public transient ValueState<ExecQueue> leftState;
>
>
>
>     public int initQueueSize;
>
>     public long emitFrequencyMs;
>
>
>
>     public MyKeyedProcessFunction() {
>
>         initQueueSize = 10;
>
>         emitFrequencyMs = 1;
>
>     }
>
>
>
>     @Override
>
>     public void open(Configuration conf) {
>
>         leftTypeInfo = TypeInformation.of(new TypeHint<ExecQueue>(){});
>
>         leftState = getRuntimeContext().getState(
>
>                     new ValueStateDescriptor<>("left", leftTypeInfo,
> null));
>
>     }
>
>
>
>     @Override
>
>     public void processElement(Exec leftIn, Context ctx, Collector<Exec>
> out) {
>
>         try {
>
>             ExecQueue eq = leftState.value();
>
>             if (eq == null) {
>
>                 eq = new ExecQueue(10);
>
>
> ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime()
> + emitFrequencyMs);
>
>             }
>
>             leftState.update(eq);
>
>         }
>
>         catch (Exception e) {
>
>             LOG.error("Exception in processElement1. Key: " +
> ctx.getCurrentKey() + ". " + e + ". trace = " );
>
>             for (java.lang.StackTraceElement s:e.getStackTrace())
>
>                 LOG.error(s.toString());
>
>
>
>         }
>
>     }
>
>
>
>
>
>     @Override
>
>     public void onTimer(long timestamp, OnTimerContext ctx,
> Collector<Exec> out) {
>
>         try {
>
>             ExecQueue eq = leftState.value();
>
>
> ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime()
> + emitFrequencyMs);
>
>         }
>
>         catch ( Exception e) {
>
>             LOG.error("Exception in onTimer. Key: " + ctx.getCurrentKey()
> + ". " + e + ". trace = " );
>
>             for (java.lang.StackTraceElement s:e.getStackTrace())
>
>                 LOG.error(s.toString());
>
>         }
>
>     }
>
>     public class ExecQueue {
>
>         public RingBufferExec queue;
>
>         public ExecQueue (){}
>
>         public ExecQueue (int initSize) {
>
>             queue = new RingBufferExec(initSize);
>
>         }
>
>
>
>         public class RingBufferExec {
>
>             public Integer size;
>
>             public Integer count;
>
>             public RingBufferExec(){ }
>
>             public RingBufferExec(int sizeIn){
>
>                 size = sizeIn;
>
>                 count = 0;
>
>             }
>
>         }
>
>     }
>
> }
>
>
>
>
>
> *From:* Dawid Wysakowicz <dwysakow...@apache.org>
> *Sent:* Thursday, October 8, 2020 6:26 AM
> *To:* Colletta, Edward <edward.colle...@fmr.com>; user@flink.apache.org
> *Subject:* Re: state access causing segmentation fault
>
>
>
> Hi,
>
> It should be absolutely fine to use multiple state objects. I am not aware
> of any limits to that. A minimal, reproducible example would definitely be
> helpful. For those kind of exceptions, I'd look into the serializers you
> use. Other than that I cannot think of an obvious reason for that kind of
> exceptions.
>
> Best,
>
> Dawid
>
> On 08/10/2020 12:12, Colletta, Edward wrote:
>
> Using Flink 1.9.2, Java, FsStateBackend.  Running Session cluster on EC2
> instances.
>
>
>
> I have a KeyedProcessFunction that is causing a segmentation fault,
> crashing the flink task manager.  The seems to be caused by using 3 State
> variables in the operator.  The crash happens consistently after some load
> is processed.
>
> This is the second time I have encountered this.   The first time I had 3
> ValueState variables, this time I had 2 ValueState variables and a MapState
> variable.  Both times the error was alleviated by removing one of the state
> variables.
>
> This time I replaced the 2 valueState variables with a Tuple2 of the types
> of the individual variables.   I can try to put together a minimal example,
> but I was wondering if anyone has encountered this problem.
>
>
>
> Are there any documented limits of the number of state variables 1
> operator can use?
>
>
>
> For background the reason I use multiple state variables is the operator
> is processing 2 types of inputs, Left and Right.  When Left is received it
> is put it into a PriorityQueue. When the Right type is received I put that
> into a ring buffer.
>
> I replaced the PriorityQueue with a queue of Ids and MapState to hold the
> elements.  So I have Left stored in a queue ValueState variable and
> MapState variable, and Right is stored in the ring buffer ValueState
> variable.
>
>
>
>
>
>

-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Reply via email to