[jira] [Commented] (FLINK-10918) incremental Keyed state with RocksDB throws cannot create directory error in windows

2018-11-24 Thread Amit (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16698092#comment-16698092
 ] 

Amit commented on FLINK-10918:
--

Un-related streaming test cases are failing, while the forked branch built 
without any errors

> incremental Keyed state with RocksDB throws cannot create directory error in 
> windows
> 
>
> Key: FLINK-10918
> URL: https://issues.apache.org/jira/browse/FLINK-10918
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State, State Backends, Checkpointing
>Affects Versions: 1.6.2
> Environment: windows
> {code:java}
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val rocksdb = new RocksDBStateBackend("file:\\C:\\rocksdb\\checkpoint",true)
> rocksdb.setDbStoragePath("file:\\C:\\rocksdb\\storage")
> env.setStateBackend(rocksdb)
> env.enableCheckpointing(10)
> {code}
>  
>Reporter: Amit
>Priority: Major
>  Labels: pull-request-available
> Attachments: 
> 0001-FLINK-10918-Fix-for-checkpoint-creation-on-windows-1.patch
>
>
> Facing error while enabling keyed state with RocksDBBackend with 
> checkpointing to a local windows directory
>  
> {code:java}
> Caused by: org.rocksdb.RocksDBException: Failed to create dir: 
> /c:/tmp/data/job_dbe01128760d4d5cb90809cd94c2a936_op_StreamMap_b5c8d46f3e7b141acf271f12622e752b__3_8__uuid_45c1f62b-a198-44f5-add5-7683079b03f8/chk-1.tmp:
>  Invalid argument
>     at org.rocksdb.Checkpoint.createCheckpoint(Native Method)
>     at org.rocksdb.Checkpoint.createCheckpoint(Checkpoint.java:51)
>     at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.takeSnapshot(RocksDBKeyedStateBackend.java:2549)
>     at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$IncrementalSnapshotStrategy.performSnapshot(RocksDBKeyedStateBackend.java:2008)
>     at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.snapshot(RocksDBKeyedStateBackend.java:498)
>     at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:406)
>     ... 13 more
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10918) incremental Keyed state with RocksDB throws cannot create directory error in windows

2018-11-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16698071#comment-16698071
 ] 

ASF GitHub Bot commented on FLINK-10918:


imamitsehgal commented on issue #7171: FLINK-10918 Fix error while 
checkpointing on window keyed state rock …
URL: https://github.com/apache/flink/pull/7171#issuecomment-441417072
 
 
   The build is passing on the forked branch, the failure here is unrelated
   https://travis-ci.org/imamitsehgal/flink/builds/459154746
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> incremental Keyed state with RocksDB throws cannot create directory error in 
> windows
> 
>
> Key: FLINK-10918
> URL: https://issues.apache.org/jira/browse/FLINK-10918
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State, State Backends, Checkpointing
>Affects Versions: 1.6.2
> Environment: windows
> {code:java}
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val rocksdb = new RocksDBStateBackend("file:\\C:\\rocksdb\\checkpoint",true)
> rocksdb.setDbStoragePath("file:\\C:\\rocksdb\\storage")
> env.setStateBackend(rocksdb)
> env.enableCheckpointing(10)
> {code}
>  
>Reporter: Amit
>Priority: Major
>  Labels: pull-request-available
> Attachments: 
> 0001-FLINK-10918-Fix-for-checkpoint-creation-on-windows-1.patch
>
>
> Facing error while enabling keyed state with RocksDBBackend with 
> checkpointing to a local windows directory
>  
> {code:java}
> Caused by: org.rocksdb.RocksDBException: Failed to create dir: 
> /c:/tmp/data/job_dbe01128760d4d5cb90809cd94c2a936_op_StreamMap_b5c8d46f3e7b141acf271f12622e752b__3_8__uuid_45c1f62b-a198-44f5-add5-7683079b03f8/chk-1.tmp:
>  Invalid argument
>     at org.rocksdb.Checkpoint.createCheckpoint(Native Method)
>     at org.rocksdb.Checkpoint.createCheckpoint(Checkpoint.java:51)
>     at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.takeSnapshot(RocksDBKeyedStateBackend.java:2549)
>     at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$IncrementalSnapshotStrategy.performSnapshot(RocksDBKeyedStateBackend.java:2008)
>     at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.snapshot(RocksDBKeyedStateBackend.java:498)
>     at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:406)
>     ... 13 more
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] imamitsehgal commented on issue #7171: FLINK-10918 Fix error while checkpointing on window keyed state rock …

2018-11-24 Thread GitBox
imamitsehgal commented on issue #7171: FLINK-10918 Fix error while 
checkpointing on window keyed state rock …
URL: https://github.com/apache/flink/pull/7171#issuecomment-441417072
 
 
   The build is passing on the forked branch, the failure here is unrelated
   https://travis-ci.org/imamitsehgal/flink/builds/459154746
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] xueyumusic commented on issue #7059: [FLINK-10689][table] Port UDFs in Table API extension points to flink-table-common

2018-11-24 Thread GitBox
xueyumusic commented on issue #7059: [FLINK-10689][table] Port UDFs in Table 
API extension points to flink-table-common
URL: https://github.com/apache/flink/pull/7059#issuecomment-441416404
 
 
   Thanks for review, @twalthr , I updated the codes.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10689) Port UDFs in Table API extension points to flink-table-common

2018-11-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16698067#comment-16698067
 ] 

ASF GitHub Bot commented on FLINK-10689:


xueyumusic commented on issue #7059: [FLINK-10689][table] Port UDFs in Table 
API extension points to flink-table-common
URL: https://github.com/apache/flink/pull/7059#issuecomment-441416404
 
 
   Thanks for review, @twalthr , I updated the codes.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Port UDFs in Table API extension points to flink-table-common
> -
>
> Key: FLINK-10689
> URL: https://issues.apache.org/jira/browse/FLINK-10689
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: xueyu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> After FLINK-10687 and FLINK-10688 have been resolved, we should also port the 
> remaining extension points of the Table API to flink-table-common. This 
> includes interfaces for UDFs and the external catalog interface.
> This ticket is for porting UDFs. This jira does NOT depend on FLINK-16088 so 
> it can be started at anytime.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10884) Flink on yarn TM container will be killed by nodemanager because of the exceeded physical memory.

2018-11-24 Thread wgcn (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10884?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16698043#comment-16698043
 ] 

wgcn commented on FLINK-10884:
--

I think the  memory assigned to  jvm  not only contains  " heap memory" and 
"off heap memory"  in container. Do you mean that the offHeapSizeMB should be 
assigned to " (containerMemoryMB - heapSizeMB) *(1-cutoffFactor)"

> Flink on yarn  TM container will be killed by nodemanager because of  the 
> exceeded  physical memory.
> 
>
> Key: FLINK-10884
> URL: https://issues.apache.org/jira/browse/FLINK-10884
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management, Core
>Affects Versions: 1.5.5, 1.6.2, 1.7.0
> Environment: version  : 1.6.2 
> module : flink on yarn
> centos  jdk1.8
> hadoop 2.7
>Reporter: wgcn
>Assignee: wgcn
>Priority: Major
>  Labels: yarn
>
> TM container will be killed by nodemanager because of  the exceeded  
> [physical|http://www.baidu.com/link?url=Y4LyfMDH59n9-Ey16Fo6EFAYltN1e9anB3y2ynhVmdvuIBCkJGdH0hTExKDZRvXNr6hqhwIXs8JjYqesYbx0BOpQDD0o1VjbVQlOC-9MgXi]
>  memory. I found the lanuch context   lanuching TM container  that  
> "container memory =   heap memory+ offHeapSizeMB"  at the class 
> org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters   
> from line 160 to 166  I set a safety margin for the whole memory container 
> using. For example  if the container  limit 3g  memory,  the sum memory that  
>  "heap memory+ offHeapSizeMB"  is equal to  2.4g to prevent the container 
> being killed.Do we have the 
> [ready-made|http://www.baidu.com/link?url=ylC8cEafGU6DWAdU9ADcJPNugkjbx6IjtqIIxJ9foX4_Yfgc7ctWmpEpQRettVmBiOy7Wfph7S1UvN5LiJj-G1Rsb--oDw4Z2OEbA5Fj0bC]
>  solution  or I can commit my solution



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7883) Make savepoints atomic with respect to state and side effects

2018-11-24 Thread Steven Zhen Wu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16697931#comment-16697931
 ] 

Steven Zhen Wu commented on FLINK-7883:
---

We would love to see this happening. it is the "graceful" shutdown that we need 
to reduce/minimize duplicates if we are going to enable aggressive/frequent 
rescale events. otherwise, we are going to see frequent and significant 
duplicates.

> Make savepoints atomic with respect to state and side effects
> -
>
> Key: FLINK-7883
> URL: https://issues.apache.org/jira/browse/FLINK-7883
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, Kafka Connector, State Backends, 
> Checkpointing
>Affects Versions: 1.3.2, 1.4.0
>Reporter: Antoine Philippot
>Priority: Major
>
> For a cancel with savepoint command, the JobManager trigger the cancel call 
> once the savepoint is finished, but during the savepoint execution, kafka 
> source continue to poll new messages which will not be part of the savepoint 
> and will be replayed on the next application start.
> A solution could be to stop fetching the source stream task before triggering 
> the savepoint.
> I suggest to add an interface {{StoppableFetchingSourceFunction}} with a 
> method {{stopFetching}} that existant SourceFunction implementations could 
> implement.
> We can add a {{stopFetchingSource}} property in 
>  {{CheckpointOptions}} class to pass the desired behaviour from 
> {{JobManager.handleMessage(CancelJobWithSavepoint)}} to 
> {{SourceStreamTask.triggerCheckpoint}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-10868) Flink's JobCluster ResourceManager doesn't use yarn.maximum-failed-containers as limit of resource acquirement

2018-11-24 Thread Zhenqiu Huang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16697677#comment-16697677
 ] 

Zhenqiu Huang edited comment on FLINK-10868 at 11/24/18 5:48 PM:
-

[~suez1224] [~till.rohrmann]

Agree with Shuyi's proposal. As maximum-failed-containers is more a 
configuration for a job level rather than session cluster level. We may have a 
simple fix for Per Job cluster first to achieve feature parity with former 
release. 

1) I will add a boolean parameter to createResourceManager function to 
distinguish whether it runs for a per job cluster or session cluster. And also 
pass  LeaderGatewayRetriever dispatcherGatewayRetriever as 
one of parameters createResourceManager function in ResourceManagerFactory.

2) If it is per job cluster, One the threshold is hit, shutdownCluster by using 
DispatcherGateway. 

How do you think?


was (Author: zhenqiuhuang):
[~suez1224] [~till.rohrmann]

Agree with Shuyi's proposal. As yarn.maximum-failed-containers is more a 
configuration for a job level rather than session cluster level. We may have a 
simple fix for Per Job cluster first to achieve feature parity with former 
release. 

1) I will add a boolean parameter to YarnResourceManager to distinguish whether 
it runs for a per job cluster or session cluster. And also pass  
LeaderGatewayRetriever dispatcherGatewayRetriever as 
parameter of constructor of YarnResourceManager.

2) If it is per job cluster, One the threshold is hit, shutdownCluster by using 
DispatcherGateway. 

How do you think?

> Flink's JobCluster ResourceManager doesn't use yarn.maximum-failed-containers 
> as limit of resource acquirement
> --
>
> Key: FLINK-10868
> URL: https://issues.apache.org/jira/browse/FLINK-10868
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.6.2, 1.7.0
>Reporter: Zhenqiu Huang
>Assignee: Zhenqiu Huang
>Priority: Major
>
> Currently, YarnResourceManager does use yarn.maximum-failed-containers as 
> limit of resource acquirement. In worse case, when new start containers 
> consistently fail, YarnResourceManager will goes into an infinite resource 
> acquirement process without failing the job. Together with the 
> https://issues.apache.org/jira/browse/FLINK-10848, It will quick occupy all 
> resources of yarn queue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10868) Flink's JobCluster ResourceManager doesn't use yarn.maximum-failed-containers as limit of resource acquirement

2018-11-24 Thread Zhenqiu Huang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10868?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhenqiu Huang updated FLINK-10868:
--
Summary: Flink's JobCluster ResourceManager doesn't use 
yarn.maximum-failed-containers as limit of resource acquirement  (was: Flink's 
Yarn ResourceManager doesn't use yarn.maximum-failed-containers as limit of 
resource acquirement)

> Flink's JobCluster ResourceManager doesn't use yarn.maximum-failed-containers 
> as limit of resource acquirement
> --
>
> Key: FLINK-10868
> URL: https://issues.apache.org/jira/browse/FLINK-10868
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.6.2, 1.7.0
>Reporter: Zhenqiu Huang
>Assignee: Zhenqiu Huang
>Priority: Major
>
> Currently, YarnResourceManager does use yarn.maximum-failed-containers as 
> limit of resource acquirement. In worse case, when new start containers 
> consistently fail, YarnResourceManager will goes into an infinite resource 
> acquirement process without failing the job. Together with the 
> https://issues.apache.org/jira/browse/FLINK-10848, It will quick occupy all 
> resources of yarn queue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10918) incremental Keyed state with RocksDB throws cannot create directory error in windows

2018-11-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16697871#comment-16697871
 ] 

ASF GitHub Bot commented on FLINK-10918:


imamitsehgal commented on issue #7171: FLINK-10918 Fix error while 
checkpointing on window keyed state rock …
URL: https://github.com/apache/flink/pull/7171#issuecomment-441378109
 
 
   What is the purpose of the change?
   To fix the issue with creating directory while running on windows with keyed 
state and rocksdbbackend.
   Brief change log
   RockDBKeyedStateBackend class changes to remove leading '/' in the path 
string
   Verifying this change
   This change is already covered by existing tests, such as (please describe 
tests).
   Documentation
   Does this pull request introduce a new feature? NO


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> incremental Keyed state with RocksDB throws cannot create directory error in 
> windows
> 
>
> Key: FLINK-10918
> URL: https://issues.apache.org/jira/browse/FLINK-10918
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State, State Backends, Checkpointing
>Affects Versions: 1.6.2
> Environment: windows
> {code:java}
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val rocksdb = new RocksDBStateBackend("file:\\C:\\rocksdb\\checkpoint",true)
> rocksdb.setDbStoragePath("file:\\C:\\rocksdb\\storage")
> env.setStateBackend(rocksdb)
> env.enableCheckpointing(10)
> {code}
>  
>Reporter: Amit
>Priority: Major
>  Labels: pull-request-available
> Attachments: 
> 0001-FLINK-10918-Fix-for-checkpoint-creation-on-windows-1.patch
>
>
> Facing error while enabling keyed state with RocksDBBackend with 
> checkpointing to a local windows directory
>  
> {code:java}
> Caused by: org.rocksdb.RocksDBException: Failed to create dir: 
> /c:/tmp/data/job_dbe01128760d4d5cb90809cd94c2a936_op_StreamMap_b5c8d46f3e7b141acf271f12622e752b__3_8__uuid_45c1f62b-a198-44f5-add5-7683079b03f8/chk-1.tmp:
>  Invalid argument
>     at org.rocksdb.Checkpoint.createCheckpoint(Native Method)
>     at org.rocksdb.Checkpoint.createCheckpoint(Checkpoint.java:51)
>     at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.takeSnapshot(RocksDBKeyedStateBackend.java:2549)
>     at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$IncrementalSnapshotStrategy.performSnapshot(RocksDBKeyedStateBackend.java:2008)
>     at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.snapshot(RocksDBKeyedStateBackend.java:498)
>     at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:406)
>     ... 13 more
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] imamitsehgal commented on issue #7171: FLINK-10918 Fix error while checkpointing on window keyed state rock …

2018-11-24 Thread GitBox
imamitsehgal commented on issue #7171: FLINK-10918 Fix error while 
checkpointing on window keyed state rock …
URL: https://github.com/apache/flink/pull/7171#issuecomment-441378109
 
 
   What is the purpose of the change?
   To fix the issue with creating directory while running on windows with keyed 
state and rocksdbbackend.
   Brief change log
   RockDBKeyedStateBackend class changes to remove leading '/' in the path 
string
   Verifying this change
   This change is already covered by existing tests, such as (please describe 
tests).
   Documentation
   Does this pull request introduce a new feature? NO


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10918) incremental Keyed state with RocksDB throws cannot create directory error in windows

2018-11-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16697868#comment-16697868
 ] 

ASF GitHub Bot commented on FLINK-10918:


imamitsehgal opened a new pull request #7171: FLINK-10918 Fix error while 
checkpointing on window keyed state rock …
URL: https://github.com/apache/flink/pull/7171
 
 
   …db backend
   
   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> incremental Keyed state with RocksDB throws cannot create directory error in 
> windows
> 
>
> Key: FLINK-10918
> URL: https://issues.apache.org/jira/browse/FLINK-10918
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State, State Backends, Checkpointing
>Affects Versions: 1.6.2
> Environment: windows
> {code:java}
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val rocksdb = new RocksDBStateBackend("file:\\C:\\rocksdb\\checkpoint",true)
> rocksdb.setDbStoragePath("file:\\C:\\rocksdb\\storage")
> env.setStateBackend(rocksdb)
> env.enableCheckpointing(10)
> {code}
>  
>Reporter: Amit
>Priority: Major
>  Labels: pull-request-available
> Attachments: 
> 0001-FLINK-10918-Fix-for-checkpoint-creation-on-windows-1.patch
>
>
> Facing error while enabling keyed state with RocksDBBackend with 
> checkpointing to a local windows directory
>  
> {code:java}
> Caused by: org.rocksdb.RocksDBException: Failed to create dir: 
> /c:/tmp/data/job_dbe01128760d4d5cb90809cd94c2a936_op_StreamMap_b5c8d46f3e7b141acf271f12622e752b__3_8__uuid_45c1f62b-a198-44f5-add5-7683079b03f8/chk-1.tmp:
>  Invalid argument
>     at org.rocksdb.Checkpoint.createCheckpoint(Native Method)
>     at org.rocksdb.Checkpoint.createCheckpoint(Checkpoint.java:51)
>     at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.takeSnapshot(RocksDBKeyedStateBackend.java:2549)
>     at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$IncrementalSnapshotStrategy.performSnapshot(RocksDBKeyedStateBackend.java:2008)
>     at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.snapshot(RocksDBKeyedStateBackend.java:498)
>     at 
> 

[GitHub] imamitsehgal opened a new pull request #7171: FLINK-10918 Fix error while checkpointing on window keyed state rock …

2018-11-24 Thread GitBox
imamitsehgal opened a new pull request #7171: FLINK-10918 Fix error while 
checkpointing on window keyed state rock …
URL: https://github.com/apache/flink/pull/7171
 
 
   …db backend
   
   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10918) incremental Keyed state with RocksDB throws cannot create directory error in windows

2018-11-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16697867#comment-16697867
 ] 

ASF GitHub Bot commented on FLINK-10918:


imamitsehgal closed pull request #7153: FLINK-10918 Fix the checkpoint dir 
creation error on Window for keyed rocksdb state
URL: https://github.com/apache/flink/pull/7153
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 17ba985f883..4db32e1d2c8 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -557,6 +557,14 @@ private void createDB() throws IOException {
this.defaultColumnFamily = columnFamilyHandles.get(0);
}
 
+   private String getPathAsString(Path path) {
+   String pathString = path.getPath();
+   if (path.hasWindowsDrive() && pathString.startsWith("/")) {
+   pathString = pathString.substring(1);
+   }
+   return pathString;
+   }
+
private RocksDB openDB(
String path,
List stateColumnFamilyDescriptors,
@@ -575,7 +583,7 @@ private RocksDB openDB(
try {
dbRef = RocksDB.open(
Preconditions.checkNotNull(dbOptions),
-   Preconditions.checkNotNull(path),
+   Preconditions.checkNotNull(getPathAsString(new 
Path(path))),
columnFamilyDescriptors,
stateColumnFamilyHandles);
} catch (RocksDBException e) {
@@ -2374,6 +2382,7 @@ private void closeLocalRegistry() {
}
}
 
+
/**
 * Encapsulates the process to perform an incremental snapshot of a 
RocksDBKeyedStateBackend.
 */
@@ -2546,7 +2555,9 @@ void takeSnapshot() throws Exception {
 
// create hard links of living files in the snapshot 
path
try (Checkpoint checkpoint = 
Checkpoint.create(stateBackend.db)) {
-   
checkpoint.createCheckpoint(localBackupDirectory.getDirectory().getPath());
+   String path = 
stateBackend.getPathAsString(localBackupDirectory.getDirectory());
+   LOG.trace("Checkpoint path is {}.", path);
+   checkpoint.createCheckpoint(path);
}
}
 


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> incremental Keyed state with RocksDB throws cannot create directory error in 
> windows
> 
>
> Key: FLINK-10918
> URL: https://issues.apache.org/jira/browse/FLINK-10918
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State, State Backends, Checkpointing
>Affects Versions: 1.6.2
> Environment: windows
> {code:java}
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val rocksdb = new RocksDBStateBackend("file:\\C:\\rocksdb\\checkpoint",true)
> rocksdb.setDbStoragePath("file:\\C:\\rocksdb\\storage")
> env.setStateBackend(rocksdb)
> env.enableCheckpointing(10)
> {code}
>  
>Reporter: Amit
>Priority: Major
>  Labels: pull-request-available
> Attachments: 
> 0001-FLINK-10918-Fix-for-checkpoint-creation-on-windows-1.patch
>
>
> Facing error while enabling keyed state with RocksDBBackend with 
> checkpointing to a local windows directory
>  
> {code:java}
> Caused by: org.rocksdb.RocksDBException: Failed to create dir: 
> /c:/tmp/data/job_dbe01128760d4d5cb90809cd94c2a936_op_StreamMap_b5c8d46f3e7b141acf271f12622e752b__3_8__uuid_45c1f62b-a198-44f5-add5-7683079b03f8/chk-1.tmp:
>  Invalid argument
>     at org.rocksdb.Checkpoint.createCheckpoint(Native Method)
>     at 

[GitHub] imamitsehgal closed pull request #7153: FLINK-10918 Fix the checkpoint dir creation error on Window for keyed rocksdb state

2018-11-24 Thread GitBox
imamitsehgal closed pull request #7153: FLINK-10918 Fix the checkpoint dir 
creation error on Window for keyed rocksdb state
URL: https://github.com/apache/flink/pull/7153
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 17ba985f883..4db32e1d2c8 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -557,6 +557,14 @@ private void createDB() throws IOException {
this.defaultColumnFamily = columnFamilyHandles.get(0);
}
 
+   private String getPathAsString(Path path) {
+   String pathString = path.getPath();
+   if (path.hasWindowsDrive() && pathString.startsWith("/")) {
+   pathString = pathString.substring(1);
+   }
+   return pathString;
+   }
+
private RocksDB openDB(
String path,
List stateColumnFamilyDescriptors,
@@ -575,7 +583,7 @@ private RocksDB openDB(
try {
dbRef = RocksDB.open(
Preconditions.checkNotNull(dbOptions),
-   Preconditions.checkNotNull(path),
+   Preconditions.checkNotNull(getPathAsString(new 
Path(path))),
columnFamilyDescriptors,
stateColumnFamilyHandles);
} catch (RocksDBException e) {
@@ -2374,6 +2382,7 @@ private void closeLocalRegistry() {
}
}
 
+
/**
 * Encapsulates the process to perform an incremental snapshot of a 
RocksDBKeyedStateBackend.
 */
@@ -2546,7 +2555,9 @@ void takeSnapshot() throws Exception {
 
// create hard links of living files in the snapshot 
path
try (Checkpoint checkpoint = 
Checkpoint.create(stateBackend.db)) {
-   
checkpoint.createCheckpoint(localBackupDirectory.getDirectory().getPath());
+   String path = 
stateBackend.getPathAsString(localBackupDirectory.getDirectory());
+   LOG.trace("Checkpoint path is {}.", path);
+   checkpoint.createCheckpoint(path);
}
}
 


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10918) incremental Keyed state with RocksDB throws cannot create directory error in windows

2018-11-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16697807#comment-16697807
 ] 

ASF GitHub Bot commented on FLINK-10918:


imamitsehgal commented on a change in pull request #7153: FLINK-10918 Fix the 
checkpoint dir creation error on Window for keyed rocksdb state
URL: https://github.com/apache/flink/pull/7153#discussion_r236042030
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ##
 @@ -557,6 +557,16 @@ private void createDB() throws IOException {
this.defaultColumnFamily = columnFamilyHandles.get(0);
}
 
+   private static class Utils {
 
 Review comment:
   valid point, I initially thought to create utility  class, then decided to 
keep it local.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> incremental Keyed state with RocksDB throws cannot create directory error in 
> windows
> 
>
> Key: FLINK-10918
> URL: https://issues.apache.org/jira/browse/FLINK-10918
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State, State Backends, Checkpointing
>Affects Versions: 1.6.2
> Environment: windows
> {code:java}
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val rocksdb = new RocksDBStateBackend("file:\\C:\\rocksdb\\checkpoint",true)
> rocksdb.setDbStoragePath("file:\\C:\\rocksdb\\storage")
> env.setStateBackend(rocksdb)
> env.enableCheckpointing(10)
> {code}
>  
>Reporter: Amit
>Priority: Major
>  Labels: pull-request-available
> Attachments: 
> 0001-FLINK-10918-Fix-for-checkpoint-creation-on-windows-1.patch
>
>
> Facing error while enabling keyed state with RocksDBBackend with 
> checkpointing to a local windows directory
>  
> {code:java}
> Caused by: org.rocksdb.RocksDBException: Failed to create dir: 
> /c:/tmp/data/job_dbe01128760d4d5cb90809cd94c2a936_op_StreamMap_b5c8d46f3e7b141acf271f12622e752b__3_8__uuid_45c1f62b-a198-44f5-add5-7683079b03f8/chk-1.tmp:
>  Invalid argument
>     at org.rocksdb.Checkpoint.createCheckpoint(Native Method)
>     at org.rocksdb.Checkpoint.createCheckpoint(Checkpoint.java:51)
>     at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.takeSnapshot(RocksDBKeyedStateBackend.java:2549)
>     at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$IncrementalSnapshotStrategy.performSnapshot(RocksDBKeyedStateBackend.java:2008)
>     at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.snapshot(RocksDBKeyedStateBackend.java:498)
>     at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:406)
>     ... 13 more
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] imamitsehgal commented on a change in pull request #7153: FLINK-10918 Fix the checkpoint dir creation error on Window for keyed rocksdb state

2018-11-24 Thread GitBox
imamitsehgal commented on a change in pull request #7153: FLINK-10918 Fix the 
checkpoint dir creation error on Window for keyed rocksdb state
URL: https://github.com/apache/flink/pull/7153#discussion_r236042030
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ##
 @@ -557,6 +557,16 @@ private void createDB() throws IOException {
this.defaultColumnFamily = columnFamilyHandles.get(0);
}
 
+   private static class Utils {
 
 Review comment:
   valid point, I initially thought to create utility  class, then decided to 
keep it local.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10918) incremental Keyed state with RocksDB throws cannot create directory error in windows

2018-11-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16697806#comment-16697806
 ] 

ASF GitHub Bot commented on FLINK-10918:


imamitsehgal commented on a change in pull request #7153: FLINK-10918 Fix the 
checkpoint dir creation error on Window for keyed rocksdb state
URL: https://github.com/apache/flink/pull/7153#discussion_r236042001
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ##
 @@ -575,7 +585,7 @@ private RocksDB openDB(
try {
dbRef = RocksDB.open(
Preconditions.checkNotNull(dbOptions),
-   Preconditions.checkNotNull(path),
+   
Preconditions.checkNotNull(Utils.getPathAsString(new Path(path))),
 
 Review comment:
   If Path is null or "" then new Path(path) will throw runtime 
IllegalArgumentException. The path returned by getPathAsString will be checked 
again to be non-null.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> incremental Keyed state with RocksDB throws cannot create directory error in 
> windows
> 
>
> Key: FLINK-10918
> URL: https://issues.apache.org/jira/browse/FLINK-10918
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State, State Backends, Checkpointing
>Affects Versions: 1.6.2
> Environment: windows
> {code:java}
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val rocksdb = new RocksDBStateBackend("file:\\C:\\rocksdb\\checkpoint",true)
> rocksdb.setDbStoragePath("file:\\C:\\rocksdb\\storage")
> env.setStateBackend(rocksdb)
> env.enableCheckpointing(10)
> {code}
>  
>Reporter: Amit
>Priority: Major
>  Labels: pull-request-available
> Attachments: 
> 0001-FLINK-10918-Fix-for-checkpoint-creation-on-windows-1.patch
>
>
> Facing error while enabling keyed state with RocksDBBackend with 
> checkpointing to a local windows directory
>  
> {code:java}
> Caused by: org.rocksdb.RocksDBException: Failed to create dir: 
> /c:/tmp/data/job_dbe01128760d4d5cb90809cd94c2a936_op_StreamMap_b5c8d46f3e7b141acf271f12622e752b__3_8__uuid_45c1f62b-a198-44f5-add5-7683079b03f8/chk-1.tmp:
>  Invalid argument
>     at org.rocksdb.Checkpoint.createCheckpoint(Native Method)
>     at org.rocksdb.Checkpoint.createCheckpoint(Checkpoint.java:51)
>     at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.takeSnapshot(RocksDBKeyedStateBackend.java:2549)
>     at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$IncrementalSnapshotStrategy.performSnapshot(RocksDBKeyedStateBackend.java:2008)
>     at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.snapshot(RocksDBKeyedStateBackend.java:498)
>     at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:406)
>     ... 13 more
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] imamitsehgal commented on a change in pull request #7153: FLINK-10918 Fix the checkpoint dir creation error on Window for keyed rocksdb state

2018-11-24 Thread GitBox
imamitsehgal commented on a change in pull request #7153: FLINK-10918 Fix the 
checkpoint dir creation error on Window for keyed rocksdb state
URL: https://github.com/apache/flink/pull/7153#discussion_r236042001
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ##
 @@ -575,7 +585,7 @@ private RocksDB openDB(
try {
dbRef = RocksDB.open(
Preconditions.checkNotNull(dbOptions),
-   Preconditions.checkNotNull(path),
+   
Preconditions.checkNotNull(Utils.getPathAsString(new Path(path))),
 
 Review comment:
   If Path is null or "" then new Path(path) will throw runtime 
IllegalArgumentException. The path returned by getPathAsString will be checked 
again to be non-null.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] imamitsehgal commented on a change in pull request #7153: FLINK-10918 Fix the checkpoint dir creation error on Window for keyed rocksdb state

2018-11-24 Thread GitBox
imamitsehgal commented on a change in pull request #7153: FLINK-10918 Fix the 
checkpoint dir creation error on Window for keyed rocksdb state
URL: https://github.com/apache/flink/pull/7153#discussion_r236041049
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ##
 @@ -557,6 +557,16 @@ private void createDB() throws IOException {
this.defaultColumnFamily = columnFamilyHandles.get(0);
}
 
+   private static class Utils {
+   static String getPathAsString(Path path) {
+   String pathString = path.getPath();
 
 Review comment:
   I tried that it kept failing the test cases. Event tried the toString method 
in Path class which removes the leading '/' even that fails the test cases


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10918) incremental Keyed state with RocksDB throws cannot create directory error in windows

2018-11-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16697781#comment-16697781
 ] 

ASF GitHub Bot commented on FLINK-10918:


imamitsehgal commented on a change in pull request #7153: FLINK-10918 Fix the 
checkpoint dir creation error on Window for keyed rocksdb state
URL: https://github.com/apache/flink/pull/7153#discussion_r236041049
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ##
 @@ -557,6 +557,16 @@ private void createDB() throws IOException {
this.defaultColumnFamily = columnFamilyHandles.get(0);
}
 
+   private static class Utils {
+   static String getPathAsString(Path path) {
+   String pathString = path.getPath();
 
 Review comment:
   I tried that it kept failing the test cases. Event tried the toString method 
in Path class which removes the leading '/' even that fails the test cases


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> incremental Keyed state with RocksDB throws cannot create directory error in 
> windows
> 
>
> Key: FLINK-10918
> URL: https://issues.apache.org/jira/browse/FLINK-10918
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State, State Backends, Checkpointing
>Affects Versions: 1.6.2
> Environment: windows
> {code:java}
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val rocksdb = new RocksDBStateBackend("file:\\C:\\rocksdb\\checkpoint",true)
> rocksdb.setDbStoragePath("file:\\C:\\rocksdb\\storage")
> env.setStateBackend(rocksdb)
> env.enableCheckpointing(10)
> {code}
>  
>Reporter: Amit
>Priority: Major
>  Labels: pull-request-available
> Attachments: 
> 0001-FLINK-10918-Fix-for-checkpoint-creation-on-windows-1.patch
>
>
> Facing error while enabling keyed state with RocksDBBackend with 
> checkpointing to a local windows directory
>  
> {code:java}
> Caused by: org.rocksdb.RocksDBException: Failed to create dir: 
> /c:/tmp/data/job_dbe01128760d4d5cb90809cd94c2a936_op_StreamMap_b5c8d46f3e7b141acf271f12622e752b__3_8__uuid_45c1f62b-a198-44f5-add5-7683079b03f8/chk-1.tmp:
>  Invalid argument
>     at org.rocksdb.Checkpoint.createCheckpoint(Native Method)
>     at org.rocksdb.Checkpoint.createCheckpoint(Checkpoint.java:51)
>     at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.takeSnapshot(RocksDBKeyedStateBackend.java:2549)
>     at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$IncrementalSnapshotStrategy.performSnapshot(RocksDBKeyedStateBackend.java:2008)
>     at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.snapshot(RocksDBKeyedStateBackend.java:498)
>     at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:406)
>     ... 13 more
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10999) Adding time multiple times causes Runtime : java.sql.Timestamp cannot be cast to java.lang.Long

2018-11-24 Thread ideal-hp (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16697745#comment-16697745
 ] 

ideal-hp commented on FLINK-10999:
--

Thank you for your help!
I updated the update in detail at https://github.com/harbby/sylph/issues/23

> Adding time multiple times causes Runtime : java.sql.Timestamp cannot be cast 
> to java.lang.Long
> ---
>
> Key: FLINK-10999
> URL: https://issues.apache.org/jira/browse/FLINK-10999
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.5.5, 1.6.2
>Reporter: ideal-hp
>Priority: Major
>
> Caused by: java.lang.ClassCastException: java.sql.Timestamp cannot be cast to 
> java.lang.Long
>  at 
> org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:27)
>  at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:95)
>  at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:46)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
>  at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
>  at 
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>  at 
> org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37)
>  at 
> org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28)
>  at DataStreamCalcRule$15.processElement(Unknown Source)
>  at 
> org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:66)
>  at 
> org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:35)
>  at 
> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10999) Adding time multiple times causes Runtime : java.sql.Timestamp cannot be cast to java.lang.Long

2018-11-24 Thread xuqianjin (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16697743#comment-16697743
 ] 

xuqianjin commented on FLINK-10999:
---

[~harbby] I understand your description. Thank you very much. I will try to 
track this problem.

> Adding time multiple times causes Runtime : java.sql.Timestamp cannot be cast 
> to java.lang.Long
> ---
>
> Key: FLINK-10999
> URL: https://issues.apache.org/jira/browse/FLINK-10999
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.5.5, 1.6.2
>Reporter: ideal-hp
>Priority: Major
>
> Caused by: java.lang.ClassCastException: java.sql.Timestamp cannot be cast to 
> java.lang.Long
>  at 
> org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:27)
>  at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:95)
>  at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:46)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
>  at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
>  at 
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>  at 
> org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37)
>  at 
> org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28)
>  at DataStreamCalcRule$15.processElement(Unknown Source)
>  at 
> org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:66)
>  at 
> org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:35)
>  at 
> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11001) Window rowtime attribute can't be renamed in Java

2018-11-24 Thread Hequn Cheng (JIRA)
Hequn Cheng created FLINK-11001:
---

 Summary: Window rowtime attribute can't be renamed in Java
 Key: FLINK-11001
 URL: https://issues.apache.org/jira/browse/FLINK-11001
 Project: Flink
  Issue Type: Bug
 Environment: 

Reporter: Hequn Cheng
Assignee: Hequn Cheng


Currently, we can rename window rowtime attribute like this in Scala:
{code:java}
table
  .window(Tumble over 2.millis on 'rowtime as 'w)
  .groupBy('w)
  .select('w.rowtime as 'rowtime, 'int.count as 'int)
{code}
However, an exception will be thrown if we use java(by changing the Expressions 
to String):
{code:java}
table
  .window(Tumble over 2.millis on 'rowtime as 'w)
  .groupBy('w)
  .select("w.rowtime as rowtime, int.count as int")
{code}
The Exception is:
{code:java}
org.apache.flink.table.api.ExpressionParserException: Could not parse 
expression at column 11: `,' expected but `a' found
w.rowtime as rowtime, int.count as int
{code}
 

To solve the problem, we can add rename support in {{ExpressionParser}}. 
However, this may conflict with the design of source which use as before 
rowtime:
{code:java}
stream.toTable(
  tEnv,
  ExpressionParser.parseExpressionList("(b as b).rowtime, c as c, a as a"): 
_*)
{code}
Personally, I think we should keep the two consistent, so the final api would 
be:
{code:java}
// window case
.select("w.rowtime as rowtime, int.count as int")

// source case
stream.toTable(
  tEnv,
  ExpressionParser.parseExpressionList("b.rowtime as b, c as c, a as a"): 
_*)
{code}
Any suggestions would be greatly appreciated!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10999) Adding time multiple times causes Runtime : java.sql.Timestamp cannot be cast to java.lang.Long

2018-11-24 Thread ideal-hp (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16697730#comment-16697730
 ] 

ideal-hp commented on FLINK-10999:
--

[~x1q1j1]: Hi, I am very sorry, my expression is not clear.
I re-describe: When `tableEnv.registerDataStream` is used multiple times, 
specifying `proctime.proctime` or `rowtime.rowtime` will cause this error.

I have a more detailed description and reproduction here.

see: https://github.com/harbby/sylph/issues/23

> Adding time multiple times causes Runtime : java.sql.Timestamp cannot be cast 
> to java.lang.Long
> ---
>
> Key: FLINK-10999
> URL: https://issues.apache.org/jira/browse/FLINK-10999
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.5.5, 1.6.2
>Reporter: ideal-hp
>Priority: Major
>
> Caused by: java.lang.ClassCastException: java.sql.Timestamp cannot be cast to 
> java.lang.Long
>  at 
> org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:27)
>  at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:95)
>  at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:46)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
>  at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
>  at 
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>  at 
> org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37)
>  at 
> org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28)
>  at DataStreamCalcRule$15.processElement(Unknown Source)
>  at 
> org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:66)
>  at 
> org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:35)
>  at 
> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10999) Adding time multiple times causes Runtime : java.sql.Timestamp cannot be cast to java.lang.Long

2018-11-24 Thread xuqianjin (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16697701#comment-16697701
 ] 

xuqianjin commented on FLINK-10999:
---

[~harbby]It is not easy to understand your question from your description and 
log.

> Adding time multiple times causes Runtime : java.sql.Timestamp cannot be cast 
> to java.lang.Long
> ---
>
> Key: FLINK-10999
> URL: https://issues.apache.org/jira/browse/FLINK-10999
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.5.5, 1.6.2
>Reporter: ideal-hp
>Priority: Major
>
> Caused by: java.lang.ClassCastException: java.sql.Timestamp cannot be cast to 
> java.lang.Long
>  at 
> org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:27)
>  at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:95)
>  at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:46)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
>  at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
>  at 
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>  at 
> org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37)
>  at 
> org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28)
>  at DataStreamCalcRule$15.processElement(Unknown Source)
>  at 
> org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:66)
>  at 
> org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:35)
>  at 
> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)