Re: The RMClient's and YarnResourceManagers internal state about the number of pending container requests has diverged

2019-10-25 Thread Till Rohrmann
Great, thanks a lot Regina. I'll check the logs tomorrow. If info level is
not enough, then I'll let you know.

Cheers,
Till

On Fri, Oct 25, 2019, 21:20 Chan, Regina  wrote:

> Till, I added you to this lockbox area where you should be able to
> download the logs. You should have also received an email with an account
> created in lockbox where you can set a password. Let me know if you have
> any issues.
>
>
>
>
>
>
>
> *From:* Till Rohrmann 
> *Sent:* Friday, October 25, 2019 1:24 PM
> *To:* Chan, Regina [Engineering] 
> *Cc:* Yang Wang ; user 
> *Subject:* Re: The RMClient's and YarnResourceManagers internal state
> about the number of pending container requests has diverged
>
>
>
> Could you provide me with the full logs of the cluster
> entrypoint/JobManager. I'd like to see what's going on there.
>
>
>
> Cheers,
>
> Till
>
>
>
> On Fri, Oct 25, 2019, 19:10 Chan, Regina  wrote:
>
> Till,
>
>
>
> We’re still seeing a large number of returned containers even with this
> heart beat set to something higher. Do you have hints as to what’s going
> on? It seems to be bursty in nature. The bursty requests cause the job to
> fail with the cluster not having enough resources because it’s in the
> process of releasing them.
>
> “org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Could not allocate enough slots to run the job. Please make sure that the
> cluster has enough resources.” It causes the job to run very
> inconsistently.
>
>
>
> Since legacy mode is now gone in 1.9, we don’t really see many options
> here.
>
>
>
> *Run Profile*
>
> *Number of returned excess containers*
>
> 12G per TM, 2 slots
> yarn.heartbeat.container-request-interval=500
>
> 685
>
> 12G per TM, 2 slots
> yarn.heartbeat.container-request-interval=5000
>
> 552
>
> 12G per TM, 2 slots
> yarn.heartbeat.container-request-interval=1
>
> 331
>
> 10G per TM, 1 slots
> yarn.heartbeat.container-request-interval=6
>
> 478
>
>
>
> 2019-10-25 09:55:51,452 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Deploying
> CHAIN DataSource (synonym | Read Staging From File System | AVRO) -> Map
> (Map at readAvroFileWithFilter(FlinkReadUtils.java:78)) -> Map (Key
> Extractor) (14/90) (attempt #0) to
> container_e22_1571837093169_78279_01_000852 @ d50503-004-e22.dc.gs.com
> (dataPort=33579)
>
> 2019-10-25 09:55:51,513 INFO
> org.apache.flink.yarn.YarnResourceManager - Received
> new container: container_e22_1571837093169_78279_01_000909 - Remaining
> pending container requests: 0
>
> 2019-10-25 09:55:51,513 INFO
> org.apache.flink.yarn.YarnResourceManager - Returning
> excess container container_e22_1571837093169_78279_01_000909.
>
> 2019-10-25 09:55:51,513 INFO
> org.apache.flink.yarn.YarnResourceManager - Received
> new container: container_e22_1571837093169_78279_01_000910 - Remaining
> pending container requests: 0
>
> 2019-10-25 09:55:51,513 INFO
> org.apache.flink.yarn.YarnResourceManager - Returning
> excess container container_e22_1571837093169_78279_01_000910.
>
> 2019-10-25 09:55:51,513 INFO
> org.apache.flink.yarn.YarnResourceManager - Received
> new container: container_e22_1571837093169_78279_01_000911 - Remaining
> pending container requests: 0
>
> 2019-10-25 09:55:51,513 INFO
> org.apache.flink.yarn.YarnResourceManager - Returning
> excess container container_e22_1571837093169_78279_01_000911.
>
> 2019-10-25 09:55:51,513 INFO
> org.apache.flink.yarn.YarnResourceManager - Received
> new container: container_e22_1571837093169_78279_01_000912 - Remaining
> pending container requests: 0
>
> 2019-10-25 09:55:51,513 INFO
> org.apache.flink.yarn.YarnResourceManager - Returning
> excess container container_e22_1571837093169_78279_01_000912.
>
> 2019-10-25 09:55:51,513 INFO
> org.apache.flink.yarn.YarnResourceManager - Received
> new container: container_e22_1571837093169_78279_01_000913 - Remaining
> pending container requests: 0
>
> 2019-10-25 09:55:51,513 INFO
> org.apache.flink.yarn.YarnResourceManager - Returning
> excess container container_e22_1571837093169_78279_01_000913.
>
> 2019-10-25 09:55:51,513 INFO
> org.apache.flink.yarn.YarnResourceManager - Received
> new container: container_e22_1571837093169_78279_01_000914 - Remaining
> pending container requests: 0
>
> 2019-10-25 09:55:51,513 INFO
> org.apache.flink.yarn.YarnResourceManager - Returning
> excess container container_e22_1571837093169_78279_01_000914.
>
> 2019-10-25 09:55:51,513 INFO
> org.apache.flink.yarn.YarnResourceManager - Received
> new container: container_e22_1571837093169_78279_01_000915 - Remaining
> pending container requests: 0
>
> 2019-10-25 09:55:51,513 INFO
> org.apache.flink.yarn.YarnResourceManager - Returning
> excess container 

Re: The RMClient's and YarnResourceManagers internal state about the number of pending container requests has diverged

2019-10-25 Thread Till Rohrmann
Could you provide me with the full logs of the cluster
entrypoint/JobManager. I'd like to see what's going on there.

Cheers,
Till

On Fri, Oct 25, 2019, 19:10 Chan, Regina  wrote:

> Till,
>
>
>
> We’re still seeing a large number of returned containers even with this
> heart beat set to something higher. Do you have hints as to what’s going
> on? It seems to be bursty in nature. The bursty requests cause the job to
> fail with the cluster not having enough resources because it’s in the
> process of releasing them.
>
> “org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Could not allocate enough slots to run the job. Please make sure that the
> cluster has enough resources.” It causes the job to run very
> inconsistently.
>
>
>
> Since legacy mode is now gone in 1.9, we don’t really see many options
> here.
>
>
>
> *Run Profile*
>
> *Number of returned excess containers*
>
> 12G per TM, 2 slots
> yarn.heartbeat.container-request-interval=500
>
> 685
>
> 12G per TM, 2 slots
> yarn.heartbeat.container-request-interval=5000
>
> 552
>
> 12G per TM, 2 slots
> yarn.heartbeat.container-request-interval=1
>
> 331
>
> 10G per TM, 1 slots
> yarn.heartbeat.container-request-interval=6
>
> 478
>
>
>
> 2019-10-25 09:55:51,452 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Deploying
> CHAIN DataSource (synonym | Read Staging From File System | AVRO) -> Map
> (Map at readAvroFileWithFilter(FlinkReadUtils.java:78)) -> Map (Key
> Extractor) (14/90) (attempt #0) to
> container_e22_1571837093169_78279_01_000852 @ d50503-004-e22.dc.gs.com
> (dataPort=33579)
>
> 2019-10-25 09:55:51,513 INFO
> org.apache.flink.yarn.YarnResourceManager - Received
> new container: container_e22_1571837093169_78279_01_000909 - Remaining
> pending container requests: 0
>
> 2019-10-25 09:55:51,513 INFO
> org.apache.flink.yarn.YarnResourceManager - Returning
> excess container container_e22_1571837093169_78279_01_000909.
>
> 2019-10-25 09:55:51,513 INFO
> org.apache.flink.yarn.YarnResourceManager - Received
> new container: container_e22_1571837093169_78279_01_000910 - Remaining
> pending container requests: 0
>
> 2019-10-25 09:55:51,513 INFO
> org.apache.flink.yarn.YarnResourceManager - Returning
> excess container container_e22_1571837093169_78279_01_000910.
>
> 2019-10-25 09:55:51,513 INFO
> org.apache.flink.yarn.YarnResourceManager - Received
> new container: container_e22_1571837093169_78279_01_000911 - Remaining
> pending container requests: 0
>
> 2019-10-25 09:55:51,513 INFO
> org.apache.flink.yarn.YarnResourceManager - Returning
> excess container container_e22_1571837093169_78279_01_000911.
>
> 2019-10-25 09:55:51,513 INFO
> org.apache.flink.yarn.YarnResourceManager - Received
> new container: container_e22_1571837093169_78279_01_000912 - Remaining
> pending container requests: 0
>
> 2019-10-25 09:55:51,513 INFO
> org.apache.flink.yarn.YarnResourceManager - Returning
> excess container container_e22_1571837093169_78279_01_000912.
>
> 2019-10-25 09:55:51,513 INFO
> org.apache.flink.yarn.YarnResourceManager - Received
> new container: container_e22_1571837093169_78279_01_000913 - Remaining
> pending container requests: 0
>
> 2019-10-25 09:55:51,513 INFO
> org.apache.flink.yarn.YarnResourceManager - Returning
> excess container container_e22_1571837093169_78279_01_000913.
>
> 2019-10-25 09:55:51,513 INFO
> org.apache.flink.yarn.YarnResourceManager - Received
> new container: container_e22_1571837093169_78279_01_000914 - Remaining
> pending container requests: 0
>
> 2019-10-25 09:55:51,513 INFO
> org.apache.flink.yarn.YarnResourceManager - Returning
> excess container container_e22_1571837093169_78279_01_000914.
>
> 2019-10-25 09:55:51,513 INFO
> org.apache.flink.yarn.YarnResourceManager - Received
> new container: container_e22_1571837093169_78279_01_000915 - Remaining
> pending container requests: 0
>
> 2019-10-25 09:55:51,513 INFO
> org.apache.flink.yarn.YarnResourceManager - Returning
> excess container container_e22_1571837093169_78279_01_000915.
>
> 2019-10-25 09:55:51,513 INFO
> org.apache.flink.yarn.YarnResourceManager - Received
> new container: container_e22_1571837093169_78279_01_000916 - Remaining
> pending container requests: 0
>
> 2019-10-25 09:55:51,513 INFO
> org.apache.flink.yarn.YarnResourceManager - Returning
> excess container container_e22_1571837093169_78279_01_000916.
>
> 2019-10-25 09:55:51,513 INFO
> org.apache.flink.yarn.YarnResourceManager - Received
> new container: container_e22_1571837093169_78279_01_000917 - Remaining
> pending container requests: 0
>
> 2019-10-25 09:55:51,513 INFO
> 

RE: The RMClient's and YarnResourceManagers internal state about the number of pending container requests has diverged

2019-10-25 Thread Chan, Regina
Till,

We’re still seeing a large number of returned containers even with this heart 
beat set to something higher. Do you have hints as to what’s going on? It seems 
to be bursty in nature. The bursty requests cause the job to fail with the 
cluster not having enough resources because it’s in the process of releasing 
them.
“org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
Could not allocate enough slots to run the job. Please make sure that the 
cluster has enough resources.” It causes the job to run very inconsistently.

Since legacy mode is now gone in 1.9, we don’t really see many options here.

Run Profile

Number of returned excess containers

12G per TM, 2 slots
yarn.heartbeat.container-request-interval=500

685

12G per TM, 2 slots
yarn.heartbeat.container-request-interval=5000

552

12G per TM, 2 slots
yarn.heartbeat.container-request-interval=1

331

10G per TM, 1 slots
yarn.heartbeat.container-request-interval=6

478


2019-10-25 09:55:51,452 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Deploying CHAIN 
DataSource (synonym | Read Staging From File System | AVRO) -> Map (Map at 
readAvroFileWithFilter(FlinkReadUtils.java:78)) -> Map (Key Extractor) (14/90) 
(attempt #0) to container_e22_1571837093169_78279_01_000852 @ 
d50503-004-e22.dc.gs.com (dataPort=33579)
2019-10-25 09:55:51,513 INFO  org.apache.flink.yarn.YarnResourceManager 
- Received new container: 
container_e22_1571837093169_78279_01_000909 - Remaining pending container 
requests: 0
2019-10-25 09:55:51,513 INFO  org.apache.flink.yarn.YarnResourceManager 
- Returning excess container 
container_e22_1571837093169_78279_01_000909.
2019-10-25 09:55:51,513 INFO  org.apache.flink.yarn.YarnResourceManager 
- Received new container: 
container_e22_1571837093169_78279_01_000910 - Remaining pending container 
requests: 0
2019-10-25 09:55:51,513 INFO  org.apache.flink.yarn.YarnResourceManager 
- Returning excess container 
container_e22_1571837093169_78279_01_000910.
2019-10-25 09:55:51,513 INFO  org.apache.flink.yarn.YarnResourceManager 
- Received new container: 
container_e22_1571837093169_78279_01_000911 - Remaining pending container 
requests: 0
2019-10-25 09:55:51,513 INFO  org.apache.flink.yarn.YarnResourceManager 
- Returning excess container 
container_e22_1571837093169_78279_01_000911.
2019-10-25 09:55:51,513 INFO  org.apache.flink.yarn.YarnResourceManager 
- Received new container: 
container_e22_1571837093169_78279_01_000912 - Remaining pending container 
requests: 0
2019-10-25 09:55:51,513 INFO  org.apache.flink.yarn.YarnResourceManager 
- Returning excess container 
container_e22_1571837093169_78279_01_000912.
2019-10-25 09:55:51,513 INFO  org.apache.flink.yarn.YarnResourceManager 
- Received new container: 
container_e22_1571837093169_78279_01_000913 - Remaining pending container 
requests: 0
2019-10-25 09:55:51,513 INFO  org.apache.flink.yarn.YarnResourceManager 
- Returning excess container 
container_e22_1571837093169_78279_01_000913.
2019-10-25 09:55:51,513 INFO  org.apache.flink.yarn.YarnResourceManager 
- Received new container: 
container_e22_1571837093169_78279_01_000914 - Remaining pending container 
requests: 0
2019-10-25 09:55:51,513 INFO  org.apache.flink.yarn.YarnResourceManager 
- Returning excess container 
container_e22_1571837093169_78279_01_000914.
2019-10-25 09:55:51,513 INFO  org.apache.flink.yarn.YarnResourceManager 
- Received new container: 
container_e22_1571837093169_78279_01_000915 - Remaining pending container 
requests: 0
2019-10-25 09:55:51,513 INFO  org.apache.flink.yarn.YarnResourceManager 
- Returning excess container 
container_e22_1571837093169_78279_01_000915.
2019-10-25 09:55:51,513 INFO  org.apache.flink.yarn.YarnResourceManager 
- Received new container: 
container_e22_1571837093169_78279_01_000916 - Remaining pending container 
requests: 0
2019-10-25 09:55:51,513 INFO  org.apache.flink.yarn.YarnResourceManager 
- Returning excess container 
container_e22_1571837093169_78279_01_000916.
2019-10-25 09:55:51,513 INFO  org.apache.flink.yarn.YarnResourceManager 
- Received new container: 
container_e22_1571837093169_78279_01_000917 - Remaining pending container 
requests: 0
2019-10-25 09:55:51,513 INFO  org.apache.flink.yarn.YarnResourceManager 
- Returning excess container 
container_e22_1571837093169_78279_01_000917.
2019-10-25 09:55:51,513 INFO  org.apache.flink.yarn.YarnResourceManager 
- Received new container: 
container_e22_1571837093169_78279_01_000918 - Remaining pending container 
requests: 0
2019-10-25 09:55:51,513 INFO  

Re: Guarantee of event-time order in FlinkKafkaConsumer

2019-10-25 Thread Fabian Hueske
Hi Wojciech,

I posted an answer on StackOverflow.

Best, Fabian

Am Do., 24. Okt. 2019 um 13:03 Uhr schrieb Wojciech Indyk <
wojciechin...@gmail.com>:

> Hi!
> I use Flink 1.8.0 with Kafka 2.2.1. I need to guarantee of correct order
> of events by event timestamp. I generate periodic watermarks every 1s. I
> use FlinkKafkaConsumer with AscendingTimestampExtractor.
> The code (and the same question) is here:
> https://stackoverflow.com/questions/58539379/guarantee-of-event-time-order-in-flinkkafkaconsumer
>
> I realized, that for unordered events, that came in the same ms or a few
> ms later, the order is not corrected by Flink. What I found in the docs:
> "the watermark triggers computation of all windows where the maximum
> timestamp (which is end-timestamp - 1) is smaller than the new watermark",
> so I added a step of timeWindowAll with size of 100ms and inside that
> window I sort messages by the event timestamp. It works, but I find this
> solution ugly and it looks like a workaround. I am also concerned about
> per-partition watermarks of KafkaSource.
>
> Ideally I would like to put the guarantee of order in the KafkaSource and
> keep it for each kafka partition, like per-partition watermarks. Is it
> possible to do so? What is the current best solution for guarantee the
> event-time order of events in Flink?
>
> --
> Kind regards/ Pozdrawiam,
> Wojciech Indyk
>


Re: Flink 1.5+ performance in a Java standalone environment

2019-10-25 Thread Fabian Hueske
Hi Jakub,

I had a look at the changes of Flink 1.5 [1] and didn't find anything
obvious.
Something that might cause a different behavior is the new deployment and
process model (FLIP-6).

In Flink 1.5, there is a switch to disable it and use the previous
deployment mechanism.
You could try to disable the new new model [2] and see if this cause the
performance issue.

Note that the legacy mode was removed in one of the later versions.

Best, Fabian

[1] https://flink.apache.org/news/2018/05/25/release-1.5.0.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.5/release-notes/flink-1.5.html#update-configuration-for-reworked-job-deployment

Am Do., 24. Okt. 2019 um 19:37 Uhr schrieb Jakub Danilewicz <
jdanilew...@alto-analytics.com>:

> Hi,
>
> I have recently tried to upgrade Flink from 1.2.0 to the newest version
> and noticed that starting from the version 1.5 the performance is much
> worse when processing fixed graphs in a standalone JVM environment (Java
> 8).
>
> This affects all the use-cases when a Gelly graph (pre-built from a fixed
> collection of nodes/edges) gets processed by any of our custom algorithms
> (VertexCentric, ScatterGather or GSA), especially when using parallel
> processing for a local ExecutionEnvironment. The processing times
> (compared to the versions <= 1.4.2) double/triple, while CPU and memory
> consumption increase significantly.
>
> Are there any fine-tuning steps/tricks for the job processing engine
> behind Flink 1.5+ that would improve the performance in the scenarios
> described above?
>
> Best,
>
> Jakub
>


Re: Using STSAssumeRoleSessionCredentialsProvider for cross account access

2019-10-25 Thread Fabian Hueske
Hi Vinay,

Maybe Gordon (in CC) has an idea about this issue.

Best, Fabian

Am Do., 24. Okt. 2019 um 14:50 Uhr schrieb Vinay Patil <
vinay18.pa...@gmail.com>:

> Hi,
>
> Can someone pls help here , facing issues in Prod . I see the following
> ticket in unresolved state.
>
> https://issues.apache.org/jira/browse/FLINK-8417
>
>
> Regards,
> Vinay Patil
>
>
> On Thu, Oct 24, 2019 at 11:01 AM Vinay Patil 
> wrote:
>
>> Hi,
>>
>> I am trying to access dynamo streams from a different aws account but
>> getting resource not found exception while trying to access the dynamo
>> streams from Task Manager. I have provided the following configurations :
>>
>> *dynamodbStreamsConsumerConfig.setProperty(ConsumerConfigConstants.AWS_ROLE_CREDENTIALS_PROVIDER,AWSConfigConstants.CredentialProvider.ASSUME_ROLE.name
>> ());*
>>
>>
>> *dynamodbStreamsConsumerConfig.setProperty(ConsumerConfigConstants.AWS_ROLE_ARN,dynamoDbConnect.getRoleArn());*
>>
>>
>> *dynamodbStreamsConsumerConfig.setProperty(ConsumerConfigConstants.AWS_ROLE_SESSION_NAME,dynamoDbConnect.getRoleSessionName());*
>>
>> In the main class I am able to get the arn of dynamoDb table
>> using STSAssumeRoleSessionCredentialsProvider, so the assume role is
>> working fine . Getting error only while accessing from TM.
>>
>> I assume that the credentials are not required to be passed :
>> https://github.com/apache/flink/blob/abbd6b02d743486f3c0c1336139dd6b3edd20840/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java#L164
>>
>>
>> Regards,
>> Vinay Patil
>>
>


Re: Can a Flink query outputs nested json?

2019-10-25 Thread Fabian Hueske
Hi,

I did not understand what you are trying to achieve.
Which field of the input table do you want to write to the output table?

Flink SQL> insert into nestedSink select nested from nestedJsonStream;
[INFO] Submitting SQL update statement to the cluster...
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Field types of query result
and registered TableSink [nestedSink] do not match.
Query result schema: [nested: Row]
TableSink schema:[nested: Row]

This does not work, because the nested schema of the query result and sink
are not identical.
The table sink has only one nested String field called inner. The query
result looks like this: ROW<`inner` STRING, `nested1` ROW<`inner1` STRING>>

You need to make sure that the schema of query result and sink table are
exactly the same. Otherwise, Flink will throw those ValidationExceptions.

Best, Fabian

Am Do., 24. Okt. 2019 um 12:24 Uhr schrieb srikanth flink <
flink.d...@gmail.com>:

> I'm working on Flink SQL client. Input data is json format and contains
> nested json.
>
> I'm trying to query the nested json from the table and expecting the
> output to be nested json instead of string.
>
> I've build the environment file to define a table schema as:
>
>> format:
>>
>   type: json
>>   fail-on-missing-field: false
>>   json-schema: >
>> {
>>   type: 'object',
>>   properties: {
>> 'lon': {
>>   type: 'string'
>> },
>> 'rideTime': {
>>   type: 'string'
>> },
>> 'nested': {
>>   type: 'object',
>>   properties: {
>> 'inner': {
>>   type: 'string'
>> },
>> 'nested1': {
>>   type: 'object',
>>   properties: {
>> 'inner1': {
>>   type: 'string'
>> }
>>   }
>> }
>>   }
>> },
>> 'name': {
>>   type: 'string'
>> }
>>   }
>> }
>>   derive-schema: false
>> schema:
>>   - name: 'lon'
>> type: VARCHAR
>>   - name: 'rideTime'
>> type: VARCHAR
>>   - name: 'nested'
>> type: ROW<`inner` STRING, `nested1` ROW<`inner1` STRING>>
>>   - name: 'name'
>> type: VARCHAR
>
>
> Sink table schema:
>
>> format:
>>   type: json
>>   fail-on-missing-field: false
>>   derive-schema: true
>> schema:
>>   - name: 'nested'
>> type: ROW<`inner` STRING>
>>
>
> Queries Been trying the following queries
> Flink SQL> insert into nestedSink select nested.`inner` as `nested.inner`
> from nestedJsonStream;
> [INFO] Submitting SQL update statement to the cluster...
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.ValidationException: Field types of query
> result and registered TableSink [nestedSink] do not match.
> Query result schema: [nested.inner: String]
> TableSink schema:[nested: Row]
>
> Flink SQL> insert into nestedSink select nested from nestedJsonStream;
> [INFO] Submitting SQL update statement to the cluster...
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.ValidationException: Field types of query
> result and registered TableSink [nestedSink] do not match.
> Query result schema: [nested: Row]
> TableSink schema:[nested: Row]
>
> Flink SQL> insert into nestedSink select nested.`inner` as nested.`inner`
> from nestedJsonStream;
> [INFO] Submitting SQL update statement to the cluster...
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.sql.parser.impl.ParseException: Encountered "." at line
> 1, column 55.
> Was expecting one of:
> 
> "EXCEPT" ...
> "FETCH" ...
> "FROM" ...
> "INTERSECT" ...
> "LIMIT" ...
> "OFFSET" ...
> "ORDER" ...
> "MINUS" ...
> "UNION" ...
> "," ...
>
> Help me understand the problem with my schema/query?
> Also would like to add new columns and nested colums.
>
> Thanks
> Srikanth
>
>
>
>


Re: JDBCInputFormat does not support json type

2019-10-25 Thread Fabian Hueske
Hi Fanbin,

One approach would be to ingest the field as a VARCHAR / String and
implement a Scalar UDF to convert it into a nested tuple.
The UDF could use the code of the flink-json module.

AFAIK, there is some work on the way to add built-in JSON functions.

Best, Fabian

Am Do., 24. Okt. 2019 um 10:03 Uhr schrieb Fanbin Bu :

> Looks like SnowflakeColumnMetadata treats VARIANT as VARCHAR
>
> case VARIANT:
>   colType = Types.VARCHAR;
>   extColTypeName = "VARIANT";
>   break;
>
> and SnowflakeResultSet just return the string of the field
>
> switch(type)
> {
>   case Types.VARCHAR:
>   case Types.CHAR:
> return getString(columnIndex);
>
> What would be the best way to handle this on Flink side?
>
>
>
> On Thu, Oct 24, 2019 at 12:36 AM Fanbin Bu  wrote:
>
>> Hi there,
>>
>> Flink Version: 1.8.1
>> JDBC driver: net.snowflake.client.jdbc.SnowflakeDriver
>>
>> Here is the code snippet:
>>
>> val rowTypeInfo = new RowTypeInfo(
>>   Array[TypeInformation[_]](
>> new RowTypeInfo(
>>   Array[TypeInformation[_]](BasicTypeInfo.STRING_TYPE_INFO,
>> BasicTypeInfo.STRING_TYPE_INFO),
>>   Array[String]("a", "b")
>> )
>>   ),
>>   Array[String]("fieldName")
>> )
>> val inputFormat = buildInputFormat(query, rowTypeInfo)
>> env.createInput(inputFormat)
>>
>> my snowflake table data looks like this (fieldName has type VARIANT)
>>
>> fieldName
>> --
>> {
>> "a": "1",
>> "b": "2"
>> }
>>
>> I got err msg:
>> Caused by: java.lang.ClassCastException: java.lang.String cannot be cast
>> to org.apache.flink.types.Row
>>
>>
>> Looks like the record I got from snowflake is a string. The error
>> prevents me from doing something like
>> sqlQuery("select fieldName.a from table")
>>
>> Any help is appreciated!
>>
>> Thanks,
>> Fanbin
>>
>


Re: Flink 1.9 measuring time taken by each operator in DataStream API

2019-10-25 Thread Fabian Hueske
Hi Komal,

Measuring latency is always a challenge. The problem here is that your
functions are chained, meaning that the result of a function is directly
passed on to the next function and only when the last function emits the
result, the first function is called with a new record.
This makes measuring the processing time of a single function difficult,
because measuring the processing time of a single record is often not
possible because the time for processing a single record is usually too
small to be measured.

Regarding the second point:
Flink does not synchronize the collection of metrics because this would add
too much overhead. In the given screenshot, some records might be still on
the wire or waiting in buffers to be processed.

Best, Fabian

Am Do., 24. Okt. 2019 um 09:16 Uhr schrieb Komal Mariam <
komal.mar...@gmail.com>:

> Hello,
>
> I have a few questions regarding flink’s dashboard and monitoring tools.
>
> I have a fixed number of records that I process through the datastreaming
> API on my standalone cluster and want to know how long it takes to process
> them. My questions are:
>
> 1)How can I see the time taken in milliseconds individually for each
> operator (filter, filter, map and keyed aggregate) to process these records
> during run time? I essentially want to know which operator causes the most
> latency in the pipeline.
>
> 2)While viewing the records and metrics on the dashboard there is a
> discrepancy between the number of records sent and received between two
> operators in my job graph. Why exactly are the number of records received
> by my second operator less than those sent by customsource->map and where
> are they stored? Image attached below for reference.
> [image: image.png]
> Best regards,
> Komal
>
>
>


Re: Issue with writeAsText() to S3 bucket

2019-10-25 Thread Fabian Hueske
Hi Michael,

One reason might be that S3's file listing command is only eventually
consistent.
It might take some time until the file appears and is listed.

Best, Fabian

Am Mi., 23. Okt. 2019 um 22:41 Uhr schrieb Nguyen, Michael <
michael.nguye...@t-mobile.com>:

> Hello all,
>
>
>
> I am running into issues at the moment trying to print my DataStreams to
> an S3 bucket using writeAsText(“s3://bucket/result.json”) in my Flink job.
> I used print() on the same DataStream and I see the output I am looking for
> in standard output. I first confirm that my datastream has data by looking
> at the standard output, then I cancel my Flink job. After cancelling the
> job, result.json only gets created in my S3 bucket some of the time. It
> does not always gets created, but I confirmed that I see my data in
> standard output.
>
>
>
> I understand writeAsText() should be used for debugging purposes only
> according to Flink’s documentation, but I’m just curious as to why I can’t
> get writeAsText() to always work every time I cancel my job.
>
>
>
> Thank you for your help,
>
> Michael
>


Re: [Problem] Unable to do join on TumblingEventTimeWindows using SQL

2019-10-25 Thread Fabian Hueske
Hi,

the exception says: "Rowtime attributes must not be in the input rows of a
regular join. As a workaround you can cast the time attributes of input
tables to TIMESTAMP before.".

The problem is that your query first joins the two tables without a
temporal condition and then wants to do a windowed grouping.
Joins without temporal condition are not able to preserve the rowtime
attribute.
You should try to change the join into a time-windowed join [1] [2] by
adding a BETWEEN predicate on the rowtime attributes of both tables.

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/joins.html#time-windowed-joins
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sql.html#joins

Am Mi., 23. Okt. 2019 um 09:18 Uhr schrieb Manoj Kumar :

>
> *Hi All,*
>
> *[Explanation]*
>
> Two tables say lineitem and orders:
>
> Table
> orderstbl=bsTableEnv.fromDataStream(orders,"a,b,c,d,e,f,g,h,i,orders.rowtime");
> Table
> lineitemtbl=bsTableEnv.fromDataStream(lineitem,"a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,lineitem.rowtime");
>
> bsTableEnv.registerTable("Orders",orderstbl);
> bsTableEnv.registerTable("Lineitem",lineitemtbl)
>
> *#Rgular tumble window works*
>  Table sqlResult = bsTableEnv.sqlQuery("Select count(Orders.a) FROM Orders
> GROUP BY TUMBLE(orders, INTERVAL '5' SECOND)");
>  Table sqlResult = bsTableEnv.sqlQuery("Select count(Lineitem.a) FROM
> Lineitem GROUP BY TUMBLE(lineitem, INTERVAL '5' SECOND)");
>
> *#Datastream TumblingEventTimeWindows joins also works fine*
>
> lineitem.join(orders).where(...).equalTo(...).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(...)
>
> *But when I try to join them over same window it gives me error, it might
> possible I am writing wrong SQL :(*
>
> Table sqlResult  = bsTableEnv.sqlQuery("SELECTcount(Lineitem.a) FROM "
> + "Orders,Lineitem where Lineitem.a=Orders.a "
> + "GROUP BY TUMBLE(orders, INTERVAL '5' SECOND)");
>
> Exception in thread "main" org.apache.flink.table.api.TableException:
> Cannot generate a valid execution plan for the given query:
>
> FlinkLogicalSink(name=[sink], fields=[b])
> +- FlinkLogicalWindowAggregate(group=[{}], EXPR$0=[COUNT($1)],
> window=[TumblingGroupWindow], properties=[])
>+- FlinkLogicalCalc(select=[orders, a0])
>   +- FlinkLogicalJoin(condition=[=($2, $0)], joinType=[inner])
>  :- FlinkLogicalCalc(select=[a, orders])
>  :  +-
> FlinkLogicalDataStreamTableScan(table=[[Unregistered_DataStream_3]])
>  +- FlinkLogicalCalc(select=[a])
> +-
> FlinkLogicalDataStreamTableScan(table=[[Unregistered_DataStream_6]])
>
> Rowtime attributes must not be in the input rows of a regular join. As a
> workaround you can cast the time attributes of input tables to TIMESTAMP
> before.
> Please check the documentation for the set of currently supported SQL
> features.
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:166)
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:88)
> at
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:78)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:212)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:147)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:439)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.insertInto(TableEnvironmentImpl.java:327)
> at
> org.apache.flink.table.api.internal.TableImpl.insertInto(TableImpl.java:411)
> at bzflink.StreamingTable.main(StreamingTable.java:65)
> Caused by: 

RE: Does operator uid() have to be unique across all jobs?

2019-10-25 Thread min.tan
Thank you for your reply.

Any tool enables us to inspect (list) statically all the "uid"ed operators or 
all the operators? for a jar?

Also addSink and addSource are not on the operator list 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/

But they both have an uid method. Are these two an operator or not?

Regards,

Min


From: Dian Fu [mailto:dian0511...@gmail.com]
Sent: Freitag, 25. Oktober 2019 10:24
To: Tan, Min
Cc: John Smith; user@flink.apache.org
Subject: [External] Re: Does operator uid() have to be unique across all jobs?

It means that there is an operator state which has no corresponding operator in 
the new job. It usually indicates that the uid of a stateful operator has 
changed.

在 2019年10月25日,下午4:12,mailto:min@ubs.com>> 
mailto:min@ubs.com>> 写道:

Thanks for your reply.

Our sources and sinks are connected to Kafka, therefore they are statful.

We did not set uid on them but only name().

The log says
Caused by: java.lang.IllegalStateException: Failed to rollback to 
checkpoint/savepoint 
file:/var/flink/data-remote/savepoint-00-dae014102550.
 Cannot map checkpoint/savepoint state for operator 
484df1f961bd0cff95fd39b290ba9c03 to the new program, because the operator is 
not available in the new program. If you want to allow to skip this, you can 
set the --allowNonRestoredState option on the CLI.

Regards,

Min


From: Dian Fu [mailto:dian0511...@gmail.com]
Sent: Freitag, 25. Oktober 2019 10:04
To: Tan, Min
Cc: John Smith; user
Subject: [External] Re: Does operator uid() have to be unique across all jobs?

Hi Min,

It depends on the source/sink implementation. If the source/sink implementation 
uses state, uid should be set. So you can always set the uid in this case and 
then you don't need to care about the implementation details of the source/sink 
you used.

name() doesn't have such functionality.

Regarding to the uid mismatch you encountered, could you share the exception 
log?

Regards,
Dian

在 2019年10月25日,下午3:38,min@ubs.com 写道:

Thank you very much for your helpful response.

Our new production release complains about the an uid mismatch (we use exactly 
once checkpoints).
I hope I understand  your correctly: map and print are certainly stateless, 
therefore no uid is required. What about addSink and addSoure? Do they need an 
uid? Or a name() has a similar function?

Regards,

Min

From: Dian Fu [mailto:dian0511...@gmail.com]
Sent: Freitag, 25. Oktober 2019 03:52
To: Tan, Min
Cc: John Smith; user
Subject: [External] Re: Does operator uid() have to be unique across all jobs?

Hi Min,

The uid is used to matching the operator state stored in the 
checkpoint/savepoint to an operator[1]. So you only need to specify the uid for 
stateful operators.
1) If you have not specified the uid for an operator, it will generate a uid 
for it in a deterministic way[2] for it. The generated uid doesn't change for 
the same job.
2) However, it's encouraged to set uid for stateful operators to allow for job 
evolution. The dynamically generated uid is not guaranteed to remain the same 
if the job has changed, i.e. adding/removing operators in the job graph. If you 
want to reuse state after job evolution, you need to set the uid explicitly.

So for the example you give, I think you don't need to specify the uid for the 
map and print operator.

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/ops/upgrading.html#matching-operator-state
[2] 
https://github.com/apache/flink/blob/fd511c345eac31f03b801ff19dbf1f8c86aae760/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java#L78

在 2019年10月24日,下午11:22,min@ubs.com 写道:

Hi,

I have some simple questions on the uid as well.

1)  Do we add a uid for every operator e.g. print(), addSink and addSource?
2)  For chained operators, do we need to uids for each operator? Or just 
the last operator?
e.g. .map().uid("some-id").print().uid("print-id");


Regards,

Min

From: John Smith [mailto:java.dev@gmail.com]
Sent: Donnerstag, 24. Oktober 2019 16:32
To: Dian Fu
Cc: user
Subject: [External] Re: Does operator uid() have to be unique across all jobs?

Ok cool. Thanks

BTW this seems a bit cumbersome...

.map().uid("some-id").name("some-id");

On Wed, 23 Oct 2019 at 21:13, Dian Fu 
mailto:dian0511...@gmail.com>> wrote:
Yes, you can use it in another job. The uid needs only to be unique within a 
job.

> 在 2019年10月24日,上午5:42,John Smith 
> mailto:java.dev@gmail.com>> 写道:
>
> When setting uid() of an operator does it have to be unique across all jobs 
> or just unique within a job?
>
> For example can I use 

Tumbling tables in the SQL API

2019-10-25 Thread A. V.
Hi,

In the SQL API I see the query below. I want to know how I can make tumbling 
tables based on amount of rows. So I want to make a window  for row 1-10, 11-20 
etc. It is also good if the windowing takes place on a Integer ID column. How 
can I do this?

Table result1 = tableEnv.sqlQuery(
  "SELECT user, " +
  "  TUMBLE_START(rowtime, INTERVAL '1' DAY) as wStart,  " +
  "  SUM(amount) FROM Orders " +
  "GROUP BY TUMBLE(rowtime, INTERVAL '1' DAY), user");


Thanks for the help.


Re: 为什么在KeyedStream上进行任何transformation都会变回DataStream

2019-10-25 Thread Utopia
谢谢
2019年10月25日 +0800 17:01 Dian Fu ,写道:
> 因为执行完transformation之后,不能保证一定还是KeyedStream。如果确实还是KeyedStream,可以通过DataStreamUtils.reinterpretAsKeyedStream,声明成KeyedStream,不会引入keyBy。
>
> > 在 2019年10月25日,下午4:05,Utopia  写道:
> >
> > 大家好,
> >
> > KeyedStream 执行 transformation 以后不能变成 KeyedStream 吗,这样每次 transformation 都要重新 
> > keyBy,这样不太符合直觉,写的也很冗余。
> >
> > 谢谢~
>


Re: 为什么在KeyedStream上进行任何transformation都会变回DataStream

2019-10-25 Thread Dian Fu
因为执行完transformation之后,不能保证一定还是KeyedStream。如果确实还是KeyedStream,可以通过DataStreamUtils.reinterpretAsKeyedStream,声明成KeyedStream,不会引入keyBy。

> 在 2019年10月25日,下午4:05,Utopia  写道:
> 
> 大家好,
> 
> KeyedStream 执行 transformation 以后不能变成 KeyedStream 吗,这样每次 transformation 都要重新 
> keyBy,这样不太符合直觉,写的也很冗余。
> 
> 谢谢~



Re: Can flink 1.9.1 use flink-shaded 2.8.3-1.8.2

2019-10-25 Thread Chesnay Schepler
Simple, you pick the version that is listed on the download page for the 
Flink version you are using.


We have not done any tests as to whether hadoop 2.8.3 works with hadoop 
2.8.5 .


On 25/10/2019 10:36, Jeff Zhang wrote:
Thanks Chesnay, is there any document to explain which version of 
flink-shaded-hadoop-jar should I use for specific version of flink ?
e.g. The document of flink 1.9 here 
https://flink.apache.org/downloads.html#apache-flink-191 point me to 
flink-shaded-hadoop-jar 7.0, but the latest version 
of flink-shaded-hadoop-jar is 8.0, then when should I 
use flink-shaded-hadoop-jar 8.0 ?


Another question is that whether flink-shaded-hadoop-2-uber 2.8.3-7.0 
could be used for hadoop 2.8.5 as well ? I believe so, but want to 
confirm it. And if it works for all hadoop 2.8.x, then it may make 
more sense to omit the hadoop minor version, e.g. name it as 2.8-7.0, 
otherwise it may make user confused.



Chesnay Schepler mailto:ches...@apache.org>> 
于2019年10月25日周五 下午4:21写道:


If you need hadoop, but the approach outlined here


doesn't work for you, then you still need a
flink-shaded-hadoop-jar that you can download here
.

On 25/10/2019 09:54, Jeff Zhang wrote:

Hi all,

There's no new flink shaded release for flink 1.9, so I'd like to
confirm with you can flink 1.9.1 use flink-shaded 2.8.3-1.8.2 ?
Or the flink shaded is not necessary for flink 1.9 afterwards ?

https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop2

-- 
Best Regards


Jeff Zhang





--
Best Regards

Jeff Zhang





Re: Can flink 1.9.1 use flink-shaded 2.8.3-1.8.2

2019-10-25 Thread Jeff Zhang
Thanks Chesnay, is there any document to explain which version of
flink-shaded-hadoop-jar should I use for specific version of flink ?
e.g. The document of flink 1.9 here
https://flink.apache.org/downloads.html#apache-flink-191 point me to
flink-shaded-hadoop-jar 7.0, but the latest version
of flink-shaded-hadoop-jar is 8.0, then when should I
use flink-shaded-hadoop-jar 8.0 ?

Another question is that whether flink-shaded-hadoop-2-uber 2.8.3-7.0 could
be used for hadoop 2.8.5 as well ? I believe so, but want to confirm it.
And if it works for all hadoop 2.8.x, then it may make more sense to omit
the hadoop minor version, e.g. name it as 2.8-7.0, otherwise it may make
user confused.


Chesnay Schepler  于2019年10月25日周五 下午4:21写道:

> If you need hadoop, but the approach outlined here
> 
> doesn't work for you, then you still need a flink-shaded-hadoop-jar that
> you can download here
> .
>
> On 25/10/2019 09:54, Jeff Zhang wrote:
>
> Hi all,
>
> There's no new flink shaded release for flink 1.9, so I'd like to confirm
> with you can flink 1.9.1 use flink-shaded 2.8.3-1.8.2 ? Or the flink shaded
> is not necessary for flink 1.9 afterwards ?
>
> https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop2
>
> --
> Best Regards
>
> Jeff Zhang
>
>
>

-- 
Best Regards

Jeff Zhang


Re: Can flink 1.9.1 use flink-shaded 2.8.3-1.8.2

2019-10-25 Thread Chesnay Schepler
If you need hadoop, but the approach outlined here 
 
doesn't work for you, then you still need a flink-shaded-hadoop-jar that 
you can download here 
.


On 25/10/2019 09:54, Jeff Zhang wrote:

Hi all,

There's no new flink shaded release for flink 1.9, so I'd like to 
confirm with you can flink 1.9.1 use flink-shaded 2.8.3-1.8.2 ? Or the 
flink shaded is not necessary for flink 1.9 afterwards ?


https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop2

--
Best Regards

Jeff Zhang





为什么在KeyedStream上进行任何transformation都会变回DataStream

2019-10-25 Thread Utopia
大家好,

KeyedStream 执行 transformation 以后不能变成 KeyedStream 吗,这样每次 transformation 都要重新 
keyBy,这样不太符合直觉,写的也很冗余。

谢谢~


RE: Does operator uid() have to be unique across all jobs?

2019-10-25 Thread min.tan
Thanks for your reply.

Our sources and sinks are connected to Kafka, therefore they are statful.

We did not set uid on them but only name().

The log says
Caused by: java.lang.IllegalStateException: Failed to rollback to 
checkpoint/savepoint 
file:/var/flink/data-remote/savepoint-00-dae014102550.
 Cannot map checkpoint/savepoint state for operator 
484df1f961bd0cff95fd39b290ba9c03 to the new program, because the operator is 
not available in the new program. If you want to allow to skip this, you can 
set the --allowNonRestoredState option on the CLI.

Regards,

Min


From: Dian Fu [mailto:dian0511...@gmail.com]
Sent: Freitag, 25. Oktober 2019 10:04
To: Tan, Min
Cc: John Smith; user
Subject: [External] Re: Does operator uid() have to be unique across all jobs?

Hi Min,

It depends on the source/sink implementation. If the source/sink implementation 
uses state, uid should be set. So you can always set the uid in this case and 
then you don't need to care about the implementation details of the source/sink 
you used.

name() doesn't have such functionality.

Regarding to the uid mismatch you encountered, could you share the exception 
log?

Regards,
Dian

在 2019年10月25日,下午3:38,min@ubs.com 写道:

Thank you very much for your helpful response.

Our new production release complains about the an uid mismatch (we use exactly 
once checkpoints).
I hope I understand  your correctly: map and print are certainly stateless, 
therefore no uid is required. What about addSink and addSoure? Do they need an 
uid? Or a name() has a similar function?

Regards,

Min

From: Dian Fu [mailto:dian0511...@gmail.com]
Sent: Freitag, 25. Oktober 2019 03:52
To: Tan, Min
Cc: John Smith; user
Subject: [External] Re: Does operator uid() have to be unique across all jobs?

Hi Min,

The uid is used to matching the operator state stored in the 
checkpoint/savepoint to an operator[1]. So you only need to specify the uid for 
stateful operators.
1) If you have not specified the uid for an operator, it will generate a uid 
for it in a deterministic way[2] for it. The generated uid doesn't change for 
the same job.
2) However, it's encouraged to set uid for stateful operators to allow for job 
evolution. The dynamically generated uid is not guaranteed to remain the same 
if the job has changed, i.e. adding/removing operators in the job graph. If you 
want to reuse state after job evolution, you need to set the uid explicitly.

So for the example you give, I think you don't need to specify the uid for the 
map and print operator.

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/ops/upgrading.html#matching-operator-state
[2] 
https://github.com/apache/flink/blob/fd511c345eac31f03b801ff19dbf1f8c86aae760/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java#L78

在 2019年10月24日,下午11:22,min@ubs.com 写道:

Hi,

I have some simple questions on the uid as well.

1)  Do we add a uid for every operator e.g. print(), addSink and addSource?
2)  For chained operators, do we need to uids for each operator? Or just 
the last operator?
e.g. .map().uid("some-id").print().uid("print-id");


Regards,

Min

From: John Smith [mailto:java.dev@gmail.com]
Sent: Donnerstag, 24. Oktober 2019 16:32
To: Dian Fu
Cc: user
Subject: [External] Re: Does operator uid() have to be unique across all jobs?

Ok cool. Thanks

BTW this seems a bit cumbersome...

.map().uid("some-id").name("some-id");

On Wed, 23 Oct 2019 at 21:13, Dian Fu 
mailto:dian0511...@gmail.com>> wrote:
Yes, you can use it in another job. The uid needs only to be unique within a 
job.

> 在 2019年10月24日,上午5:42,John Smith 
> mailto:java.dev@gmail.com>> 写道:
>
> When setting uid() of an operator does it have to be unique across all jobs 
> or just unique within a job?
>
> For example can I use env.addSource(myKafkaConsumer).uid("kafka-consumer") in 
> another job?

E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential 
manipulation of contents and/or sender's address, incorrect recipient 
(misdirection), viruses etc. Based on previous e-mail correspondence with you 
and/or an agreement reached with you, UBS considers itself authorized to 
contact you via e-mail. UBS assumes no responsibility for any loss or damage 
resulting from the use of e-mails.
The recipient is aware of and accepts the inherent risks of using e-mails, in 
particular the risk that the banking relationship and confidential information 
relating thereto are disclosed to third parties.
UBS reserves the right to retain and monitor all messages. Messages are 
protected and accessed only in legally justified cases.
For information on how 

Re: Does operator uid() have to be unique across all jobs?

2019-10-25 Thread Dian Fu
Hi Min,

It depends on the source/sink implementation. If the source/sink implementation 
uses state, uid should be set. So you can always set the uid in this case and 
then you don't need to care about the implementation details of the source/sink 
you used.

name() doesn't have such functionality.

Regarding to the uid mismatch you encountered, could you share the exception 
log? 

Regards,
Dian

> 在 2019年10月25日,下午3:38,min@ubs.com 写道:
> 
> Thank you very much for your helpful response.
>  
> Our new production release complains about the an uid mismatch (we use 
> exactly once checkpoints).
> I hope I understand  your correctly: map and print are certainly stateless, 
> therefore no uid is required. What about addSink and addSoure? Do they need 
> an uid? Or a name() has a similar function?
>  
> Regards,
>  
> Min
>  
> From: Dian Fu [mailto:dian0511...@gmail.com] 
> Sent: Freitag, 25. Oktober 2019 03:52
> To: Tan, Min
> Cc: John Smith; user
> Subject: [External] Re: Does operator uid() have to be unique across all jobs?
>  
> Hi Min,
>  
> The uid is used to matching the operator state stored in the 
> checkpoint/savepoint to an operator[1]. So you only need to specify the uid 
> for stateful operators.
> 1) If you have not specified the uid for an operator, it will generate a uid 
> for it in a deterministic way[2] for it. The generated uid doesn't change for 
> the same job.
> 2) However, it's encouraged to set uid for stateful operators to allow for 
> job evolution. The dynamically generated uid is not guaranteed to remain the 
> same if the job has changed, i.e. adding/removing operators in the job graph. 
> If you want to reuse state after job evolution, you need to set the uid 
> explicitly.
>  
> So for the example you give, I think you don't need to specify the uid for 
> the map and print operator.
>  
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-master/ops/upgrading.html#matching-operator-state
>  
> 
> [2] 
> https://github.com/apache/flink/blob/fd511c345eac31f03b801ff19dbf1f8c86aae760/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java#L78
>  
> 
>  
> 在 2019年10月24日,下午11:22,min@ubs.com  写道:
>  
> Hi,
>  
> I have some simple questions on the uid as well.
>  
> 1)  Do we add a uid for every operator e.g. print(), addSink and 
> addSource?
> 2)  For chained operators, do we need to uids for each operator? Or just 
> the last operator?
> e.g. .map().uid("some-id").print().uid("print-id");
>  
>  
> Regards,
>  
> Min
>  
> From: John Smith [mailto:java.dev@gmail.com 
> ] 
> Sent: Donnerstag, 24. Oktober 2019 16:32
> To: Dian Fu
> Cc: user
> Subject: [External] Re: Does operator uid() have to be unique across all jobs?
>  
> Ok cool. Thanks
> 
> BTW this seems a bit cumbersome...
> 
> .map().uid("some-id").name("some-id");
>  
> On Wed, 23 Oct 2019 at 21:13, Dian Fu  > wrote:
> Yes, you can use it in another job. The uid needs only to be unique within a 
> job.
> 
> > 在 2019年10月24日,上午5:42,John Smith  > > 写道:
> > 
> > When setting uid() of an operator does it have to be unique across all jobs 
> > or just unique within a job?
> > 
> > For example can I use env.addSource(myKafkaConsumer).uid("kafka-consumer") 
> > in another job?
> 
> 
> E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, 
> potential manipulation of contents and/or sender's address, incorrect 
> recipient (misdirection), viruses etc. Based on previous e-mail 
> correspondence with you and/or an agreement reached with you, UBS considers 
> itself authorized to contact you via e-mail. UBS assumes no responsibility 
> for any loss or damage resulting from the use of e-mails. 
> The recipient is aware of and accepts the inherent risks of using e-mails, in 
> particular the risk that the banking relationship and confidential 
> information relating thereto are disclosed to third parties.
> UBS reserves the right to retain and monitor all messages. Messages are 
> protected and accessed only in legally justified cases.
> For information on how UBS uses and discloses personal data, how long we 
> retain it, how we keep it secure and your data protection rights, please see 
> our Privacy Notice http://www.ubs.com/privacy-statement 
> 
>  
> 
> E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, 
> potential manipulation of contents and/or sender's address, incorrect 
> recipient (misdirection), viruses etc. Based on 

Re: Can flink 1.9.1 use flink-shaded 2.8.3-1.8.2

2019-10-25 Thread vino yang
Hi Jeff,

Maybe @Chesnay Schepler  could tell you the answer.

Best,
Vino

Jeff Zhang  于2019年10月25日周五 下午3:54写道:

> Hi all,
>
> There's no new flink shaded release for flink 1.9, so I'd like to confirm
> with you can flink 1.9.1 use flink-shaded 2.8.3-1.8.2 ? Or the flink shaded
> is not necessary for flink 1.9 afterwards ?
>
> https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop2
>
> --
> Best Regards
>
> Jeff Zhang
>


Re: How to create an empty test stream

2019-10-25 Thread vino yang
Yes, this is also a good idea if you don't ask for this stream to be empty
from the source.

Best,
Dmitry

Dmitry Minaev  于2019年10月25日周五 下午12:21写道:

> Thanks, I'll check it out.
> Actually I realized I can always put a filter operator that'll effectively
> remove everything from the stream.
>
> -- Dmitry
>
> On Thu, Oct 24, 2019 at 2:29 AM vino yang  wrote:
>
>> Hi Dmitry,
>>
>> Perhaps an easy way is to customize a source function. Then in the run
>> method, start an empty loop? But I don't understand the meaning of starting
>> a stream pipeline without generating data.
>>
>> Best,
>> Vino
>>
>> Dmitry Minaev  于2019年10月24日周四 上午6:16写道:
>>
>>> Hi everyone,
>>>
>>> I have a pipeline where I union several streams. I want to test it and
>>> don't
>>> want to populate one of the streams. I'm usually creating streams with:
>>>
>>> DataStreamTestBase.createTestStreamWith(event).close();
>>>
>>> The above statement creates a stream and puts the `event` inside. But in
>>> my
>>> case I want to create an empty stream.
>>>
>>> How do I do it?
>>>
>>> --
>>> Thanks, Dmitry
>>>
>>>
>>>
>>>
>>> --
>>> Sent from:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>
>>


Can flink 1.9.1 use flink-shaded 2.8.3-1.8.2

2019-10-25 Thread Jeff Zhang
Hi all,

There's no new flink shaded release for flink 1.9, so I'd like to confirm
with you can flink 1.9.1 use flink-shaded 2.8.3-1.8.2 ? Or the flink shaded
is not necessary for flink 1.9 afterwards ?

https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop2

-- 
Best Regards

Jeff Zhang


RE: Does operator uid() have to be unique across all jobs?

2019-10-25 Thread min.tan
Thank you very much for your helpful response.

Our new production release complains about the an uid mismatch (we use exactly 
once checkpoints).
I hope I understand  your correctly: map and print are certainly stateless, 
therefore no uid is required. What about addSink and addSoure? Do they need an 
uid? Or a name() has a similar function?

Regards,

Min

From: Dian Fu [mailto:dian0511...@gmail.com]
Sent: Freitag, 25. Oktober 2019 03:52
To: Tan, Min
Cc: John Smith; user
Subject: [External] Re: Does operator uid() have to be unique across all jobs?

Hi Min,

The uid is used to matching the operator state stored in the 
checkpoint/savepoint to an operator[1]. So you only need to specify the uid for 
stateful operators.
1) If you have not specified the uid for an operator, it will generate a uid 
for it in a deterministic way[2] for it. The generated uid doesn't change for 
the same job.
2) However, it's encouraged to set uid for stateful operators to allow for job 
evolution. The dynamically generated uid is not guaranteed to remain the same 
if the job has changed, i.e. adding/removing operators in the job graph. If you 
want to reuse state after job evolution, you need to set the uid explicitly.

So for the example you give, I think you don't need to specify the uid for the 
map and print operator.

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/ops/upgrading.html#matching-operator-state
[2] 
https://github.com/apache/flink/blob/fd511c345eac31f03b801ff19dbf1f8c86aae760/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java#L78

在 2019年10月24日,下午11:22,min@ubs.com 写道:

Hi,

I have some simple questions on the uid as well.

1)  Do we add a uid for every operator e.g. print(), addSink and addSource?
2)  For chained operators, do we need to uids for each operator? Or just 
the last operator?
e.g. .map().uid("some-id").print().uid("print-id");


Regards,

Min

From: John Smith [mailto:java.dev@gmail.com]
Sent: Donnerstag, 24. Oktober 2019 16:32
To: Dian Fu
Cc: user
Subject: [External] Re: Does operator uid() have to be unique across all jobs?

Ok cool. Thanks

BTW this seems a bit cumbersome...

.map().uid("some-id").name("some-id");

On Wed, 23 Oct 2019 at 21:13, Dian Fu 
mailto:dian0511...@gmail.com>> wrote:
Yes, you can use it in another job. The uid needs only to be unique within a 
job.

> 在 2019年10月24日,上午5:42,John Smith 
> mailto:java.dev@gmail.com>> 写道:
>
> When setting uid() of an operator does it have to be unique across all jobs 
> or just unique within a job?
>
> For example can I use env.addSource(myKafkaConsumer).uid("kafka-consumer") in 
> another job?

E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential 
manipulation of contents and/or sender's address, incorrect recipient 
(misdirection), viruses etc. Based on previous e-mail correspondence with you 
and/or an agreement reached with you, UBS considers itself authorized to 
contact you via e-mail. UBS assumes no responsibility for any loss or damage 
resulting from the use of e-mails.
The recipient is aware of and accepts the inherent risks of using e-mails, in 
particular the risk that the banking relationship and confidential information 
relating thereto are disclosed to third parties.
UBS reserves the right to retain and monitor all messages. Messages are 
protected and accessed only in legally justified cases.
For information on how UBS uses and discloses personal data, how long we retain 
it, how we keep it secure and your data protection rights, please see our 
Privacy Notice http://www.ubs.com/privacy-statement


E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential 
manipulation of contents and/or sender's address, incorrect recipient 
(misdirection), viruses etc. Based on previous e-mail correspondence with you 
and/or an agreement reached with you, UBS considers itself authorized to 
contact you via e-mail. UBS assumes no responsibility for any loss or damage 
resulting from the use of e-mails. 
The recipient is aware of and accepts the inherent risks of using e-mails, in 
particular the risk that the banking relationship and confidential information 
relating thereto are disclosed to third parties.
UBS reserves the right to retain and monitor all messages. Messages are 
protected and accessed only in legally justified cases.
For information on how UBS uses and discloses personal data, how long we retain 
it, how we keep it secure and your data protection rights, please see our 
Privacy Notice http://www.ubs.com/privacy-statement

RE: Could not load the native RocksDB library

2019-10-25 Thread Patro, Samya
Hello Thad,
 In my case , the issue was fixed after upgrading the os version , and gcc 
version.

From: Thad Truman [mailto:ttru...@neovest.com]
Sent: Tuesday, October 22, 2019 11:03 PM
To: Andrey Zagrebin; Haibo Sun
Cc: Patro, Samya [Engineering]; user@flink.apache.org; Bari, Swapnil 
[Engineering]
Subject: RE: Could not load the native RocksDB library

Hi Samya,

Were you able to get this resolved? Seeing the same issue here after upgrading 
to Flink 1.9 from 1.6.

Thanks,

Thad

From: Andrey Zagrebin 
Sent: Wednesday, July 3, 2019 9:09 AM
To: Haibo Sun 
Cc: Patro, Samya ; user@flink.apache.org; Bari, Swapnil 

Subject: Re: Could not load the native RocksDB library

Hi Samya,

Additionally to Haibo's answer:
Have you tried the previous 1.7 version of Flink? The Rocksdb version was 
upgraded in 1.8 version.

Best,
Andrey

On Wed, Jul 3, 2019 at 5:21 AM Haibo Sun 
mailto:sunhaib...@163.com>> wrote:
Hi,  Samya.Patro

I guess this may be a setup problem. What OS and what version of JDK do you 
use?  You can try upgrading JDK to see if the issue can be solved.

Best,
Haibo

At 2019-07-02 17:16:59, "Patro, Samya" 
mailto:samya.pa...@gs.com>> wrote:
Hello,
I am using rocksdb for storing state . But when I run the pipeline I get the 
error   ”Could not load the native RocksDB library” .  Kindly can you check the 
configs and error stacktrace and suggest what am I doing wrong .

Flink version  - 1.8.0



org.apache.flink
flink-statebackend-rocksdb_2.11
1.8.0


This is  the flink checkpointing config I have used

executionEnvironment.enableCheckpointing(30);
executionEnvironment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
executionEnvironment.getCheckpointConfig().setMinPauseBetweenCheckpoints(5);
executionEnvironment.getCheckpointConfig().setCheckpointTimeout(60);
executionEnvironment.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
executionEnvironment.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
StateBackend rocksDbBackend = new 
RocksDBStateBackend(parameter.get("stateBackendPath"),true);
executionEnvironment.setStateBackend(rocksDbBackend);

When I run the pipeline, I get this error

java.lang.Exception: Exception while creating StreamOperatorStateContext.
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state 
backend for StreamFlatMap_9dd63673dd41ea021b896d5203f3ba7c_(1/5) from any of 
the 1 provided restore options.
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
... 5 more
Caused by: java.io.IOException: Could not load the native RocksDB library
at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.ensureRocksDBIsLoaded(RocksDBStateBackend.java:911)
at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:482)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
... 7 more
Caused by: java.lang.UnsatisfiedLinkError: 
/tmp/rocksdb-lib-ee961b2f013f7d5baabbc1cb2b0b87d7/librocksdbjni-linux64.so: 
/tmp/rocksdb-lib-ee961b2f013f7d5baabbc1cb2b0b87d7/librocksdbjni-linux64.so: 
undefined symbol: malloc_stats_print
at java.lang.ClassLoader$NativeLibrary.load(Native Method)
at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1941)
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1824)
at java.lang.Runtime.load0(Runtime.java:809)
at java.lang.System.load(System.java:1086)
at 
org.rocksdb.NativeLibraryLoader.loadLibraryFromJar(NativeLibraryLoader.java:78)
at org.rocksdb.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:56)
at 

Re: Does operator uid() have to be unique across all jobs?

2019-10-25 Thread Victor Wong
Hi,
  “uid” is mainly useful when you upgrade your application. It’s used to match 
the operator state stored in the savepoint.
  As suggested in [1], “it is highly recommended to assign unique IDs to all 
operators of an application that might be upgraded in the future.”

  [1]. 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/upgrading.html#matching-operator-state

From: "min@ubs.com" 
Date: Thursday, 24 October 2019 at 11:31 PM
To: "java.dev@gmail.com" , "dian0511...@gmail.com" 

Cc: "user@flink.apache.org" 
Subject: RE: Does operator uid() have to be unique across all jobs?

Hi,

I have some simple questions on the uid as well.


  1.  Do we add a uid for every operator e.g. print(), addSink and addSource?
  2.  For chained operators, do we need to uids for each operator? Or just the 
last operator?

e.g. .map().uid("some-id").print().uid("print-id");




Regards,


Min

From: John Smith [mailto:java.dev@gmail.com]
Sent: Donnerstag, 24. Oktober 2019 16:32
To: Dian Fu
Cc: user
Subject: [External] Re: Does operator uid() have to be unique across all jobs?

Ok cool. Thanks

BTW this seems a bit cumbersome...

.map().uid("some-id").name("some-id");

On Wed, 23 Oct 2019 at 21:13, Dian Fu 
mailto:dian0511...@gmail.com>> wrote:
Yes, you can use it in another job. The uid needs only to be unique within a 
job.

> 在 2019年10月24日,上午5:42,John Smith 
> mailto:java.dev@gmail.com>> 写道:
>
> When setting uid() of an operator does it have to be unique across all jobs 
> or just unique within a job?
>
> For example can I use env.addSource(myKafkaConsumer).uid("kafka-consumer") in 
> another job?


Re: Could not load the native RocksDB library

2019-10-25 Thread Congxian Qiu
FYI

Maybe this is an env problem. I encountered this problem when running flink
1.9 on k8s, but it was success when running on yarn. did not figure out why
this happened, will update here after find it out.

Best,
Congxian


Thad Truman  于2019年10月23日周三 上午1:33写道:

> Hi Samya,
>
>
>
> Were you able to get this resolved? Seeing the same issue here after
> upgrading to Flink 1.9 from 1.6.
>
>
>
> Thanks,
>
>
>
> Thad
>
>
>
> *From:* Andrey Zagrebin 
> *Sent:* Wednesday, July 3, 2019 9:09 AM
> *To:* Haibo Sun 
> *Cc:* Patro, Samya ; user@flink.apache.org; Bari,
> Swapnil 
> *Subject:* Re: Could not load the native RocksDB library
>
>
>
> Hi Samya,
>
>
>
> Additionally to Haibo's answer:
>
> Have you tried the previous 1.7 version of Flink? The Rocksdb version was
> upgraded in 1.8 version.
>
>
>
> Best,
>
> Andrey
>
>
>
> On Wed, Jul 3, 2019 at 5:21 AM Haibo Sun  wrote:
>
> Hi,  Samya.Patro
>
>
>
> I guess this may be a setup problem. What OS and what version of JDK do
> you use?  You can try upgrading JDK to see if the issue can be solved.
>
>
>
> Best,
>
> Haibo
>
>
> At 2019-07-02 17:16:59, "Patro, Samya"  wrote:
>
> Hello,
> I am using rocksdb for storing state . But when I run the pipeline I get
> the error   ”Could not load the native RocksDB library” .  Kindly can you
> check the configs and error stacktrace and suggest what am I doing wrong .
>
>
>
> Flink version  - 1.8.0
>
>
>
> <*dependency*>
> <*groupId*>org.apache.flink
> <*artifactId*>flink-statebackend-rocksdb_2.11
> <*version*>1.8.0
> 
>
>
>
> This is  the flink checkpointing config I have used
>
>
>
> *executionEnvironment**.enableCheckpointing(**30*
>
> *);
> executionEnvironment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
> executionEnvironment.getCheckpointConfig().setMinPauseBetweenCheckpoints(*
> *5**);*
>
> *executionEnvironment.getCheckpointConfig().setCheckpointTimeout(**60*
> *);
> executionEnvironment.getCheckpointConfig().setMaxConcurrentCheckpoints(*
> *1**);*
>
> *executionEnvironment*
> *.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.*
> *RETAIN_ON_CANCELLATION**);*
>
> *StateBackend rocksDbBackend = **new **RocksDBStateBackend(parameter.get(*
> *"stateBackendPath"**),**true*
> *); **executionEnvironment**.setStateBackend(rocksDbBackend);*
>
>
>
> *When I run the pipeline, I get this error*
>
>
>
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
>
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
>
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>
> at java.lang.Thread.run(Thread.java:745)
>
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
> state backend for StreamFlatMap_9dd63673dd41ea021b896d5203f3ba7c_(1/5) from
> any of the 1 provided restore options.
>
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
>
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
>
> ... 5 more
>
> Caused by: java.io.IOException: Could not load the native RocksDB library
>
> at
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.ensureRocksDBIsLoaded(RocksDBStateBackend.java:911)
>
> at
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:482)
>
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291)
>
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
>
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
>
> ... 7 more
>
> Caused by: java.lang.UnsatisfiedLinkError:
> /tmp/rocksdb-lib-ee961b2f013f7d5baabbc1cb2b0b87d7/librocksdbjni-linux64.so:
> /tmp/rocksdb-lib-ee961b2f013f7d5baabbc1cb2b0b87d7/librocksdbjni-linux64.so:
> undefined symbol: malloc_stats_print
>
> at java.lang.ClassLoader$NativeLibrary.load(Native Method)
>
> at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1941)
>
> at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1824)
>
> at