Re: Re: Questions of "State Processing API in Scala"

2020-09-01 Thread Matthias Pohl
Hi Izual,
thanks for contributing and improving the documentation. The PR will be
picked up as part of our regular maintenance work. The communication will
happen through PR conversations as soon as someone picks it up.

Best,
Matthias

On Tue, Sep 1, 2020 at 8:44 AM izual  wrote:

> I tried to fix the small mistake of sample code in State-Processor-API
> doc[1], could someone do a doc review[2] for me, thank you.
>
> 1:
> https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/state_processor_api.html#keyed-state
> 2: https://github.com/apache/flink/pull/13266
>
>
>
> At 2020-01-21 15:54:56, "Tzu-Li (Gordon) Tai"  wrote:
> >Hi Izual,
> >
> >Thanks for reporting this! I'm also forwarding this to the user mailing
> >list, as that is the more suitable place for this question.
> >
> >I think the usability of the State Processor API in Scala is indeed
> >something that hasn’t been looked at closely yet.
> >
> >On Tue, Jan 21, 2020 at 8:12 AM izual  wrote:
> >
> >> Hi community,
> >>
> >> When I use state in Scala, something makes confused, I followed these
> >> steps to generate and read states:
> >>
> >> a. implements the example[1] `CountWindowAverage` in Scala(exactly same),
> >> and run jobA => that makes good.
> >>
> >> b. execute `flink cancel -s ${JobID}` => savepoints was generated as
> >> expected.
> >>
> >> c. implements the example[2] `StatefulFunctionWithTime` in Scala(code
> >> below), and run jobB => failed, exceptions shows that "Caused by:
> >> org.apache.flink.util.StateMigrationException: The new key serializer must
> >> be compatible."
> >>
> >>
> >> ReaderFunction code as below:
> >>
> >> ```
> >>
> >>   class ReaderFunction extends KeyedStateReaderFunction[Long, (Long,
> >> Long)] {
> >>
> >> var countState: ValueState[(Long, Long)] = _
> >>
> >> override def open(parameters: Configuration): Unit = {
> >>
> >>   val stateDescriptor = new ValueStateDescriptor("average",
> >> createTypeInformation[(Long, Long)])
> >>
> >>   countState = getRuntimeContext().getState(stateDescriptor)
> >>
> >> }
> >>
> >> override def readKey(key: Long, ctx: KeyedStateReaderFunction.Context,
> >> out: Collector[(Long, Long)]): Unit = {
> >>
> >>   out.collect(countState.value())
> >>
> >> }
> >>
> >>   }
> >>
> >> ```
> >>
> >> d. then I try to use java.lang.Long instead of Long in key-type, and run
> >> jobB => exception just disappeared and that makes good.
> >>
> >> This makes me confused. Did I miss some features in State-Processing-API,
> >> such as `magic-implicits`?
> >>
> >
> >This part is explainable. The "magic-implicits" actually happen in the
> >DataStream Scala API.
> >Any primitive Scala types will inferred and serialized as their Java
> >counterparts.
> >AFAIK, this would not happen in the State Processor API yet and therefore
> >why you are getting the StateMigrationException.
> >When using Scala types directly with the State Processor API, I would guess
> >that Kryo (as a generic fallback) was being used to access state.
> >This can probably be confirmed by looking at the exception stack trace. Can
> >you post a full copy of that?
> >
> >This should be resolvable by properly supporting Scala for the State
> >Processor API, but it's just that up to this point, we didn't have a plan
> >for that yet.
> >Can you open a JIRA for this? I think it'll be a reasonable extension to
> >the API.
> >
> >
> >>
> >> And when I change `xxx.keyBy(_._1)` to `xxx.keyBy(0)`,same exception comes
> >> again,this time I tried to use Tuple(java.lang.Long) or something else, but
> >> does not work.
> >>
> >
> >I'm not sure what you mean here. Where is this keyBy happening? In the
> >Scala DataStream job, or the State Processor API?
> >
> >
> >>
> >> Hope.
> >>
> >> 1:
> >> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/state.html#using-managed-keyed-state
> >>
> >> 2:
> >> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/state.html#using-managed-keyed-state
> >
> >
> >Cheers,
> >Gordon
>
>
>
>
>


-- 

Matthias Pohl | Engineer

Follow us @VervericaData Ververica 

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton
Wehner


Re:Re: Questions of "State Processing API in Scala"

2020-09-01 Thread izual
I tried to fix the small mistake of sample code in State-Processor-API doc[1], 
could someone do a doc review[2] for me, thank you.


1: 
https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/state_processor_api.html#keyed-state
2: https://github.com/apache/flink/pull/13266





At 2020-01-21 15:54:56, "Tzu-Li (Gordon) Tai"  wrote:
>Hi Izual,
>
>Thanks for reporting this! I'm also forwarding this to the user mailing
>list, as that is the more suitable place for this question.
>
>I think the usability of the State Processor API in Scala is indeed
>something that hasn’t been looked at closely yet.
>
>On Tue, Jan 21, 2020 at 8:12 AM izual  wrote:
>
>> Hi community,
>>
>> When I use state in Scala, something makes confused, I followed these
>> steps to generate and read states:
>>
>> a. implements the example[1] `CountWindowAverage` in Scala(exactly same),
>> and run jobA => that makes good.
>>
>> b. execute `flink cancel -s ${JobID}` => savepoints was generated as
>> expected.
>>
>> c. implements the example[2] `StatefulFunctionWithTime` in Scala(code
>> below), and run jobB => failed, exceptions shows that "Caused by:
>> org.apache.flink.util.StateMigrationException: The new key serializer must
>> be compatible."
>>
>>
>> ReaderFunction code as below:
>>
>> ```
>>
>>   class ReaderFunction extends KeyedStateReaderFunction[Long, (Long,
>> Long)] {
>>
>> var countState: ValueState[(Long, Long)] = _
>>
>> override def open(parameters: Configuration): Unit = {
>>
>>   val stateDescriptor = new ValueStateDescriptor("average",
>> createTypeInformation[(Long, Long)])
>>
>>   countState = getRuntimeContext().getState(stateDescriptor)
>>
>> }
>>
>> override def readKey(key: Long, ctx: KeyedStateReaderFunction.Context,
>> out: Collector[(Long, Long)]): Unit = {
>>
>>   out.collect(countState.value())
>>
>> }
>>
>>   }
>>
>> ```
>>
>> d. then I try to use java.lang.Long instead of Long in key-type, and run
>> jobB => exception just disappeared and that makes good.
>>
>> This makes me confused. Did I miss some features in State-Processing-API,
>> such as `magic-implicits`?
>>
>
>This part is explainable. The "magic-implicits" actually happen in the
>DataStream Scala API.
>Any primitive Scala types will inferred and serialized as their Java
>counterparts.
>AFAIK, this would not happen in the State Processor API yet and therefore
>why you are getting the StateMigrationException.
>When using Scala types directly with the State Processor API, I would guess
>that Kryo (as a generic fallback) was being used to access state.
>This can probably be confirmed by looking at the exception stack trace. Can
>you post a full copy of that?
>
>This should be resolvable by properly supporting Scala for the State
>Processor API, but it's just that up to this point, we didn't have a plan
>for that yet.
>Can you open a JIRA for this? I think it'll be a reasonable extension to
>the API.
>
>
>>
>> And when I change `xxx.keyBy(_._1)` to `xxx.keyBy(0)`,same exception comes
>> again,this time I tried to use Tuple(java.lang.Long) or something else, but
>> does not work.
>>
>
>I'm not sure what you mean here. Where is this keyBy happening? In the
>Scala DataStream job, or the State Processor API?
>
>
>>
>> Hope.
>>
>> 1:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/state.html#using-managed-keyed-state
>>
>> 2:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/state.html#using-managed-keyed-state
>
>
>Cheers,
>Gordon


Re: Questions of "State Processing API in Scala"

2020-01-21 Thread Izual
Sry for wrong post.

> This can probably be confirmed by looking at the exception stack trace.
> Can you post a full copy of that?
I missed the history jobs, but I think u r right.
When I debug the program to find reason, came into these code snippet.

```
  TypeSerializerSchemaCompatibility result =
previousSerializerSnapshot.resolveSchemaCompatibility(registeredSerializer);
  if (result.isIncompatible()) {
invalidateCurrentSchemaSerializerAccess();
  }
```

I remember one is
`org.apache.flink.api.common.typeutils.base.LongSerializer$LongSerializerSnapshot`,
another is just `Kryo`.

> Can you open a JIRA for this? I think it'll be a reasonable extension to
> the API.
I'll do that, 3q.

> I'm not sure what you mean here. Where is this keyBy happening? In the
> Scala DataStream job, or the State Processor API?
In the Scala DataStream job, same with the examples of link-1 in the origial
post。
I change keyBy(_._1) to keyBy(0), then the program will throw an exception.

The full copy from job Exceptions:

```
java.io.IOException: Failed to restore state backend
at
org.apache.flink.state.api.input.KeyedStateInputFormat.getStreamOperatorStateContext(KeyedStateInputFormat.java:231)
at
org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:177)
at
org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:79)
at
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Exception while creating
StreamOperatorStateContext.
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
at
org.apache.flink.state.api.input.KeyedStateInputFormat.getStreamOperatorStateContext(KeyedStateInputFormat.java:223)
... 6 more
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
state backend for
8f89af64b0cf95ff20b8dda264c66f81_8f89af64b0cf95ff20b8dda264c66f81_(1/1) from
any of the 1 provided restore options.
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
... 7 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed
when trying to restore heap backend
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116)
at
org.apache.flink.runtime.state.memory.MemoryStateBackend.createKeyedStateBackend(MemoryStateBackend.java:347)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
... 9 more
Caused by: org.apache.flink.util.StateMigrationException: The new key
serializer must be compatible.
at
org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:142)
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114)
... 13 more
```

Maybe it's explainable with 「inferred and serialized as their Java
counterparts」, not sure, I am a triple beginner with Java & Scala & Flink.
Thanks a lot.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Questions of "State Processing API in Scala"

2020-01-20 Thread Tzu-Li (Gordon) Tai
Hi Izual,

Thanks for reporting this! I'm also forwarding this to the user mailing
list, as that is the more suitable place for this question.

I think the usability of the State Processor API in Scala is indeed
something that hasn’t been looked at closely yet.

On Tue, Jan 21, 2020 at 8:12 AM izual  wrote:

> Hi community,
>
> When I use state in Scala, something makes confused, I followed these
> steps to generate and read states:
>
> a. implements the example[1] `CountWindowAverage` in Scala(exactly same),
> and run jobA => that makes good.
>
> b. execute `flink cancel -s ${JobID}` => savepoints was generated as
> expected.
>
> c. implements the example[2] `StatefulFunctionWithTime` in Scala(code
> below), and run jobB => failed, exceptions shows that "Caused by:
> org.apache.flink.util.StateMigrationException: The new key serializer must
> be compatible."
>
>
> ReaderFunction code as below:
>
> ```
>
>   class ReaderFunction extends KeyedStateReaderFunction[Long, (Long,
> Long)] {
>
> var countState: ValueState[(Long, Long)] = _
>
> override def open(parameters: Configuration): Unit = {
>
>   val stateDescriptor = new ValueStateDescriptor("average",
> createTypeInformation[(Long, Long)])
>
>   countState = getRuntimeContext().getState(stateDescriptor)
>
> }
>
> override def readKey(key: Long, ctx: KeyedStateReaderFunction.Context,
> out: Collector[(Long, Long)]): Unit = {
>
>   out.collect(countState.value())
>
> }
>
>   }
>
> ```
>
> d. then I try to use java.lang.Long instead of Long in key-type, and run
> jobB => exception just disappeared and that makes good.
>
> This makes me confused. Did I miss some features in State-Processing-API,
> such as `magic-implicits`?
>

This part is explainable. The "magic-implicits" actually happen in the
DataStream Scala API.
Any primitive Scala types will inferred and serialized as their Java
counterparts.
AFAIK, this would not happen in the State Processor API yet and therefore
why you are getting the StateMigrationException.
When using Scala types directly with the State Processor API, I would guess
that Kryo (as a generic fallback) was being used to access state.
This can probably be confirmed by looking at the exception stack trace. Can
you post a full copy of that?

This should be resolvable by properly supporting Scala for the State
Processor API, but it's just that up to this point, we didn't have a plan
for that yet.
Can you open a JIRA for this? I think it'll be a reasonable extension to
the API.


>
> And when I change `xxx.keyBy(_._1)` to `xxx.keyBy(0)`,same exception comes
> again,this time I tried to use Tuple(java.lang.Long) or something else, but
> does not work.
>

I'm not sure what you mean here. Where is this keyBy happening? In the
Scala DataStream job, or the State Processor API?


>
> Hope.
>
> 1:
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/state.html#using-managed-keyed-state
>
> 2:
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/state.html#using-managed-keyed-state


Cheers,
Gordon