[jira] [Commented] (FLINK-9325) generate the _meta file for checkpoint only when the writing is truly successful
[ https://issues.apache.org/jira/browse/FLINK-9325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517821#comment-16517821 ] ASF GitHub Bot commented on FLINK-9325: --- Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/5982 Could anybody have a look at this? > generate the _meta file for checkpoint only when the writing is truly > successful > > > Key: FLINK-9325 > URL: https://issues.apache.org/jira/browse/FLINK-9325 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > > We should generate the _meta file for checkpoint only when the writing is > totally successful. We should write the metadata file first to a temp file > and then atomically rename it (with an equivalent workaround for S3). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...
Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/5982 Could anybody have a look at this? ---
[GitHub] flink issue #6185: [FLINK-9619][YARN] Eagerly close the connection with task...
Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/6185 CC @tillrohrmann ---
[jira] [Commented] (FLINK-9619) Always close the task manager connection when the container is completed in YarnResourceManager
[ https://issues.apache.org/jira/browse/FLINK-9619?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517794#comment-16517794 ] ASF GitHub Bot commented on FLINK-9619: --- GitHub user sihuazhou opened a pull request: https://github.com/apache/flink/pull/6185 [FLINK-9619][YARN] Eagerly close the connection with task manager when the container is completed ## What is the purpose of the change *We should always eagerly close the connection with task manager when the container is completed.* ## Brief change log - *Eagerly close the connection with task manager when the container is completed* ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - No You can merge this pull request into a Git repository by running: $ git pull https://github.com/sihuazhou/flink FLINK-9619 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6185.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6185 commit a667ea120b0c2519ce45e5919c1377e897897d17 Author: sihuazhou Date: 2018-06-20T04:41:05Z Eagerly close the connection with task manager when the container is completed. > Always close the task manager connection when the container is completed in > YarnResourceManager > --- > > Key: FLINK-9619 > URL: https://issues.apache.org/jira/browse/FLINK-9619 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.6.0, 1.5.1 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Critical > Fix For: 1.6.0, 1.5.1 > > > We should always eagerly close the connection with task manager when the > container is completed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9619) Always close the task manager connection when the container is completed in YarnResourceManager
[ https://issues.apache.org/jira/browse/FLINK-9619?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517795#comment-16517795 ] ASF GitHub Bot commented on FLINK-9619: --- Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/6185 CC @tillrohrmann > Always close the task manager connection when the container is completed in > YarnResourceManager > --- > > Key: FLINK-9619 > URL: https://issues.apache.org/jira/browse/FLINK-9619 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.6.0, 1.5.1 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Critical > Fix For: 1.6.0, 1.5.1 > > > We should always eagerly close the connection with task manager when the > container is completed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6185: [FLINK-9619][YARN] Eagerly close the connection wi...
GitHub user sihuazhou opened a pull request: https://github.com/apache/flink/pull/6185 [FLINK-9619][YARN] Eagerly close the connection with task manager when the container is completed ## What is the purpose of the change *We should always eagerly close the connection with task manager when the container is completed.* ## Brief change log - *Eagerly close the connection with task manager when the container is completed* ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - No You can merge this pull request into a Git repository by running: $ git pull https://github.com/sihuazhou/flink FLINK-9619 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6185.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6185 commit a667ea120b0c2519ce45e5919c1377e897897d17 Author: sihuazhou Date: 2018-06-20T04:41:05Z Eagerly close the connection with task manager when the container is completed. ---
[jira] [Created] (FLINK-9619) Always close the task manager connection when the container is completed in YarnResourceManager
Sihua Zhou created FLINK-9619: - Summary: Always close the task manager connection when the container is completed in YarnResourceManager Key: FLINK-9619 URL: https://issues.apache.org/jira/browse/FLINK-9619 Project: Flink Issue Type: Bug Components: YARN Affects Versions: 1.6.0, 1.5.1 Reporter: Sihua Zhou Assignee: Sihua Zhou Fix For: 1.6.0, 1.5.1 We should always eagerly close the connection with task manager when the container is completed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9417) Send heartbeat requests from RPC endpoint's main thread
[ https://issues.apache.org/jira/browse/FLINK-9417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517790#comment-16517790 ] Sihua Zhou commented on FLINK-9417: --- Hi [~till.rohrmann] One thing come to my mind, If we send heartbeat requests from RPC's main thread, then should we also do a checking for the HEARTBEAT_INTERVAL with a sanity min value(currently it only need to greater than 0)? If the user configure a very small value e.g 10, then the resource manager and the job master will be kept always very busy just for sending the heartbeat. > Send heartbeat requests from RPC endpoint's main thread > --- > > Key: FLINK-9417 > URL: https://issues.apache.org/jira/browse/FLINK-9417 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0, 1.6.0 >Reporter: Till Rohrmann >Assignee: Sihua Zhou >Priority: Major > > Currently, we use the {{RpcService#scheduledExecutor}} to send heartbeat > requests to remote targets. This has the problem that we still see heartbeats > from this endpoint also if its main thread is currently blocked. Due to this, > the heartbeat response cannot be processed and the remote target times out. > On the remote side, this won't be noticed because it still receives the > heartbeat requests. > A solution to this problem would be to send the heartbeat requests to the > remote thread through the RPC endpoint's main thread. That way, also the > heartbeats would be blocked if the main thread is blocked/busy. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9187) add prometheus pushgateway reporter
[ https://issues.apache.org/jira/browse/FLINK-9187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517789#comment-16517789 ] ASF GitHub Bot commented on FLINK-9187: --- Github user lamber-ken commented on the issue: https://github.com/apache/flink/pull/6184 Please click here for details [old-flink-9187](https://github.com/apache/flink/pull/5857) > add prometheus pushgateway reporter > --- > > Key: FLINK-9187 > URL: https://issues.apache.org/jira/browse/FLINK-9187 > Project: Flink > Issue Type: New Feature > Components: Metrics >Affects Versions: 1.4.2 >Reporter: lamber-ken >Priority: Minor > Labels: features > Fix For: 1.6.0 > > > make flink system can send metrics to prometheus via pushgateway. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6184: [FLINK-9187][METRICS] add prometheus pushgateway reporter
Github user lamber-ken commented on the issue: https://github.com/apache/flink/pull/6184 Please click here for details [old-flink-9187](https://github.com/apache/flink/pull/5857) ---
[jira] [Commented] (FLINK-9187) add prometheus pushgateway reporter
[ https://issues.apache.org/jira/browse/FLINK-9187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517788#comment-16517788 ] ASF GitHub Bot commented on FLINK-9187: --- Github user lamber-ken closed the pull request at: https://github.com/apache/flink/pull/5857 > add prometheus pushgateway reporter > --- > > Key: FLINK-9187 > URL: https://issues.apache.org/jira/browse/FLINK-9187 > Project: Flink > Issue Type: New Feature > Components: Metrics >Affects Versions: 1.4.2 >Reporter: lamber-ken >Priority: Minor > Labels: features > Fix For: 1.6.0 > > > make flink system can send metrics to prometheus via pushgateway. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5857: [FLINK-9187][METRICS] add prometheus pushgateway r...
Github user lamber-ken closed the pull request at: https://github.com/apache/flink/pull/5857 ---
[GitHub] flink pull request #6184: add prometheus pushgateway reporter
GitHub user lamber-ken opened a pull request: https://github.com/apache/flink/pull/6184 add prometheus pushgateway reporter ## What is the purpose of the change This pull request makes flink system can send metrics to prometheus via pushgateway. it may be useful. ## Brief change log - Add prometheus pushgateway repoter - Restructure the code of the promethues reporter part ## Verifying this change This change is already covered by existing tests. [prometheus test](https://github.com/apache/flink/tree/master/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus) ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (JavaDocs) You can merge this pull request into a Git repository by running: $ git pull https://github.com/lamber-ken/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6184.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6184 commit 1a8e1f6193823e70b1dc6abc1146299042c25c7d Author: lamber-ken Date: 2018-06-20T04:26:10Z add prometheus pushgateway reporter ---
[jira] [Commented] (FLINK-9187) add prometheus pushgateway reporter
[ https://issues.apache.org/jira/browse/FLINK-9187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517760#comment-16517760 ] ASF GitHub Bot commented on FLINK-9187: --- Github user lamber-ken commented on the issue: https://github.com/apache/flink/pull/5857 sorry, I reforked `flink` project, do I need to start a new PR? ![image](https://user-images.githubusercontent.com/20113411/41635822-76271486-747d-11e8-9ad3-c6447c1b930c.png) > add prometheus pushgateway reporter > --- > > Key: FLINK-9187 > URL: https://issues.apache.org/jira/browse/FLINK-9187 > Project: Flink > Issue Type: New Feature > Components: Metrics >Affects Versions: 1.4.2 >Reporter: lamber-ken >Priority: Minor > Labels: features > Fix For: 1.6.0 > > > make flink system can send metrics to prometheus via pushgateway. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5857: [FLINK-9187][METRICS] add prometheus pushgateway reporter
Github user lamber-ken commented on the issue: https://github.com/apache/flink/pull/5857 sorry, I reforked `flink` project, do I need to start a new PR? ![image](https://user-images.githubusercontent.com/20113411/41635822-76271486-747d-11e8-9ad3-c6447c1b930c.png) ---
[jira] [Commented] (FLINK-9588) Reuse the same conditionContext with in a same computationState
[ https://issues.apache.org/jira/browse/FLINK-9588?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517732#comment-16517732 ] ASF GitHub Bot commented on FLINK-9588: --- Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/6168 Is it ok now ? @dawidwys > Reuse the same conditionContext with in a same computationState > --- > > Key: FLINK-9588 > URL: https://issues.apache.org/jira/browse/FLINK-9588 > Project: Flink > Issue Type: Improvement > Components: CEP >Reporter: aitozi >Assignee: aitozi >Priority: Major > > Now cep checkFilterCondition with a newly created Conditioncontext for each > edge, which will result in the repeatable getEventsForPattern because of the > different Conditioncontext Object. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6168: [FLINK-9588][CEP]Reused context with same computation sta...
Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/6168 Is it ok now ? @dawidwys ---
[jira] [Commented] (FLINK-9380) Failing end-to-end tests should not clean up logs
[ https://issues.apache.org/jira/browse/FLINK-9380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517718#comment-16517718 ] Deepak Sharma commented on FLINK-9380: -- Hi [~till.rohrmann], just a friendly reminder of this Jira :) > Failing end-to-end tests should not clean up logs > - > > Key: FLINK-9380 > URL: https://issues.apache.org/jira/browse/FLINK-9380 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.5.0, 1.6.0 >Reporter: Till Rohrmann >Assignee: Deepak Sharma >Priority: Critical > Labels: test-stability > Fix For: 1.6.0, 1.5.1 > > > Some of the end-to-end tests clean up their logs also in the failure case. > This makes debugging and understanding the problem extremely difficult. > Ideally, the scripts says where it stored the respective logs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9563) Migrate integration tests for CEP
[ https://issues.apache.org/jira/browse/FLINK-9563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517717#comment-16517717 ] ASF GitHub Bot commented on FLINK-9563: --- Github user deepaks4077 commented on the issue: https://github.com/apache/flink/pull/6170 Hi @zentol, just a friendly reminder about this pull request :) > Migrate integration tests for CEP > - > > Key: FLINK-9563 > URL: https://issues.apache.org/jira/browse/FLINK-9563 > Project: Flink > Issue Type: Sub-task >Reporter: Deepak Sharma >Assignee: Deepak Sharma >Priority: Minor > > Covers all integration tests under > apache-flink/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6170: [FLINK-9563]: Using a custom sink function for tests in C...
Github user deepaks4077 commented on the issue: https://github.com/apache/flink/pull/6170 Hi @zentol, just a friendly reminder about this pull request :) ---
[jira] [Commented] (FLINK-9614) Improve the error message for Compiler#compile
[ https://issues.apache.org/jira/browse/FLINK-9614?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517679#comment-16517679 ] mingleizhang commented on FLINK-9614: - I close this since we catch a throwable. stackoverflowerror is acceptable. > Improve the error message for Compiler#compile > -- > > Key: FLINK-9614 > URL: https://issues.apache.org/jira/browse/FLINK-9614 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Reporter: mingleizhang >Assignee: mingleizhang >Priority: Major > > When the below sql has too long. Like > case when case when . > when host in > ('114.67.56.94','114.67.56.102','114.67.56.103','114.67.56.106','114.67.56.107','183.60.220.231','183.60.220.232','183.60.219.247','114.67.56.94','114.67.56.102','114.67.56.103','114.67.56.106','114.67.56.107','183.60.220.231','183.60.220.232','183.60.219.247','114.67.56.94','114.67.56.102','114.67.56.103','114.67.56.106','114.67.56.107','183.60.220.231','183.60.220.232','183.60.219.247') > then 'condition' > Then cause the {{StackOverflowError}}. And the current code is, but I would > suggest prompt users add the {{-Xss 20m}} to solve this, instead of {{This is > a bug..}} > {code:java} > trait Compiler[T] { > @throws(classOf[CompileException]) > def compile(cl: ClassLoader, name: String, code: String): Class[T] = { > require(cl != null, "Classloader must not be null.") > val compiler = new SimpleCompiler() > compiler.setParentClassLoader(cl) > try { > compiler.cook(code) > } catch { > case t: Throwable => > throw new InvalidProgramException("Table program cannot be compiled. > " + > "This is a bug. Please file an issue.", t) > } > compiler.getClassLoader.loadClass(name).asInstanceOf[Class[T]] > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-9614) Improve the error message for Compiler#compile
[ https://issues.apache.org/jira/browse/FLINK-9614?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] mingleizhang closed FLINK-9614. --- > Improve the error message for Compiler#compile > -- > > Key: FLINK-9614 > URL: https://issues.apache.org/jira/browse/FLINK-9614 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Reporter: mingleizhang >Assignee: mingleizhang >Priority: Major > > When the below sql has too long. Like > case when case when . > when host in > ('114.67.56.94','114.67.56.102','114.67.56.103','114.67.56.106','114.67.56.107','183.60.220.231','183.60.220.232','183.60.219.247','114.67.56.94','114.67.56.102','114.67.56.103','114.67.56.106','114.67.56.107','183.60.220.231','183.60.220.232','183.60.219.247','114.67.56.94','114.67.56.102','114.67.56.103','114.67.56.106','114.67.56.107','183.60.220.231','183.60.220.232','183.60.219.247') > then 'condition' > Then cause the {{StackOverflowError}}. And the current code is, but I would > suggest prompt users add the {{-Xss 20m}} to solve this, instead of {{This is > a bug..}} > {code:java} > trait Compiler[T] { > @throws(classOf[CompileException]) > def compile(cl: ClassLoader, name: String, code: String): Class[T] = { > require(cl != null, "Classloader must not be null.") > val compiler = new SimpleCompiler() > compiler.setParentClassLoader(cl) > try { > compiler.cook(code) > } catch { > case t: Throwable => > throw new InvalidProgramException("Table program cannot be compiled. > " + > "This is a bug. Please file an issue.", t) > } > compiler.getClassLoader.loadClass(name).asInstanceOf[Class[T]] > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-9614) Improve the error message for Compiler#compile
[ https://issues.apache.org/jira/browse/FLINK-9614?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] mingleizhang resolved FLINK-9614. - Resolution: Not A Problem > Improve the error message for Compiler#compile > -- > > Key: FLINK-9614 > URL: https://issues.apache.org/jira/browse/FLINK-9614 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Reporter: mingleizhang >Assignee: mingleizhang >Priority: Major > > When the below sql has too long. Like > case when case when . > when host in > ('114.67.56.94','114.67.56.102','114.67.56.103','114.67.56.106','114.67.56.107','183.60.220.231','183.60.220.232','183.60.219.247','114.67.56.94','114.67.56.102','114.67.56.103','114.67.56.106','114.67.56.107','183.60.220.231','183.60.220.232','183.60.219.247','114.67.56.94','114.67.56.102','114.67.56.103','114.67.56.106','114.67.56.107','183.60.220.231','183.60.220.232','183.60.219.247') > then 'condition' > Then cause the {{StackOverflowError}}. And the current code is, but I would > suggest prompt users add the {{-Xss 20m}} to solve this, instead of {{This is > a bug..}} > {code:java} > trait Compiler[T] { > @throws(classOf[CompileException]) > def compile(cl: ClassLoader, name: String, code: String): Class[T] = { > require(cl != null, "Classloader must not be null.") > val compiler = new SimpleCompiler() > compiler.setParentClassLoader(cl) > try { > compiler.cook(code) > } catch { > case t: Throwable => > throw new InvalidProgramException("Table program cannot be compiled. > " + > "This is a bug. Please file an issue.", t) > } > compiler.getClassLoader.loadClass(name).asInstanceOf[Class[T]] > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9618) NullPointerException in FlinkKinesisProducer when aws.region is not set and aws.endpoint is set
[ https://issues.apache.org/jira/browse/FLINK-9618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517645#comment-16517645 ] Aaron Langford commented on FLINK-9618: --- I would be happy to take the work to fix this. > NullPointerException in FlinkKinesisProducer when aws.region is not set and > aws.endpoint is set > --- > > Key: FLINK-9618 > URL: https://issues.apache.org/jira/browse/FLINK-9618 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.5.0 > Environment: N/A >Reporter: Aaron Langford >Priority: Minor > Original Estimate: 3h > Remaining Estimate: 3h > > This problem arose while trying to write to a local kinesalite instance. > Specifying the aws.region and the aws.endpoint is not allowed. However when > the aws.region is not present, a NullPointer exception is thrown. > Here is some example Scala code: > {code:java} > /** > * > * @param region the AWS region the stream lives in > * @param streamName the stream to write records to > * @param endpoint if in local dev, this points to a kinesalite instance > * @return > */ > def getSink(region: String, > streamName: String, > endpoint: Option[String]): > FlinkKinesisProducer[ProcessedMobilePageView] = { > val props = new Properties() > props.put(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "AUTO") > endpoint match { > case Some(uri) => props.put(AWSConfigConstants.AWS_ENDPOINT, uri) > case None => props.put(AWSConfigConstants.AWS_REGION, region) > } > val producer = new FlinkKinesisProducer[ProcessedMobilePageView]( > new JsonSerializer[ProcessedMobilePageView](DefaultSerializationBuilder), > props > ) > producer.setDefaultStream(streamName) > producer > } > {code} > To produce the NullPointerException, pass in `Some("localhost:4567")` for > endpoint. > The source of the error is found at > org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil.java, on > line 194. This line should perform some kind of check if aws.endpoint is > present before grabbing it from the Properties object. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9618) NullPointerException in FlinkKinesisProducer when aws.region is not set and aws.endpoint is set
Aaron Langford created FLINK-9618: - Summary: NullPointerException in FlinkKinesisProducer when aws.region is not set and aws.endpoint is set Key: FLINK-9618 URL: https://issues.apache.org/jira/browse/FLINK-9618 Project: Flink Issue Type: Bug Components: Kinesis Connector Affects Versions: 1.5.0 Environment: N/A Reporter: Aaron Langford This problem arose while trying to write to a local kinesalite instance. Specifying the aws.region and the aws.endpoint is not allowed. However when the aws.region is not present, a NullPointer exception is thrown. Here is some example Scala code: {code:java} /** * * @param region the AWS region the stream lives in * @param streamName the stream to write records to * @param endpoint if in local dev, this points to a kinesalite instance * @return */ def getSink(region: String, streamName: String, endpoint: Option[String]): FlinkKinesisProducer[ProcessedMobilePageView] = { val props = new Properties() props.put(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "AUTO") endpoint match { case Some(uri) => props.put(AWSConfigConstants.AWS_ENDPOINT, uri) case None => props.put(AWSConfigConstants.AWS_REGION, region) } val producer = new FlinkKinesisProducer[ProcessedMobilePageView]( new JsonSerializer[ProcessedMobilePageView](DefaultSerializationBuilder), props ) producer.setDefaultStream(streamName) producer } {code} To produce the NullPointerException, pass in `Some("localhost:4567")` for endpoint. The source of the error is found at org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil.java, on line 194. This line should perform some kind of check if aws.endpoint is present before grabbing it from the Properties object. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9611) Allow for user-defined artifacts to be specified as part of a mesos overlay
[ https://issues.apache.org/jira/browse/FLINK-9611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517528#comment-16517528 ] Eron Wright commented on FLINK-9611: - I really like this idea, except for the proposed name because it has a connotation of overlaying the 'user' (which is sort of what happens with the Kerberos overlay). Maybe 'MesosCustomOverlay'. > Allow for user-defined artifacts to be specified as part of a mesos overlay > --- > > Key: FLINK-9611 > URL: https://issues.apache.org/jira/browse/FLINK-9611 > Project: Flink > Issue Type: Improvement > Components: Configuration, Docker, Mesos >Affects Versions: 1.5.0 >Reporter: Addison Higham >Priority: Major > > NOTE: this assumes mesos, but this improvement could also be useful for > future container deployments. > Currently, when deploying to mesos, the "Overlay" functionality is used to > determine which artifacts are to be downloaded into the container. However, > there isn't a way to plug in your own artifacts to be downloaded into the > container. This can cause problems with certain deployment models. > For example, if you are running flink in docker on mesos, you cannot easily > use a private docker image. Typically with mesos and private docker images, > you specify credentials as a URI to be downloaded into the container that > give permissions to download the private image. Typically, this credentials > expire after a few days, so baking them into a docker host isn't a solution. > It would make sense to add a `MesosUserOverlay` that would simplify take some > new configuration parameters and add any custom artifacts (or possibly also > environment variables?) > Another solution (or longer term solution) might be to allow for dynamically > loading an overlay class for even further customization of the container > specification. > > > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9612) Add option for minimal artifacts being pulled in Mesos
[ https://issues.apache.org/jira/browse/FLINK-9612?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517525#comment-16517525 ] Eron Wright commented on FLINK-9612: - Yes it makes sense that the overlays would be selective and configurable, and especially true that the Flink binaries aren't needed in most scenarios involving a docker image. Specifically on that, I wonder if the Flink conf directory should be treated differently from the bin/libs (perhaps as a different overlay), since the image might be 'stock'. > Add option for minimal artifacts being pulled in Mesos > -- > > Key: FLINK-9612 > URL: https://issues.apache.org/jira/browse/FLINK-9612 > Project: Flink > Issue Type: Improvement > Components: Configuration, Docker, Mesos >Reporter: Addison Higham >Priority: Major > > NOTE: this assumes mesos, but this improvement could also be useful for > future container deployments. > Currently, in mesos, the FlinkDistributionOverlay copies the entire `conf`, > `bin`, and `lib` folders from the running JobManager/ResourceManager. When > using docker with a pre-installed flink distribution, this is relatively > inefficient as it pulls jars that are already baked into the container image. > A new option that disables pulling most (if not all?) of the > FlinkDistributionOverlay could allow for much faster and more scalable > provisions of TaskManagers. As it currently stands, trying to run a few > hundred TaskManagers is likely to result in poor performance in pulling all > the artifacts from the MesosArtifactServer -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517490#comment-16517490 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196559093 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java --- @@ -0,0 +1,471 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.configuration.WebOptions; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.FileUploads; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.EmptyResponseBody; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.RequestBody; +import org.apache.flink.runtime.rest.util.RestMapperUtils; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import okhttp3.MediaType; +import okhttp3.MultipartBody; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import javax.annotation.Nonnull; + +import java.io.File; +import java.io.IOException; +import java.io.StringWriter; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CompletableFuture; + +import static java.util.Objects.requireNonNull; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for the {@link FileUploadHandler}. Ensures that multipart http messages containing files and/or json are properly + * handled. + */ +public class FileUploadHandlerTest { --- End diff -- Missing `extends TestLogger` > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major >
[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196556101 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/FileUploadsTest.java --- @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Collection; +import java.util.Collections; + +/** + * Tests for {@link FileUploads}. + */ +public class FileUploadsTest { --- End diff -- `extends TestLogger` is missing ---
[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196453980 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java --- @@ -129,4 +137,9 @@ public R getRequestBody() { return queryParameter.getValue(); } } + + @Nonnull + public FileUploads getFileUploads() { + return uploadedFiles; + } --- End diff -- I would not expose `FileUploads` to the user but rather return a `Collection`. ---
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517480#comment-16517480 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196556101 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/FileUploadsTest.java --- @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Collection; +import java.util.Collections; + +/** + * Tests for {@link FileUploads}. + */ +public class FileUploadsTest { --- End diff -- `extends TestLogger` is missing > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196559335 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java --- @@ -0,0 +1,471 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.configuration.WebOptions; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.FileUploads; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.EmptyResponseBody; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.RequestBody; +import org.apache.flink.runtime.rest.util.RestMapperUtils; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import okhttp3.MediaType; +import okhttp3.MultipartBody; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import javax.annotation.Nonnull; + +import java.io.File; +import java.io.IOException; +import java.io.StringWriter; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CompletableFuture; + +import static java.util.Objects.requireNonNull; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for the {@link FileUploadHandler}. Ensures that multipart http messages containing files and/or json are properly + * handled. + */ +public class FileUploadHandlerTest { + + private static final ObjectMapper OBJECT_MAPPER = RestMapperUtils.getStrictObjectMapper(); + private static final Random RANDOM = new Random(); + + @ClassRule + public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + private static RestServerEndpoint serverEndpoint; + private static String serverAddress; + + private static MultipartMixedHandler mixedHandler; + private static MultipartJsonHandler jsonHandler; + private static MultipartFileHandler fileHandler; + private static File file1; + private static File file2; + + @BeforeClass + public static void setup() throws Exception { +
[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196558949 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageHeaders.java --- @@ -63,4 +63,13 @@ * @return description for the header */ String getDescription(); + + /** +* Returns whether this header allows file uploads. +* +* @return whether this header allows file uploads +*/ + default boolean acceptsFileUploads() { + return false; + } --- End diff -- Should this maybe go into `UntypedResponseMessageHeaders`? At the moment one can upload files for a `AbstractHandler` (e.g. `AbstractTaskManagerFileHandler`) implementation and also has access to it via the `HandlerRequest` without being able to specify whether file upload is allowed or not. ---
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517479#comment-16517479 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196452418 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java --- @@ -116,5 +136,16 @@ private void reset() { currentHttpPostRequestDecoder.destroy(); currentHttpPostRequestDecoder = null; currentHttpRequest = null; + currentUploadDir = null; + currentJsonPayload = null; + } + + public static Optional getMultipartJsonPayload(ChannelHandlerContext ctx) { + return Optional.ofNullable(ctx.channel().attr(UPLOADED_JSON).get()); + } --- End diff -- By sending the json payload down stream, we could avoid having this method. > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517492#comment-16517492 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196557260 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java --- @@ -39,15 +41,21 @@ public class HandlerRequest { private final R requestBody; + private final FileUploads uploadedFiles; --- End diff -- This could also be a `Collection` > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196559093 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java --- @@ -0,0 +1,471 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.configuration.WebOptions; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.FileUploads; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.EmptyResponseBody; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.RequestBody; +import org.apache.flink.runtime.rest.util.RestMapperUtils; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import okhttp3.MediaType; +import okhttp3.MultipartBody; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import javax.annotation.Nonnull; + +import java.io.File; +import java.io.IOException; +import java.io.StringWriter; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CompletableFuture; + +import static java.util.Objects.requireNonNull; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for the {@link FileUploadHandler}. Ensures that multipart http messages containing files and/or json are properly + * handled. + */ +public class FileUploadHandlerTest { --- End diff -- Missing `extends TestLogger` ---
[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196554025 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java --- @@ -103,77 +104,74 @@ protected void respondAsLeader(ChannelHandlerContext ctx, RoutedRequest routedRe return; } - ByteBuf msgContent = ((FullHttpRequest) httpRequest).content(); + final ByteBuf msgContent; + Optional multipartJsonPayload = FileUploadHandler.getMultipartJsonPayload(ctx); + if (multipartJsonPayload.isPresent()) { + msgContent = Unpooled.wrappedBuffer(multipartJsonPayload.get()); --- End diff -- Let's send the Json payload as a proper `HttpRequest`, then we don't have this special casing here. ---
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517481#comment-16517481 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196452583 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java --- @@ -116,5 +136,16 @@ private void reset() { currentHttpPostRequestDecoder.destroy(); currentHttpPostRequestDecoder = null; currentHttpRequest = null; + currentUploadDir = null; + currentJsonPayload = null; + } + + public static Optional getMultipartJsonPayload(ChannelHandlerContext ctx) { + return Optional.ofNullable(ctx.channel().attr(UPLOADED_JSON).get()); + } + + public static FileUploads getMultipartFileUploads(ChannelHandlerContext ctx) { + return Optional.ofNullable(ctx.channel().attr(UPLOADED_FILES).get()) + .orElse(FileUploads.EMPTY); --- End diff -- I would suggest to simply return the upload directory. > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517477#comment-16517477 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196452755 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java --- @@ -95,14 +107,22 @@ protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject ms final DiskFileUpload fileUpload = (DiskFileUpload) data; checkState(fileUpload.isCompleted()); - final Path dest = uploadDir.resolve(Paths.get(UUID.randomUUID() + - "_" + fileUpload.getFilename())); + final Path dest = currentUploadDir.resolve(fileUpload.getFilename()); fileUpload.renameTo(dest.toFile()); - ctx.channel().attr(UPLOADED_FILE).set(dest); + } else if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute) { + final Attribute request = (Attribute) data; + // this could also be implemented by using the first found Attribute as the payload + if (data.getName().equals(HTTP_ATTRIBUTE_REQUEST)) { + currentJsonPayload = request.get(); + } else { + LOG.warn("Received unknown attribute {}, will be ignored.", data.getName()); --- End diff -- Should we rather fail? > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196557260 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java --- @@ -39,15 +41,21 @@ public class HandlerRequest { private final R requestBody; + private final FileUploads uploadedFiles; --- End diff -- This could also be a `Collection` ---
[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196452418 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java --- @@ -116,5 +136,16 @@ private void reset() { currentHttpPostRequestDecoder.destroy(); currentHttpPostRequestDecoder = null; currentHttpRequest = null; + currentUploadDir = null; + currentJsonPayload = null; + } + + public static Optional getMultipartJsonPayload(ChannelHandlerContext ctx) { + return Optional.ofNullable(ctx.channel().attr(UPLOADED_JSON).get()); + } --- End diff -- By sending the json payload down stream, we could avoid having this method. ---
[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196560163 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java --- @@ -0,0 +1,471 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.configuration.WebOptions; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.FileUploads; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.EmptyResponseBody; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.RequestBody; +import org.apache.flink.runtime.rest.util.RestMapperUtils; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import okhttp3.MediaType; +import okhttp3.MultipartBody; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import javax.annotation.Nonnull; + +import java.io.File; +import java.io.IOException; +import java.io.StringWriter; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CompletableFuture; + +import static java.util.Objects.requireNonNull; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for the {@link FileUploadHandler}. Ensures that multipart http messages containing files and/or json are properly + * handled. + */ +public class FileUploadHandlerTest { + + private static final ObjectMapper OBJECT_MAPPER = RestMapperUtils.getStrictObjectMapper(); + private static final Random RANDOM = new Random(); + + @ClassRule + public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + private static RestServerEndpoint serverEndpoint; + private static String serverAddress; + + private static MultipartMixedHandler mixedHandler; + private static MultipartJsonHandler jsonHandler; + private static MultipartFileHandler fileHandler; + private static File file1; + private static File file2; + + @BeforeClass + public static void setup() throws Exception { +
[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196555487 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java --- @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler; + +import org.apache.flink.util.Preconditions; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; + +/** + * A container for uploaded files. + * + * Implementation note: The constructor also accepts directories to ensure that the upload directories are cleaned up. + * For convenience during testing it also accepts files directly. + */ +public final class FileUploads implements AutoCloseable { + private final Collection directoriesToClean; + private final Collection uploadedFiles; + + @SuppressWarnings("resource") + public static final FileUploads EMPTY = new FileUploads(); + + private FileUploads() { + this.directoriesToClean = Collections.emptyList(); + this.uploadedFiles = Collections.emptyList(); + } + + public FileUploads(Collection uploadedFilesOrDirectory) throws IOException { + final Collection files = new ArrayList<>(4); + final Collection directories = new ArrayList<>(1); + for (Path fileOrDirectory : uploadedFilesOrDirectory) { + Preconditions.checkArgument(fileOrDirectory.isAbsolute(), "Path must be absolute."); + if (Files.isDirectory(fileOrDirectory)) { + directories.add(fileOrDirectory); + FileAdderVisitor visitor = new FileAdderVisitor(); + Files.walkFileTree(fileOrDirectory, visitor); + files.addAll(visitor.get()); + } else { + files.add(fileOrDirectory); + } + } + directoriesToClean = Collections.unmodifiableCollection(directories); + uploadedFiles = Collections.unmodifiableCollection(files); + } + + public Collection getUploadedFiles() { + return uploadedFiles; + } + + @Override + public void close() throws IOException { + for (Path file : uploadedFiles) { + try { + Files.delete(file); + } catch (FileNotFoundException ignored) { + // file may have been moved by a handler + } + } + for (Path directory : directoriesToClean) { + Files.walkFileTree(directory, CleanupFileVisitor.get()); + } + } + + private static final class FileAdderVisitor extends SimpleFileVisitor { + + private final Collection files = new ArrayList<>(4); + + Collection get() { + return files; + } + + FileAdderVisitor() { + } + + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { + FileVisitResult result = super.visitFile(file, attrs); + files.add(file); + return result; + } + } + + private static final class CleanupFileVisitor extends SimpleFileVisitor { --- End diff -- I think it would be better to make this an enum. Then we get all singleton properties for free. ---
[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196455211 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java --- @@ -103,77 +104,74 @@ protected void respondAsLeader(ChannelHandlerContext ctx, RoutedRequest routedRe return; } - ByteBuf msgContent = ((FullHttpRequest) httpRequest).content(); + final ByteBuf msgContent; + Optional multipartJsonPayload = FileUploadHandler.getMultipartJsonPayload(ctx); + if (multipartJsonPayload.isPresent()) { + msgContent = Unpooled.wrappedBuffer(multipartJsonPayload.get()); + } else { + msgContent = ((FullHttpRequest) httpRequest).content(); + } - R request; - if (isFileUpload()) { - final Path path = ctx.channel().attr(FileUploadHandler.UPLOADED_FILE).get(); - if (path == null) { - HandlerUtils.sendErrorResponse( - ctx, - httpRequest, - new ErrorResponseBody("Client did not upload a file."), - HttpResponseStatus.BAD_REQUEST, - responseHeaders); - return; - } - //noinspection unchecked - request = (R) new FileUpload(path); - } else if (msgContent.capacity() == 0) { - try { - request = MAPPER.readValue("{}", untypedResponseMessageHeaders.getRequestClass()); - } catch (JsonParseException | JsonMappingException je) { - log.error("Request did not conform to expected format.", je); - HandlerUtils.sendErrorResponse( - ctx, - httpRequest, - new ErrorResponseBody("Bad request received."), - HttpResponseStatus.BAD_REQUEST, - responseHeaders); - return; + try (FileUploads uploadedFiles = FileUploadHandler.getMultipartFileUploads(ctx)) { --- End diff -- I would obtain the upload directory from `FileUploadHandler` and simply delete this directory after the call has been processed. We could, then also create `FileUploads` outside of the `FileUploadHandler` to instantiate a `HandlerRequest` with it. This would also simplify the `FileUploads` class significantly, because it is no longer responsible for deleting the files. ---
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517478#comment-16517478 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196453235 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java --- @@ -95,14 +107,22 @@ protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject ms final DiskFileUpload fileUpload = (DiskFileUpload) data; checkState(fileUpload.isCompleted()); - final Path dest = uploadDir.resolve(Paths.get(UUID.randomUUID() + - "_" + fileUpload.getFilename())); + final Path dest = currentUploadDir.resolve(fileUpload.getFilename()); fileUpload.renameTo(dest.toFile()); - ctx.channel().attr(UPLOADED_FILE).set(dest); + } else if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute) { + final Attribute request = (Attribute) data; + // this could also be implemented by using the first found Attribute as the payload + if (data.getName().equals(HTTP_ATTRIBUTE_REQUEST)) { + currentJsonPayload = request.get(); + } else { + LOG.warn("Received unknown attribute {}, will be ignored.", data.getName()); + } } } if (httpContent instanceof LastHttpContent) { + ctx.channel().attr(UPLOADED_FILES).set(new FileUploads(Collections.singleton(currentUploadDir))); --- End diff -- I would suggest to simply store the upload directory in the `UPLOAD_FILES` attribute. > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r19670 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java --- @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler; + +import org.apache.flink.util.Preconditions; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; + +/** + * A container for uploaded files. + * + * Implementation note: The constructor also accepts directories to ensure that the upload directories are cleaned up. + * For convenience during testing it also accepts files directly. + */ +public final class FileUploads implements AutoCloseable { + private final Collection directoriesToClean; + private final Collection uploadedFiles; + + @SuppressWarnings("resource") + public static final FileUploads EMPTY = new FileUploads(); + + private FileUploads() { + this.directoriesToClean = Collections.emptyList(); + this.uploadedFiles = Collections.emptyList(); + } + + public FileUploads(Collection uploadedFilesOrDirectory) throws IOException { + final Collection files = new ArrayList<>(4); + final Collection directories = new ArrayList<>(1); + for (Path fileOrDirectory : uploadedFilesOrDirectory) { + Preconditions.checkArgument(fileOrDirectory.isAbsolute(), "Path must be absolute."); + if (Files.isDirectory(fileOrDirectory)) { + directories.add(fileOrDirectory); + FileAdderVisitor visitor = new FileAdderVisitor(); + Files.walkFileTree(fileOrDirectory, visitor); + files.addAll(visitor.get()); + } else { + files.add(fileOrDirectory); + } + } + directoriesToClean = Collections.unmodifiableCollection(directories); + uploadedFiles = Collections.unmodifiableCollection(files); + } + + public Collection getUploadedFiles() { + return uploadedFiles; + } + + @Override + public void close() throws IOException { + for (Path file : uploadedFiles) { + try { + Files.delete(file); + } catch (FileNotFoundException ignored) { + // file may have been moved by a handler + } + } + for (Path directory : directoriesToClean) { + Files.walkFileTree(directory, CleanupFileVisitor.get()); + } + } + + private static final class FileAdderVisitor extends SimpleFileVisitor { + + private final Collection files = new ArrayList<>(4); + + Collection get() { --- End diff -- maybe more descriptive name than `get`. ---
[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196452755 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java --- @@ -95,14 +107,22 @@ protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject ms final DiskFileUpload fileUpload = (DiskFileUpload) data; checkState(fileUpload.isCompleted()); - final Path dest = uploadDir.resolve(Paths.get(UUID.randomUUID() + - "_" + fileUpload.getFilename())); + final Path dest = currentUploadDir.resolve(fileUpload.getFilename()); fileUpload.renameTo(dest.toFile()); - ctx.channel().attr(UPLOADED_FILE).set(dest); + } else if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute) { + final Attribute request = (Attribute) data; + // this could also be implemented by using the first found Attribute as the payload + if (data.getName().equals(HTTP_ATTRIBUTE_REQUEST)) { + currentJsonPayload = request.get(); + } else { + LOG.warn("Received unknown attribute {}, will be ignored.", data.getName()); --- End diff -- Should we rather fail? ---
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517486#comment-16517486 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196555487 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java --- @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler; + +import org.apache.flink.util.Preconditions; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; + +/** + * A container for uploaded files. + * + * Implementation note: The constructor also accepts directories to ensure that the upload directories are cleaned up. + * For convenience during testing it also accepts files directly. + */ +public final class FileUploads implements AutoCloseable { + private final Collection directoriesToClean; + private final Collection uploadedFiles; + + @SuppressWarnings("resource") + public static final FileUploads EMPTY = new FileUploads(); + + private FileUploads() { + this.directoriesToClean = Collections.emptyList(); + this.uploadedFiles = Collections.emptyList(); + } + + public FileUploads(Collection uploadedFilesOrDirectory) throws IOException { + final Collection files = new ArrayList<>(4); + final Collection directories = new ArrayList<>(1); + for (Path fileOrDirectory : uploadedFilesOrDirectory) { + Preconditions.checkArgument(fileOrDirectory.isAbsolute(), "Path must be absolute."); + if (Files.isDirectory(fileOrDirectory)) { + directories.add(fileOrDirectory); + FileAdderVisitor visitor = new FileAdderVisitor(); + Files.walkFileTree(fileOrDirectory, visitor); + files.addAll(visitor.get()); + } else { + files.add(fileOrDirectory); + } + } + directoriesToClean = Collections.unmodifiableCollection(directories); + uploadedFiles = Collections.unmodifiableCollection(files); + } + + public Collection getUploadedFiles() { + return uploadedFiles; + } + + @Override + public void close() throws IOException { + for (Path file : uploadedFiles) { + try { + Files.delete(file); + } catch (FileNotFoundException ignored) { + // file may have been moved by a handler + } + } + for (Path directory : directoriesToClean) { + Files.walkFileTree(directory, CleanupFileVisitor.get()); + } + } + + private static final class FileAdderVisitor extends SimpleFileVisitor { + + private final Collection files = new ArrayList<>(4); + + Collection get() { + return files; + } + + FileAdderVisitor() { + } + + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { + FileVisitResult result = super.visitFile(file, attrs); + files.add(file); + return result; + } + }
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517483#comment-16517483 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196455211 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java --- @@ -103,77 +104,74 @@ protected void respondAsLeader(ChannelHandlerContext ctx, RoutedRequest routedRe return; } - ByteBuf msgContent = ((FullHttpRequest) httpRequest).content(); + final ByteBuf msgContent; + Optional multipartJsonPayload = FileUploadHandler.getMultipartJsonPayload(ctx); + if (multipartJsonPayload.isPresent()) { + msgContent = Unpooled.wrappedBuffer(multipartJsonPayload.get()); + } else { + msgContent = ((FullHttpRequest) httpRequest).content(); + } - R request; - if (isFileUpload()) { - final Path path = ctx.channel().attr(FileUploadHandler.UPLOADED_FILE).get(); - if (path == null) { - HandlerUtils.sendErrorResponse( - ctx, - httpRequest, - new ErrorResponseBody("Client did not upload a file."), - HttpResponseStatus.BAD_REQUEST, - responseHeaders); - return; - } - //noinspection unchecked - request = (R) new FileUpload(path); - } else if (msgContent.capacity() == 0) { - try { - request = MAPPER.readValue("{}", untypedResponseMessageHeaders.getRequestClass()); - } catch (JsonParseException | JsonMappingException je) { - log.error("Request did not conform to expected format.", je); - HandlerUtils.sendErrorResponse( - ctx, - httpRequest, - new ErrorResponseBody("Bad request received."), - HttpResponseStatus.BAD_REQUEST, - responseHeaders); - return; + try (FileUploads uploadedFiles = FileUploadHandler.getMultipartFileUploads(ctx)) { --- End diff -- I would obtain the upload directory from `FileUploadHandler` and simply delete this directory after the call has been processed. We could, then also create `FileUploads` outside of the `FileUploadHandler` to instantiate a `HandlerRequest` with it. This would also simplify the `FileUploads` class significantly, because it is no longer responsible for deleting the files. > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196452583 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java --- @@ -116,5 +136,16 @@ private void reset() { currentHttpPostRequestDecoder.destroy(); currentHttpPostRequestDecoder = null; currentHttpRequest = null; + currentUploadDir = null; + currentJsonPayload = null; + } + + public static Optional getMultipartJsonPayload(ChannelHandlerContext ctx) { + return Optional.ofNullable(ctx.channel().attr(UPLOADED_JSON).get()); + } + + public static FileUploads getMultipartFileUploads(ChannelHandlerContext ctx) { + return Optional.ofNullable(ctx.channel().attr(UPLOADED_FILES).get()) + .orElse(FileUploads.EMPTY); --- End diff -- I would suggest to simply return the upload directory. ---
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517487#comment-16517487 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r19670 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java --- @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler; + +import org.apache.flink.util.Preconditions; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; + +/** + * A container for uploaded files. + * + * Implementation note: The constructor also accepts directories to ensure that the upload directories are cleaned up. + * For convenience during testing it also accepts files directly. + */ +public final class FileUploads implements AutoCloseable { + private final Collection directoriesToClean; + private final Collection uploadedFiles; + + @SuppressWarnings("resource") + public static final FileUploads EMPTY = new FileUploads(); + + private FileUploads() { + this.directoriesToClean = Collections.emptyList(); + this.uploadedFiles = Collections.emptyList(); + } + + public FileUploads(Collection uploadedFilesOrDirectory) throws IOException { + final Collection files = new ArrayList<>(4); + final Collection directories = new ArrayList<>(1); + for (Path fileOrDirectory : uploadedFilesOrDirectory) { + Preconditions.checkArgument(fileOrDirectory.isAbsolute(), "Path must be absolute."); + if (Files.isDirectory(fileOrDirectory)) { + directories.add(fileOrDirectory); + FileAdderVisitor visitor = new FileAdderVisitor(); + Files.walkFileTree(fileOrDirectory, visitor); + files.addAll(visitor.get()); + } else { + files.add(fileOrDirectory); + } + } + directoriesToClean = Collections.unmodifiableCollection(directories); + uploadedFiles = Collections.unmodifiableCollection(files); + } + + public Collection getUploadedFiles() { + return uploadedFiles; + } + + @Override + public void close() throws IOException { + for (Path file : uploadedFiles) { + try { + Files.delete(file); + } catch (FileNotFoundException ignored) { + // file may have been moved by a handler + } + } + for (Path directory : directoriesToClean) { + Files.walkFileTree(directory, CleanupFileVisitor.get()); + } + } + + private static final class FileAdderVisitor extends SimpleFileVisitor { + + private final Collection files = new ArrayList<>(4); + + Collection get() { --- End diff -- maybe more descriptive name than `get`. > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee:
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517491#comment-16517491 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196558949 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageHeaders.java --- @@ -63,4 +63,13 @@ * @return description for the header */ String getDescription(); + + /** +* Returns whether this header allows file uploads. +* +* @return whether this header allows file uploads +*/ + default boolean acceptsFileUploads() { + return false; + } --- End diff -- Should this maybe go into `UntypedResponseMessageHeaders`? At the moment one can upload files for a `AbstractHandler` (e.g. `AbstractTaskManagerFileHandler`) implementation and also has access to it via the `HandlerRequest` without being able to specify whether file upload is allowed or not. > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517488#comment-16517488 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196559335 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java --- @@ -0,0 +1,471 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.configuration.WebOptions; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.FileUploads; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.EmptyResponseBody; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.RequestBody; +import org.apache.flink.runtime.rest.util.RestMapperUtils; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import okhttp3.MediaType; +import okhttp3.MultipartBody; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import javax.annotation.Nonnull; + +import java.io.File; +import java.io.IOException; +import java.io.StringWriter; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CompletableFuture; + +import static java.util.Objects.requireNonNull; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for the {@link FileUploadHandler}. Ensures that multipart http messages containing files and/or json are properly + * handled. + */ +public class FileUploadHandlerTest { + + private static final ObjectMapper OBJECT_MAPPER = RestMapperUtils.getStrictObjectMapper(); + private static final Random RANDOM = new Random(); + + @ClassRule + public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + private static RestServerEndpoint serverEndpoint; + private static String serverAddress; + + private static MultipartMixedHandler mixedHandler; + private static
[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196453584 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java --- @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler; + +import org.apache.flink.util.Preconditions; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; + +/** + * A container for uploaded files. + * + * Implementation note: The constructor also accepts directories to ensure that the upload directories are cleaned up. + * For convenience during testing it also accepts files directly. + */ +public final class FileUploads implements AutoCloseable { + private final Collection directoriesToClean; + private final Collection uploadedFiles; + + @SuppressWarnings("resource") + public static final FileUploads EMPTY = new FileUploads(); + + private FileUploads() { + this.directoriesToClean = Collections.emptyList(); + this.uploadedFiles = Collections.emptyList(); + } + + public FileUploads(Collection uploadedFilesOrDirectory) throws IOException { + final Collection files = new ArrayList<>(4); + final Collection directories = new ArrayList<>(1); + for (Path fileOrDirectory : uploadedFilesOrDirectory) { + Preconditions.checkArgument(fileOrDirectory.isAbsolute(), "Path must be absolute."); + if (Files.isDirectory(fileOrDirectory)) { + directories.add(fileOrDirectory); + FileAdderVisitor visitor = new FileAdderVisitor(); + Files.walkFileTree(fileOrDirectory, visitor); + files.addAll(visitor.get()); + } else { + files.add(fileOrDirectory); + } + } + directoriesToClean = Collections.unmodifiableCollection(directories); + uploadedFiles = Collections.unmodifiableCollection(files); --- End diff -- Let's move this logic out of `FileUploads` and simply initialize it with a `Collection`. ---
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517485#comment-16517485 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196554025 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java --- @@ -103,77 +104,74 @@ protected void respondAsLeader(ChannelHandlerContext ctx, RoutedRequest routedRe return; } - ByteBuf msgContent = ((FullHttpRequest) httpRequest).content(); + final ByteBuf msgContent; + Optional multipartJsonPayload = FileUploadHandler.getMultipartJsonPayload(ctx); + if (multipartJsonPayload.isPresent()) { + msgContent = Unpooled.wrappedBuffer(multipartJsonPayload.get()); --- End diff -- Let's send the Json payload as a proper `HttpRequest`, then we don't have this special casing here. > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517484#comment-16517484 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196453980 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java --- @@ -129,4 +137,9 @@ public R getRequestBody() { return queryParameter.getValue(); } } + + @Nonnull + public FileUploads getFileUploads() { + return uploadedFiles; + } --- End diff -- I would not expose `FileUploads` to the user but rather return a `Collection`. > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517493#comment-16517493 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196560163 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java --- @@ -0,0 +1,471 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.configuration.WebOptions; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.FileUploads; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.EmptyResponseBody; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.RequestBody; +import org.apache.flink.runtime.rest.util.RestMapperUtils; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import okhttp3.MediaType; +import okhttp3.MultipartBody; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import javax.annotation.Nonnull; + +import java.io.File; +import java.io.IOException; +import java.io.StringWriter; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CompletableFuture; + +import static java.util.Objects.requireNonNull; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for the {@link FileUploadHandler}. Ensures that multipart http messages containing files and/or json are properly + * handled. + */ +public class FileUploadHandlerTest { + + private static final ObjectMapper OBJECT_MAPPER = RestMapperUtils.getStrictObjectMapper(); + private static final Random RANDOM = new Random(); + + @ClassRule + public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + private static RestServerEndpoint serverEndpoint; + private static String serverAddress; + + private static MultipartMixedHandler mixedHandler; + private static
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517482#comment-16517482 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196453584 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java --- @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler; + +import org.apache.flink.util.Preconditions; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; + +/** + * A container for uploaded files. + * + * Implementation note: The constructor also accepts directories to ensure that the upload directories are cleaned up. + * For convenience during testing it also accepts files directly. + */ +public final class FileUploads implements AutoCloseable { + private final Collection directoriesToClean; + private final Collection uploadedFiles; + + @SuppressWarnings("resource") + public static final FileUploads EMPTY = new FileUploads(); + + private FileUploads() { + this.directoriesToClean = Collections.emptyList(); + this.uploadedFiles = Collections.emptyList(); + } + + public FileUploads(Collection uploadedFilesOrDirectory) throws IOException { + final Collection files = new ArrayList<>(4); + final Collection directories = new ArrayList<>(1); + for (Path fileOrDirectory : uploadedFilesOrDirectory) { + Preconditions.checkArgument(fileOrDirectory.isAbsolute(), "Path must be absolute."); + if (Files.isDirectory(fileOrDirectory)) { + directories.add(fileOrDirectory); + FileAdderVisitor visitor = new FileAdderVisitor(); + Files.walkFileTree(fileOrDirectory, visitor); + files.addAll(visitor.get()); + } else { + files.add(fileOrDirectory); + } + } + directoriesToClean = Collections.unmodifiableCollection(directories); + uploadedFiles = Collections.unmodifiableCollection(files); --- End diff -- Let's move this logic out of `FileUploads` and simply initialize it with a `Collection`. > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA
[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196452024 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java --- @@ -95,14 +107,22 @@ protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject ms final DiskFileUpload fileUpload = (DiskFileUpload) data; checkState(fileUpload.isCompleted()); - final Path dest = uploadDir.resolve(Paths.get(UUID.randomUUID() + - "_" + fileUpload.getFilename())); + final Path dest = currentUploadDir.resolve(fileUpload.getFilename()); fileUpload.renameTo(dest.toFile()); - ctx.channel().attr(UPLOADED_FILE).set(dest); + } else if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute) { + final Attribute request = (Attribute) data; + // this could also be implemented by using the first found Attribute as the payload + if (data.getName().equals(HTTP_ATTRIBUTE_REQUEST)) { + currentJsonPayload = request.get(); + } else { + LOG.warn("Received unknown attribute {}, will be ignored.", data.getName()); + } } } if (httpContent instanceof LastHttpContent) { + ctx.channel().attr(UPLOADED_FILES).set(new FileUploads(Collections.singleton(currentUploadDir))); + ctx.channel().attr(UPLOADED_JSON).set(currentJsonPayload); --- End diff -- I think it would be better to not store the JSON payload as an `Attribute` but instead forward it via `httpContent.replace(Unpooled.wrappedBuffer(currentJsonPayload))`. ---
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517476#comment-16517476 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196452024 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java --- @@ -95,14 +107,22 @@ protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject ms final DiskFileUpload fileUpload = (DiskFileUpload) data; checkState(fileUpload.isCompleted()); - final Path dest = uploadDir.resolve(Paths.get(UUID.randomUUID() + - "_" + fileUpload.getFilename())); + final Path dest = currentUploadDir.resolve(fileUpload.getFilename()); fileUpload.renameTo(dest.toFile()); - ctx.channel().attr(UPLOADED_FILE).set(dest); + } else if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute) { + final Attribute request = (Attribute) data; + // this could also be implemented by using the first found Attribute as the payload + if (data.getName().equals(HTTP_ATTRIBUTE_REQUEST)) { + currentJsonPayload = request.get(); + } else { + LOG.warn("Received unknown attribute {}, will be ignored.", data.getName()); + } } } if (httpContent instanceof LastHttpContent) { + ctx.channel().attr(UPLOADED_FILES).set(new FileUploads(Collections.singleton(currentUploadDir))); + ctx.channel().attr(UPLOADED_JSON).set(currentJsonPayload); --- End diff -- I think it would be better to not store the JSON payload as an `Attribute` but instead forward it via `httpContent.replace(Unpooled.wrappedBuffer(currentJsonPayload))`. > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196453235 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java --- @@ -95,14 +107,22 @@ protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject ms final DiskFileUpload fileUpload = (DiskFileUpload) data; checkState(fileUpload.isCompleted()); - final Path dest = uploadDir.resolve(Paths.get(UUID.randomUUID() + - "_" + fileUpload.getFilename())); + final Path dest = currentUploadDir.resolve(fileUpload.getFilename()); fileUpload.renameTo(dest.toFile()); - ctx.channel().attr(UPLOADED_FILE).set(dest); + } else if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute) { + final Attribute request = (Attribute) data; + // this could also be implemented by using the first found Attribute as the payload + if (data.getName().equals(HTTP_ATTRIBUTE_REQUEST)) { + currentJsonPayload = request.get(); + } else { + LOG.warn("Received unknown attribute {}, will be ignored.", data.getName()); + } } } if (httpContent instanceof LastHttpContent) { + ctx.channel().attr(UPLOADED_FILES).set(new FileUploads(Collections.singleton(currentUploadDir))); --- End diff -- I would suggest to simply store the upload directory in the `UPLOAD_FILES` attribute. ---
[jira] [Commented] (FLINK-4542) Add MULTISET operations
[ https://issues.apache.org/jira/browse/FLINK-4542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517283#comment-16517283 ] Timo Walther commented on FLINK-4542: - Hi [~Sergey Nuyanzin], I thought Calcite has support (at least parsing and validation) for the operations mentioned in the issue, because they are listed in {{org.apache.calcite.sql.fun.SqlStdOperatorTable}}. The multiset type is supported in Flink SQL (this as been introduced as part of FLINK-7491). However, creating literals using {{SELECT MULTISET(...)}} is not. For this we need to implement the value constructor call {{SqlStdOperatorTable.MULTISET_VALUE}}. I hope Calcite is releasing 1.17 soon that we are not blocked on this. Feel free to split this issue in multiple subtasks and open PRs for functions that we can already support and functions that are blocked. > Add MULTISET operations > --- > > Key: FLINK-4542 > URL: https://issues.apache.org/jira/browse/FLINK-4542 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Reporter: Timo Walther >Assignee: Sergey Nuyanzin >Priority: Minor > > Umbrella issue for MULTISET operations like: > MULTISET UNION, MULTISET UNION ALL, MULTISET EXCEPT, MULTISET EXCEPT ALL, > MULTISET INTERSECT, MULTISET INTERSECT ALL, CARDINALITY, ELEMENT, MEMBER OF, > SUBMULTISET OF, IS A SET, FUSION > At the moment we only support COLLECT. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8921) Split code generated call expression
[ https://issues.apache.org/jira/browse/FLINK-8921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517268#comment-16517268 ] Timo Walther commented on FLINK-8921: - [~RuidongLi] what is the status of this issue. Will you find time to work on it? Users are asking for this feature on the mailing list. > Split code generated call expression > - > > Key: FLINK-8921 > URL: https://issues.apache.org/jira/browse/FLINK-8921 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Reporter: Timo Walther >Assignee: Ruidong Li >Priority: Major > > In FLINK-8274 we introduced the possibility of splitting the generated code > into multiple methods in order to exceed the JVMs maximum method size (see > also https://docs.oracle.com/javase/specs/jvms/se7/html/jvms-4.html#jvms-4.9). > At the moment we only split methods by fields, however, this is not enough in > all case. We should also split expressions. I suggest to split the operands > of a {{RexCall}} in > {{org.apache.flink.table.codegen.CodeGenerator#visitCall}} if we reach a > certain threshold. However, this should happen as lazily as possible to keep > the runtime overhead low. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5777: [FLINK-7897] Consider using nio.Files for file deletion i...
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/5777 @StefanRRichter , does this PR look good to you? ---
[jira] [Commented] (FLINK-7897) Consider using nio.Files for file deletion in TransientBlobCleanupTask
[ https://issues.apache.org/jira/browse/FLINK-7897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517233#comment-16517233 ] ASF GitHub Bot commented on FLINK-7897: --- Github user yanghua commented on the issue: https://github.com/apache/flink/pull/5777 @StefanRRichter , does this PR look good to you? > Consider using nio.Files for file deletion in TransientBlobCleanupTask > -- > > Key: FLINK-7897 > URL: https://issues.apache.org/jira/browse/FLINK-7897 > Project: Flink > Issue Type: Improvement > Components: Local Runtime >Reporter: Ted Yu >Assignee: vinoyang >Priority: Minor > > nio.Files#delete() provides better clue as to why the deletion may fail: > https://docs.oracle.com/javase/7/docs/api/java/nio/file/Files.html#delete(java.nio.file.Path) > Depending on the potential exception (FileNotFound), the call to > localFile.exists() may be skipped. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9352) In Standalone checkpoint recover mode many jobs with same checkpoint interval cause IO pressure
[ https://issues.apache.org/jira/browse/FLINK-9352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517230#comment-16517230 ] ASF GitHub Bot commented on FLINK-9352: --- Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6092 cc @zentol @kl0u > In Standalone checkpoint recover mode many jobs with same checkpoint interval > cause IO pressure > --- > > Key: FLINK-9352 > URL: https://issues.apache.org/jira/browse/FLINK-9352 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2, 1.6.0 >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > > currently, the periodic checkpoint coordinator startCheckpointScheduler uses > *baseInterval* as the initialDelay parameter. the *baseInterval* is also the > checkpoint interval. > In standalone checkpoint mode, many jobs config the same checkpoint interval. > When all jobs being recovered (the cluster restart or jobmanager leadership > switched), all jobs' checkpoint period will tend to accordance. All jobs' > CheckpointCoordinator would start and trigger in a approximate time point. > This caused the high IO cost in the same time period in our production > scenario. > I suggest let the scheduleAtFixedRate's initial delay parameter as a API > config which can let user scatter checkpoint in this scenario. > > cc [~StephanEwen] [~Zentol] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6092: [FLINK-9352] In Standalone checkpoint recover mode many j...
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6092 cc @zentol @kl0u ---
[jira] [Created] (FLINK-9617) Provide alias for whole records in Table API
Piotr Nowojski created FLINK-9617: - Summary: Provide alias for whole records in Table API Key: FLINK-9617 URL: https://issues.apache.org/jira/browse/FLINK-9617 Project: Flink Issue Type: Improvement Components: Table API SQL Affects Versions: 1.5.0 Reporter: Piotr Nowojski In SQL we can provide an alias for whole table to avoid column name collisions between two tables. For example: {code:java} SELECT SUM(o.amount * r.rate) FROM Orders AS o, Rates AS r WHERE r.currency = o.currency{code} However that's not possible in table API. In Table API user have to provide aliases for all of the columns, which can be annoying especially if table consists of tens or even hundred of columns For example I would expect some feature like this: {code:java} val result = orders.as('o) .join(rates(`o.rowtime).as('r), "o.currency = r.currency") .select("SUM(o.amount * r.rate) AS amount") {code} where \{{rates}} is a TableFunction. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9616) DatadogHttpReporter fails to be created due to missing shaded dependency
[ https://issues.apache.org/jira/browse/FLINK-9616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517205#comment-16517205 ] ASF GitHub Bot commented on FLINK-9616: --- GitHub user addisonj opened a pull request: https://github.com/apache/flink/pull/6183 [FLINK-9616][metrics] Fix datadog to include shaded deps ## What is the purpose of the change This fixes a broken build that wasn't properly including a shaded in the jar it builds. This causes the instantiation of DatadogHttpReporter to fail and with no easy way to fix since the dependencies exist on a shaded import path. ## Brief change log - Changes ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. However, it can be validated by: ``` cd flink-metrics/flink-metrics-datadog mvn package jar tf target/flink-metrics-datadog-1.6-SNAPSHOT.jar ``` And then seeing the expected okhttp3 and okio dependencies being included in the resulting jar. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): yes, but brings in line with documented behavior here: https://ci.apache.org/projects/flink/flink-docs-release-1.5/monitoring/metrics.html#datadog-orgapacheflinkmetricsdatadogdatadoghttpreporter - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable You can merge this pull request into a Git repository by running: $ git pull https://github.com/instructure/flink datadog_fix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6183.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6183 commit 10fe56a9adbe6f35dc4b5fae0e7478e99028f5f7 Author: Addison Higham Date: 2018-06-19T14:49:56Z [FLINK-9616] Fix datadog to include shaded deps flink-metrics-datadog wasn't properly included it's shaded dependencies in the jar it builds. Looking at other places where shaded dependecies are used, it seems like the build wasn't working as intended. > DatadogHttpReporter fails to be created due to missing shaded dependency > > > Key: FLINK-9616 > URL: https://issues.apache.org/jira/browse/FLINK-9616 > Project: Flink > Issue Type: Bug > Components: Metrics >Affects Versions: 1.5.0 >Reporter: Addison Higham >Priority: Major > > When using the DatadogHttpReporter, it fails to instantiate with the > following exception: > {code:java} > 2018-06-19 06:01:19,640 INFO > org.apache.flink.runtime.metrics.MetricRegistryImpl - Configuring dghttp with > {apikey=, tags=, > class=org.apache.flink.metrics.datadog.DatadogHttpReporter}. > 2018-06-19 06:01:19,642 ERROR > org.apache.flink.runtime.metrics.MetricRegistryImpl - Could not instantiate > metrics reporter dghttp. Metrics might not be exposed/reported. > java.lang.NoClassDefFoundError: org/apache/flink/shaded/okhttp3/MediaType > at > org.apache.flink.metrics.datadog.DatadogHttpClient.(DatadogHttpClient.java:45) > at > org.apache.flink.metrics.datadog.DatadogHttpReporter.open(DatadogHttpReporter.java:105) > at > org.apache.flink.runtime.metrics.MetricRegistryImpl.(MetricRegistryImpl.java:150) > at > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createMetricRegistry(ClusterEntrypoint.java:413) > at > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:274) > at > org.apache.flink.mesos.entrypoint.MesosSessionClusterEntrypoint.initializeServices(MesosSessionClusterEntrypoint.java:92) > at > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:225) > at > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:189) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1889) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at >
[GitHub] flink pull request #6183: [FLINK-9616][metrics] Fix datadog to include shade...
GitHub user addisonj opened a pull request: https://github.com/apache/flink/pull/6183 [FLINK-9616][metrics] Fix datadog to include shaded deps ## What is the purpose of the change This fixes a broken build that wasn't properly including a shaded in the jar it builds. This causes the instantiation of DatadogHttpReporter to fail and with no easy way to fix since the dependencies exist on a shaded import path. ## Brief change log - Changes ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. However, it can be validated by: ``` cd flink-metrics/flink-metrics-datadog mvn package jar tf target/flink-metrics-datadog-1.6-SNAPSHOT.jar ``` And then seeing the expected okhttp3 and okio dependencies being included in the resulting jar. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): yes, but brings in line with documented behavior here: https://ci.apache.org/projects/flink/flink-docs-release-1.5/monitoring/metrics.html#datadog-orgapacheflinkmetricsdatadogdatadoghttpreporter - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable You can merge this pull request into a Git repository by running: $ git pull https://github.com/instructure/flink datadog_fix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6183.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6183 commit 10fe56a9adbe6f35dc4b5fae0e7478e99028f5f7 Author: Addison Higham Date: 2018-06-19T14:49:56Z [FLINK-9616] Fix datadog to include shaded deps flink-metrics-datadog wasn't properly included it's shaded dependencies in the jar it builds. Looking at other places where shaded dependecies are used, it seems like the build wasn't working as intended. ---
[jira] [Created] (FLINK-9616) DatadogHttpReporter fails to be created due to missing shaded dependency
Addison Higham created FLINK-9616: - Summary: DatadogHttpReporter fails to be created due to missing shaded dependency Key: FLINK-9616 URL: https://issues.apache.org/jira/browse/FLINK-9616 Project: Flink Issue Type: Bug Components: Metrics Affects Versions: 1.5.0 Reporter: Addison Higham When using the DatadogHttpReporter, it fails to instantiate with the following exception: {code:java} 2018-06-19 06:01:19,640 INFO org.apache.flink.runtime.metrics.MetricRegistryImpl - Configuring dghttp with {apikey=, tags=, class=org.apache.flink.metrics.datadog.DatadogHttpReporter}. 2018-06-19 06:01:19,642 ERROR org.apache.flink.runtime.metrics.MetricRegistryImpl - Could not instantiate metrics reporter dghttp. Metrics might not be exposed/reported. java.lang.NoClassDefFoundError: org/apache/flink/shaded/okhttp3/MediaType at org.apache.flink.metrics.datadog.DatadogHttpClient.(DatadogHttpClient.java:45) at org.apache.flink.metrics.datadog.DatadogHttpReporter.open(DatadogHttpReporter.java:105) at org.apache.flink.runtime.metrics.MetricRegistryImpl.(MetricRegistryImpl.java:150) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createMetricRegistry(ClusterEntrypoint.java:413) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:274) at org.apache.flink.mesos.entrypoint.MesosSessionClusterEntrypoint.initializeServices(MesosSessionClusterEntrypoint.java:92) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:225) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:189) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1889) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:188) at org.apache.flink.mesos.entrypoint.MesosSessionClusterEntrypoint.main(MesosSessionClusterEntrypoint.java:181) Caused by: java.lang.ClassNotFoundException: org.apache.flink.shaded.okhttp3.MediaType at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 14 more {code} Looking at the pom.xml for `flink-metrics-datadog` it looks like that dependency is intended to be shaded and included in the jar, however, when we build the jar we see the following lines: {noformat} $ mvn package [INFO] Scanning for projects... [INFO] [INFO] [INFO] Building flink-metrics-datadog 1.5.0 [INFO] [INFO] --- maven-shade-plugin:3.0.0:shade (shade-flink) @ flink-metrics-datadog --- [INFO] Excluding com.squareup.okhttp3:okhttp:jar:3.7.0 from the shaded jar. [INFO] Excluding com.squareup.okio:okio:jar:1.12.0 from the shaded jar. [INFO] Including org.apache.flink:force-shading:jar:1.5.0 in the shaded jar. [INFO] Replacing original artifact with shaded artifact. {noformat} And inspecting the built jar: {noformat} $ jar tf flink-metrics-datadog-1.5.0.jar META-INF/ META-INF/MANIFEST.MF org/ org/apache/ org/apache/flink/ org/apache/flink/metrics/ org/apache/flink/metrics/datadog/ org/apache/flink/metrics/datadog/DatadogHttpClient$EmptyCallback.class org/apache/flink/metrics/datadog/DMetric.class org/apache/flink/metrics/datadog/DSeries.class org/apache/flink/metrics/datadog/DGauge.class org/apache/flink/metrics/datadog/DatadogHttpReporter.class org/apache/flink/metrics/datadog/DatadogHttpClient.class org/apache/flink/metrics/datadog/MetricType.class org/apache/flink/metrics/datadog/DatadogHttpReporter$DatadogHttpRequest.class org/apache/flink/metrics/datadog/DMeter.class org/apache/flink/metrics/datadog/DCounter.class META-INF/DEPENDENCIES META-INF/maven/ META-INF/maven/org.apache.flink/ META-INF/maven/org.apache.flink/flink-metrics-datadog/ META-INF/maven/org.apache.flink/flink-metrics-datadog/pom.xml META-INF/maven/org.apache.flink/flink-metrics-datadog/pom.properties META-INF/NOTICE {noformat} We don't see the included dependencies -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8795) Scala shell broken for Flip6
[ https://issues.apache.org/jira/browse/FLINK-8795?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517150#comment-16517150 ] ASF GitHub Bot commented on FLINK-8795: --- GitHub user dawidwys opened a pull request: https://github.com/apache/flink/pull/6182 [FLINK-8795] Fixed local scala shell for Flip6 ## What is the purpose of the change Enable to run scala-shell in local mode with new runtime mode. ## Brief change log - Creating either MiniCluster or StandaloneMiniCluster based on the mode. ## Verifying this change *(Please pick either of the following options)* Changed tests to run both on new and legacy runtime. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no /** don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/dawidwys/flink FLINK-8795 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6182.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6182 commit 6fa4aa18c2d111a846386fed27354bd7a1da2a0b Author: Dawid Wysakowicz Date: 2018-06-19T07:49:31Z [FLINK-8795] Fixed local scala shell for Flip6 > Scala shell broken for Flip6 > > > Key: FLINK-8795 > URL: https://issues.apache.org/jira/browse/FLINK-8795 > Project: Flink > Issue Type: Bug >Reporter: kant kodali >Priority: Blocker > Fix For: 1.6.0, 1.5.1 > > > I am trying to run the simple code below after building everything from > Flink's github master branch for various reasons. I get an exception below > and I wonder what runs on port 9065? and How to fix this exception? > I followed the instructions from the Flink master branch so I did the > following. > {code:java} > git clone https://github.com/apache/flink.git > cd flink mvn clean package -DskipTests > cd build-target > ./bin/start-scala-shell.sh local{code} > {{And Here is the code I ran}} > {code:java} > val dataStream = senv.fromElements(1, 2, 3, 4) > dataStream.countWindowAll(2).sum(0).print() > senv.execute("My streaming program"){code} > {{And I finally get this exception}} > {code:java} > Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to > submit JobGraph. at > org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$18(RestClusterClient.java:306) > at > java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) > at > java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) > at > org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$222(RestClient.java:196) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:603) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:563) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:268) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:284) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) > at >
[GitHub] flink pull request #6182: [FLINK-8795] Fixed local scala shell for Flip6
GitHub user dawidwys opened a pull request: https://github.com/apache/flink/pull/6182 [FLINK-8795] Fixed local scala shell for Flip6 ## What is the purpose of the change Enable to run scala-shell in local mode with new runtime mode. ## Brief change log - Creating either MiniCluster or StandaloneMiniCluster based on the mode. ## Verifying this change *(Please pick either of the following options)* Changed tests to run both on new and legacy runtime. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no /** don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/dawidwys/flink FLINK-8795 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6182.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6182 commit 6fa4aa18c2d111a846386fed27354bd7a1da2a0b Author: Dawid Wysakowicz Date: 2018-06-19T07:49:31Z [FLINK-8795] Fixed local scala shell for Flip6 ---
[jira] [Commented] (FLINK-9615) Add possibility to use Connectors as Sinks in SQL Client
[ https://issues.apache.org/jira/browse/FLINK-9615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517128#comment-16517128 ] Fabian Hueske commented on FLINK-9615: -- Thanks for opening this issue! This is (partially) a duplicate of FLINK-8858. > Add possibility to use Connectors as Sinks in SQL Client > > > Key: FLINK-9615 > URL: https://issues.apache.org/jira/browse/FLINK-9615 > Project: Flink > Issue Type: Improvement >Reporter: Dominik Wosiński >Priority: Major > > AFAIK, there is currently no possibility to use Kafka or other connectors as > a sink in SQL Client. Such feature would be good for prototyping or quick > streams manipulation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9610) Add Kafka partitioner that uses the key to partition by
[ https://issues.apache.org/jira/browse/FLINK-9610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517115#comment-16517115 ] ASF GitHub Bot commented on FLINK-9610: --- Github user nielsbasjes commented on a diff in the pull request: https://github.com/apache/flink/pull/6181#discussion_r196435400 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKeyHashPartitioner.java --- @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.partitioner; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.Arrays; + +/** + * A partitioner that uses the hash of the provided key to distribute + * the values over the partitions as evenly as possible. + * This partitioner ensures that all records with the same key will be sent to + * the same Kafka partition. + * + * Note that this will cause a lot of network connections to be created between + * all the Flink instances and all the Kafka brokers. + */ +@PublicEvolving +public class FlinkKeyHashPartitioner extends FlinkKafkaPartitioner { + + private static final long serialVersionUID = -2006468063065010594L; + + @Override + public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) { + Preconditions.checkArgument( + partitions != null && partitions.length > 0, + "Partitions of the target topic is empty."); + + return partitions[hash(key) % partitions.length]; --- End diff -- Yes, good catch. I fixed that just now. > Add Kafka partitioner that uses the key to partition by > --- > > Key: FLINK-9610 > URL: https://issues.apache.org/jira/browse/FLINK-9610 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector >Reporter: Niels Basjes >Assignee: Niels Basjes >Priority: Major > > The kafka connector package only contains the FlinkFixedPartitioner > implementation of the FlinkKafkaPartitioner. > The most common usecase I have seen is the need to spread the records across > the Kafka partitions while keeping all messages with the same key together. > I'll put up a pull request with a very simple implementation that should make > this a lot easier for others to use and extend. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6181: [FLINK-9610] [flink-connector-kafka-base] Add Kafk...
Github user nielsbasjes commented on a diff in the pull request: https://github.com/apache/flink/pull/6181#discussion_r196435400 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKeyHashPartitioner.java --- @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.partitioner; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.Arrays; + +/** + * A partitioner that uses the hash of the provided key to distribute + * the values over the partitions as evenly as possible. + * This partitioner ensures that all records with the same key will be sent to + * the same Kafka partition. + * + * Note that this will cause a lot of network connections to be created between + * all the Flink instances and all the Kafka brokers. + */ +@PublicEvolving +public class FlinkKeyHashPartitioner extends FlinkKafkaPartitioner { + + private static final long serialVersionUID = -2006468063065010594L; + + @Override + public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) { + Preconditions.checkArgument( + partitions != null && partitions.length > 0, + "Partitions of the target topic is empty."); + + return partitions[hash(key) % partitions.length]; --- End diff -- Yes, good catch. I fixed that just now. ---
[jira] [Issue Comment Deleted] (FLINK-9615) Add possibility to use Connectors as Sinks in SQL Client
[ https://issues.apache.org/jira/browse/FLINK-9615?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dominik Wosiński updated FLINK-9615: Comment: was deleted (was: I can take care of that :) It's not a big improvement to be made.) > Add possibility to use Connectors as Sinks in SQL Client > > > Key: FLINK-9615 > URL: https://issues.apache.org/jira/browse/FLINK-9615 > Project: Flink > Issue Type: Improvement >Reporter: Dominik Wosiński >Priority: Major > > AFAIK, there is currently no possibility to use Kafka or other connectors as > a sink in SQL Client. Such feature would be good for prototyping or quick > streams manipulation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9615) Add possibility to use Connectors as Sinks in SQL Client
[ https://issues.apache.org/jira/browse/FLINK-9615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517095#comment-16517095 ] Dominik Wosiński commented on FLINK-9615: - I can take care of that :) It's not a big improvement to be made. > Add possibility to use Connectors as Sinks in SQL Client > > > Key: FLINK-9615 > URL: https://issues.apache.org/jira/browse/FLINK-9615 > Project: Flink > Issue Type: Improvement >Reporter: Dominik Wosiński >Priority: Major > > AFAIK, there is currently no possibility to use Kafka or other connectors as > a sink in SQL Client. Such feature would be good for prototyping or quick > streams manipulation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9614) Improve the error message for Compiler#compile
[ https://issues.apache.org/jira/browse/FLINK-9614?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] mingleizhang updated FLINK-9614: Description: When the below sql has too long. Like case when case when . when host in ('114.67.56.94','114.67.56.102','114.67.56.103','114.67.56.106','114.67.56.107','183.60.220.231','183.60.220.232','183.60.219.247','114.67.56.94','114.67.56.102','114.67.56.103','114.67.56.106','114.67.56.107','183.60.220.231','183.60.220.232','183.60.219.247','114.67.56.94','114.67.56.102','114.67.56.103','114.67.56.106','114.67.56.107','183.60.220.231','183.60.220.232','183.60.219.247') then 'condition' Then cause the {{StackOverflowError}}. And the current code is, but I would suggest prompt users add the {{-Xss 20m}} to solve this, instead of {{This is a bug..}} {code:java} trait Compiler[T] { @throws(classOf[CompileException]) def compile(cl: ClassLoader, name: String, code: String): Class[T] = { require(cl != null, "Classloader must not be null.") val compiler = new SimpleCompiler() compiler.setParentClassLoader(cl) try { compiler.cook(code) } catch { case t: Throwable => throw new InvalidProgramException("Table program cannot be compiled. " + "This is a bug. Please file an issue.", t) } compiler.getClassLoader.loadClass(name).asInstanceOf[Class[T]] } } {code} > Improve the error message for Compiler#compile > -- > > Key: FLINK-9614 > URL: https://issues.apache.org/jira/browse/FLINK-9614 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Reporter: mingleizhang >Assignee: mingleizhang >Priority: Major > > When the below sql has too long. Like > case when case when . > when host in > ('114.67.56.94','114.67.56.102','114.67.56.103','114.67.56.106','114.67.56.107','183.60.220.231','183.60.220.232','183.60.219.247','114.67.56.94','114.67.56.102','114.67.56.103','114.67.56.106','114.67.56.107','183.60.220.231','183.60.220.232','183.60.219.247','114.67.56.94','114.67.56.102','114.67.56.103','114.67.56.106','114.67.56.107','183.60.220.231','183.60.220.232','183.60.219.247') > then 'condition' > Then cause the {{StackOverflowError}}. And the current code is, but I would > suggest prompt users add the {{-Xss 20m}} to solve this, instead of {{This is > a bug..}} > {code:java} > trait Compiler[T] { > @throws(classOf[CompileException]) > def compile(cl: ClassLoader, name: String, code: String): Class[T] = { > require(cl != null, "Classloader must not be null.") > val compiler = new SimpleCompiler() > compiler.setParentClassLoader(cl) > try { > compiler.cook(code) > } catch { > case t: Throwable => > throw new InvalidProgramException("Table program cannot be compiled. > " + > "This is a bug. Please file an issue.", t) > } > compiler.getClassLoader.loadClass(name).asInstanceOf[Class[T]] > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9615) Add possibility to use Connectors as Sinks in SQL Client
[ https://issues.apache.org/jira/browse/FLINK-9615?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dominik Wosiński updated FLINK-9615: Summary: Add possibility to use Connectors as Sinks in SQL Client (was: Add) > Add possibility to use Connectors as Sinks in SQL Client > > > Key: FLINK-9615 > URL: https://issues.apache.org/jira/browse/FLINK-9615 > Project: Flink > Issue Type: Improvement >Reporter: Dominik Wosiński >Priority: Major > > AFAIK, there is currently no possibility to use Kafka or other connectors as > a sink in SQL Client. Such feature would be good for prototyping or quick > streams manipulation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9615) Add
Dominik Wosiński created FLINK-9615: --- Summary: Add Key: FLINK-9615 URL: https://issues.apache.org/jira/browse/FLINK-9615 Project: Flink Issue Type: Improvement Reporter: Dominik Wosiński AFAIK, there is currently no possibility to use Kafka or other connectors as a sink in SQL Client. Such feature would be good for prototyping or quick streams manipulation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9614) Improve the error message for Compiler#compile
[ https://issues.apache.org/jira/browse/FLINK-9614?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] mingleizhang updated FLINK-9614: Component/s: Table API & SQL > Improve the error message for Compiler#compile > -- > > Key: FLINK-9614 > URL: https://issues.apache.org/jira/browse/FLINK-9614 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Reporter: mingleizhang >Assignee: mingleizhang >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9614) Improve the error message for Compiler#compile
mingleizhang created FLINK-9614: --- Summary: Improve the error message for Compiler#compile Key: FLINK-9614 URL: https://issues.apache.org/jira/browse/FLINK-9614 Project: Flink Issue Type: Improvement Reporter: mingleizhang Assignee: mingleizhang -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-9367) Truncate() in BucketingSink is only allowed after hadoop2.7
[ https://issues.apache.org/jira/browse/FLINK-9367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangxinyu closed FLINK-9367. - Resolution: Won't Do BucketingSink is going to be rewritten, and hadoop below 2.7 won't be supported. Therefore, this issue is unnecessary. > Truncate() in BucketingSink is only allowed after hadoop2.7 > --- > > Key: FLINK-9367 > URL: https://issues.apache.org/jira/browse/FLINK-9367 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Affects Versions: 1.5.0 >Reporter: zhangxinyu >Priority: Major > > When output to HDFS using BucketingSink, truncate() is only allowed after > hadoop2.7. > If some tasks failed, the ".valid-length" file is created for the lower > version hadoop. > The problem is, if other people want to use the data in HDFS, they must know > how to deal with the ".valid-length" file, otherwise, the data may be not > exactly-once. > I think it's not convenient for other people to use the data. Why not just > read the in-progress file and write a new file when restoring instead of > writing a ".valid-length" file. > In this way, others who use the data in HDFS don't need to know how to deal > with the ".valid-length" file. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files
[ https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16516986#comment-16516986 ] ASF GitHub Bot commented on FLINK-9280: --- Github user zentol closed the pull request at: https://github.com/apache/flink/pull/6147 > Extend JobSubmitHandler to accept jar files > --- > > Key: FLINK-9280 > URL: https://issues.apache.org/jira/browse/FLINK-9280 > Project: Flink > Issue Type: New Feature > Components: Job-Submission, REST >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Critical > Fix For: 1.6.0, 1.5.1 > > > The job submission through the CLI first uploads all require jars to the blob > server, sets the blob keys in the jobgraph, and then uploads this graph to > The {{JobSubmitHandler}} which submits it to the Dispatcher. > This process has the downside that it requires jars to be uploaded to the > blobserver before submitting the job graph, which does not happen via REST. > I propose an extension to the the {{JobSubmitHandler}} to also accept an > optional list of jar files, that were previously uploaded through the > {{JarUploadHandler}}. If present, the handler would upload these jars to the > blobserver and set the blob keys. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files
[ https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16516985#comment-16516985 ] ASF GitHub Bot commented on FLINK-9280: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/6147 I will split this PR to address the various issues separately. > Extend JobSubmitHandler to accept jar files > --- > > Key: FLINK-9280 > URL: https://issues.apache.org/jira/browse/FLINK-9280 > Project: Flink > Issue Type: New Feature > Components: Job-Submission, REST >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Critical > Fix For: 1.6.0, 1.5.1 > > > The job submission through the CLI first uploads all require jars to the blob > server, sets the blob keys in the jobgraph, and then uploads this graph to > The {{JobSubmitHandler}} which submits it to the Dispatcher. > This process has the downside that it requires jars to be uploaded to the > blobserver before submitting the job graph, which does not happen via REST. > I propose an extension to the the {{JobSubmitHandler}} to also accept an > optional list of jar files, that were previously uploaded through the > {{JarUploadHandler}}. If present, the handler would upload these jars to the > blobserver and set the blob keys. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6147: [FLINK-9280][rest] Rework JobSubmitHandler to acce...
Github user zentol closed the pull request at: https://github.com/apache/flink/pull/6147 ---
[GitHub] flink issue #6147: [FLINK-9280][rest] Rework JobSubmitHandler to accept jar/...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/6147 I will split this PR to address the various issues separately. ---
[jira] [Commented] (FLINK-9567) Flink does not release resource in Yarn Cluster mode
[ https://issues.apache.org/jira/browse/FLINK-9567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16516972#comment-16516972 ] Shimin Yang commented on FLINK-9567: Thanks for assigning the issue to me, I will do my best to fix it. > Flink does not release resource in Yarn Cluster mode > > > Key: FLINK-9567 > URL: https://issues.apache.org/jira/browse/FLINK-9567 > Project: Flink > Issue Type: Bug > Components: Cluster Management, YARN >Affects Versions: 1.5.0 >Reporter: Shimin Yang >Assignee: Shimin Yang >Priority: Critical > Fix For: 1.6.0 > > Attachments: FlinkYarnProblem, fulllog.txt > > > After restart the Job Manager in Yarn Cluster mode, sometimes Flink does not > release task manager containers in some specific case. In the worst case, I > had a job configured to 5 task managers, but possess more than 100 containers > in the end. Although the task didn't failed, but it affect other jobs in the > Yarn Cluster. > In the first log I posted, the container with id 24 is the reason why Yarn > did not release resources. As the container was killed before restart, but it > has not received the callback of *onContainerComplete* in > *YarnResourceManager* which should be called by *AMRMAsyncClient* of Yarn. > After restart, as we can see in line 347 of FlinkYarnProblem log, > 2018-06-14 22:50:47,846 WARN akka.remote.ReliableDeliverySupervisor - > Association with remote system [akka.tcp://flink@bd-r1hdp69:30609] has > failed, address is now gated for [50] ms. Reason: [Disassociated] > Flink lost the connection of container 24 which is on bd-r1hdp69 machine. > When it try to call *closeTaskManagerConnection* in *onContainerComplete*, it > did not has the connection to TaskManager on container 24, so it just ignore > the close of TaskManger. > 2018-06-14 22:50:51,812 DEBUG org.apache.flink.yarn.YarnResourceManager - No > open TaskExecutor connection container_1528707394163_29461_02_24. > Ignoring close TaskExecutor connection. > However, bafore calling *closeTaskManagerConnection,* it already called > *requestYarnContainer* which lead to *numPendingContainerRequests variable > in* *YarnResourceManager* increased by 1. > As the excessive container return is determined by the > *numPendingContainerRequests* variable in *YarnResourceManager*, it cannot > return this container although it is not required. Meanwhile, the restart > logic has already allocated enough containers for Task Managers, Flink will > possess the extra container for a long time for nothing. > In the full log, the job ended with 7 containers while only 3 are running > TaskManagers. > ps: Another strange thing I found is that when sometimes request for a yarn > container, it will return much more than requested. Is it a normal scenario > for AMRMAsyncClient? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9367) Truncate() in BucketingSink is only allowed after hadoop2.7
[ https://issues.apache.org/jira/browse/FLINK-9367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16516961#comment-16516961 ] ASF GitHub Bot commented on FLINK-9367: --- Github user zhangxinyu1 closed the pull request at: https://github.com/apache/flink/pull/6108 > Truncate() in BucketingSink is only allowed after hadoop2.7 > --- > > Key: FLINK-9367 > URL: https://issues.apache.org/jira/browse/FLINK-9367 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Affects Versions: 1.5.0 >Reporter: zhangxinyu >Priority: Major > > When output to HDFS using BucketingSink, truncate() is only allowed after > hadoop2.7. > If some tasks failed, the ".valid-length" file is created for the lower > version hadoop. > The problem is, if other people want to use the data in HDFS, they must know > how to deal with the ".valid-length" file, otherwise, the data may be not > exactly-once. > I think it's not convenient for other people to use the data. Why not just > read the in-progress file and write a new file when restoring instead of > writing a ".valid-length" file. > In this way, others who use the data in HDFS don't need to know how to deal > with the ".valid-length" file. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6108: [FLINK-9367] [Streaming Connectors] Allow to do tr...
Github user zhangxinyu1 closed the pull request at: https://github.com/apache/flink/pull/6108 ---
[jira] [Commented] (FLINK-9610) Add Kafka partitioner that uses the key to partition by
[ https://issues.apache.org/jira/browse/FLINK-9610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16516914#comment-16516914 ] ASF GitHub Bot commented on FLINK-9610: --- Github user tedyu commented on a diff in the pull request: https://github.com/apache/flink/pull/6181#discussion_r196368052 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKeyHashPartitioner.java --- @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.partitioner; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.Arrays; + +/** + * A partitioner that uses the hash of the provided key to distribute + * the values over the partitions as evenly as possible. + * This partitioner ensures that all records with the same key will be sent to + * the same Kafka partition. + * + * Note that this will cause a lot of network connections to be created between + * all the Flink instances and all the Kafka brokers. + */ +@PublicEvolving +public class FlinkKeyHashPartitioner extends FlinkKafkaPartitioner { + + private static final long serialVersionUID = -2006468063065010594L; + + @Override + public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) { + Preconditions.checkArgument( + partitions != null && partitions.length > 0, + "Partitions of the target topic is empty."); + + return partitions[hash(key) % partitions.length]; --- End diff -- Should we guard against hash(key) % partitions.length < 0 (in case someone overrides hash()) ? > Add Kafka partitioner that uses the key to partition by > --- > > Key: FLINK-9610 > URL: https://issues.apache.org/jira/browse/FLINK-9610 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector >Reporter: Niels Basjes >Assignee: Niels Basjes >Priority: Major > > The kafka connector package only contains the FlinkFixedPartitioner > implementation of the FlinkKafkaPartitioner. > The most common usecase I have seen is the need to spread the records across > the Kafka partitions while keeping all messages with the same key together. > I'll put up a pull request with a very simple implementation that should make > this a lot easier for others to use and extend. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6181: [FLINK-9610] [flink-connector-kafka-base] Add Kafk...
Github user tedyu commented on a diff in the pull request: https://github.com/apache/flink/pull/6181#discussion_r196368052 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKeyHashPartitioner.java --- @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.partitioner; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.Arrays; + +/** + * A partitioner that uses the hash of the provided key to distribute + * the values over the partitions as evenly as possible. + * This partitioner ensures that all records with the same key will be sent to + * the same Kafka partition. + * + * Note that this will cause a lot of network connections to be created between + * all the Flink instances and all the Kafka brokers. + */ +@PublicEvolving +public class FlinkKeyHashPartitioner extends FlinkKafkaPartitioner { + + private static final long serialVersionUID = -2006468063065010594L; + + @Override + public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) { + Preconditions.checkArgument( + partitions != null && partitions.length > 0, + "Partitions of the target topic is empty."); + + return partitions[hash(key) % partitions.length]; --- End diff -- Should we guard against hash(key) % partitions.length < 0 (in case someone overrides hash()) ? ---
[jira] [Created] (FLINK-9613) YARNSessionCapacitySchedulerITCase failed because YarnTestBase.checkClusterEmpty()
Sihua Zhou created FLINK-9613: - Summary: YARNSessionCapacitySchedulerITCase failed because YarnTestBase.checkClusterEmpty() Key: FLINK-9613 URL: https://issues.apache.org/jira/browse/FLINK-9613 Project: Flink Issue Type: Bug Components: Tests Affects Versions: 1.6.0 Reporter: Sihua Zhou The test YARNSessionCapacitySchedulerITCase failed on travis because of .YarnTestBase.checkClusterEmpty(). https://api.travis-ci.org/v3/job/394017104/log.txt -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9187) add prometheus pushgateway reporter
[ https://issues.apache.org/jira/browse/FLINK-9187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16516911#comment-16516911 ] ASF GitHub Bot commented on FLINK-9187: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/5857 I I see the value of a `cluster_id` but you're mixing concerns here. It is not the responsibility of a `reporter` to introduce a `cluster_id` tag. Reporters are to faithfully report the metrics and their associated variables, not add more. Instead we may want to think about adding a configurable `cluster_id` value. With that out of the way we have arrived at my previous question, random ID vs actual ID. That the `job_name` will be equal to `tm/jm_id` is precisely why i prefer this approach, it doesn't introduce additional noise in the tags. I've already started working on `FLINK-9543` so we can proceed with this PR; the issue must be extended to also expose an ID for Dispatchers and register the JVM metrics for each JM. as to your question, `ResourceID`s are exactly what we're looking for; this is also what we're using for `TaskExecutor`. > add prometheus pushgateway reporter > --- > > Key: FLINK-9187 > URL: https://issues.apache.org/jira/browse/FLINK-9187 > Project: Flink > Issue Type: New Feature > Components: Metrics >Affects Versions: 1.4.2 >Reporter: lamber-ken >Priority: Minor > Labels: features > Fix For: 1.6.0 > > > make flink system can send metrics to prometheus via pushgateway. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9569) Confusing construction of AvroSerializers for generic records
[ https://issues.apache.org/jira/browse/FLINK-9569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16516912#comment-16516912 ] ASF GitHub Bot commented on FLINK-9569: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6151#discussion_r196366033 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java --- @@ -105,41 +108,54 @@ /** The currently accessing thread, set and checked on debug level only. */ private transient volatile Thread currentThread; - // + // --- instantiation methods -- /** * Creates a new AvroSerializer for the type indicated by the given class. -* This constructor is intended to be used with {@link SpecificRecord} or reflection serializer. -* For serializing {@link GenericData.Record} use {@link AvroSerializer#AvroSerializer(Class, Schema)} +* +* This constructor is expected to be used only with {@link GenericRecord}. +* For {@link SpecificRecord} or reflection serializer use {@link AvroSerializer#forNonGeneric(Class)}. +* +* @param schema the explicit schema to use for generic records. */ - public AvroSerializer(Class type) { - checkArgument(!isGenericRecord(type), - "For GenericData.Record use constructor with explicit schema."); - this.type = checkNotNull(type); - this.schemaString = null; + public static AvroSerializer forGeneric(Schema schema) { + return new AvroSerializer<>(GenericRecord.class, schema); --- End diff -- Should we do checking for schema to make sure it not null here? > Confusing construction of AvroSerializers for generic records > - > > Key: FLINK-9569 > URL: https://issues.apache.org/jira/browse/FLINK-9569 > Project: Flink > Issue Type: Improvement > Components: Type Serialization System >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Major > > The {{AvroSerializer}} currently has a {{AvroSerializer(Class type, Schema > schema)}} public constructor when used for generic records. > This is a bit confusing, because when using the \{{AvroSerializer}}, the type > to be serialized should always be a {{GenericData.Record}} type. > We should either: > - have a separate subclass of {{AvroSerializer}}, say > {{GenericRecordAvroSerializer}} that is a {{AvroSerializer}}, > or > - follow a similar approach to the instantiation methods in the > {{AvroDeserialiationSchema}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6151: [FLINK-9569] [avro] Fix confusing construction of ...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6151#discussion_r196366033 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java --- @@ -105,41 +108,54 @@ /** The currently accessing thread, set and checked on debug level only. */ private transient volatile Thread currentThread; - // + // --- instantiation methods -- /** * Creates a new AvroSerializer for the type indicated by the given class. -* This constructor is intended to be used with {@link SpecificRecord} or reflection serializer. -* For serializing {@link GenericData.Record} use {@link AvroSerializer#AvroSerializer(Class, Schema)} +* +* This constructor is expected to be used only with {@link GenericRecord}. +* For {@link SpecificRecord} or reflection serializer use {@link AvroSerializer#forNonGeneric(Class)}. +* +* @param schema the explicit schema to use for generic records. */ - public AvroSerializer(Class type) { - checkArgument(!isGenericRecord(type), - "For GenericData.Record use constructor with explicit schema."); - this.type = checkNotNull(type); - this.schemaString = null; + public static AvroSerializer forGeneric(Schema schema) { + return new AvroSerializer<>(GenericRecord.class, schema); --- End diff -- Should we do checking for schema to make sure it not null here? ---
[GitHub] flink issue #5857: [FLINK-9187][METRICS] add prometheus pushgateway reporter
Github user zentol commented on the issue: https://github.com/apache/flink/pull/5857 I I see the value of a `cluster_id` but you're mixing concerns here. It is not the responsibility of a `reporter` to introduce a `cluster_id` tag. Reporters are to faithfully report the metrics and their associated variables, not add more. Instead we may want to think about adding a configurable `cluster_id` value. With that out of the way we have arrived at my previous question, random ID vs actual ID. That the `job_name` will be equal to `tm/jm_id` is precisely why i prefer this approach, it doesn't introduce additional noise in the tags. I've already started working on `FLINK-9543` so we can proceed with this PR; the issue must be extended to also expose an ID for Dispatchers and register the JVM metrics for each JM. as to your question, `ResourceID`s are exactly what we're looking for; this is also what we're using for `TaskExecutor`. ---
[GitHub] flink issue #5857: [FLINK-9187][METRICS] add prometheus pushgateway reporter
Github user lamber-ken commented on the issue: https://github.com/apache/flink/pull/5857 @zentol , thanks for review. first, there is a small point that the IDs may be duplicated when use JM/TM actual unique ID to compose the job name of pushgateway, like the picture below. ![image](https://user-images.githubusercontent.com/20113411/41587417-c7367d86-73e1-11e8-95fc-ae54dbd63809.png) second, when config the grafana dashboard, the prefix of the pushgateway job name is useful, the uniq ID may not useful, because our metric data contains (`jm_id`, `tm_id`). so, for pushgateway, the jobName is used to distinguish different services. for grafana, the prefix of jobName can use to distinguish different clusters, we also can just use JM/TM ID and ignore the jobName. `FLINK-9543` is useful and important. I tried to finished it. Is't ok to use `resourceId` to represent the JMâID? ```java public abstract class ResourceManager ...{ public static final String RESOURCE_MANAGER_NAME = "resourcemanager"; /** Unique id of the resource manager. */ private final ResourceID resourceId; ``` ---
[jira] [Commented] (FLINK-9187) add prometheus pushgateway reporter
[ https://issues.apache.org/jira/browse/FLINK-9187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16516863#comment-16516863 ] ASF GitHub Bot commented on FLINK-9187: --- Github user lamber-ken commented on the issue: https://github.com/apache/flink/pull/5857 @zentol , thanks for review. first, there is a small point that the IDs may be duplicated when use JM/TM actual unique ID to compose the job name of pushgateway, like the picture below. ![image](https://user-images.githubusercontent.com/20113411/41587417-c7367d86-73e1-11e8-95fc-ae54dbd63809.png) second, when config the grafana dashboard, the prefix of the pushgateway job name is useful, the uniq ID may not useful, because our metric data contains (`jm_id`, `tm_id`). so, for pushgateway, the jobName is used to distinguish different services. for grafana, the prefix of jobName can use to distinguish different clusters, we also can just use JM/TM ID and ignore the jobName. `FLINK-9543` is useful and important. I tried to finished it. Is't ok to use `resourceId` to represent the JM‘ID? ```java public abstract class ResourceManager ...{ public static final String RESOURCE_MANAGER_NAME = "resourcemanager"; /** Unique id of the resource manager. */ private final ResourceID resourceId; ``` > add prometheus pushgateway reporter > --- > > Key: FLINK-9187 > URL: https://issues.apache.org/jira/browse/FLINK-9187 > Project: Flink > Issue Type: New Feature > Components: Metrics >Affects Versions: 1.4.2 >Reporter: lamber-ken >Priority: Minor > Labels: features > Fix For: 1.6.0 > > > make flink system can send metrics to prometheus via pushgateway. -- This message was sent by Atlassian JIRA (v7.6.3#76005)