Hi,

There is no lock-step of releasing a new StateFun release when a new Flink
release goes out. StateFun and Flink have individual releasing schemes and
schedules.

Usually, for new major StateFun version releases, we will upgrade its Flink
dependency to the latest available version.
We are currently targeting mid February for the next major StateFun
release, which by then the Flink dependency will be upgraded to 1.12.x.
In the meantime, if you'd like to work against Flink 1.12.x with StateFun,
you might have to resort to building the artifacts yourself.

Cheers,
Gordon

On Tue, Jan 12, 2021 at 3:57 PM Stephan Pelikan <stephan.peli...@phactum.at>
wrote:

> I found the reason: There is a class incompatibility because I changed from
>
>     Statefun 2.2.1 + Flink 1.11.1
>
> to
>
>     Statefun 2.2.1 + Flink 1.12.0
>
>
>
> But even the newest version of Statefun 2.2.2 refers to Flink 1.11.3.
>
>
>
> Is there a possibility to use the newest version of Flink in combination
> with the newest version of Statefun? I’m wondering why there is no Statefun
> version matching the current stable version of Flink?
>
>
>
> Stephan
>
>
>
>
>
> *Von:* Stephan Pelikan <stephan.peli...@phactum.at>
> *Gesendet:* Montag, 11. Jänner 2021 19:37
> *An:* user@flink.apache.org
> *Betreff:* Statefun with RabbitMQ consumes message but does not run
> statefun
>
>
>
> Hi,
>
>
>
> I try to use RabbitMQ as a Source. My source consumes messages of the
> queue but the statefun is not execution – not even created.
>
>
>
> This is my main function:
>
>
>
> 1 public static void main(String[] args) throws Exception {
>
> 2
>
> 3     final var env = StreamExecutionEnvironment.getExecutionEnvironment();
>
> 4
>
> 5     env.registerTypeWithKryoSerializer(Any.class,
> ProtobufSerializer.class);
>
> 6
>
> 7     env.enableCheckpointing(10000, CheckpointingMode.EXACTLY_ONCE);
>
> 8     env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);
>
> 9     env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
>
> 10
> env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>
> 11
>
> 12     final var statefunConfig =
> StatefulFunctionsConfig.fromEnvironment(env);
>
> 13     statefunConfig.setFlinkJobName("test");
>
> 14
> statefunConfig.setFactoryType(MessageFactoryType.WITH_KRYO_PAYLOADS);
>
> 15
>
> 16     final var connectionConfig = new RMQConnectionConfig.Builder()
>
> 17             .setHost("localhost")
>
> 18             .setUserName("guest")
>
> 19             .setPassword("guest")
>
> 20             .setPort(5672)
>
> 21             .setVirtualHost("test")
>
> 22             .setPrefetchCount(5000)
>
> 23             .build();
>
> 24
>
> 25     final var deserializationSchema = new
> TypeInformationSerializationSchema<>(
>
> 26             new ProtobufTypeInformation<>(Any.class), env.getConfig());
>
> 27     final var rmqSource = new RMQSource<>(connectionConfig,
> TEST_INGRESS, true, deserializationSchema);
>
> 28
>
> 29     final var source = env
>
> 30             .addSource(rmqSource, TEST_INGRESS)
>
> 31             .setParallelism(1)
>
> 32             .map(msg -> {
>
> 33                 return RoutableMessageBuilder
>
> 34                     .builder()
>
> 35                     .withTargetAddress(MyStatefun.TYPE, Utils.getUUID())
>
> 36                     .withMessageBody(msg)
>
> 37                     .build();
>
> 38             });
>
> 39
>
> 40     StatefulFunctionDataStreamBuilder
>
> 41             .builder("test")
>
> 42             .withDataStreamAsIngress(source)
>
> 43             .withFunctionProvider(MyStatefun.TYPE, unused -> {
>
> 44                 return new MyStatefun();
>
> 45             })
>
> 46             .withEgressId(MyStatefun.EGRESS)
>
> 47             .withConfiguration(statefunConfig)
>
> 48             .build(env)
>
> 49             .getDataStreamForEgressId(MyStatefun.EGRESS)
>
> 50             .addSink(new PrintSinkFunction<>(true));
>
> 51
>
> 52     env.execute();
>
> 53
>
> 54 }
>
>
>
> A breakpoint in line 33 shows me the messages consumed. A breakpoint in
> line 44 is never called. The message is reportingly consumed but never
> acknowledged or processed. Before using RabbitMQ I used a custom
> SourceFunction to fake input data and it worked well.
>
>
>
> To setup things I use a local environment but logging does not show up any
> errors. Before my current problem I had another error during message
> deserialization and it wasn’t reported either. Unfortunately I didn’t
> manage to get the exception in the log/stdout. I had to use the debugger to
> find the reason of the former problem. In this situation now the debugger
> shows no thrown or caught exceptions. That’s way I stuck.
>
>
>
> Of course I would like to know what’s the problem with my code. But I
> guess it is not obviously. Maybe some can give me a hint how to turn on
> exception logging which might help to get closer to the origin of the
> phenomenon.
>
>
>
> Thanks in advance,
>
> Stephan
>
>
>

Reply via email to