[GitHub] flink issue #3641: [FLINK-5654] - Add processing time OVER RANGE BETWEEN x P...
Github user narendraj9 commented on the issue: https://github.com/apache/flink/pull/3641 @fhueske Hi, I am trying to understand how Objects of RichProcessFunction class are managed. For example, you mentioned that onTimer() and processElement() won't be called at the same time. Could you share the document/link or code that I can read? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1614#comment-1614 ] ASF GitHub Bot commented on FLINK-5654: --- Github user narendraj9 commented on the issue: https://github.com/apache/flink/pull/3641 @fhueske Hi, I am trying to understand how Objects of RichProcessFunction class are managed. For example, you mentioned that onTimer() and processElement() won't be called at the same time. Could you share the document/link or code that I can read? > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > Fix For: 1.3.0 > > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7366) Upgrade kinesis producer library in flink-connector-kinesis
[ https://issues.apache.org/jira/browse/FLINK-7366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16146639#comment-16146639 ] ASF GitHub Bot commented on FLINK-7366: --- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4522 Great! I can start FLINK-7508 then > Upgrade kinesis producer library in flink-connector-kinesis > --- > > Key: FLINK-7366 > URL: https://issues.apache.org/jira/browse/FLINK-7366 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.3.2 >Reporter: Bowen Li >Assignee: Bowen Li > Fix For: 1.4.0, 1.3.3 > > > We need to upgrade KPL and KCL to pick up the enhanced performance and > stability for Flink to work better with Kinesis. Upgrading KPL is specially > necessary, because the KPL version Flink uses is old, and doesn't have good > retry and error handling logic. > *KPL:* > flink-connector-kinesis currently uses kinesis-producer-library 0.10.2, which > is released in Nov 2015 by AWS. It's old. It's the fourth release, and thus > problematic. It doesn't even have good retry logic, therefore Flink fails > really frequently (about every 10 mins as we observed) when Flink writes too > fast to Kinesis and receives RateLimitExceededException, > Quotes from https://github.com/awslabs/amazon-kinesis-producer/issues/56, > "*With the newer version of the KPL it uses the AWS C++ SDK which should > offer additional retries.*" on Oct 2016. 0.12.5, the version we are upgrading > to, is released in May 2017 and should have the enhanced retry logic. > *KCL:* > Upgrade KCL from 1.6.2 to 1.8.1 > *AWS SDK* > from 1.10.71 to 1.11.171 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4522: [FLINK-7366][kinesis connector] Upgrade kinesis producer ...
Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4522 Great! I can start FLINK-7508 then --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-7558) Improve SQL ValidationException message.
sunjincheng created FLINK-7558: -- Summary: Improve SQL ValidationException message. Key: FLINK-7558 URL: https://issues.apache.org/jira/browse/FLINK-7558 Project: Flink Issue Type: Improvement Components: Table API & SQL Reporter: sunjincheng Assignee: sunjincheng org.apache.flink.table.api.ValidationException: SQL validation failed. Operand types of could not be inferred. at org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:91) at org.apache.flink.table.api.TableEnvironment.sql(TableEnvironment.scala:513) at com.alibaba.blink.scala.tool.util.SqlJobAdapter.dealInserts(SqlJobAdapter.java:292) at com.alibaba.blink.scala.tool.util.JobBuildHelper.buildSqlJob(JobBuildHelper.java:80) at com.alibaba.blink.scala.tool.JobLauncher.main(JobLauncher.java:138) Caused by: org.apache.flink.table.api.ValidationException: Operand types of could not be inferred. at org.apache.flink.table.functions.utils.ScalarSqlFunction$$anon$2$$anonfun$2.apply(ScalarSqlFunction.scala:110) at org.apache.flink.table.functions.utils.ScalarSqlFunction$$anon$2$$anonfun$2.apply(ScalarSqlFunction.scala:110) at scala.Option.getOrElse(Option.scala:121) at org.apache.flink.table.functions.utils.ScalarSqlFunction$$anon$2.inferOperandTypes(ScalarSqlFunction.scala:110) at org.apache.calcite.sql.validate.SqlValidatorImpl.inferUnknownTypes(SqlValidatorImpl.java:1769) at -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7557) Fix typo for s3a config in AWS deployment documentation
[ https://issues.apache.org/jira/browse/FLINK-7557?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16146553#comment-16146553 ] ASF GitHub Bot commented on FLINK-7557: --- GitHub user tony810430 opened a pull request: https://github.com/apache/flink/pull/4618 [FLINK-7557] Fix typo for s3a config in AWS deployment documentation Should use `fs.s3a.buffer.dir` in `core-site.xml` to set s3a directories for buffering files. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tony810430/flink FLINK-7557 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4618.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4618 commit d48931888574d7d6407b11cbbd597c83fb6b9d94 Author: Tony WeiDate: 2017-08-30T03:44:27Z [FLINK-7557] Fix typo for s3a config in AWS deployment documentation > Fix typo for s3a config in AWS deployment documentation > --- > > Key: FLINK-7557 > URL: https://issues.apache.org/jira/browse/FLINK-7557 > Project: Flink > Issue Type: Bug > Components: Documentation >Reporter: Wei-Che Wei >Assignee: Wei-Che Wei > > The property name {{fs.s3.buffer.dir}} for s3a in {{core-site.xml}} should be > {{fs.s3a.buffer.dir}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4618: [FLINK-7557] Fix typo for s3a config in AWS deploy...
GitHub user tony810430 opened a pull request: https://github.com/apache/flink/pull/4618 [FLINK-7557] Fix typo for s3a config in AWS deployment documentation Should use `fs.s3a.buffer.dir` in `core-site.xml` to set s3a directories for buffering files. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tony810430/flink FLINK-7557 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4618.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4618 commit d48931888574d7d6407b11cbbd597c83fb6b9d94 Author: Tony WeiDate: 2017-08-30T03:44:27Z [FLINK-7557] Fix typo for s3a config in AWS deployment documentation --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-7557) Fix typo for s3a config in AWS deployment documentation
Wei-Che Wei created FLINK-7557: -- Summary: Fix typo for s3a config in AWS deployment documentation Key: FLINK-7557 URL: https://issues.apache.org/jira/browse/FLINK-7557 Project: Flink Issue Type: Bug Components: Documentation Reporter: Wei-Che Wei Assignee: Wei-Che Wei The property name {{fs.s3.buffer.dir}} for s3a in {{core-site.xml}} should be {{fs.s3a.buffer.dir}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL
[ https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16144282#comment-16144282 ] Shuyi Chen edited comment on FLINK-7491 at 8/29/17 11:16 PM: - Hi [~jark], thanks for the response. However, I am worried with Array as the runtime type, multiset specific operation will be slow, for example. MEMBER OF operator is *O(1)* for multiset data structure and {code:java}O(n){code} SUBMULTISET OF operator is *O(m+n)* for array, and *O(m)* for multiset if to test M < N. Also the actual type I am using is HashMultiset, which is backed by a java HashMap, which I think should perform reasonably well. was (Author: suez1224): Hi [~jark], thanks for the response. However, I am worried with Array as the runtime type, multiset specific operation will be slow, for example. MEMBER OF operator is *O(1)* for multiset data structure and {code:java} *O(n)* {code} SUBMULTISET OF operator is *O(m+n)* for array, and *O(m)* for multiset if to test M < N. Also the actual type I am using is HashMultiset, which is backed by a java HashMap , which I think should perform reasonably well. > Support COLLECT Aggregate function in Flink SQL > --- > > Key: FLINK-7491 > URL: https://issues.apache.org/jira/browse/FLINK-7491 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Shuyi Chen >Assignee: Shuyi Chen > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL
[ https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16144282#comment-16144282 ] Shuyi Chen edited comment on FLINK-7491 at 8/29/17 11:15 PM: - Hi [~jark], thanks for the response. However, I am worried with Array as the runtime type, multiset specific operation will be slow, for example. MEMBER OF operator is *O(1)* for multiset data structure and *O(n)* SUBMULTISET OF operator is *O(m+n)* for array, and *O(m)* for multiset if to test M < N. Also the actual type I am using is HashMultiset, which is backed by a java HashMap, which I think should perform reasonably well. was (Author: suez1224): Hi [~jark], thanks for the response. However, I am worried with Array as the runtime type, multiset specific operation will be slow, for example. MEMBER OF operator is O(1) for multiset data structure and O(n) for array. SUBMULTISET OF operator is O(m+n) for array, and O(m) for multiset if to test M < N. Also the actual type I am using is HashMultiset, which is backed by a java HashMap , which I think should perform reasonably well. > Support COLLECT Aggregate function in Flink SQL > --- > > Key: FLINK-7491 > URL: https://issues.apache.org/jira/browse/FLINK-7491 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Shuyi Chen >Assignee: Shuyi Chen > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL
[ https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16144282#comment-16144282 ] Shuyi Chen edited comment on FLINK-7491 at 8/29/17 11:15 PM: - Hi [~jark], thanks for the response. However, I am worried with Array as the runtime type, multiset specific operation will be slow, for example. MEMBER OF operator is *O(1)* for multiset data structure and {code:java} *O(n)* {code} SUBMULTISET OF operator is *O(m+n)* for array, and *O(m)* for multiset if to test M < N. Also the actual type I am using is HashMultiset, which is backed by a java HashMap, which I think should perform reasonably well. was (Author: suez1224): Hi [~jark], thanks for the response. However, I am worried with Array as the runtime type, multiset specific operation will be slow, for example. MEMBER OF operator is *O(1)* for multiset data structure and *O(n)* SUBMULTISET OF operator is *O(m+n)* for array, and *O(m)* for multiset if to test M < N. Also the actual type I am using is HashMultiset, which is backed by a java HashMap , which I think should perform reasonably well. > Support COLLECT Aggregate function in Flink SQL > --- > > Key: FLINK-7491 > URL: https://issues.apache.org/jira/browse/FLINK-7491 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Shuyi Chen >Assignee: Shuyi Chen > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #3511: [Flink-5734] code generation for normalizedkey sorter
Github user heytitle commented on the issue: https://github.com/apache/flink/pull/3511 Hi @ggevay, Thanks for the commits. Do you have any plan for `FixedLengthRecordSorter` implementation? I'm not sure how much work need to be done there. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-7503) Config options taskmanager.log.path and jobmanager.web.log.path are misleading, if not broken
[ https://issues.apache.org/jira/browse/FLINK-7503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-7503: -- Component/s: Webfrontend > Config options taskmanager.log.path and jobmanager.web.log.path are > misleading, if not broken > - > > Key: FLINK-7503 > URL: https://issues.apache.org/jira/browse/FLINK-7503 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Affects Versions: 1.3.2 >Reporter: Felix Dreissig >Priority: Minor > > Setting config option {{taskmanager.log.path}} to the absolute directory of > the logs caused the TaskManager logs to be unavailable through the JobManager > web UI in our setup: The web UI said "Fetching TaskManager log failed.", > JobManager and TaskManager logs see below. > By grepping through the source code, I found that {{taskmanager.log.path}} > (resp. {{TASK_MANAGER_LOG_PATH_KEY}}) outside of tests only ever gets used by > {{TaskManager.handleRequestTaskManagerLog()}}, but not for writing the logs. > Which makes sense, because writing them is (solely, as far as I can tell) > handled by Log4j. > Documentation on {{taskmanager.log.path}} is rather sparse and just says "The > config parameter defining the taskmanager log file location". Apart from not > telling what the value is supposed to look like (absolute/relative path), > this also doesn't say how the option is supposed to be used. > A similar case is {{jobmanager.web.log.path}}, which isn't even documented > (no idea how it ended up in our config) and apart from tests is only used in > {{WebMonitorUtils.LogFileLocation.find()}}. For production deployments, it > normally won't even be picked up there since {{log.file}} is always set when > Flink is launched through the start script. However, this option caused no > issues for us so far. > > JobManager log: > {code} > ERROR org.apache.flink.runtime.webmonitor.handlers.TaskManagerLogHandler - > Fetching TaskManager log failed. > java.util.NoSuchElementException: None.get > at scala.None$.get(Option.scala:347) > at scala.None$.get(Option.scala:345) > at > org.apache.flink.runtime.webmonitor.handlers.TaskManagerLogHandler$2.apply(TaskManagerLogHandler.java:200) > at > org.apache.flink.runtime.webmonitor.handlers.TaskManagerLogHandler$2.apply(TaskManagerLogHandler.java:197) > {code} > TaskManager log: > {code} > ERROR akka.actor.OneForOneStrategy - /var/log/flink (Is a directory) > java.io.FileNotFoundException: /var/log/flink (Is a directory) > at java.io.FileInputStream.open0(Native Method) > at java.io.FileInputStream.open(FileInputStream.java:195) > at java.io.FileInputStream.(FileInputStream.java:138) > at > org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleRequestTaskManagerLog(TaskManager.scala:840) > at > org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:337) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7524) Task "xxx" did not react to cancelling signal, but is stuck in method
[ https://issues.apache.org/jira/browse/FLINK-7524?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16146098#comment-16146098 ] Bowen Li commented on FLINK-7524: - This seems to be due to Flink TM ran out of memory. After switching from filesystemstatebackend to rocksdbstatebackend, the memory usage has gone down a lot, and this issue never happens again. > Task "xxx" did not react to cancelling signal, but is stuck in method > - > > Key: FLINK-7524 > URL: https://issues.apache.org/jira/browse/FLINK-7524 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.0 >Reporter: Bowen Li >Priority: Blocker > Fix For: 1.4.0 > > > Hi, > I observed the following errors in taskmanager.log > {code:java} > 2017-08-25 17:03:40,141 WARN org.apache.flink.runtime.taskmanager.Task > - Task 'TriggerWindow(SlidingEventTimeWindows(25920, > 360), > AggregatingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@f65b6aa2, > aggFunction=com.streamingApp$MyAggregateFunction@1f193686}, > EventTimeTrigger(), WindowedStream.aggregate(WindowedStream.java:858)) -> > Sink: prod_item (2/6)' did not react to cancelling signal, but is stuck in > method: > > org.apache.flink.util.AbstractCloseableRegistry.unregisterClosable(AbstractCloseableRegistry.java:84) > org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.closeAndUnregisterStream(StateSnapshotContextSynchronousImpl.java:137) > org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.close(StateSnapshotContextSynchronousImpl.java:147) > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399) > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1157) > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1089) > org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:653) > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:589) > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:542) > org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:378) > org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:281) > org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:183) > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213) > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262) > org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > java.lang.Thread.run(Thread.java:748) > ... > 2017-08-25 17:05:10,139 INFO org.apache.flink.runtime.taskmanager.Task > - Notifying TaskManager about fatal error. Task > 'TriggerWindow(SlidingEventTimeWindows(25920, 360), > AggregatingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@f65b6aa2, > aggFunction=com.streamingApp$MyAggregateFunction@1f193686}, > EventTimeTrigger(), WindowedStream.aggregate(WindowedStream.java:858)) -> > Sink: prod_item (2/6)' did not react to cancelling signal in the last 30 > seconds, but is stuck in method: > > org.apache.flink.util.AbstractCloseableRegistry.unregisterClosable(AbstractCloseableRegistry.java:84) > org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.closeAndUnregisterStream(StateSnapshotContextSynchronousImpl.java:137) > org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.close(StateSnapshotContextSynchronousImpl.java:147) > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399) > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1157) > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1089) > org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:653) > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:589) > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:542) > org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:378) > org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:281) >
[jira] [Updated] (FLINK-7524) Task "xxx" did not react to cancelling signal, but is stuck in method
[ https://issues.apache.org/jira/browse/FLINK-7524?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-7524: -- Component/s: State Backends, Checkpointing > Task "xxx" did not react to cancelling signal, but is stuck in method > - > > Key: FLINK-7524 > URL: https://issues.apache.org/jira/browse/FLINK-7524 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.0 >Reporter: Bowen Li >Priority: Blocker > Fix For: 1.4.0 > > > Hi, > I observed the following errors in taskmanager.log > {code:java} > 2017-08-25 17:03:40,141 WARN org.apache.flink.runtime.taskmanager.Task > - Task 'TriggerWindow(SlidingEventTimeWindows(25920, > 360), > AggregatingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@f65b6aa2, > aggFunction=com.streamingApp$MyAggregateFunction@1f193686}, > EventTimeTrigger(), WindowedStream.aggregate(WindowedStream.java:858)) -> > Sink: prod_item (2/6)' did not react to cancelling signal, but is stuck in > method: > > org.apache.flink.util.AbstractCloseableRegistry.unregisterClosable(AbstractCloseableRegistry.java:84) > org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.closeAndUnregisterStream(StateSnapshotContextSynchronousImpl.java:137) > org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.close(StateSnapshotContextSynchronousImpl.java:147) > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399) > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1157) > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1089) > org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:653) > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:589) > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:542) > org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:378) > org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:281) > org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:183) > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213) > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262) > org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > java.lang.Thread.run(Thread.java:748) > ... > 2017-08-25 17:05:10,139 INFO org.apache.flink.runtime.taskmanager.Task > - Notifying TaskManager about fatal error. Task > 'TriggerWindow(SlidingEventTimeWindows(25920, 360), > AggregatingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@f65b6aa2, > aggFunction=com.streamingApp$MyAggregateFunction@1f193686}, > EventTimeTrigger(), WindowedStream.aggregate(WindowedStream.java:858)) -> > Sink: prod_item (2/6)' did not react to cancelling signal in the last 30 > seconds, but is stuck in method: > > org.apache.flink.util.AbstractCloseableRegistry.unregisterClosable(AbstractCloseableRegistry.java:84) > org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.closeAndUnregisterStream(StateSnapshotContextSynchronousImpl.java:137) > org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.close(StateSnapshotContextSynchronousImpl.java:147) > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399) > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1157) > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1089) > org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:653) > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:589) > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:542) > org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:378) > org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:281) > org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:183) > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213) >
[jira] [Updated] (FLINK-7546) Support SUBMULTISET_OF Operator for Multiset SQL type
[ https://issues.apache.org/jira/browse/FLINK-7546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-7546: -- Component/s: Table API & SQL > Support SUBMULTISET_OF Operator for Multiset SQL type > - > > Key: FLINK-7546 > URL: https://issues.apache.org/jira/browse/FLINK-7546 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Shuyi Chen >Assignee: Shuyi Chen > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7539) Make AvroOutputFormat default codec configurable
[ https://issues.apache.org/jira/browse/FLINK-7539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-7539: -- Component/s: Batch Connectors and Input/Output Formats > Make AvroOutputFormat default codec configurable > > > Key: FLINK-7539 > URL: https://issues.apache.org/jira/browse/FLINK-7539 > Project: Flink > Issue Type: Improvement > Components: Batch Connectors and Input/Output Formats >Reporter: Sebastian Klemke > > In my organization there is a requirement that all avro datasets stored on > HDFS should be compressed. Currently, this requires invoking setCodec() > manually on all AvroOutputFormat instances. To ease setting up > AvroOutputFormat instances, we'd like to be able to configure default codec > site-wide, ideally via flink-conf.yaml -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7545) Support MEMBER OF Operator for Multiset SQL type
[ https://issues.apache.org/jira/browse/FLINK-7545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-7545: -- Component/s: Table API & SQL > Support MEMBER OF Operator for Multiset SQL type > > > Key: FLINK-7545 > URL: https://issues.apache.org/jira/browse/FLINK-7545 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Shuyi Chen >Assignee: Shuyi Chen > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7366) Upgrade kinesis producer library in flink-connector-kinesis
[ https://issues.apache.org/jira/browse/FLINK-7366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16146046#comment-16146046 ] ASF GitHub Bot commented on FLINK-7366: --- Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/4522 I've pushed the PR to CI again. If it passes, I'll merge it. > Upgrade kinesis producer library in flink-connector-kinesis > --- > > Key: FLINK-7366 > URL: https://issues.apache.org/jira/browse/FLINK-7366 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.3.2 >Reporter: Bowen Li >Assignee: Bowen Li > Fix For: 1.4.0, 1.3.3 > > > We need to upgrade KPL and KCL to pick up the enhanced performance and > stability for Flink to work better with Kinesis. Upgrading KPL is specially > necessary, because the KPL version Flink uses is old, and doesn't have good > retry and error handling logic. > *KPL:* > flink-connector-kinesis currently uses kinesis-producer-library 0.10.2, which > is released in Nov 2015 by AWS. It's old. It's the fourth release, and thus > problematic. It doesn't even have good retry logic, therefore Flink fails > really frequently (about every 10 mins as we observed) when Flink writes too > fast to Kinesis and receives RateLimitExceededException, > Quotes from https://github.com/awslabs/amazon-kinesis-producer/issues/56, > "*With the newer version of the KPL it uses the AWS C++ SDK which should > offer additional retries.*" on Oct 2016. 0.12.5, the version we are upgrading > to, is released in May 2017 and should have the enhanced retry logic. > *KCL:* > Upgrade KCL from 1.6.2 to 1.8.1 > *AWS SDK* > from 1.10.71 to 1.11.171 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4522: [FLINK-7366][kinesis connector] Upgrade kinesis producer ...
Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/4522 I've pushed the PR to CI again. If it passes, I'll merge it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Closed] (FLINK-7309) NullPointerException in CodeGenUtils.timePointToInternalCode() generated code
[ https://issues.apache.org/jira/browse/FLINK-7309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske closed FLINK-7309. Resolution: Fixed Fix Version/s: 1.3.3 1.4.0 Fixed for 1.3.3 with eec261e9287b7272f21106df931686a9b4ca828a Fixed for 1.4.0 with 29e849b1bf9180a5aa5f2d500efb283a39839caa > NullPointerException in CodeGenUtils.timePointToInternalCode() generated code > - > > Key: FLINK-7309 > URL: https://issues.apache.org/jira/browse/FLINK-7309 > Project: Flink > Issue Type: Bug > Components: Local Runtime, Table API & SQL >Affects Versions: 1.3.1 >Reporter: Liangliang Chen >Priority: Critical > Fix For: 1.4.0, 1.3.3 > > > The code generated by CodeGenUtils.timePointToInternalCode() will cause a > NullPointerException when SQL table field type is `TIMESTAMP` and the field > value is `null`. > Example for reproduce: > {code} > object StreamSQLExample { > def main(args: Array[String]): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > val tEnv = TableEnvironment.getTableEnvironment(env) > // null field value > val orderA: DataStream[Order] = env.fromCollection(Seq( > Order(null, "beer", 3))) > > tEnv.registerDataStream("OrderA", orderA, 'ts, 'product, 'amount) > val result = tEnv.sql("SELECT * FROM OrderA") > result.toAppendStream[Order].print() > > env.execute() > } > case class Order(ts: Timestamp, product: String, amount: Int) > } > {code} > In the above example, timePointToInternalCode() will generated some > statements like this: > {code} > ... > long result$1 = > org.apache.calcite.runtime.SqlFunctions.toLong((java.sql.Timestamp) in1.ts()); > boolean isNull$2 = (java.sql.Timestamp) in1.ts() == null; > ... > {code} > so, the NPE will happen when in1.ts() is null. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7398) Table API operators/UDFs must not store Logger
[ https://issues.apache.org/jira/browse/FLINK-7398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske closed FLINK-7398. Resolution: Fixed Fixed for 1.3.3 with 3167f72d95719ee551e798196fd26b9503e2dfd9 Fixed for 1.4.0 with df7452d9811b0aa88919d7e3c1f6c34b36ac9b38 > Table API operators/UDFs must not store Logger > -- > > Key: FLINK-7398 > URL: https://issues.apache.org/jira/browse/FLINK-7398 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.0, 1.3.2 >Reporter: Aljoscha Krettek >Assignee: Haohui Mai >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > Attachments: Example.png > > > Table API operators and UDFs store a reference to the (slf4j) {{Logger}} in > an instance field (c.f. > https://github.com/apache/flink/blob/f37988c19adc30d324cde83c54f2fa5d36efb9e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala#L39). > This means that the {{Logger}} will be serialised with the UDF and sent to > the cluster. This in itself does not sound right and leads to problems when > the slf4j configuration on the Client is different from the cluster > environment. > This is an example of a user running into that problem: > https://lists.apache.org/thread.html/01dd44007c0122d60c3fd2b2bb04fd2d6d2114bcff1e34d1d2079522@%3Cuser.flink.apache.org%3E. > Here, they have Logback on the client but the Logback classes are not > available on the cluster and so deserialisation of the UDFs fails with a > {{ClassNotFoundException}}. > This is a rough list of the involved classes: > {code} > src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala:43: > private val LOG: Logger = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala:45: > private val LOG: Logger = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala:28: > val LOG10 = Types.lookupMethod(classOf[Math], "log10", classOf[Double]) > src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala:62: > private val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala:59: > private val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala:51: > private val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/plan/nodes/FlinkConventions.scala:28: > val LOGICAL: Convention = new Convention.Impl("LOGICAL", > classOf[FlinkLogicalRel]) > src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala:38: val > LOGICAL_OPT_RULES: RuleSet = RuleSets.ofList( > src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala:36: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala:43: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetFinalAggFunction.scala:44: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala:44: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala:55: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala:66: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala:56: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala:64: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala:46: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala:52: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala:55: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala:48: > val LOG: Logger = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala:66: >
[jira] [Closed] (FLINK-7245) Enhance the operators to support holding back watermarks
[ https://issues.apache.org/jira/browse/FLINK-7245?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske closed FLINK-7245. Resolution: Implemented Fix Version/s: 1.4.0 Implemented for 1.4.0 with 68fdaa57e35b8ee30a262aad4d26926b18054c57 > Enhance the operators to support holding back watermarks > > > Key: FLINK-7245 > URL: https://issues.apache.org/jira/browse/FLINK-7245 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Xingcan Cui >Assignee: Xingcan Cui > Fix For: 1.4.0 > > > Currently the watermarks are applied and emitted by the > {{AbstractStreamOperator}} instantly. > {code:java} > public void processWatermark(Watermark mark) throws Exception { > if (timeServiceManager != null) { > timeServiceManager.advanceWatermark(mark); > } > output.emitWatermark(mark); > } > {code} > Some calculation results (with timestamp fields) triggered by these > watermarks (e.g., join or aggregate results) may be regarded as delayed by > the downstream operators since their timestamps must be less than or equal to > the corresponding triggers. > This issue aims to add another "working mode", which supports holding back > watermarks, to current operators. These watermarks should be blocked and > stored by the operators until all the corresponding new generated results are > emitted. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7206) Implementation of DataView to support state access for UDAGG
[ https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske closed FLINK-7206. Resolution: Implemented Fix Version/s: 1.4.0 Implemented for 1.4.0 with 1fc0b6413c74eff0ace25f4329451e35e84849b5 > Implementation of DataView to support state access for UDAGG > > > Key: FLINK-7206 > URL: https://issues.apache.org/jira/browse/FLINK-7206 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kaibo Zhou >Assignee: Kaibo Zhou > Fix For: 1.4.0 > > > Implementation of MapView and ListView to support state access for UDAGG. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG
[ https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16146022#comment-16146022 ] ASF GitHub Bot commented on FLINK-7206: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4355 > Implementation of DataView to support state access for UDAGG > > > Key: FLINK-7206 > URL: https://issues.apache.org/jira/browse/FLINK-7206 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kaibo Zhou >Assignee: Kaibo Zhou > > Implementation of MapView and ListView to support state access for UDAGG. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7309) NullPointerException in CodeGenUtils.timePointToInternalCode() generated code
[ https://issues.apache.org/jira/browse/FLINK-7309?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16146025#comment-16146025 ] ASF GitHub Bot commented on FLINK-7309: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4479 > NullPointerException in CodeGenUtils.timePointToInternalCode() generated code > - > > Key: FLINK-7309 > URL: https://issues.apache.org/jira/browse/FLINK-7309 > Project: Flink > Issue Type: Bug > Components: Local Runtime, Table API & SQL >Affects Versions: 1.3.1 >Reporter: Liangliang Chen >Priority: Critical > > The code generated by CodeGenUtils.timePointToInternalCode() will cause a > NullPointerException when SQL table field type is `TIMESTAMP` and the field > value is `null`. > Example for reproduce: > {code} > object StreamSQLExample { > def main(args: Array[String]): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > val tEnv = TableEnvironment.getTableEnvironment(env) > // null field value > val orderA: DataStream[Order] = env.fromCollection(Seq( > Order(null, "beer", 3))) > > tEnv.registerDataStream("OrderA", orderA, 'ts, 'product, 'amount) > val result = tEnv.sql("SELECT * FROM OrderA") > result.toAppendStream[Order].print() > > env.execute() > } > case class Order(ts: Timestamp, product: String, amount: Int) > } > {code} > In the above example, timePointToInternalCode() will generated some > statements like this: > {code} > ... > long result$1 = > org.apache.calcite.runtime.SqlFunctions.toLong((java.sql.Timestamp) in1.ts()); > boolean isNull$2 = (java.sql.Timestamp) in1.ts() == null; > ... > {code} > so, the NPE will happen when in1.ts() is null. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7398) Table API operators/UDFs must not store Logger
[ https://issues.apache.org/jira/browse/FLINK-7398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16146024#comment-16146024 ] ASF GitHub Bot commented on FLINK-7398: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4576 > Table API operators/UDFs must not store Logger > -- > > Key: FLINK-7398 > URL: https://issues.apache.org/jira/browse/FLINK-7398 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.0, 1.3.2 >Reporter: Aljoscha Krettek >Assignee: Haohui Mai >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > Attachments: Example.png > > > Table API operators and UDFs store a reference to the (slf4j) {{Logger}} in > an instance field (c.f. > https://github.com/apache/flink/blob/f37988c19adc30d324cde83c54f2fa5d36efb9e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala#L39). > This means that the {{Logger}} will be serialised with the UDF and sent to > the cluster. This in itself does not sound right and leads to problems when > the slf4j configuration on the Client is different from the cluster > environment. > This is an example of a user running into that problem: > https://lists.apache.org/thread.html/01dd44007c0122d60c3fd2b2bb04fd2d6d2114bcff1e34d1d2079522@%3Cuser.flink.apache.org%3E. > Here, they have Logback on the client but the Logback classes are not > available on the cluster and so deserialisation of the UDFs fails with a > {{ClassNotFoundException}}. > This is a rough list of the involved classes: > {code} > src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala:43: > private val LOG: Logger = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala:45: > private val LOG: Logger = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala:28: > val LOG10 = Types.lookupMethod(classOf[Math], "log10", classOf[Double]) > src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala:62: > private val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala:59: > private val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala:51: > private val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/plan/nodes/FlinkConventions.scala:28: > val LOGICAL: Convention = new Convention.Impl("LOGICAL", > classOf[FlinkLogicalRel]) > src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala:38: val > LOGICAL_OPT_RULES: RuleSet = RuleSets.ofList( > src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala:36: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala:43: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetFinalAggFunction.scala:44: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala:44: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala:55: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala:66: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala:56: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala:64: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala:46: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala:52: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala:55: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala:48: > val LOG: Logger = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala:66: > val
[jira] [Commented] (FLINK-7245) Enhance the operators to support holding back watermarks
[ https://issues.apache.org/jira/browse/FLINK-7245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16146023#comment-16146023 ] ASF GitHub Bot commented on FLINK-7245: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4530 > Enhance the operators to support holding back watermarks > > > Key: FLINK-7245 > URL: https://issues.apache.org/jira/browse/FLINK-7245 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Xingcan Cui >Assignee: Xingcan Cui > > Currently the watermarks are applied and emitted by the > {{AbstractStreamOperator}} instantly. > {code:java} > public void processWatermark(Watermark mark) throws Exception { > if (timeServiceManager != null) { > timeServiceManager.advanceWatermark(mark); > } > output.emitWatermark(mark); > } > {code} > Some calculation results (with timestamp fields) triggered by these > watermarks (e.g., join or aggregate results) may be regarded as delayed by > the downstream operators since their timestamps must be less than or equal to > the corresponding triggers. > This issue aims to add another "working mode", which supports holding back > watermarks, to current operators. These watermarks should be blocked and > stored by the operators until all the corresponding new generated results are > emitted. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4479: [FLINK-7309][hotfix] fix NullPointerException when...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4479 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4530: [FLINK-7245] [stream] Support holding back waterma...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4530 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4355 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4576: [FLINK-7398] Table API operators/UDFs must not sto...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4576 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #1305: Out-of-core state backend for JDBC databases
Github user coveralls commented on the issue: https://github.com/apache/flink/pull/1305 [![Coverage Status](https://coveralls.io/builds/13040362/badge)](https://coveralls.io/builds/13040362) Changes Unknown when pulling **db2a964a450c05cb2aad3843999d994e4b8e5ef5 on gyfora:master** into ** on apache:master**. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4617: [FLINK-7556] Fix fetch size configurable in JDBCIn...
GitHub user nycholas opened a pull request: https://github.com/apache/flink/pull/4617 [FLINK-7556] Fix fetch size configurable in JDBCInputFormat for MySQL Driver According to the MySQL documentation[1], it follows: * ResultSet > By default, ResultSets are completely retrieved and stored in memory. In most cases this is the most efficient way to operate and, due to the design of the MySQL network protocol, is easier to implement. If you are working with ResultSets that have a large number of rows or large values and cannot allocate heap space in your JVM for the memory required, you can tell the driver to stream the results back one row at a time. > To enable this functionality, create a Statement instance in the following manner: ``` stmt = conn.createStatement(java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY); stmt.setFetchSize(Integer.MIN_VALUE); ``` > The combination of a forward-only, read-only result set, with a fetch size of Integer.MIN_VALUE serves as a signal to the driver to stream result sets row-by-row. After this, any result sets created with the statement will be retrieved row-by-row. Allow the Integer.MIN_VALUE to be accepted as a parameter for setFetchSize. [1] - https://dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-implementation-notes.html You can merge this pull request into a Git repository by running: $ git pull https://github.com/cenobites/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4617.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4617 commit 82535fd09cf52cb3e1c3da162dc807d7d889ac19 Author: Nycholas de Oliveira e OliveiraDate: 2017-08-29T17:21:03Z [FLINK-7556] Fix fetch size configurable in JDBCInputFormat for MySQL Driver --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7556) Fix fetch size configurable in JDBCInputFormat for MySQL Driver
[ https://issues.apache.org/jira/browse/FLINK-7556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16145732#comment-16145732 ] ASF GitHub Bot commented on FLINK-7556: --- GitHub user nycholas opened a pull request: https://github.com/apache/flink/pull/4617 [FLINK-7556] Fix fetch size configurable in JDBCInputFormat for MySQL Driver According to the MySQL documentation[1], it follows: * ResultSet > By default, ResultSets are completely retrieved and stored in memory. In most cases this is the most efficient way to operate and, due to the design of the MySQL network protocol, is easier to implement. If you are working with ResultSets that have a large number of rows or large values and cannot allocate heap space in your JVM for the memory required, you can tell the driver to stream the results back one row at a time. > To enable this functionality, create a Statement instance in the following manner: ``` stmt = conn.createStatement(java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY); stmt.setFetchSize(Integer.MIN_VALUE); ``` > The combination of a forward-only, read-only result set, with a fetch size of Integer.MIN_VALUE serves as a signal to the driver to stream result sets row-by-row. After this, any result sets created with the statement will be retrieved row-by-row. Allow the Integer.MIN_VALUE to be accepted as a parameter for setFetchSize. [1] - https://dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-implementation-notes.html You can merge this pull request into a Git repository by running: $ git pull https://github.com/cenobites/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4617.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4617 commit 82535fd09cf52cb3e1c3da162dc807d7d889ac19 Author: Nycholas de Oliveira e OliveiraDate: 2017-08-29T17:21:03Z [FLINK-7556] Fix fetch size configurable in JDBCInputFormat for MySQL Driver > Fix fetch size configurable in JDBCInputFormat for MySQL Driver > --- > > Key: FLINK-7556 > URL: https://issues.apache.org/jira/browse/FLINK-7556 > Project: Flink > Issue Type: Bug > Components: Batch Connectors and Input/Output Formats >Affects Versions: 1.3.2 >Reporter: Nycholas de Oliveira e Oliveira >Priority: Trivial > > According to the MySQL documentation[1], it follows: > * ResultSet > {quote}By default, ResultSets are completely retrieved and stored in memory. > In most cases this is the most efficient way to operate and, due to the > design of the MySQL network protocol, is easier to implement. If you are > working with ResultSets that have a large number of rows or large values and > cannot allocate heap space in your JVM for the memory required, you can tell > the driver to stream the results back one row at a time. > {quote} > {quote}To enable this functionality, create a Statement instance in the > following manner:{quote} > {code:java} > stmt = conn.createStatement(java.sql.ResultSet.TYPE_FORWARD_ONLY, > java.sql.ResultSet.CONCUR_READ_ONLY); > stmt.setFetchSize(Integer.MIN_VALUE); > {code} > {quote}The combination of a forward-only, read-only result set, with a fetch > size of Integer.MIN_VALUE serves as a signal to the driver to stream result > sets row-by-row. After this, any result sets created with the statement will > be retrieved row-by-row.{quote} > Allow the *Integer.MIN_VALUE* to be accepted as a parameter for > _setFetchSize_. > [1] - > https://dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-implementation-notes.html -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7547) o.a.f.s.api.scala.async.AsyncFunction is not declared Serializable
[ https://issues.apache.org/jira/browse/FLINK-7547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16145697#comment-16145697 ] ASF GitHub Bot commented on FLINK-7547: --- Github user eliaslevy commented on the issue: https://github.com/apache/flink/pull/4614 > o.a.f.s.api.scala.async.AsyncFunction is not declared Serializable > -- > > Key: FLINK-7547 > URL: https://issues.apache.org/jira/browse/FLINK-7547 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.3.2 >Reporter: Elias Levy >Priority: Minor > > {{org.apache.flink.streaming.api.scala.async.AsyncFunction}} is not declared > {{Serializable}}, whereas > {{org.apache.flink.streaming.api.functions.async.AsyncFunction}} is. This > leads to the job not starting as the as async function can't be serialized > during initialization. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4614: [FLINK-7547] AsyncFunction.scala extends Function, serial...
Github user eliaslevy commented on the issue: https://github.com/apache/flink/pull/4614 ð --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-7556) Fix fetch size configurable in JDBCInputFormat for MySQL Driver
[ https://issues.apache.org/jira/browse/FLINK-7556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nycholas de Oliveira e Oliveira updated FLINK-7556: --- Description: According to the MySQL documentation[1], it follows: * ResultSet {quote}By default, ResultSets are completely retrieved and stored in memory. In most cases this is the most efficient way to operate and, due to the design of the MySQL network protocol, is easier to implement. If you are working with ResultSets that have a large number of rows or large values and cannot allocate heap space in your JVM for the memory required, you can tell the driver to stream the results back one row at a time. {quote} {quote}To enable this functionality, create a Statement instance in the following manner:{quote} {code:java} stmt = conn.createStatement(java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY); stmt.setFetchSize(Integer.MIN_VALUE); {code} {quote}The combination of a forward-only, read-only result set, with a fetch size of Integer.MIN_VALUE serves as a signal to the driver to stream result sets row-by-row. After this, any result sets created with the statement will be retrieved row-by-row.{quote} Allow the *Integer.MIN_VALUE* to be accepted as a parameter for _setFetchSize_. [1] - https://dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-implementation-notes.html was: According to the MySQL documentation[1], it follows: * ResultSet ??By default, ResultSets are completely retrieved and stored in memory. In most cases this is the most efficient way to operate and, due to the design of the MySQL network protocol, is easier to implement. If you are working with ResultSets that have a large number of rows or large values and cannot allocate heap space in your JVM for the memory required, you can tell the driver to stream the results back one row at a time. To enable this functionality, create a Statement instance in the following manner:?? {code:java} stmt = conn.createStatement(java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY); stmt.setFetchSize(Integer.MIN_VALUE); {code} ??The combination of a forward-only, read-only result set, with a fetch size of Integer.MIN_VALUE serves as a signal to the driver to stream result sets row-by-row. After this, any result sets created with the statement will be retrieved row-by-row. ?? Allow the *Integer.MIN_VALUE* to be accepted as a parameter for _setFetchSize_. [1] - https://dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-implementation-notes.html > Fix fetch size configurable in JDBCInputFormat for MySQL Driver > --- > > Key: FLINK-7556 > URL: https://issues.apache.org/jira/browse/FLINK-7556 > Project: Flink > Issue Type: Bug > Components: Batch Connectors and Input/Output Formats >Affects Versions: 1.3.2 >Reporter: Nycholas de Oliveira e Oliveira >Priority: Trivial > > According to the MySQL documentation[1], it follows: > * ResultSet > {quote}By default, ResultSets are completely retrieved and stored in memory. > In most cases this is the most efficient way to operate and, due to the > design of the MySQL network protocol, is easier to implement. If you are > working with ResultSets that have a large number of rows or large values and > cannot allocate heap space in your JVM for the memory required, you can tell > the driver to stream the results back one row at a time. > {quote} > {quote}To enable this functionality, create a Statement instance in the > following manner:{quote} > {code:java} > stmt = conn.createStatement(java.sql.ResultSet.TYPE_FORWARD_ONLY, > java.sql.ResultSet.CONCUR_READ_ONLY); > stmt.setFetchSize(Integer.MIN_VALUE); > {code} > {quote}The combination of a forward-only, read-only result set, with a fetch > size of Integer.MIN_VALUE serves as a signal to the driver to stream result > sets row-by-row. After this, any result sets created with the statement will > be retrieved row-by-row.{quote} > Allow the *Integer.MIN_VALUE* to be accepted as a parameter for > _setFetchSize_. > [1] - > https://dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-implementation-notes.html -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7556) Fix fetch size configurable in JDBCInputFormat for MySQL Driver
Nycholas de Oliveira e Oliveira created FLINK-7556: -- Summary: Fix fetch size configurable in JDBCInputFormat for MySQL Driver Key: FLINK-7556 URL: https://issues.apache.org/jira/browse/FLINK-7556 Project: Flink Issue Type: Bug Components: Batch Connectors and Input/Output Formats Affects Versions: 1.3.2 Reporter: Nycholas de Oliveira e Oliveira Priority: Trivial According to the MySQL documentation[1], it follows: * ResultSet ??By default, ResultSets are completely retrieved and stored in memory. In most cases this is the most efficient way to operate and, due to the design of the MySQL network protocol, is easier to implement. If you are working with ResultSets that have a large number of rows or large values and cannot allocate heap space in your JVM for the memory required, you can tell the driver to stream the results back one row at a time. To enable this functionality, create a Statement instance in the following manner:?? {code:java} stmt = conn.createStatement(java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY); stmt.setFetchSize(Integer.MIN_VALUE); {code} ??The combination of a forward-only, read-only result set, with a fetch size of Integer.MIN_VALUE serves as a signal to the driver to stream result sets row-by-row. After this, any result sets created with the statement will be retrieved row-by-row. ?? Allow the *Integer.MIN_VALUE* to be accepted as a parameter for _setFetchSize_. [1] - https://dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-implementation-notes.html -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7555) Use flink-shaded-guava-18 to replace guava dependencies
[ https://issues.apache.org/jira/browse/FLINK-7555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-7555. --- Resolution: Not A Problem Certain libraries (cassandra, kinesis, table API) must use vanilla guava since they pull it in through a dependency that exposes gauva in it's API. We can't replace those. Libraries other than those 3 are not allowed to import guava which is enforced by checkstyle. Furthermore, we also do not allow unshaded guava dependencies in the flink-dist jar, which is enforced by a check in the travis scripts. > Use flink-shaded-guava-18 to replace guava dependencies > > > Key: FLINK-7555 > URL: https://issues.apache.org/jira/browse/FLINK-7555 > Project: Flink > Issue Type: Bug > Components: Build System >Affects Versions: 1.4.0 >Reporter: Hai Zhou > Fix For: 1.4.0 > > > After [#issue FLINK-6982|https://issues.apache.org/jira/browse/FLINK-6982], > I still find 40 occurrences of 'import com.google.common' in project . > we should replace all 'import com.google.common.*' to 'import > org.apache.flink.shaded.guava18.com.google.common.*' ? > if so, I will give a PR to fix it. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7555) Use flink-shaded-guava-18 to replace guava dependencies
Hai Zhou created FLINK-7555: --- Summary: Use flink-shaded-guava-18 to replace guava dependencies Key: FLINK-7555 URL: https://issues.apache.org/jira/browse/FLINK-7555 Project: Flink Issue Type: Bug Components: Build System Affects Versions: 1.4.0 Environment: After [#issue FLINK-6982|https://issues.apache.org/jira/browse/FLINK-6982], I still find 40 occurrences of 'import com.google.common' in project . we should replace all 'import com.google.common.*' to 'import org.apache.flink.shaded.guava18.com.google.common.*' ? if so, I will give a PR to fix it. Reporter: Hai Zhou Fix For: 1.4.0 !attachment-name.jpg|thumbnail! -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7555) Use flink-shaded-guava-18 to replace guava dependencies
[ https://issues.apache.org/jira/browse/FLINK-7555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hai Zhou updated FLINK-7555: Description: After [#issue FLINK-6982|https://issues.apache.org/jira/browse/FLINK-6982], I still find 40 occurrences of 'import com.google.common' in project . we should replace all 'import com.google.common.*' to 'import org.apache.flink.shaded.guava18.com.google.common.*' ? if so, I will give a PR to fix it. was:!attachment-name.jpg|thumbnail! > Use flink-shaded-guava-18 to replace guava dependencies > > > Key: FLINK-7555 > URL: https://issues.apache.org/jira/browse/FLINK-7555 > Project: Flink > Issue Type: Bug > Components: Build System >Affects Versions: 1.4.0 >Reporter: Hai Zhou > Fix For: 1.4.0 > > > After [#issue FLINK-6982|https://issues.apache.org/jira/browse/FLINK-6982], > I still find 40 occurrences of 'import com.google.common' in project . > we should replace all 'import com.google.common.*' to 'import > org.apache.flink.shaded.guava18.com.google.common.*' ? > if so, I will give a PR to fix it. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7555) Use flink-shaded-guava-18 to replace guava dependencies
[ https://issues.apache.org/jira/browse/FLINK-7555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hai Zhou updated FLINK-7555: Environment: (was: After [#issue FLINK-6982|https://issues.apache.org/jira/browse/FLINK-6982], I still find 40 occurrences of 'import com.google.common' in project . we should replace all 'import com.google.common.*' to 'import org.apache.flink.shaded.guava18.com.google.common.*' ? if so, I will give a PR to fix it.) > Use flink-shaded-guava-18 to replace guava dependencies > > > Key: FLINK-7555 > URL: https://issues.apache.org/jira/browse/FLINK-7555 > Project: Flink > Issue Type: Bug > Components: Build System >Affects Versions: 1.4.0 >Reporter: Hai Zhou > Fix For: 1.4.0 > > > !attachment-name.jpg|thumbnail! -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-7551) Add VERSION to the REST urls.
[ https://issues.apache.org/jira/browse/FLINK-7551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler reassigned FLINK-7551: --- Assignee: Chesnay Schepler > Add VERSION to the REST urls. > -- > > Key: FLINK-7551 > URL: https://issues.apache.org/jira/browse/FLINK-7551 > Project: Flink > Issue Type: Improvement > Components: REST >Affects Versions: 1.4.0 >Reporter: Kostas Kloudas >Assignee: Chesnay Schepler > Fix For: 1.4.0 > > > This is to guarantee that we can update the REST API without breaking > existing third-party clients. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7297) Instable Kafka09ProducerITCase.testCustomPartitioning test case
[ https://issues.apache.org/jira/browse/FLINK-7297?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hai Zhou closed FLINK-7297. --- Resolution: Fixed > Instable Kafka09ProducerITCase.testCustomPartitioning test case > --- > > Key: FLINK-7297 > URL: https://issues.apache.org/jira/browse/FLINK-7297 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Tests >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Priority: Critical > Labels: test-stability > > There seems to be a test instability of > {{Kafka09ProducerITCase>KafkaProducerTestBase.testCustomPartitioning}} on > Travis. > https://travis-ci.org/apache/flink/jobs/258538636 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4454: [hotfix][docs] Add section in docs about writing u...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4454 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Closed] (FLINK-7376) Cleanup options class and test classes in flink-clients
[ https://issues.apache.org/jira/browse/FLINK-7376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hai Zhou closed FLINK-7376. --- Resolution: Won't Fix > Cleanup options class and test classes in flink-clients > > > Key: FLINK-7376 > URL: https://issues.apache.org/jira/browse/FLINK-7376 > Project: Flink > Issue Type: Improvement > Components: Client >Reporter: Hai Zhou >Assignee: Hai Zhou >Priority: Critical > Labels: cleanup, test > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-7376) Cleanup options class and test classes in flink-clients
[ https://issues.apache.org/jira/browse/FLINK-7376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hai Zhou reassigned FLINK-7376: --- Assignee: Hai Zhou > Cleanup options class and test classes in flink-clients > > > Key: FLINK-7376 > URL: https://issues.apache.org/jira/browse/FLINK-7376 > Project: Flink > Issue Type: Improvement > Components: Client >Reporter: Hai Zhou >Assignee: Hai Zhou >Priority: Critical > Labels: cleanup, test > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7398) Table API operators/UDFs must not store Logger
[ https://issues.apache.org/jira/browse/FLINK-7398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16145393#comment-16145393 ] ASF GitHub Bot commented on FLINK-7398: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/4576 Thanks for the PR @haohui! Will add `Logging` to `CRowOutputProcessRunner` as well and merge it. Cheers, Fabian > Table API operators/UDFs must not store Logger > -- > > Key: FLINK-7398 > URL: https://issues.apache.org/jira/browse/FLINK-7398 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.0, 1.3.2 >Reporter: Aljoscha Krettek >Assignee: Haohui Mai >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > Attachments: Example.png > > > Table API operators and UDFs store a reference to the (slf4j) {{Logger}} in > an instance field (c.f. > https://github.com/apache/flink/blob/f37988c19adc30d324cde83c54f2fa5d36efb9e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala#L39). > This means that the {{Logger}} will be serialised with the UDF and sent to > the cluster. This in itself does not sound right and leads to problems when > the slf4j configuration on the Client is different from the cluster > environment. > This is an example of a user running into that problem: > https://lists.apache.org/thread.html/01dd44007c0122d60c3fd2b2bb04fd2d6d2114bcff1e34d1d2079522@%3Cuser.flink.apache.org%3E. > Here, they have Logback on the client but the Logback classes are not > available on the cluster and so deserialisation of the UDFs fails with a > {{ClassNotFoundException}}. > This is a rough list of the involved classes: > {code} > src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala:43: > private val LOG: Logger = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala:45: > private val LOG: Logger = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala:28: > val LOG10 = Types.lookupMethod(classOf[Math], "log10", classOf[Double]) > src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala:62: > private val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala:59: > private val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala:51: > private val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/plan/nodes/FlinkConventions.scala:28: > val LOGICAL: Convention = new Convention.Impl("LOGICAL", > classOf[FlinkLogicalRel]) > src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala:38: val > LOGICAL_OPT_RULES: RuleSet = RuleSets.ofList( > src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala:36: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala:43: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetFinalAggFunction.scala:44: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala:44: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala:55: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala:66: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala:56: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala:64: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala:46: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala:52: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala:55: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala:48: > val LOG: Logger =
[jira] [Commented] (FLINK-7309) NullPointerException in CodeGenUtils.timePointToInternalCode() generated code
[ https://issues.apache.org/jira/browse/FLINK-7309?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16145394#comment-16145394 ] ASF GitHub Bot commented on FLINK-7309: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/4479 Thanks for the PR @yestinchen. Will merge this. > NullPointerException in CodeGenUtils.timePointToInternalCode() generated code > - > > Key: FLINK-7309 > URL: https://issues.apache.org/jira/browse/FLINK-7309 > Project: Flink > Issue Type: Bug > Components: Local Runtime, Table API & SQL >Affects Versions: 1.3.1 >Reporter: Liangliang Chen >Priority: Critical > > The code generated by CodeGenUtils.timePointToInternalCode() will cause a > NullPointerException when SQL table field type is `TIMESTAMP` and the field > value is `null`. > Example for reproduce: > {code} > object StreamSQLExample { > def main(args: Array[String]): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > val tEnv = TableEnvironment.getTableEnvironment(env) > // null field value > val orderA: DataStream[Order] = env.fromCollection(Seq( > Order(null, "beer", 3))) > > tEnv.registerDataStream("OrderA", orderA, 'ts, 'product, 'amount) > val result = tEnv.sql("SELECT * FROM OrderA") > result.toAppendStream[Order].print() > > env.execute() > } > case class Order(ts: Timestamp, product: String, amount: Int) > } > {code} > In the above example, timePointToInternalCode() will generated some > statements like this: > {code} > ... > long result$1 = > org.apache.calcite.runtime.SqlFunctions.toLong((java.sql.Timestamp) in1.ts()); > boolean isNull$2 = (java.sql.Timestamp) in1.ts() == null; > ... > {code} > so, the NPE will happen when in1.ts() is null. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4479: [FLINK-7309][hotfix] fix NullPointerException when select...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/4479 Thanks for the PR @yestinchen. Will merge this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4576: [FLINK-7398] Table API operators/UDFs must not store Logg...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/4576 Thanks for the PR @haohui! Will add `Logging` to `CRowOutputProcessRunner` as well and merge it. Cheers, Fabian --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16145363#comment-16145363 ] ASF GitHub Bot commented on FLINK-7169: --- Github user yestinchen commented on the issue: https://github.com/apache/flink/pull/4331 Hi @dawidwys , sorry for the late response. Thanks for your reviews, I have updated the test and the document. Please take a look if you have time. Thanks. > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > Fix For: 1.4.0 > > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP function in CE...
Github user yestinchen commented on the issue: https://github.com/apache/flink/pull/4331 Hi @dawidwys , sorry for the late response. Thanks for your reviews, I have updated the test and the document. Please take a look if you have time. Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-7554) Add a testing RuntimeContext to test utilities
[ https://issues.apache.org/jira/browse/FLINK-7554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther updated FLINK-7554: Labels: starter (was: ) > Add a testing RuntimeContext to test utilities > -- > > Key: FLINK-7554 > URL: https://issues.apache.org/jira/browse/FLINK-7554 > Project: Flink > Issue Type: New Feature > Components: Tests >Reporter: Timo Walther > Labels: starter > > When unit testing user-defined functions it would be useful to have an > official testing {{RuntimeContext}} that uses Java collections for storing > state, metrics, etc. > After executing the business logic, the user could then verify how the state > of the UDF changed or which metrics have been collected. > This issue includes documentation for the "Testing" section. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7554) Add a testing RuntimeContext to test utilities
Timo Walther created FLINK-7554: --- Summary: Add a testing RuntimeContext to test utilities Key: FLINK-7554 URL: https://issues.apache.org/jira/browse/FLINK-7554 Project: Flink Issue Type: New Feature Components: Tests Reporter: Timo Walther When unit testing user-defined functions it would be useful to have an official testing {{RuntimeContext}} that uses Java collections for storing state, metrics, etc. After executing the business logic, the user could then verify how the state of the UDF changed or which metrics have been collected. This issue includes documentation for the "Testing" section. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7552) Extend SinkFunction interface with SinkContext
[ https://issues.apache.org/jira/browse/FLINK-7552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16145357#comment-16145357 ] ASF GitHub Bot commented on FLINK-7552: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4616 R: @rmetzger @pnowojski Could you please review this? > Extend SinkFunction interface with SinkContext > -- > > Key: FLINK-7552 > URL: https://issues.apache.org/jira/browse/FLINK-7552 > Project: Flink > Issue Type: Bug > Components: DataStream API >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > Fix For: 1.4.0 > > > Now that we require Java 8 we can extend the {{SinkFunction}} interface > without breaking backwards compatibility. I'm proposing this: > {code} > /** > * Interface for implementing user defined sink functionality. > * > * @param Input type parameter. > */ > @Public > public interface SinkFunction extends Function, Serializable { > /** >* Function for standard sink behaviour. This function is called for > every record. >* >* @param value The input record. >* @throws Exception >* @deprecated Use {@link #invoke(SinkContext, Object)}. >*/ > @Deprecated > default void invoke(IN value) throws Exception { > } > /** >* Writes the given value to the sink. This function is called for > every record. >* >* @param context Additional context about the input record. >* @param value The input record. >* @throws Exception >*/ > default void invoke(SinkContext context, IN value) throws Exception { > invoke(value); > } > /** >* Context that {@link SinkFunction SinkFunctions } can use for getting > additional data about >* an input record. >* >* @param The type of elements accepted by the sink. >*/ > @Public // Interface might be extended in the future with additional > methods. > interface SinkContext { > /** >* Returns the timestamp of the current input record. >*/ > long timestamp(); > } > } > {code} > For now, this only allows access to the element timestamp. This would allow > us to fix the abomination that is {{FlinkKafkaProducer010}}, which is a > hybrid {{SinkFunction}}/{{StreamOperator}} only because it needs access to > timestamps. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4616: [FLINK-7552] [FLINK-7553] Enhance SinkInterface / Use in ...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4616 R: @rmetzger @pnowojski Could you please review this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7552) Extend SinkFunction interface with SinkContext
[ https://issues.apache.org/jira/browse/FLINK-7552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16145339#comment-16145339 ] ASF GitHub Bot commented on FLINK-7552: --- GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/4616 [FLINK-7552] [FLINK-7553] Enhance SinkInterface / Use in FlinkKafkaProducer ## What is the purpose of the change Enhance `SinkFunction` with a way of retrieving the element timestamp. This allows us to get rid of the hybrid nature of `FlinkKafkaProducer010`. This is keeping the legacy static "convenience" methods à la `FlinkKafkaProducer010.writeToKafkaWithTimestamps` for backwards compatibility. ## Brief change log - Enhance Sink interface - Use new interface in Kafka Producer ## Verifying this change This change is already covered by existing tests, such as *(please describe tests)*. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): yes, call stack of KafkaProducer with writing timestamps is changed slightly, also, `StreamSink` operator now has a context object. - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink jira-7553-fix-kafka010-producer Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4616.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4616 commit 0b5bea36247736a0160ce584b94050d7b676d091 Author: Aljoscha KrettekDate: 2017-08-29T13:50:56Z [FLINK-7552] Extend SinkFunction interface with SinkContext commit d3a7b294542ea40287290ff4970715ead621d398 Author: Aljoscha Krettek Date: 2017-08-29T13:53:16Z [FLINK-7553] Use new SinkFunction interface in FlinkKafkaProducer010 > Extend SinkFunction interface with SinkContext > -- > > Key: FLINK-7552 > URL: https://issues.apache.org/jira/browse/FLINK-7552 > Project: Flink > Issue Type: Bug > Components: DataStream API >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > Fix For: 1.4.0 > > > Now that we require Java 8 we can extend the {{SinkFunction}} interface > without breaking backwards compatibility. I'm proposing this: > {code} > /** > * Interface for implementing user defined sink functionality. > * > * @param Input type parameter. > */ > @Public > public interface SinkFunction extends Function, Serializable { > /** >* Function for standard sink behaviour. This function is called for > every record. >* >* @param value The input record. >* @throws Exception >* @deprecated Use {@link #invoke(SinkContext, Object)}. >*/ > @Deprecated > default void invoke(IN value) throws Exception { > } > /** >* Writes the given value to the sink. This function is called for > every record. >* >* @param context Additional context about the input record. >* @param value The input record. >* @throws Exception >*/ > default void invoke(SinkContext context, IN value) throws Exception { > invoke(value); > } > /** >* Context that {@link SinkFunction SinkFunctions } can use for getting > additional data about >* an input record. >* >* @param The type of elements accepted by the sink. >*/ > @Public // Interface might be extended in the future with additional > methods. > interface SinkContext { > /** >* Returns the timestamp of the current input record. >*/ > long timestamp(); > } > } > {code} > For now, this only allows access to the element timestamp. This would allow > us to fix the abomination that is {{FlinkKafkaProducer010}}, which is a > hybrid {{SinkFunction}}/{{StreamOperator}} only because it needs access to > timestamps. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4616: [FLINK-7552] [FLINK-7553] Enhance SinkInterface / ...
GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/4616 [FLINK-7552] [FLINK-7553] Enhance SinkInterface / Use in FlinkKafkaProducer ## What is the purpose of the change Enhance `SinkFunction` with a way of retrieving the element timestamp. This allows us to get rid of the hybrid nature of `FlinkKafkaProducer010`. This is keeping the legacy static "convenience" methods à la `FlinkKafkaProducer010.writeToKafkaWithTimestamps` for backwards compatibility. ## Brief change log - Enhance Sink interface - Use new interface in Kafka Producer ## Verifying this change This change is already covered by existing tests, such as *(please describe tests)*. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): yes, call stack of KafkaProducer with writing timestamps is changed slightly, also, `StreamSink` operator now has a context object. - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink jira-7553-fix-kafka010-producer Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4616.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4616 commit 0b5bea36247736a0160ce584b94050d7b676d091 Author: Aljoscha KrettekDate: 2017-08-29T13:50:56Z [FLINK-7552] Extend SinkFunction interface with SinkContext commit d3a7b294542ea40287290ff4970715ead621d398 Author: Aljoscha Krettek Date: 2017-08-29T13:53:16Z [FLINK-7553] Use new SinkFunction interface in FlinkKafkaProducer010 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4454: [hotfix][docs] Add section in docs about writing unit/int...
Github user pnowojski commented on the issue: https://github.com/apache/flink/pull/4454 Thanks :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4577: Add user context and bind together state fields in TwoPha...
Github user pnowojski commented on the issue: https://github.com/apache/flink/pull/4577 Thanks! :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4577: Add user context and bind together state fields in...
Github user pnowojski closed the pull request at: https://github.com/apache/flink/pull/4577 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-7553) Use new SinkFunction interface in FlinkKafkaProducer010
Aljoscha Krettek created FLINK-7553: --- Summary: Use new SinkFunction interface in FlinkKafkaProducer010 Key: FLINK-7553 URL: https://issues.apache.org/jira/browse/FLINK-7553 Project: Flink Issue Type: Bug Components: DataStream API Reporter: Aljoscha Krettek Assignee: Aljoscha Krettek Fix For: 1.4.0 This will allow us to get rid of the hybrid {{SinkFunction}}/{{StreamOperator}} nature of the Kafka 0.10.x sink. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7552) Extend SinkFunction interface with SinkContext
Aljoscha Krettek created FLINK-7552: --- Summary: Extend SinkFunction interface with SinkContext Key: FLINK-7552 URL: https://issues.apache.org/jira/browse/FLINK-7552 Project: Flink Issue Type: Bug Components: DataStream API Reporter: Aljoscha Krettek Assignee: Aljoscha Krettek Fix For: 1.4.0 Now that we require Java 8 we can extend the {{SinkFunction}} interface without breaking backwards compatibility. I'm proposing this: {code} /** * Interface for implementing user defined sink functionality. * * @param Input type parameter. */ @Public public interface SinkFunction extends Function, Serializable { /** * Function for standard sink behaviour. This function is called for every record. * * @param value The input record. * @throws Exception * @deprecated Use {@link #invoke(SinkContext, Object)}. */ @Deprecated default void invoke(IN value) throws Exception { } /** * Writes the given value to the sink. This function is called for every record. * * @param context Additional context about the input record. * @param value The input record. * @throws Exception */ default void invoke(SinkContext context, IN value) throws Exception { invoke(value); } /** * Context that {@link SinkFunction SinkFunctions } can use for getting additional data about * an input record. * * @param The type of elements accepted by the sink. */ @Public // Interface might be extended in the future with additional methods. interface SinkContext { /** * Returns the timestamp of the current input record. */ long timestamp(); } } {code} For now, this only allows access to the element timestamp. This would allow us to fix the abomination that is {{FlinkKafkaProducer010}}, which is a hybrid {{SinkFunction}}/{{StreamOperator}} only because it needs access to timestamps. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4454: [hotfix][docs] Add section in docs about writing unit/int...
Github user twalthr commented on the issue: https://github.com/apache/flink/pull/4454 @pnowojski Thanks for the PR. I will go over the changes again and merge this. I will create an followup issue about the mentioned comments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7452) Add helper methods for all built-in Flink types to Types
[ https://issues.apache.org/jira/browse/FLINK-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16145264#comment-16145264 ] ASF GitHub Bot commented on FLINK-7452: --- Github user twalthr commented on the issue: https://github.com/apache/flink/pull/4612 @fhueske Feel free to review. > Add helper methods for all built-in Flink types to Types > > > Key: FLINK-7452 > URL: https://issues.apache.org/jira/browse/FLINK-7452 > Project: Flink > Issue Type: Improvement > Components: Type Serialization System >Reporter: Timo Walther >Assignee: Timo Walther > > Sometimes it is very difficult to provide `TypeInformation` manually, in case > some extraction fails or is not available. {{TypeHint}}s should be the > preferred way but this methods can ensure correct types. > I propose to add all built-in Flink types to the {{Types}}. Such as: > {code} > Types.POJO(MyPojo.class) > Types.POJO(Map) > Types.GENERIC(Object.class) > Types.TUPLE(TypeInformation, ...) > Types.MAP(TypeInformation, TypeInformation) > {code} > The methods should validate that the returned type is exactly the requested > type. And especially in case of POJO should help creating {{PojoTypeInfo}}. > Once this is in place, we can deprecate the {{TypeInfoParser}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4612: [FLINK-7452] [types] Add helper methods for all built-in ...
Github user twalthr commented on the issue: https://github.com/apache/flink/pull/4612 @fhueske Feel free to review. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-7410) Use toString method to display operator names for UserDefinedFunction
[ https://issues.apache.org/jira/browse/FLINK-7410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hequn Cheng updated FLINK-7410: --- Description: *Motivation* Operator names setted in table-api are used by visualization and logging, it is import to make these names simple and readable. Currently, UserDefinedFunction’s name contains class CanonicalName and md5 value making the name too long and unfriendly to users. As shown in the following example, {quote} select: (a, b, c, org$apache$flink$table$expressions$utils$RichFunc1$281f7e61ec5d8da894f5783e2e17a4f5(a) AS _c3, org$apache$flink$table$expressions$utils$RichFunc2$fb99077e565685ebc5f48b27edc14d98(c) AS _c4) {quote} *Changes:* Use {{toString}} method to display operator names for UserDefinedFunction. The method will return class name by default. Users can also override the method to return whatever he wants. What do you think [~fhueske] ? was: *Motivation* Operator names setted in table-api are used by visualization and logging, it is import to make these names simple and readable. Currently, UserDefinedFunction’s name contains class CanonicalName and md5 value making the name too long and unfriendly to users. As shown in the following example, {quote} select: (a, b, c, org$apache$flink$table$expressions$utils$RichFunc1$281f7e61ec5d8da894f5783e2e17a4f5(a) AS _c3, org$apache$flink$table$expressions$utils$RichFunc2$fb99077e565685ebc5f48b27edc14d98(c) AS _c4) {quote} *Changes:* Provide getName method for UserDefinedFunction. The method will return class name by default. Users can also override the method to return whatever he wants. What do you think [~fhueske] ? > Use toString method to display operator names for UserDefinedFunction > - > > Key: FLINK-7410 > URL: https://issues.apache.org/jira/browse/FLINK-7410 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Hequn Cheng >Assignee: Hequn Cheng > > *Motivation* > Operator names setted in table-api are used by visualization and logging, it > is import to make these names simple and readable. Currently, > UserDefinedFunction’s name contains class CanonicalName and md5 value making > the name too long and unfriendly to users. > As shown in the following example, > {quote} > select: (a, b, c, > org$apache$flink$table$expressions$utils$RichFunc1$281f7e61ec5d8da894f5783e2e17a4f5(a) > AS _c3, > org$apache$flink$table$expressions$utils$RichFunc2$fb99077e565685ebc5f48b27edc14d98(c) > AS _c4) > {quote} > *Changes:* > > Use {{toString}} method to display operator names for UserDefinedFunction. > The method will return class name by default. Users can also override the > method to return whatever he wants. > What do you think [~fhueske] ? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7410) Use toString method to display operator names for UserDefinedFunction
[ https://issues.apache.org/jira/browse/FLINK-7410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hequn Cheng updated FLINK-7410: --- Summary: Use toString method to display operator names for UserDefinedFunction (was: Add getName method to UserDefinedFunction) > Use toString method to display operator names for UserDefinedFunction > - > > Key: FLINK-7410 > URL: https://issues.apache.org/jira/browse/FLINK-7410 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Hequn Cheng >Assignee: Hequn Cheng > > *Motivation* > Operator names setted in table-api are used by visualization and logging, it > is import to make these names simple and readable. Currently, > UserDefinedFunction’s name contains class CanonicalName and md5 value making > the name too long and unfriendly to users. > As shown in the following example, > {quote} > select: (a, b, c, > org$apache$flink$table$expressions$utils$RichFunc1$281f7e61ec5d8da894f5783e2e17a4f5(a) > AS _c3, > org$apache$flink$table$expressions$utils$RichFunc2$fb99077e565685ebc5f48b27edc14d98(c) > AS _c4) > {quote} > *Changes:* > > Provide getName method for UserDefinedFunction. The method will return class > name by default. Users can also override the method to return whatever he > wants. > What do you think [~fhueske] ? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7498) Bind together state fields of TwoPhaseCommitSinkFunction
[ https://issues.apache.org/jira/browse/FLINK-7498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek closed FLINK-7498. --- Resolution: Fixed Implemented in ac72360cc0e71d6f543d93c9c1f117babaa35799 > Bind together state fields of TwoPhaseCommitSinkFunction > > > Key: FLINK-7498 > URL: https://issues.apache.org/jira/browse/FLINK-7498 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.4.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > Fix For: 1.4.0 > > > Make sure that state fields are coupled together between checkpoints. This > way all opened transactions by one operator instance will be restored to same > operator after restoring from checkpoint. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7497) Allow users to add custom user context stored in state in TwoPhaseCommitSinkFunction
[ https://issues.apache.org/jira/browse/FLINK-7497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek closed FLINK-7497. --- Resolution: Fixed Implemented in 959d54fc828691759f15f2e83c0c123e9da6e782 > Allow users to add custom user context stored in state in > TwoPhaseCommitSinkFunction > > > Key: FLINK-7497 > URL: https://issues.apache.org/jira/browse/FLINK-7497 > Project: Flink > Issue Type: New Feature > Components: Streaming >Affects Versions: 1.4.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > Fix For: 1.4.0 > > > Currently when using TwoPhaseCommitSinkFunction there is no way to store on > state additional user information that should be coupled with opened > transactions (like shared state between transaction). > It is required by FlinkKafkaProducer011, because there we need some place to > store a pool of used transactional.ids. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4577: Add user context and bind together state fields in TwoPha...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4577 I merged this. ð Could you please close the PR? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7410) Add getName method to UserDefinedFunction
[ https://issues.apache.org/jira/browse/FLINK-7410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16145213#comment-16145213 ] Hequn Cheng commented on FLINK-7410: OK, I will use {{toString}} instead, thanks all. > Add getName method to UserDefinedFunction > - > > Key: FLINK-7410 > URL: https://issues.apache.org/jira/browse/FLINK-7410 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Hequn Cheng >Assignee: Hequn Cheng > > *Motivation* > Operator names setted in table-api are used by visualization and logging, it > is import to make these names simple and readable. Currently, > UserDefinedFunction’s name contains class CanonicalName and md5 value making > the name too long and unfriendly to users. > As shown in the following example, > {quote} > select: (a, b, c, > org$apache$flink$table$expressions$utils$RichFunc1$281f7e61ec5d8da894f5783e2e17a4f5(a) > AS _c3, > org$apache$flink$table$expressions$utils$RichFunc2$fb99077e565685ebc5f48b27edc14d98(c) > AS _c4) > {quote} > *Changes:* > > Provide getName method for UserDefinedFunction. The method will return class > name by default. Users can also override the method to return whatever he > wants. > What do you think [~fhueske] ? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7518) pass our own buffer instances to netty
[ https://issues.apache.org/jira/browse/FLINK-7518?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16145212#comment-16145212 ] ASF GitHub Bot commented on FLINK-7518: --- GitHub user NicoK opened a pull request: https://github.com/apache/flink/pull/4615 [FLINK-7518][network] pass our own NetworkBuffer to Netty ## What is the purpose of the change With this PR, based on #4613, we finally pass our own `NetworkBuffer` class to Netty and remove one buffer copy while transferring data. Note that this applies to the sender side only. ## Brief change log - extend `NettyMessage#allocateBuffer()` to allow allocation for the header only - extend `NettyMessage.BufferResponse` to assemble a composite buffer based on a (pooled) header buffer and our `NetworkBuffer` instance. ## Verifying this change This change added tests and can be verified as follows: - existing `NettyMessageSerializationTest` for the immediate encode-decode path of the new buffer - any other (integration) test that uses the network stack for the full stack with something else than an `EmbeddedChannel` - manually verified a streaming program (WordCount) on a 4 node local setup (different processes) with 1 JobManager and 4 TaskManagers ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (yes) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (JavaDocs) You can merge this pull request into a Git repository by running: $ git pull https://github.com/NicoK/flink flink-7518 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4615.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4615 commit 2ae08d79712235a965db45ee739076cd6a3601fa Author: Nico KruberDate: 2017-07-31T10:06:14Z [hotfix] fix some typos commit cda26a0d8e6d07c48ac03ee4aab74c8699a04428 Author: Nico Kruber Date: 2017-08-02T09:35:16Z [hotfix][tests] add missing test descriptions commit 3b921d60c1ff969874363c75916a1d40fcc99847 Author: Nico Kruber Date: 2017-08-02T09:34:54Z [FLINK-7310][core] always use the HybridMemorySegment Since we'd like to use our own off-heap buffers for network communication, we cannot use HeapMemorySegment anymore and need to rely on HybridMemorySegment. We thus drop any code that loads the HeapMemorySegment (it is still available if needed) in favour of the HybridMemorySegment which is able to work on both heap and off-heap memory. For the performance penalty of this change compared to using HeapMemorySegment alone, see this interesting blob article (from 2015): https://flink.apache.org/news/2015/09/16/off-heap-memory.html commit 3bdd01454dae9eafbd220a5d5554d402e12b8d9f Author: Nico Kruber Date: 2017-08-02T09:27:49Z [hotfix][core] add additional final methods in final classes This applies the scheme of HeapMemorySegment to HybridMemorySegment where core methods are also marked "final" to be more future-proof. commit 1f33ec0df5b83135256538132b0de58c3bd86402 Author: Nico Kruber Date: 2017-08-04T13:15:32Z [FLINK-7312][checkstyle] remove trailing whitespace commit 679793f478a3f79c61dec9d5c424c748e2a5d6ed Author: Nico Kruber Date: 2017-08-04T13:20:28Z [FLINK-7312][checkstyle] organise imports commit 6fe487a2e929fe3aaf1d6a1d5ef3070d6263caad Author: Nico Kruber Date: 2017-08-04T13:24:16Z [FLINK-7312][checkstyle] add, adapt and improve comments commit d4b77dc006f833b08ebf5e6324cfc53ca754c254 Author: Nico Kruber Date: 2017-08-04T13:26:40Z [FLINK-7312][checkstyle] remove redundant "public" keyword in interfaces commit 2ce3703c41161a00c7e749f45f11f654e3183e52 Author: Nico Kruber Date: 2017-08-04T13:27:36Z [FLINK-7312][checkstyle] ignore some spurious warnings commit 987f8a41c034b39d14b5c00d6ecc91ef3c157c62 Author: Nico Kruber Date: 2017-08-04T13:35:15Z [FLINK-7312][checkstyle] enable checkstyle for `flink/core/memory/*` We deliberately ignore redundant modifiers
[GitHub] flink pull request #4615: [FLINK-7518][network] pass our own NetworkBuffer t...
GitHub user NicoK opened a pull request: https://github.com/apache/flink/pull/4615 [FLINK-7518][network] pass our own NetworkBuffer to Netty ## What is the purpose of the change With this PR, based on #4613, we finally pass our own `NetworkBuffer` class to Netty and remove one buffer copy while transferring data. Note that this applies to the sender side only. ## Brief change log - extend `NettyMessage#allocateBuffer()` to allow allocation for the header only - extend `NettyMessage.BufferResponse` to assemble a composite buffer based on a (pooled) header buffer and our `NetworkBuffer` instance. ## Verifying this change This change added tests and can be verified as follows: - existing `NettyMessageSerializationTest` for the immediate encode-decode path of the new buffer - any other (integration) test that uses the network stack for the full stack with something else than an `EmbeddedChannel` - manually verified a streaming program (WordCount) on a 4 node local setup (different processes) with 1 JobManager and 4 TaskManagers ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (yes) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (JavaDocs) You can merge this pull request into a Git repository by running: $ git pull https://github.com/NicoK/flink flink-7518 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4615.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4615 commit 2ae08d79712235a965db45ee739076cd6a3601fa Author: Nico KruberDate: 2017-07-31T10:06:14Z [hotfix] fix some typos commit cda26a0d8e6d07c48ac03ee4aab74c8699a04428 Author: Nico Kruber Date: 2017-08-02T09:35:16Z [hotfix][tests] add missing test descriptions commit 3b921d60c1ff969874363c75916a1d40fcc99847 Author: Nico Kruber Date: 2017-08-02T09:34:54Z [FLINK-7310][core] always use the HybridMemorySegment Since we'd like to use our own off-heap buffers for network communication, we cannot use HeapMemorySegment anymore and need to rely on HybridMemorySegment. We thus drop any code that loads the HeapMemorySegment (it is still available if needed) in favour of the HybridMemorySegment which is able to work on both heap and off-heap memory. For the performance penalty of this change compared to using HeapMemorySegment alone, see this interesting blob article (from 2015): https://flink.apache.org/news/2015/09/16/off-heap-memory.html commit 3bdd01454dae9eafbd220a5d5554d402e12b8d9f Author: Nico Kruber Date: 2017-08-02T09:27:49Z [hotfix][core] add additional final methods in final classes This applies the scheme of HeapMemorySegment to HybridMemorySegment where core methods are also marked "final" to be more future-proof. commit 1f33ec0df5b83135256538132b0de58c3bd86402 Author: Nico Kruber Date: 2017-08-04T13:15:32Z [FLINK-7312][checkstyle] remove trailing whitespace commit 679793f478a3f79c61dec9d5c424c748e2a5d6ed Author: Nico Kruber Date: 2017-08-04T13:20:28Z [FLINK-7312][checkstyle] organise imports commit 6fe487a2e929fe3aaf1d6a1d5ef3070d6263caad Author: Nico Kruber Date: 2017-08-04T13:24:16Z [FLINK-7312][checkstyle] add, adapt and improve comments commit d4b77dc006f833b08ebf5e6324cfc53ca754c254 Author: Nico Kruber Date: 2017-08-04T13:26:40Z [FLINK-7312][checkstyle] remove redundant "public" keyword in interfaces commit 2ce3703c41161a00c7e749f45f11f654e3183e52 Author: Nico Kruber Date: 2017-08-04T13:27:36Z [FLINK-7312][checkstyle] ignore some spurious warnings commit 987f8a41c034b39d14b5c00d6ecc91ef3c157c62 Author: Nico Kruber Date: 2017-08-04T13:35:15Z [FLINK-7312][checkstyle] enable checkstyle for `flink/core/memory/*` We deliberately ignore redundant modifiers for now since we want `final` modifiers on `final` classes for increased future-proofness. commit 6ce7b17f6c645a1a1ec136a307ce83f02b21eb7f Author: Nico Kruber Date: 2017-08-04T13:35:15Z
[jira] [Created] (FLINK-7551) Add VERSION to the REST urls.
Kostas Kloudas created FLINK-7551: - Summary: Add VERSION to the REST urls. Key: FLINK-7551 URL: https://issues.apache.org/jira/browse/FLINK-7551 Project: Flink Issue Type: Improvement Components: REST Affects Versions: 1.4.0 Reporter: Kostas Kloudas Fix For: 1.4.0 This is to guarantee that we can update the REST API without breaking existing third-party clients. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7547) o.a.f.s.api.scala.async.AsyncFunction is not declared Serializable
[ https://issues.apache.org/jira/browse/FLINK-7547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16145203#comment-16145203 ] ASF GitHub Bot commented on FLINK-7547: --- GitHub user yew1eb opened a pull request: https://github.com/apache/flink/pull/4614 [FLINK-7547] AsyncFunction.scala extends Function, serialized fix [#issue FLINK-7547](https://issues.apache.org/jira/browse/FLINK-7547) details: org.apache.flink.streaming.api.scala.async.AsyncFunction is not declared Serializable, whereas org.apache.flink.streaming.api.functions.async.AsyncFunction is. This leads to the job not starting as the as async function can't be serialized during initialization. ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/yew1eb/flink FLINK-7547 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4614.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4614 commit 93edc636d5804e4a50a818cd60199d25be3f073e Author: yew1ebDate: 2017-08-29T12:25:49Z AsyncFunction.scala extends Function, serialized > o.a.f.s.api.scala.async.AsyncFunction is not declared Serializable > -- > > Key: FLINK-7547 > URL: https://issues.apache.org/jira/browse/FLINK-7547 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.3.2 >Reporter: Elias Levy >Priority: Minor > > {{org.apache.flink.streaming.api.scala.async.AsyncFunction}} is not declared > {{Serializable}}, whereas > {{org.apache.flink.streaming.api.functions.async.AsyncFunction}} is. This > leads to the job not starting as the as async function can't be serialized > during initialization. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4614: [FLINK-7547] AsyncFunction.scala extends Function,...
GitHub user yew1eb opened a pull request: https://github.com/apache/flink/pull/4614 [FLINK-7547] AsyncFunction.scala extends Function, serialized fix [#issue FLINK-7547](https://issues.apache.org/jira/browse/FLINK-7547) details: org.apache.flink.streaming.api.scala.async.AsyncFunction is not declared Serializable, whereas org.apache.flink.streaming.api.functions.async.AsyncFunction is. This leads to the job not starting as the as async function can't be serialized during initialization. ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/yew1eb/flink FLINK-7547 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4614.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4614 commit 93edc636d5804e4a50a818cd60199d25be3f073e Author: yew1ebDate: 2017-08-29T12:25:49Z AsyncFunction.scala extends Function, serialized --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7520) let our Buffer class extend from netty's buffer class
[ https://issues.apache.org/jira/browse/FLINK-7520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16145200#comment-16145200 ] ASF GitHub Bot commented on FLINK-7520: --- GitHub user NicoK opened a pull request: https://github.com/apache/flink/pull/4613 [FLINK-7520][network] let our Buffer class extend from Netty's buffer class ## What is the purpose of the change With this PR, we extend out own `Buffer` class to extend from Netty's `ByteBuf` class so that we can avoid one buffer copy while transferring data through Netty but keep our `MemorySegment` logic, performance, and configuration. Note that this PR is based on several previous smaller PRs which are all needed: #4445, #4447, #4506, #4481, #4517, #4518, #4528, #4581, #4590, #4591, #4592, #4593, and #4594. ## Brief change log - extract the `Buffer` interface (common functions used by non-Netty code inside Flink) and a `NetworkBuffer` implementation (extending from `ByteBuf`, implementing `Buffer`) - change `Buffer` interface to follow the (separated) reader and writer index logic that Netty has and replace the `#getSize()`, ' #setSize()` logic, i.e. ``` +---+++ | discardable bytes | readable bytes | writable bytes | +---+++ | ||| 0 <= readerIndex <= writerIndex <= size ``` (currently, only the writer index is used and both reading and writing in Flink code is performed exclusively either on NIO buffer wrappers or the underlying `MemorySegment`s directly) - add `getNioBuffer()` and `getNioBufferReadable()` for properly accessing underlying buffer regions - since we inherit from `AbstractByteBuf`, only one thread should work with the `Buffer`'s (meta)data as modifications to the indices are not thread-safe - this is the usual case though - add `NetworkBuffer#setAllocator()` which is necessary to set before giving a `NetworkBuffer` into Netty code (we do not rely on this allocator in our code!) ## Verifying this change This change added tests and can be verified as follows: - extended `BufferTest` by inheriting from Netty's `AbstractByteBufTest` (copied into our sources due to it not being available in a separate test jar) to verify our buffer implementation follows Netty's invariants - existing (integration) tests such as `NettyMessageSerializationTest` and `PartitionRequestClientHandlerTest` for the changes in the use of the new APIs - any other (integration) test that uses the network stack ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (yes) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (JavaDocs) You can merge this pull request into a Git repository by running: $ git pull https://github.com/NicoK/flink flink-7520 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4613.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4613 commit 2ae08d79712235a965db45ee739076cd6a3601fa Author: Nico KruberDate: 2017-07-31T10:06:14Z [hotfix] fix some typos commit cda26a0d8e6d07c48ac03ee4aab74c8699a04428 Author: Nico Kruber Date: 2017-08-02T09:35:16Z [hotfix][tests] add missing test descriptions commit 3b921d60c1ff969874363c75916a1d40fcc99847 Author: Nico Kruber Date: 2017-08-02T09:34:54Z [FLINK-7310][core] always use the HybridMemorySegment Since we'd like to use our own off-heap buffers for network communication, we cannot use HeapMemorySegment anymore and need to rely on HybridMemorySegment. We thus drop any code that loads the HeapMemorySegment (it is still available if needed) in favour of the HybridMemorySegment which is able to work on both heap and off-heap memory. For the performance penalty of this change compared to using HeapMemorySegment alone, see this interesting blob article (from 2015): https://flink.apache.org/news/2015/09/16/off-heap-memory.html commit 3bdd01454dae9eafbd220a5d5554d402e12b8d9f Author: Nico Kruber
[jira] [Commented] (FLINK-7245) Enhance the operators to support holding back watermarks
[ https://issues.apache.org/jira/browse/FLINK-7245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16145199#comment-16145199 ] ASF GitHub Bot commented on FLINK-7245: --- Github user xccui commented on the issue: https://github.com/apache/flink/pull/4530 I totally understand the choice, @fhueske Thanks for the refactoring. > Enhance the operators to support holding back watermarks > > > Key: FLINK-7245 > URL: https://issues.apache.org/jira/browse/FLINK-7245 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Xingcan Cui >Assignee: Xingcan Cui > > Currently the watermarks are applied and emitted by the > {{AbstractStreamOperator}} instantly. > {code:java} > public void processWatermark(Watermark mark) throws Exception { > if (timeServiceManager != null) { > timeServiceManager.advanceWatermark(mark); > } > output.emitWatermark(mark); > } > {code} > Some calculation results (with timestamp fields) triggered by these > watermarks (e.g., join or aggregate results) may be regarded as delayed by > the downstream operators since their timestamps must be less than or equal to > the corresponding triggers. > This issue aims to add another "working mode", which supports holding back > watermarks, to current operators. These watermarks should be blocked and > stored by the operators until all the corresponding new generated results are > emitted. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4613: [FLINK-7520][network] let our Buffer class extend ...
GitHub user NicoK opened a pull request: https://github.com/apache/flink/pull/4613 [FLINK-7520][network] let our Buffer class extend from Netty's buffer class ## What is the purpose of the change With this PR, we extend out own `Buffer` class to extend from Netty's `ByteBuf` class so that we can avoid one buffer copy while transferring data through Netty but keep our `MemorySegment` logic, performance, and configuration. Note that this PR is based on several previous smaller PRs which are all needed: #4445, #4447, #4506, #4481, #4517, #4518, #4528, #4581, #4590, #4591, #4592, #4593, and #4594. ## Brief change log - extract the `Buffer` interface (common functions used by non-Netty code inside Flink) and a `NetworkBuffer` implementation (extending from `ByteBuf`, implementing `Buffer`) - change `Buffer` interface to follow the (separated) reader and writer index logic that Netty has and replace the `#getSize()`, ' #setSize()` logic, i.e. ``` +---+++ | discardable bytes | readable bytes | writable bytes | +---+++ | ||| 0 <= readerIndex <= writerIndex <= size ``` (currently, only the writer index is used and both reading and writing in Flink code is performed exclusively either on NIO buffer wrappers or the underlying `MemorySegment`s directly) - add `getNioBuffer()` and `getNioBufferReadable()` for properly accessing underlying buffer regions - since we inherit from `AbstractByteBuf`, only one thread should work with the `Buffer`'s (meta)data as modifications to the indices are not thread-safe - this is the usual case though - add `NetworkBuffer#setAllocator()` which is necessary to set before giving a `NetworkBuffer` into Netty code (we do not rely on this allocator in our code!) ## Verifying this change This change added tests and can be verified as follows: - extended `BufferTest` by inheriting from Netty's `AbstractByteBufTest` (copied into our sources due to it not being available in a separate test jar) to verify our buffer implementation follows Netty's invariants - existing (integration) tests such as `NettyMessageSerializationTest` and `PartitionRequestClientHandlerTest` for the changes in the use of the new APIs - any other (integration) test that uses the network stack ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (yes) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (JavaDocs) You can merge this pull request into a Git repository by running: $ git pull https://github.com/NicoK/flink flink-7520 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4613.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4613 commit 2ae08d79712235a965db45ee739076cd6a3601fa Author: Nico KruberDate: 2017-07-31T10:06:14Z [hotfix] fix some typos commit cda26a0d8e6d07c48ac03ee4aab74c8699a04428 Author: Nico Kruber Date: 2017-08-02T09:35:16Z [hotfix][tests] add missing test descriptions commit 3b921d60c1ff969874363c75916a1d40fcc99847 Author: Nico Kruber Date: 2017-08-02T09:34:54Z [FLINK-7310][core] always use the HybridMemorySegment Since we'd like to use our own off-heap buffers for network communication, we cannot use HeapMemorySegment anymore and need to rely on HybridMemorySegment. We thus drop any code that loads the HeapMemorySegment (it is still available if needed) in favour of the HybridMemorySegment which is able to work on both heap and off-heap memory. For the performance penalty of this change compared to using HeapMemorySegment alone, see this interesting blob article (from 2015): https://flink.apache.org/news/2015/09/16/off-heap-memory.html commit 3bdd01454dae9eafbd220a5d5554d402e12b8d9f Author: Nico Kruber Date: 2017-08-02T09:27:49Z [hotfix][core] add additional final methods in final classes This applies the scheme of HeapMemorySegment to HybridMemorySegment where core methods are also marked "final"
[GitHub] flink issue #4530: [FLINK-7245] [stream] Support holding back watermarks wit...
Github user xccui commented on the issue: https://github.com/apache/flink/pull/4530 I totally understand the choice, @fhueske ð Thanks for the refactoring. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7547) o.a.f.s.api.scala.async.AsyncFunction is not declared Serializable
[ https://issues.apache.org/jira/browse/FLINK-7547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16145186#comment-16145186 ] Hai Zhou commented on FLINK-7547: - HI [~elevy]. I will give a PR to fix it. > o.a.f.s.api.scala.async.AsyncFunction is not declared Serializable > -- > > Key: FLINK-7547 > URL: https://issues.apache.org/jira/browse/FLINK-7547 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.3.2 >Reporter: Elias Levy >Priority: Minor > > {{org.apache.flink.streaming.api.scala.async.AsyncFunction}} is not declared > {{Serializable}}, whereas > {{org.apache.flink.streaming.api.functions.async.AsyncFunction}} is. This > leads to the job not starting as the as async function can't be serialized > during initialization. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7245) Enhance the operators to support holding back watermarks
[ https://issues.apache.org/jira/browse/FLINK-7245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16145165#comment-16145165 ] ASF GitHub Bot commented on FLINK-7245: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/4530 You're right @xccui, this is a trade off. I thought about this again and agree with @aljoscha that it would be better to avoid the additional method call. The `processWatermark()` method is called many times in every DataStream program and the duplicated code are less than 10 lines of code in 4 classes. I will do the refactoring and merge this PR. Thanks, Fabian > Enhance the operators to support holding back watermarks > > > Key: FLINK-7245 > URL: https://issues.apache.org/jira/browse/FLINK-7245 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Xingcan Cui >Assignee: Xingcan Cui > > Currently the watermarks are applied and emitted by the > {{AbstractStreamOperator}} instantly. > {code:java} > public void processWatermark(Watermark mark) throws Exception { > if (timeServiceManager != null) { > timeServiceManager.advanceWatermark(mark); > } > output.emitWatermark(mark); > } > {code} > Some calculation results (with timestamp fields) triggered by these > watermarks (e.g., join or aggregate results) may be regarded as delayed by > the downstream operators since their timestamps must be less than or equal to > the corresponding triggers. > This issue aims to add another "working mode", which supports holding back > watermarks, to current operators. These watermarks should be blocked and > stored by the operators until all the corresponding new generated results are > emitted. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4530: [FLINK-7245] [stream] Support holding back watermarks wit...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/4530 You're right @xccui, this is a trade off. I thought about this again and agree with @aljoscha that it would be better to avoid the additional method call. The `processWatermark()` method is called many times in every DataStream program and the duplicated code are less than 10 lines of code in 4 classes. I will do the refactoring and merge this PR. Thanks, Fabian --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7502) PrometheusReporter improvements
[ https://issues.apache.org/jira/browse/FLINK-7502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16145155#comment-16145155 ] ASF GitHub Bot commented on FLINK-7502: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4586#discussion_r135764923 --- Diff: flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java --- @@ -160,6 +151,43 @@ public void invalidCharactersAreReplacedWithUnderscore() { assertThat(PrometheusReporter.replaceInvalidChars("a,=;:?'b,=;:?'c"), equalTo("a___:__b___:__c")); } + @Test + public void registeringSameMetricTwiceDoesNotThrowException() { + Counter counter = new SimpleCounter(); + counter.inc(); + String counterName = "testCounter"; + final FrontMetricGroup group = new FrontMetricGroup<>(0, new TaskManagerMetricGroup(registry, HOST_NAME, TASK_MANAGER)); + + reporter.notifyOfAddedMetric(counter, counterName, group); + reporter.notifyOfAddedMetric(counter, counterName, group); + } + + @Test + public void cannotStartTwoReportersOnSamePort() { + final MetricRegistry fixedPort1 = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1", "12345"))); + final MetricRegistry fixedPort2 = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test2", "12345"))); + assertThat(fixedPort1.getReporters(), hasSize(1)); + assertThat(fixedPort2.getReporters(), hasSize(0)); + } + + @Test + public void canStartTwoReportersWhenUsingPortRange() { + final MetricRegistry portRange1 = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1", "9249-9252"))); + final MetricRegistry portRange2 = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test2", "9249-9252"))); + assertThat(portRange1.getReporters(), hasSize(1)); + assertThat(portRange2.getReporters(), hasSize(1)); + } + + @Test + public void cannotStartThreeReportersWhenPortRangeIsTooSmall() { + final MetricRegistry smallPortRange1 = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1", "9253-9254"))); --- End diff -- You should call `MetricRegistry#shutdown()` when you no longer need it. > PrometheusReporter improvements > --- > > Key: FLINK-7502 > URL: https://issues.apache.org/jira/browse/FLINK-7502 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.4.0 >Reporter: Maximilian Bode >Assignee: Maximilian Bode >Priority: Minor > > * do not throw exceptions on metrics being registered for second time > * allow port ranges for setups where multiple reporters are on same host > (e.g. one TaskManager and one JobManager) > * do not use nanohttpd anymore, there is now a minimal http server included > in [Prometheus JVM client|https://github.com/prometheus/client_java] -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7502) PrometheusReporter improvements
[ https://issues.apache.org/jira/browse/FLINK-7502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16145153#comment-16145153 ] ASF GitHub Bot commented on FLINK-7502: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4586#discussion_r135764351 --- Diff: flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java --- @@ -84,21 +84,30 @@ static String replaceInvalidChars(final String input) { @Override public void open(MetricConfig config) { - int port = config.getInteger(ARG_PORT, DEFAULT_PORT); - LOG.info("Using port {}.", port); - prometheusEndpoint = new PrometheusEndpoint(port); - try { - prometheusEndpoint.start(NanoHTTPD.SOCKET_READ_TIMEOUT, true); - } catch (IOException e) { - final String msg = "Could not start PrometheusEndpoint on port " + port; - LOG.warn(msg, e); - throw new RuntimeException(msg, e); + String portsConfig = config.getString(ARG_PORT, DEFAULT_PORT); + + if (portsConfig != null) { --- End diff -- This check isn't needed. If we keep it we should also throw an exception if portsConfig is null as otherwise we're hitting an NPE later on since httpServer is still null. > PrometheusReporter improvements > --- > > Key: FLINK-7502 > URL: https://issues.apache.org/jira/browse/FLINK-7502 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.4.0 >Reporter: Maximilian Bode >Assignee: Maximilian Bode >Priority: Minor > > * do not throw exceptions on metrics being registered for second time > * allow port ranges for setups where multiple reporters are on same host > (e.g. one TaskManager and one JobManager) > * do not use nanohttpd anymore, there is now a minimal http server included > in [Prometheus JVM client|https://github.com/prometheus/client_java] -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7502) PrometheusReporter improvements
[ https://issues.apache.org/jira/browse/FLINK-7502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16145154#comment-16145154 ] ASF GitHub Bot commented on FLINK-7502: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4586#discussion_r135764533 --- Diff: flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java --- @@ -130,7 +139,11 @@ public void notifyOfAddedMetric(final Metric metric, final String metricName, fi metric.getClass().getName()); return; } - collector.register(); + try { + collector.register(); + } catch (Exception e) { + LOG.warn("There was a problem registering metric {}: {}", metricName, e); --- End diff -- We usually don't include placeholders for exceptions (because they are added implicitly), i.e. it should be ```LOG.warn("There was a problem registering metric {}.", metricName, e);``` > PrometheusReporter improvements > --- > > Key: FLINK-7502 > URL: https://issues.apache.org/jira/browse/FLINK-7502 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.4.0 >Reporter: Maximilian Bode >Assignee: Maximilian Bode >Priority: Minor > > * do not throw exceptions on metrics being registered for second time > * allow port ranges for setups where multiple reporters are on same host > (e.g. one TaskManager and one JobManager) > * do not use nanohttpd anymore, there is now a minimal http server included > in [Prometheus JVM client|https://github.com/prometheus/client_java] -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4586: [FLINK-7502] [metrics] Improve PrometheusReporter
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4586#discussion_r135764533 --- Diff: flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java --- @@ -130,7 +139,11 @@ public void notifyOfAddedMetric(final Metric metric, final String metricName, fi metric.getClass().getName()); return; } - collector.register(); + try { + collector.register(); + } catch (Exception e) { + LOG.warn("There was a problem registering metric {}: {}", metricName, e); --- End diff -- We usually don't include placeholders for exceptions (because they are added implicitly), i.e. it should be ```LOG.warn("There was a problem registering metric {}.", metricName, e);``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4586: [FLINK-7502] [metrics] Improve PrometheusReporter
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4586#discussion_r135764923 --- Diff: flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java --- @@ -160,6 +151,43 @@ public void invalidCharactersAreReplacedWithUnderscore() { assertThat(PrometheusReporter.replaceInvalidChars("a,=;:?'b,=;:?'c"), equalTo("a___:__b___:__c")); } + @Test + public void registeringSameMetricTwiceDoesNotThrowException() { + Counter counter = new SimpleCounter(); + counter.inc(); + String counterName = "testCounter"; + final FrontMetricGroup group = new FrontMetricGroup<>(0, new TaskManagerMetricGroup(registry, HOST_NAME, TASK_MANAGER)); + + reporter.notifyOfAddedMetric(counter, counterName, group); + reporter.notifyOfAddedMetric(counter, counterName, group); + } + + @Test + public void cannotStartTwoReportersOnSamePort() { + final MetricRegistry fixedPort1 = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1", "12345"))); + final MetricRegistry fixedPort2 = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test2", "12345"))); + assertThat(fixedPort1.getReporters(), hasSize(1)); + assertThat(fixedPort2.getReporters(), hasSize(0)); + } + + @Test + public void canStartTwoReportersWhenUsingPortRange() { + final MetricRegistry portRange1 = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1", "9249-9252"))); + final MetricRegistry portRange2 = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test2", "9249-9252"))); + assertThat(portRange1.getReporters(), hasSize(1)); + assertThat(portRange2.getReporters(), hasSize(1)); + } + + @Test + public void cannotStartThreeReportersWhenPortRangeIsTooSmall() { + final MetricRegistry smallPortRange1 = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1", "9253-9254"))); --- End diff -- You should call `MetricRegistry#shutdown()` when you no longer need it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4586: [FLINK-7502] [metrics] Improve PrometheusReporter
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4586#discussion_r135764351 --- Diff: flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java --- @@ -84,21 +84,30 @@ static String replaceInvalidChars(final String input) { @Override public void open(MetricConfig config) { - int port = config.getInteger(ARG_PORT, DEFAULT_PORT); - LOG.info("Using port {}.", port); - prometheusEndpoint = new PrometheusEndpoint(port); - try { - prometheusEndpoint.start(NanoHTTPD.SOCKET_READ_TIMEOUT, true); - } catch (IOException e) { - final String msg = "Could not start PrometheusEndpoint on port " + port; - LOG.warn(msg, e); - throw new RuntimeException(msg, e); + String portsConfig = config.getString(ARG_PORT, DEFAULT_PORT); + + if (portsConfig != null) { --- End diff -- This check isn't needed. If we keep it we should also throw an exception if portsConfig is null as otherwise we're hitting an NPE later on since httpServer is still null. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Assigned] (FLINK-7438) Some classes are eclipsed by classes in package scala
[ https://issues.apache.org/jira/browse/FLINK-7438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hai Zhou reassigned FLINK-7438: --- Assignee: Hai Zhou > Some classes are eclipsed by classes in package scala > - > > Key: FLINK-7438 > URL: https://issues.apache.org/jira/browse/FLINK-7438 > Project: Flink > Issue Type: Bug > Components: Build System, DataStream API >Reporter: Ted Yu >Assignee: Hai Zhou >Priority: Minor > > Noticed the following during compilation: > {code} > [WARNING] > /flink/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala:33: > warning: imported `OutputTag' is permanently hidden by definition of > object OutputTag in package scala > [WARNING] import org.apache.flink.util.{Collector, OutputTag} > [WARNING] ^ > [WARNING] > /flink/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala:33: > warning: imported `OutputTag' is permanently hidden by definition of > class OutputTag in package scala > [WARNING] import org.apache.flink.util.{Collector, OutputTag} > {code} > We should avoid the warning e.r.t. OutputTag. > There may be other occurrences of similar warning. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5886) Python API for streaming applications
[ https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16145095#comment-16145095 ] ASF GitHub Bot commented on FLINK-5886: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/3838 There is none that I'm aware of. It is also possible for the JM and TM to run in the same JVM, say for tests or in local mode. I can't think of a nice way to solve this, so I suggest we simply disable the check for the PythonEnvironmentConfig class. > Python API for streaming applications > - > > Key: FLINK-5886 > URL: https://issues.apache.org/jira/browse/FLINK-5886 > Project: Flink > Issue Type: New Feature > Components: Python API >Reporter: Zohar Mizrahi >Assignee: Zohar Mizrahi > > A work in progress to provide python interface for Flink streaming APIs. The > core technology is based on jython and thus imposes two limitations: a. user > defined functions cannot use python extensions. b. the python version is 2.x > The branch is based on Flink release 1.2.0, as can be found here: > https://github.com/zohar-pm/flink/tree/python-streaming > In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was > setup properly (see: > https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html), > one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, > which in return will execute all the tests under > {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #3838: [FLINK-5886] Python API for streaming applications
Github user zentol commented on the issue: https://github.com/apache/flink/pull/3838 There is none that I'm aware of. It is also possible for the JM and TM to run in the same JVM, say for tests or in local mode. I can't think of a nice way to solve this, so I suggest we simply disable the check for the PythonEnvironmentConfig class. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-7550) Give names to REST client/server for clearer logging.
Kostas Kloudas created FLINK-7550: - Summary: Give names to REST client/server for clearer logging. Key: FLINK-7550 URL: https://issues.apache.org/jira/browse/FLINK-7550 Project: Flink Issue Type: Improvement Components: REST Affects Versions: 1.4.0 Reporter: Kostas Kloudas Fix For: 1.4.0 This issue proposes to give names to the entities composing a REST-ful service and use these names when logging messages. This will help debugging. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7549) CEP - Pattern not discovered if source streaming is very fast
[ https://issues.apache.org/jira/browse/FLINK-7549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Paolo Rendano updated FLINK-7549: - Description: Hi all, I'm doing some stress test on my pattern using JMeter to populate source data on a rabbitmq queue. This queue contains status generated by different devices . In my test case I set to loop on a base of 1000 cycles, each one sending respectively the first and the second status that generate the event using flink CEP (status keyed by device). I expect to get an output of 1000 events. In my early tests I launched that but I noticed that I get only partial results in output (70/80% of the expected ones). Introducing a delay in jmeter plan between the sending of the two status solved the problem. The minimum delay (of course this is on my local machine, on other machines may vary) that make things work is 20/25 ms. My code is structured this way (the following is a semplification): {code:java} final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setAutoWatermarkInterval(100L); // source definition DataStream dataStreamSource = env.addSource(new MYRMQAutoboundQueueSource<>(connectionConfig, conf.getSourceExchange(), conf.getSourceRoutingKey(), conf.getSourceQueueName(), true, new MyMessageWrapperSchema())) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor(Time.minutes(1)) { private static final long serialVersionUID = -1L; @Override public long extractTimestamp(MyMessageWrapper element) { if (element.getData().get("stateTimestamp")==null) { throw new RuntimeException("Status Timestamp is null during time ordering for device [" + element.getData().get("deviceCode") + "]"); } return FlinkUtils.getTimeFromJsonTimestamp(element.getData().get("stateTimestamp")).getTime(); } }) .name("MyIncomingStatus"); // PATTERN DEFINITION PatternmyPattern = Pattern .begin("start") .subtype(MyMessageWrapper.class) .where(whereEquals("st", "none")) .next("end") .subtype(MyMessageWrapper.class) .where(whereEquals("st","started")) .within(Time.minutes(3)); // CEP DEFINITION PatternStream myPatternStream = CEP.pattern(dataStreamSource.keyBy(keySelector), myPattern); DataStream > outputStream = myPatternStream.flatSelect(patternFlatTimeoutFunction, patternFlatSelectFunction); // SINK DEFINITION outputStream.addSink(new MYRMQExchangeSink<>(connectionConfig, outputExchange, new MyMessageWrapperSchema())).name("MyGeneratedEvent"); {code} digging and logging messages received by flink in "extractTimestamp", what happens is that with that so high rate of messages, source may receive messages with the same timestamp but with different deviceCode. Any idea? Thanks, regards Paolo was: Hi all, I'm doing some stress test on my pattern using JMeter to populate source data on a rabbitmq queue. This queue contains status generated by different devices . In my test case I set to loop on a base of 1000 cycles, each one sending respectively the first and the second status that generate the event using flink CEP (status keyed by device). In my early tests I launched that but I noticed that I get only partial results in output (70/80% of the expected ones). Introducing a delay in jmeter plan between the sending of the two status solved the problem. The minimum delay (of course this is on my local machine, on other machines may vary) that make things work is 20/25 ms. My code is structured this way (the following is a semplification): {code:java} final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setAutoWatermarkInterval(100L); // source definition DataStream dataStreamSource = env.addSource(new MYRMQAutoboundQueueSource<>(connectionConfig, conf.getSourceExchange(), conf.getSourceRoutingKey(), conf.getSourceQueueName(), true, new MyMessageWrapperSchema())) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor(Time.minutes(1)) {
[jira] [Closed] (FLINK-7454) update 'Monitoring Current Event Time' section of Flink doc
[ https://issues.apache.org/jira/browse/FLINK-7454?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-7454. --- Resolution: Fixed 1.3: 51253c7f0395645e8ddc70d7ab6970d32a6c5b4f 1.4: 257a5a541fbadb7baaf8be7a45e7187f7cb2fee3 > update 'Monitoring Current Event Time' section of Flink doc > --- > > Key: FLINK-7454 > URL: https://issues.apache.org/jira/browse/FLINK-7454 > Project: Flink > Issue Type: Improvement > Components: Documentation >Affects Versions: 1.3.2 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Minor > Fix For: 1.4.0, 1.3.3 > > > Since FLINK-3427 is done, there's no need to have the following doc in > https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/debugging_event_time.html#monitoring-current-event-time > "There are plans (see FLINK-3427) to show the current low watermark for each > operator in the Flink web interface. > Until this feature is implemented the current low watermark for each task can > be accessed through the metrics system." > We can replace it with something like "Low watermarks of each task can be > accessed either from Flink web interface or Flink metric system." -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7544) Make all PathParameters mandatory
[ https://issues.apache.org/jira/browse/FLINK-7544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-7544. --- Resolution: Fixed 1.4: dcce0b7631bf65ea66dbe0d64b368c7143815f9e > Make all PathParameters mandatory > - > > Key: FLINK-7544 > URL: https://issues.apache.org/jira/browse/FLINK-7544 > Project: Flink > Issue Type: Improvement > Components: REST >Affects Versions: 1.4.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler > Fix For: 1.4.0 > > > In the current REST architecture all path parameters are mandatory, so we > should mark them as such in {{MessagePathParameter}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7543) Simplify REST parameter access.
[ https://issues.apache.org/jira/browse/FLINK-7543?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-7543. --- Resolution: Fixed 1.4: a6905df09098caf6c2b3c11e164132784801d815 > Simplify REST parameter access. > --- > > Key: FLINK-7543 > URL: https://issues.apache.org/jira/browse/FLINK-7543 > Project: Flink > Issue Type: Improvement > Components: REST >Affects Versions: 1.4.0 >Reporter: Kostas Kloudas >Assignee: Chesnay Schepler > Fix For: 1.4.0 > > > Currently you have to do: > {{ > final ParameterTypes.JobIdPathParam jobId = > request.getPathParameter(ParameterTypes.JobIdPathParam.class); > JobID jobID = jobId.getValue(); > }} > This issue proposes to remove the second step and return directly the value, > while performing the necessary checks internally (different for query and > path parameters), without exposing it to the user. -- This message was sent by Atlassian JIRA (v6.4.14#64029)