[jira] [Commented] (FLINK-9926) Allow for ShardConsumer override in Kinesis consumer
[ https://issues.apache.org/jira/browse/FLINK-9926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556931#comment-16556931 ] ASF GitHub Bot commented on FLINK-9926: --- yxu-valleytider removed a comment on issue #6427: [FLINK-9926][Kinesis Connector] Allow for ShardConsumer override in Kinesis consumer. URL: https://github.com/apache/flink/pull/6427#issuecomment-407982416 I see. Thanks Thomas. Seems streamingplatform master POM still refers to 1.4 release tag. Will that get changed to release-1.5 soon? `1.4-lyft20180719` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allow for ShardConsumer override in Kinesis consumer > > > Key: FLINK-9926 > URL: https://issues.apache.org/jira/browse/FLINK-9926 > Project: Flink > Issue Type: Task > Components: Kinesis Connector >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Minor > Labels: pull-request-available > > There are various reasons why the user may want to override the consumer. > Examples are to optimize the run loop or to add additional metrics or > logging. Instead of baking the constructor into runFetcher, create a > customizable factory method. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yxu-valleytider removed a comment on issue #6427: [FLINK-9926][Kinesis Connector] Allow for ShardConsumer override in Kinesis consumer.
yxu-valleytider removed a comment on issue #6427: [FLINK-9926][Kinesis Connector] Allow for ShardConsumer override in Kinesis consumer. URL: https://github.com/apache/flink/pull/6427#issuecomment-407982416 I see. Thanks Thomas. Seems streamingplatform master POM still refers to 1.4 release tag. Will that get changed to release-1.5 soon? `1.4-lyft20180719` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yxu-valleytider commented on issue #6427: [FLINK-9926][Kinesis Connector] Allow for ShardConsumer override in Kinesis consumer.
yxu-valleytider commented on issue #6427: [FLINK-9926][Kinesis Connector] Allow for ShardConsumer override in Kinesis consumer. URL: https://github.com/apache/flink/pull/6427#issuecomment-407982416 I see. Thanks Thomas. Seems streamingplatform master POM still refers to 1.4 release tag. Will that get changed to release-1.5 soon? `1.4-lyft20180719` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9926) Allow for ShardConsumer override in Kinesis consumer
[ https://issues.apache.org/jira/browse/FLINK-9926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556930#comment-16556930 ] ASF GitHub Bot commented on FLINK-9926: --- yxu-valleytider commented on issue #6427: [FLINK-9926][Kinesis Connector] Allow for ShardConsumer override in Kinesis consumer. URL: https://github.com/apache/flink/pull/6427#issuecomment-407982416 I see. Thanks Thomas. Seems streamingplatform master POM still refers to 1.4 release tag. Will that get changed to release-1.5 soon? `1.4-lyft20180719` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allow for ShardConsumer override in Kinesis consumer > > > Key: FLINK-9926 > URL: https://issues.apache.org/jira/browse/FLINK-9926 > Project: Flink > Issue Type: Task > Components: Kinesis Connector >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Minor > Labels: pull-request-available > > There are various reasons why the user may want to override the consumer. > Examples are to optimize the run loop or to add additional metrics or > logging. Instead of baking the constructor into runFetcher, create a > customizable factory method. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9690) Restoring state with FlinkKafkaProducer and Kafka 1.1.0 client fails
[ https://issues.apache.org/jira/browse/FLINK-9690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556924#comment-16556924 ] Aljoscha Krettek commented on FLINK-9690: - [~uce] did you end up opening tickets for Kafka 1.0.0/1.1.0? > Restoring state with FlinkKafkaProducer and Kafka 1.1.0 client fails > > > Key: FLINK-9690 > URL: https://issues.apache.org/jira/browse/FLINK-9690 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.2 >Reporter: Ufuk Celebi >Priority: Major > > Restoring a job from a savepoint that includes {{FlinkKafkaProducer}} > packaged with {{kafka.version}} set to {{1.1.0}} in Flink 1.4.2. > {code} > java.lang.RuntimeException: Incompatible KafkaProducer version > at > org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.getValue(FlinkKafkaProducer.java:301) > at > org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.getValue(FlinkKafkaProducer.java:292) > at > org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.resumeTransaction(FlinkKafkaProducer.java:195) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.recoverAndCommit(FlinkKafkaProducer011.java:723) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.recoverAndCommit(FlinkKafkaProducer011.java:93) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.recoverAndCommitInternal(TwoPhaseCommitSinkFunction.java:370) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:330) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initializeState(FlinkKafkaProducer011.java:856) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NoSuchFieldException: sequenceNumbers > at java.lang.Class.getDeclaredField(Class.java:2070) > at > org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.getValue(FlinkKafkaProducer.java:297) > ... 16 more > {code} > [~pnowojski] Any ideas about this issue? Judging from the stack trace it was > anticipated that reflective access might break with Kafka versions > 0.11.2.0. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9693) Possible memory leak in jobmanager retaining archived checkpoints
[ https://issues.apache.org/jira/browse/FLINK-9693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556325#comment-16556325 ] Steven Zhen Wu edited comment on FLINK-9693 at 7/26/18 5:15 AM: We can actually reproduce the issue by killing jobmanager node for very large jobs, like parallelism over 1,000. This issue starts to appear when replacement jobmanager node came up. Another observation is that ~10 GB memory leak seems to happen very quickly (like < a few mins). was (Author: stevenz3wu): We can actually reproduce the issue by killing jobmanager node for very large jobs, like parallelism over 1,000. This issue starts to appear when replacement jobmanager node came up. > Possible memory leak in jobmanager retaining archived checkpoints > - > > Key: FLINK-9693 > URL: https://issues.apache.org/jira/browse/FLINK-9693 > Project: Flink > Issue Type: Bug > Components: JobManager, State Backends, Checkpointing >Affects Versions: 1.5.0, 1.6.0 > Environment: !image.png!!image (1).png! >Reporter: Steven Zhen Wu >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.4.3, 1.5.1, 1.6.0 > > Attachments: 20180725_jm_mem_leak.png, > 41K_ExecutionVertex_objs_retained_9GB.png, ExecutionVertexZoomIn.png > > > First, some context about the job > * Flink 1.4.1 > * stand-alone deployment mode > * embarrassingly parallel: all operators are chained together > * parallelism is over 1,000 > * stateless except for Kafka source operators. checkpoint size is 8.4 MB. > * set "state.backend.fs.memory-threshold" so that only jobmanager writes to > S3 to checkpoint > * internal checkpoint with 10 checkpoints retained in history > > Summary of the observations > * 41,567 ExecutionVertex objects retained 9+ GB of memory > * Expanded in one ExecutionVertex. it seems to storing the kafka offsets for > source operator -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9928) Add LOG2 function for table/sql API
[ https://issues.apache.org/jira/browse/FLINK-9928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556914#comment-16556914 ] ASF GitHub Bot commented on FLINK-9928: --- xccui commented on issue #6404: [FLINK-9928] Add LOG2 function for table/sql API URL: https://github.com/apache/flink/pull/6404#issuecomment-407978401 Hi @yanghua, thanks for the update. I'll merge this. BTW, there's no need to squash the commits after a review . This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add LOG2 function for table/sql API > --- > > Key: FLINK-9928 > URL: https://issues.apache.org/jira/browse/FLINK-9928 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > Labels: pull-request-available > > refer to : > https://dev.mysql.com/doc/refman/8.0/en/mathematical-functions.html#function_log2 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] xccui commented on issue #6404: [FLINK-9928] Add LOG2 function for table/sql API
xccui commented on issue #6404: [FLINK-9928] Add LOG2 function for table/sql API URL: https://github.com/apache/flink/pull/6404#issuecomment-407978401 Hi @yanghua, thanks for the update. I'll merge this. BTW, there's no need to squash the commits after a review . This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-7525) Add config option to disable Cancel functionality on UI
[ https://issues.apache.org/jira/browse/FLINK-7525?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang reassigned FLINK-7525: --- Assignee: vinoyang > Add config option to disable Cancel functionality on UI > --- > > Key: FLINK-7525 > URL: https://issues.apache.org/jira/browse/FLINK-7525 > Project: Flink > Issue Type: Improvement > Components: Web Client, Webfrontend >Reporter: Ted Yu >Assignee: vinoyang >Priority: Major > > In this email thread > http://search-hadoop.com/m/Flink/VkLeQlf0QOnc7YA?subj=Security+Control+of+running+Flink+Jobs+on+Flink+UI > , Raja was asking for a way to control how users cancel Job(s). > Robert proposed adding a config option which disables the Cancel > functionality. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9928) Add LOG2 function for table/sql API
[ https://issues.apache.org/jira/browse/FLINK-9928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556904#comment-16556904 ] ASF GitHub Bot commented on FLINK-9928: --- yanghua commented on issue #6404: [FLINK-9928] Add LOG2 function for table/sql API URL: https://github.com/apache/flink/pull/6404#issuecomment-407976155 @xccui thanks for your suggestion, have refactored, please review again. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add LOG2 function for table/sql API > --- > > Key: FLINK-9928 > URL: https://issues.apache.org/jira/browse/FLINK-9928 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > Labels: pull-request-available > > refer to : > https://dev.mysql.com/doc/refman/8.0/en/mathematical-functions.html#function_log2 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on issue #6404: [FLINK-9928] Add LOG2 function for table/sql API
yanghua commented on issue #6404: [FLINK-9928] Add LOG2 function for table/sql API URL: https://github.com/apache/flink/pull/6404#issuecomment-407976155 @xccui thanks for your suggestion, have refactored, please review again. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-9926) Allow for ShardConsumer override in Kinesis consumer
[ https://issues.apache.org/jira/browse/FLINK-9926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-9926: -- Labels: pull-request-available (was: ) > Allow for ShardConsumer override in Kinesis consumer > > > Key: FLINK-9926 > URL: https://issues.apache.org/jira/browse/FLINK-9926 > Project: Flink > Issue Type: Task > Components: Kinesis Connector >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Minor > Labels: pull-request-available > > There are various reasons why the user may want to override the consumer. > Examples are to optimize the run loop or to add additional metrics or > logging. Instead of baking the constructor into runFetcher, create a > customizable factory method. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9926) Allow for ShardConsumer override in Kinesis consumer
[ https://issues.apache.org/jira/browse/FLINK-9926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556898#comment-16556898 ] ASF GitHub Bot commented on FLINK-9926: --- tweise opened a new pull request #6427: [FLINK-9926][Kinesis Connector] Allow for ShardConsumer override in Kinesis consumer. URL: https://github.com/apache/flink/pull/6427 Ability to override the shard consumer in derived classes, smaller unit of override for shard consumer run loop, consistent use of custom KinesisProxy for discovery and record fetch through factory use. R: @tzulitai @glaksh100 @jgrier @yxu-valleytider This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allow for ShardConsumer override in Kinesis consumer > > > Key: FLINK-9926 > URL: https://issues.apache.org/jira/browse/FLINK-9926 > Project: Flink > Issue Type: Task > Components: Kinesis Connector >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Minor > Labels: pull-request-available > > There are various reasons why the user may want to override the consumer. > Examples are to optimize the run loop or to add additional metrics or > logging. Instead of baking the constructor into runFetcher, create a > customizable factory method. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tweise opened a new pull request #6427: [FLINK-9926][Kinesis Connector] Allow for ShardConsumer override in Kinesis consumer.
tweise opened a new pull request #6427: [FLINK-9926][Kinesis Connector] Allow for ShardConsumer override in Kinesis consumer. URL: https://github.com/apache/flink/pull/6427 Ability to override the shard consumer in derived classes, smaller unit of override for shard consumer run loop, consistent use of custom KinesisProxy for discovery and record fetch through factory use. R: @tzulitai @glaksh100 @jgrier @yxu-valleytider This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9630) Kafka09PartitionDiscoverer cause connection leak on TopicAuthorizationException
[ https://issues.apache.org/jira/browse/FLINK-9630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556893#comment-16556893 ] ASF GitHub Bot commented on FLINK-9630: --- yanghua commented on issue #6336: [FLINK-9630] [connector] Kafka09PartitionDiscoverer cause connection … URL: https://github.com/apache/flink/pull/6336#issuecomment-407974746 To merge PR, you can ping @tillrohrmann and @zentol This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Kafka09PartitionDiscoverer cause connection leak on > TopicAuthorizationException > --- > > Key: FLINK-9630 > URL: https://issues.apache.org/jira/browse/FLINK-9630 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.5.0, 1.4.2 > Environment: Linux 2.6, java 8, Kafka broker 0.10.x >Reporter: Youjun Yuan >Priority: Major > Labels: pull-request-available > Fix For: 1.5.3 > > > when the Kafka topic got deleted, during task starting process, > Kafka09PartitionDiscoverer will get a *TopicAuthorizationException* in > getAllPartitionsForTopics(), and it get no chance to close the > kafkaConsumer, hence resulting TCP connection leak (to Kafka broker). > > *this issue can bring down the whole Flink cluster*, because, in a default > setup (fixedDelay with INT.MAX restart attempt), job manager will randomly > schedule the job to any TaskManager that has free slot, and each attemp will > cause the TaskManager to leak a TCP connection, eventually almost every > TaskManager will run out of file handle, hence no taskmanger could make > snapshot, or accept new job. Effectly stops the whole cluster. > > The leak happens when StreamTask.invoke() calls openAllOperators(), then > FlinkKafkaConsumerBase.open() calls partitionDiscoverer.discoverPartitions(), > when kafkaConsumer.partitionsFor(topic) in > KafkaPartitionDiscoverer.getAllPartitionsForTopics() hit a > *TopicAuthorizationException,* no one catches this. > Though StreamTask.open catches Exception and invoks the dispose() method of > each operator, which eventaully invoke FlinkKakfaConsumerBase.cancel(), > however it does not close the kakfaConsumer in partitionDiscoverer, not even > invoke the partitionDiscoverer.wakeup(), because the discoveryLoopThread was > null. > > below is the code of FlinkKakfaConsumerBase.cancel() for your convenience > public void cancel() { > // set ourselves as not running; > // this would let the main discovery loop escape as soon as possible > running = false; > if (discoveryLoopThread != null) { > if (partitionDiscoverer != null) > { // we cannot close the discoverer here, as it is error-prone to > concurrent access; // only wakeup the discoverer, the discovery > loop will clean itself up after it escapes > partitionDiscoverer.wakeup(); } > // the discovery loop may currently be sleeping in-between > // consecutive discoveries; interrupt to shutdown faster > discoveryLoopThread.interrupt(); > } > // abort the fetcher, if there is one > if (kafkaFetcher != null) > { kafkaFetcher.cancel(); } > } > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on issue #6336: [FLINK-9630] [connector] Kafka09PartitionDiscoverer cause connection …
yanghua commented on issue #6336: [FLINK-9630] [connector] Kafka09PartitionDiscoverer cause connection … URL: https://github.com/apache/flink/pull/6336#issuecomment-407974746 To merge PR, you can ping @tillrohrmann and @zentol This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9630) Kafka09PartitionDiscoverer cause connection leak on TopicAuthorizationException
[ https://issues.apache.org/jira/browse/FLINK-9630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556892#comment-16556892 ] ASF GitHub Bot commented on FLINK-9630: --- ubyyj commented on issue #6336: [FLINK-9630] [connector] Kafka09PartitionDiscoverer cause connection … URL: https://github.com/apache/flink/pull/6336#issuecomment-407974446 @zhangminglei would you please help me get this merged? thanks This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Kafka09PartitionDiscoverer cause connection leak on > TopicAuthorizationException > --- > > Key: FLINK-9630 > URL: https://issues.apache.org/jira/browse/FLINK-9630 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.5.0, 1.4.2 > Environment: Linux 2.6, java 8, Kafka broker 0.10.x >Reporter: Youjun Yuan >Priority: Major > Labels: pull-request-available > Fix For: 1.5.3 > > > when the Kafka topic got deleted, during task starting process, > Kafka09PartitionDiscoverer will get a *TopicAuthorizationException* in > getAllPartitionsForTopics(), and it get no chance to close the > kafkaConsumer, hence resulting TCP connection leak (to Kafka broker). > > *this issue can bring down the whole Flink cluster*, because, in a default > setup (fixedDelay with INT.MAX restart attempt), job manager will randomly > schedule the job to any TaskManager that has free slot, and each attemp will > cause the TaskManager to leak a TCP connection, eventually almost every > TaskManager will run out of file handle, hence no taskmanger could make > snapshot, or accept new job. Effectly stops the whole cluster. > > The leak happens when StreamTask.invoke() calls openAllOperators(), then > FlinkKafkaConsumerBase.open() calls partitionDiscoverer.discoverPartitions(), > when kafkaConsumer.partitionsFor(topic) in > KafkaPartitionDiscoverer.getAllPartitionsForTopics() hit a > *TopicAuthorizationException,* no one catches this. > Though StreamTask.open catches Exception and invoks the dispose() method of > each operator, which eventaully invoke FlinkKakfaConsumerBase.cancel(), > however it does not close the kakfaConsumer in partitionDiscoverer, not even > invoke the partitionDiscoverer.wakeup(), because the discoveryLoopThread was > null. > > below is the code of FlinkKakfaConsumerBase.cancel() for your convenience > public void cancel() { > // set ourselves as not running; > // this would let the main discovery loop escape as soon as possible > running = false; > if (discoveryLoopThread != null) { > if (partitionDiscoverer != null) > { // we cannot close the discoverer here, as it is error-prone to > concurrent access; // only wakeup the discoverer, the discovery > loop will clean itself up after it escapes > partitionDiscoverer.wakeup(); } > // the discovery loop may currently be sleeping in-between > // consecutive discoveries; interrupt to shutdown faster > discoveryLoopThread.interrupt(); > } > // abort the fetcher, if there is one > if (kafkaFetcher != null) > { kafkaFetcher.cancel(); } > } > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] ubyyj commented on issue #6336: [FLINK-9630] [connector] Kafka09PartitionDiscoverer cause connection …
ubyyj commented on issue #6336: [FLINK-9630] [connector] Kafka09PartitionDiscoverer cause connection … URL: https://github.com/apache/flink/pull/6336#issuecomment-407974446 @zhangminglei would you please help me get this merged? thanks This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] xccui commented on a change in pull request #6404: [FLINK-9928] Add LOG2 function for table/sql API
xccui commented on a change in pull request #6404: [FLINK-9928] Add LOG2 function for table/sql API URL: https://github.com/apache/flink/pull/6404#discussion_r205314834 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/mathExpressions.scala ## @@ -92,6 +92,18 @@ case class Log10(child: Expression) extends UnaryExpression with InputTypeSpec { } } +case class Log2(child: Expression) extends UnaryExpression with InputTypeSpec { + override private[flink] def expectedTypes: Seq[TypeInformation[_]] = DOUBLE_TYPE_INFO :: Nil + + override private[flink] def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO + + override private[flink] def toRexNode(implicit relBuilder: RelBuilder) = { +relBuilder.call(ScalarSqlFunctions.LOG2, children.map(_.toRexNode)) Review comment: `relBuilder.call(ScalarSqlFunctions.LOG2, child.toRexNode)` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] xccui commented on a change in pull request #6404: [FLINK-9928] Add LOG2 function for table/sql API
xccui commented on a change in pull request #6404: [FLINK-9928] Add LOG2 function for table/sql API URL: https://github.com/apache/flink/pull/6404#discussion_r205320912 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala ## @@ -553,6 +553,56 @@ class ScalarFunctionsTest extends ScalarTypesTestBase { math.log10(4.6).toString) } + @Test + def testLog2(): Unit = { Review comment: Could remove some redundant cases. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9928) Add LOG2 function for table/sql API
[ https://issues.apache.org/jira/browse/FLINK-9928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556835#comment-16556835 ] ASF GitHub Bot commented on FLINK-9928: --- xccui commented on a change in pull request #6404: [FLINK-9928] Add LOG2 function for table/sql API URL: https://github.com/apache/flink/pull/6404#discussion_r205314834 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/mathExpressions.scala ## @@ -92,6 +92,18 @@ case class Log10(child: Expression) extends UnaryExpression with InputTypeSpec { } } +case class Log2(child: Expression) extends UnaryExpression with InputTypeSpec { + override private[flink] def expectedTypes: Seq[TypeInformation[_]] = DOUBLE_TYPE_INFO :: Nil + + override private[flink] def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO + + override private[flink] def toRexNode(implicit relBuilder: RelBuilder) = { +relBuilder.call(ScalarSqlFunctions.LOG2, children.map(_.toRexNode)) Review comment: `relBuilder.call(ScalarSqlFunctions.LOG2, child.toRexNode)` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add LOG2 function for table/sql API > --- > > Key: FLINK-9928 > URL: https://issues.apache.org/jira/browse/FLINK-9928 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > Labels: pull-request-available > > refer to : > https://dev.mysql.com/doc/refman/8.0/en/mathematical-functions.html#function_log2 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] xccui commented on a change in pull request #6404: [FLINK-9928] Add LOG2 function for table/sql API
xccui commented on a change in pull request #6404: [FLINK-9928] Add LOG2 function for table/sql API URL: https://github.com/apache/flink/pull/6404#discussion_r205319343 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala ## @@ -67,6 +67,15 @@ object ScalarSqlFunctions { OperandTypes.family(SqlTypeFamily.NUMERIC, SqlTypeFamily.NUMERIC)), SqlFunctionCategory.NUMERIC) + val LOG2 = new SqlFunction( +"LOG2", +SqlKind.OTHER_FUNCTION, +ReturnTypes.DOUBLE_NULLABLE, +null, +OperandTypes.or(OperandTypes.NUMERIC, OperandTypes.family(SqlTypeFamily.NUMERIC)), Review comment: The `OperandTypes.NUMERIC` and `OperandTypes.family(SqlTypeFamily.NUMERIC)` are equivalent. Just use `OperandTypes.NUMERIC` here. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9928) Add LOG2 function for table/sql API
[ https://issues.apache.org/jira/browse/FLINK-9928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556834#comment-16556834 ] ASF GitHub Bot commented on FLINK-9928: --- xccui commented on a change in pull request #6404: [FLINK-9928] Add LOG2 function for table/sql API URL: https://github.com/apache/flink/pull/6404#discussion_r205319343 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala ## @@ -67,6 +67,15 @@ object ScalarSqlFunctions { OperandTypes.family(SqlTypeFamily.NUMERIC, SqlTypeFamily.NUMERIC)), SqlFunctionCategory.NUMERIC) + val LOG2 = new SqlFunction( +"LOG2", +SqlKind.OTHER_FUNCTION, +ReturnTypes.DOUBLE_NULLABLE, +null, +OperandTypes.or(OperandTypes.NUMERIC, OperandTypes.family(SqlTypeFamily.NUMERIC)), Review comment: The `OperandTypes.NUMERIC` and `OperandTypes.family(SqlTypeFamily.NUMERIC)` are equivalent. Just use `OperandTypes.NUMERIC` here. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add LOG2 function for table/sql API > --- > > Key: FLINK-9928 > URL: https://issues.apache.org/jira/browse/FLINK-9928 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > Labels: pull-request-available > > refer to : > https://dev.mysql.com/doc/refman/8.0/en/mathematical-functions.html#function_log2 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9928) Add LOG2 function for table/sql API
[ https://issues.apache.org/jira/browse/FLINK-9928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556832#comment-16556832 ] ASF GitHub Bot commented on FLINK-9928: --- xccui commented on a change in pull request #6404: [FLINK-9928] Add LOG2 function for table/sql API URL: https://github.com/apache/flink/pull/6404#discussion_r205320912 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala ## @@ -553,6 +553,56 @@ class ScalarFunctionsTest extends ScalarTypesTestBase { math.log10(4.6).toString) } + @Test + def testLog2(): Unit = { Review comment: Could remove some redundant cases. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add LOG2 function for table/sql API > --- > > Key: FLINK-9928 > URL: https://issues.apache.org/jira/browse/FLINK-9928 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > Labels: pull-request-available > > refer to : > https://dev.mysql.com/doc/refman/8.0/en/mathematical-functions.html#function_log2 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] xccui commented on a change in pull request #6404: [FLINK-9928] Add LOG2 function for table/sql API
xccui commented on a change in pull request #6404: [FLINK-9928] Add LOG2 function for table/sql API URL: https://github.com/apache/flink/pull/6404#discussion_r205314047 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala ## @@ -300,6 +300,11 @@ trait ImplicitExpressionOperations { */ def log10() = Log10(expr) + /** +* Calculates the base 2 logarithm of the given values. Review comment: ... of the given **value**. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] xccui commented on a change in pull request #6404: [FLINK-9928] Add LOG2 function for table/sql API
xccui commented on a change in pull request #6404: [FLINK-9928] Add LOG2 function for table/sql API URL: https://github.com/apache/flink/pull/6404#discussion_r205319945 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala ## @@ -108,6 +108,17 @@ object ScalarFunctions { } } + /** +* Returns the logarithm of "x" with base "2". Review comment: ... with base **2**. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9928) Add LOG2 function for table/sql API
[ https://issues.apache.org/jira/browse/FLINK-9928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556836#comment-16556836 ] ASF GitHub Bot commented on FLINK-9928: --- xccui commented on a change in pull request #6404: [FLINK-9928] Add LOG2 function for table/sql API URL: https://github.com/apache/flink/pull/6404#discussion_r205314047 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala ## @@ -300,6 +300,11 @@ trait ImplicitExpressionOperations { */ def log10() = Log10(expr) + /** +* Calculates the base 2 logarithm of the given values. Review comment: ... of the given **value**. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add LOG2 function for table/sql API > --- > > Key: FLINK-9928 > URL: https://issues.apache.org/jira/browse/FLINK-9928 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > Labels: pull-request-available > > refer to : > https://dev.mysql.com/doc/refman/8.0/en/mathematical-functions.html#function_log2 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9928) Add LOG2 function for table/sql API
[ https://issues.apache.org/jira/browse/FLINK-9928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556833#comment-16556833 ] ASF GitHub Bot commented on FLINK-9928: --- xccui commented on a change in pull request #6404: [FLINK-9928] Add LOG2 function for table/sql API URL: https://github.com/apache/flink/pull/6404#discussion_r205319945 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala ## @@ -108,6 +108,17 @@ object ScalarFunctions { } } + /** +* Returns the logarithm of "x" with base "2". Review comment: ... with base **2**. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add LOG2 function for table/sql API > --- > > Key: FLINK-9928 > URL: https://issues.apache.org/jira/browse/FLINK-9928 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > Labels: pull-request-available > > refer to : > https://dev.mysql.com/doc/refman/8.0/en/mathematical-functions.html#function_log2 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9927) Error in Python Stream API example on website
[ https://issues.apache.org/jira/browse/FLINK-9927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556805#comment-16556805 ] ASF GitHub Bot commented on FLINK-9927: --- TisonKun commented on issue #6424: [FLINK-9927] [Documentation] Change .print() to .output() in Python Streaming example URL: https://github.com/apache/flink/pull/6424#issuecomment-407958532 LGTM cc @zentol This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Error in Python Stream API example on website > - > > Key: FLINK-9927 > URL: https://issues.apache.org/jira/browse/FLINK-9927 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.6.0 >Reporter: Joe Malt >Priority: Minor > Labels: pull-request-available > > The [Python Programming Guide (Streaming) > |https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/python.html#program-skeleton]page > contains a WordCount example with the following main method: > {code:java} > def main(factory): > env = factory.get_execution_environment() > env.create_python_source(Generator(num_iters=1000)) \ > .flat_map(Tokenizer()) \ > .key_by(Selector()) \ > .time_window(milliseconds(50)) \ > .reduce(Sum()) \ > .print(){code} > The print() should, [according to the documentation, be > output()|https://ci.apache.org/projects/flink/flink-docs-release-1.6/api/java/org/apache/flink/streaming/python/api/datastream/PythonDataStream.html#output%E2%80%93]. > Trying to call print() results in an error: > {code:java} > jmalt-machine:bin jmalt$ ./pyflink-stream.sh > /Users/jmalt/flink-python/WordCount.py > Starting execution of program > Failed to run plan: null > Traceback (most recent call last): > File "", line 1, in > File > "/var/folders/t1/gcltcjcn5zdgqfqrc32xk90x85xkg9/T/flink_streaming_plan_9539e241-ba0a-42bf-9d4c-844dda26b998/WordCount.py", > line 43, in main > AttributeError: 'org.apache.flink.streaming.python.api.datastream.P' object > has no attribute 'print'{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] TisonKun commented on issue #6424: [FLINK-9927] [Documentation] Change .print() to .output() in Python Streaming example
TisonKun commented on issue #6424: [FLINK-9927] [Documentation] Change .print() to .output() in Python Streaming example URL: https://github.com/apache/flink/pull/6424#issuecomment-407958532 LGTM cc @zentol This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9931) watermark display bug.
[ https://issues.apache.org/jira/browse/FLINK-9931?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556786#comment-16556786 ] ASF GitHub Bot commented on FLINK-9931: --- luojiangyu commented on issue #6400: [FLINK-9931][ui] watermark display bug. URL: https://github.com/apache/flink/pull/6400#issuecomment-407953722 it also covers the new ones now. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > watermark display bug. > -- > > Key: FLINK-9931 > URL: https://issues.apache.org/jira/browse/FLINK-9931 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Affects Versions: 1.5.1 >Reporter: luojiangyu >Priority: Major > Labels: pull-request-available > > If the parallelism of the operator is bigger, the request url of watermark > may access the limit of the length of http url, it results in watermark > display bug. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] luojiangyu commented on issue #6400: [FLINK-9931][ui] watermark display bug.
luojiangyu commented on issue #6400: [FLINK-9931][ui] watermark display bug. URL: https://github.com/apache/flink/pull/6400#issuecomment-407953722 it also covers the new ones now. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9945) RocksDB state backend Checkpointing Failed
[ https://issues.apache.org/jira/browse/FLINK-9945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556776#comment-16556776 ] xymaqingxiang commented on FLINK-9945: -- Versions: 1.4.2 > RocksDB state backend Checkpointing Failed > -- > > Key: FLINK-9945 > URL: https://issues.apache.org/jira/browse/FLINK-9945 > Project: Flink > Issue Type: Bug >Reporter: xymaqingxiang >Priority: Major > Attachments: image-2018-07-25-16-57-45-617.png > > > Checkpoint failed. > The log is: > !image-2018-07-25-16-57-45-617.png! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9958) Fix potential NPE for delta iteration of DataSet
[ https://issues.apache.org/jira/browse/FLINK-9958?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-9958: -- Labels: pull-request-available (was: ) > Fix potential NPE for delta iteration of DataSet > > > Key: FLINK-9958 > URL: https://issues.apache.org/jira/browse/FLINK-9958 > Project: Flink > Issue Type: Bug > Components: DataSet API >Reporter: Ruidong Li >Assignee: Ruidong Li >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9958) Fix potential NPE for delta iteration of DataSet
[ https://issues.apache.org/jira/browse/FLINK-9958?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556770#comment-16556770 ] ASF GitHub Bot commented on FLINK-9958: --- Xpray opened a new pull request #6426: [FLINK-9958][DataSet] Fix potential NPE for delta iteration of DataSet URL: https://github.com/apache/flink/pull/6426 ## What is the purpose of the change Fix potential NPE for delta iteration of DataSet ## Brief change log - change preVisit logic of JobGenerator ## Verifying this change This change added tests and can be verified as follows: org.apache.flink.api.scala.operators.translation.DeltaIterationTranslationTest#testIterateDelta ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector:no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? no This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Fix potential NPE for delta iteration of DataSet > > > Key: FLINK-9958 > URL: https://issues.apache.org/jira/browse/FLINK-9958 > Project: Flink > Issue Type: Bug > Components: DataSet API >Reporter: Ruidong Li >Assignee: Ruidong Li >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] Xpray opened a new pull request #6426: [FLINK-9958][DataSet] Fix potential NPE for delta iteration of DataSet
Xpray opened a new pull request #6426: [FLINK-9958][DataSet] Fix potential NPE for delta iteration of DataSet URL: https://github.com/apache/flink/pull/6426 ## What is the purpose of the change Fix potential NPE for delta iteration of DataSet ## Brief change log - change preVisit logic of JobGenerator ## Verifying this change This change added tests and can be verified as follows: org.apache.flink.api.scala.operators.translation.DeltaIterationTranslationTest#testIterateDelta ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector:no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? no This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Comment Edited] (FLINK-7525) Add config option to disable Cancel functionality on UI
[ https://issues.apache.org/jira/browse/FLINK-7525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16441630#comment-16441630 ] Ted Yu edited comment on FLINK-7525 at 7/26/18 1:21 AM: FLINK-4319 has been resolved. was (Author: yuzhih...@gmail.com): Hopefully FLIP-6 would be released soon . > Add config option to disable Cancel functionality on UI > --- > > Key: FLINK-7525 > URL: https://issues.apache.org/jira/browse/FLINK-7525 > Project: Flink > Issue Type: Improvement > Components: Web Client, Webfrontend >Reporter: Ted Yu >Priority: Major > > In this email thread > http://search-hadoop.com/m/Flink/VkLeQlf0QOnc7YA?subj=Security+Control+of+running+Flink+Jobs+on+Flink+UI > , Raja was asking for a way to control how users cancel Job(s). > Robert proposed adding a config option which disables the Cancel > functionality. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-6105) Properly handle InterruptedException in HadoopInputFormatBase
[ https://issues.apache.org/jira/browse/FLINK-6105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307281#comment-16307281 ] Ted Yu edited comment on FLINK-6105 at 7/26/18 1:20 AM: In flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java : {code} try { Thread.sleep(500); } catch (InterruptedException e1) { // ignore it } {code} Interrupt status should be restored, or throw InterruptedIOException . was (Author: yuzhih...@gmail.com): In flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java : {code} try { Thread.sleep(500); } catch (InterruptedException e1) { // ignore it } {code} Interrupt status should be restored, or throw InterruptedIOException . > Properly handle InterruptedException in HadoopInputFormatBase > - > > Key: FLINK-6105 > URL: https://issues.apache.org/jira/browse/FLINK-6105 > Project: Flink > Issue Type: Bug > Components: DataStream API >Reporter: Ted Yu >Assignee: zhangminglei >Priority: Major > > When catching InterruptedException, we should throw InterruptedIOException > instead of IOException. > The following example is from HadoopInputFormatBase : > {code} > try { > splits = this.mapreduceInputFormat.getSplits(jobContext); > } catch (InterruptedException e) { > throw new IOException("Could not get Splits.", e); > } > {code} > There may be other places where IOE is thrown. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9664) FlinkML Quickstart Loading Data section example doesn't work as described
[ https://issues.apache.org/jira/browse/FLINK-9664?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556434#comment-16556434 ] ASF GitHub Bot commented on FLINK-9664: --- walterddr opened a new pull request #6425: [FLINK-9664][Doc] fixing documentation in ML quick start URL: https://github.com/apache/flink/pull/6425 ## What is the purpose of the change * Fix documentation to explicitly specify that +1 and -1 is required when using SVM library in flink-ml ## Brief change log * Added explicit conversion requirement in document ## Verifying this change * n/a ## Does this pull request potentially affect one of the following parts: * n/a ## Documentation * n/a This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > FlinkML Quickstart Loading Data section example doesn't work as described > - > > Key: FLINK-9664 > URL: https://issues.apache.org/jira/browse/FLINK-9664 > Project: Flink > Issue Type: Bug > Components: Documentation, Machine Learning Library >Affects Versions: 1.5.0 >Reporter: Mano Swerts >Assignee: Rong Rong >Priority: Major > Labels: documentation-update, machine_learning, ml, > pull-request-available > Original Estimate: 1h > Remaining Estimate: 1h > > The ML documentation example isn't complete: > [https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/libs/ml/quickstart.html#loading-data] > The referred section loads data from an astroparticle binary classification > dataset to showcase SVM. The dataset uses 0 and 1 as labels, which doesn't > produce correct results. The SVM predictor expects -1 and 1 labels to > correctly predict the label. The documentation, however, doesn't mention > that. The example therefore doesn't work without a clue why. > The documentation should be updated with an explicit mention to -1 and 1 > labels and a mapping function that shows the conversion of the labels. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9664) FlinkML Quickstart Loading Data section example doesn't work as described
[ https://issues.apache.org/jira/browse/FLINK-9664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-9664: -- Labels: documentation-update machine_learning ml pull-request-available (was: documentation-update machine_learning ml) > FlinkML Quickstart Loading Data section example doesn't work as described > - > > Key: FLINK-9664 > URL: https://issues.apache.org/jira/browse/FLINK-9664 > Project: Flink > Issue Type: Bug > Components: Documentation, Machine Learning Library >Affects Versions: 1.5.0 >Reporter: Mano Swerts >Assignee: Rong Rong >Priority: Major > Labels: documentation-update, machine_learning, ml, > pull-request-available > Original Estimate: 1h > Remaining Estimate: 1h > > The ML documentation example isn't complete: > [https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/libs/ml/quickstart.html#loading-data] > The referred section loads data from an astroparticle binary classification > dataset to showcase SVM. The dataset uses 0 and 1 as labels, which doesn't > produce correct results. The SVM predictor expects -1 and 1 labels to > correctly predict the label. The documentation, however, doesn't mention > that. The example therefore doesn't work without a clue why. > The documentation should be updated with an explicit mention to -1 and 1 > labels and a mapping function that shows the conversion of the labels. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] walterddr opened a new pull request #6425: [FLINK-9664][Doc] fixing documentation in ML quick start
walterddr opened a new pull request #6425: [FLINK-9664][Doc] fixing documentation in ML quick start URL: https://github.com/apache/flink/pull/6425 ## What is the purpose of the change * Fix documentation to explicitly specify that +1 and -1 is required when using SVM library in flink-ml ## Brief change log * Added explicit conversion requirement in document ## Verifying this change * n/a ## Does this pull request potentially affect one of the following parts: * n/a ## Documentation * n/a This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Comment Edited] (FLINK-9693) Possible memory leak in jobmanager retaining archived checkpoints
[ https://issues.apache.org/jira/browse/FLINK-9693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556325#comment-16556325 ] Steven Zhen Wu edited comment on FLINK-9693 at 7/25/18 11:23 PM: - We can actually reproduce the issue by killing jobmanager node for very large jobs, like parallelism over 1,000. This issue starts to appear when replacement jobmanager node came up. was (Author: stevenz3wu): One more observation. we are seeing this issue right after the jobmanager node got killed and replaced. however, it is not reproducible when I trying to kill the jobmanager when job is healthy. so still don't know what exactly conditions triggered the issue > Possible memory leak in jobmanager retaining archived checkpoints > - > > Key: FLINK-9693 > URL: https://issues.apache.org/jira/browse/FLINK-9693 > Project: Flink > Issue Type: Bug > Components: JobManager, State Backends, Checkpointing >Affects Versions: 1.5.0, 1.6.0 > Environment: !image.png!!image (1).png! >Reporter: Steven Zhen Wu >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.4.3, 1.5.1, 1.6.0 > > Attachments: 20180725_jm_mem_leak.png, > 41K_ExecutionVertex_objs_retained_9GB.png, ExecutionVertexZoomIn.png > > > First, some context about the job > * Flink 1.4.1 > * stand-alone deployment mode > * embarrassingly parallel: all operators are chained together > * parallelism is over 1,000 > * stateless except for Kafka source operators. checkpoint size is 8.4 MB. > * set "state.backend.fs.memory-threshold" so that only jobmanager writes to > S3 to checkpoint > * internal checkpoint with 10 checkpoints retained in history > > Summary of the observations > * 41,567 ExecutionVertex objects retained 9+ GB of memory > * Expanded in one ExecutionVertex. it seems to storing the kafka offsets for > source operator -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9693) Possible memory leak in jobmanager retaining archived checkpoints
[ https://issues.apache.org/jira/browse/FLINK-9693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556325#comment-16556325 ] Steven Zhen Wu edited comment on FLINK-9693 at 7/25/18 9:56 PM: One more observation. we are seeing this issue right after the jobmanager node got killed and replaced. however, it is not reproducible when I trying to kill the jobmanager when job is healthy. so still don't know what exactly conditions triggered the issue was (Author: stevenz3wu): One more observation. we are seeing this issue right after the jobmanager node got killed and replaced. however, it is not reproducible when I trying to kill the jobmanager when job is healthy > Possible memory leak in jobmanager retaining archived checkpoints > - > > Key: FLINK-9693 > URL: https://issues.apache.org/jira/browse/FLINK-9693 > Project: Flink > Issue Type: Bug > Components: JobManager, State Backends, Checkpointing >Affects Versions: 1.5.0, 1.6.0 > Environment: !image.png!!image (1).png! >Reporter: Steven Zhen Wu >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.4.3, 1.5.1, 1.6.0 > > Attachments: 20180725_jm_mem_leak.png, > 41K_ExecutionVertex_objs_retained_9GB.png, ExecutionVertexZoomIn.png > > > First, some context about the job > * Flink 1.4.1 > * stand-alone deployment mode > * embarrassingly parallel: all operators are chained together > * parallelism is over 1,000 > * stateless except for Kafka source operators. checkpoint size is 8.4 MB. > * set "state.backend.fs.memory-threshold" so that only jobmanager writes to > S3 to checkpoint > * internal checkpoint with 10 checkpoints retained in history > > Summary of the observations > * 41,567 ExecutionVertex objects retained 9+ GB of memory > * Expanded in one ExecutionVertex. it seems to storing the kafka offsets for > source operator -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9693) Possible memory leak in jobmanager retaining archived checkpoints
[ https://issues.apache.org/jira/browse/FLINK-9693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556325#comment-16556325 ] Steven Zhen Wu commented on FLINK-9693: --- One more observation. we are seeing this issue right after the jobmanager node got killed and replaced. however, it is not reproducible when I trying to kill the jobmanager when job is healthy > Possible memory leak in jobmanager retaining archived checkpoints > - > > Key: FLINK-9693 > URL: https://issues.apache.org/jira/browse/FLINK-9693 > Project: Flink > Issue Type: Bug > Components: JobManager, State Backends, Checkpointing >Affects Versions: 1.5.0, 1.6.0 > Environment: !image.png!!image (1).png! >Reporter: Steven Zhen Wu >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.4.3, 1.5.1, 1.6.0 > > Attachments: 20180725_jm_mem_leak.png, > 41K_ExecutionVertex_objs_retained_9GB.png, ExecutionVertexZoomIn.png > > > First, some context about the job > * Flink 1.4.1 > * stand-alone deployment mode > * embarrassingly parallel: all operators are chained together > * parallelism is over 1,000 > * stateless except for Kafka source operators. checkpoint size is 8.4 MB. > * set "state.backend.fs.memory-threshold" so that only jobmanager writes to > S3 to checkpoint > * internal checkpoint with 10 checkpoints retained in history > > Summary of the observations > * 41,567 ExecutionVertex objects retained 9+ GB of memory > * Expanded in one ExecutionVertex. it seems to storing the kafka offsets for > source operator -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-7753) HandlerUtils should close the channel on error responses
[ https://issues.apache.org/jira/browse/FLINK-7753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-7753: -- Labels: pull-request-available (was: ) > HandlerUtils should close the channel on error responses > > > Key: FLINK-7753 > URL: https://issues.apache.org/jira/browse/FLINK-7753 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management, Mesos >Reporter: Eron Wright >Assignee: Eron Wright >Priority: Minor > Labels: pull-request-available > > Unexpected errors in the server pipeline correctly cause a 500 error > response. I suggest that such responses also close the channel rather than > allowing keep-alive. This would be a better security posture too since we > don't know if the pipeline is corrupt following an unexpected error. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7753) HandlerUtils should close the channel on error responses
[ https://issues.apache.org/jira/browse/FLINK-7753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556300#comment-16556300 ] ASF GitHub Bot commented on FLINK-7753: --- EronWright closed pull request #4765: [FLINK-7753] [flip-6] close REST channel on server error URL: https://github.com/apache/flink/pull/4765 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerUtils.java index 0d7483aad3c..9de150d9dc4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerUtils.java @@ -74,7 +74,7 @@ sendErrorResponse(channelHandlerContext, httpRequest, new ErrorResponseBody("Internal server error. Could not map response to JSON."), HttpResponseStatus.INTERNAL_SERVER_ERROR); return; } - sendResponse(channelHandlerContext, httpRequest, sw.toString(), statusCode); + sendResponse(channelHandlerContext, httpRequest, sw.toString(), statusCode, false); } /** @@ -96,9 +96,9 @@ public static void sendErrorResponse( mapper.writeValue(sw, errorMessage); } catch (IOException e) { // this should never happen - sendResponse(channelHandlerContext, httpRequest, "Internal server error. Could not map error response to JSON.", HttpResponseStatus.INTERNAL_SERVER_ERROR); + sendResponse(channelHandlerContext, httpRequest, "Internal server error. Could not map error response to JSON.", HttpResponseStatus.INTERNAL_SERVER_ERROR, true); } - sendResponse(channelHandlerContext, httpRequest, sw.toString(), statusCode); + sendResponse(channelHandlerContext, httpRequest, sw.toString(), statusCode, isServerError(statusCode)); } /** @@ -108,19 +108,25 @@ public static void sendErrorResponse( * @param httpRequest originating http request * @param message which should be sent * @param statusCode of the message to send +* @param forceClose indicates whether to forcibly close the connection after the response is sent */ public static void sendResponse( @Nonnull ChannelHandlerContext channelHandlerContext, @Nonnull HttpRequest httpRequest, @Nonnull String message, - @Nonnull HttpResponseStatus statusCode) { + @Nonnull HttpResponseStatus statusCode, + boolean forceClose) { HttpResponse response = new DefaultHttpResponse(HTTP_1_1, statusCode); response.headers().set(CONTENT_TYPE, "application/json"); - if (HttpHeaders.isKeepAlive(httpRequest)) { + boolean keepAlive = !forceClose && HttpHeaders.isKeepAlive(httpRequest); + if (keepAlive) { response.headers().set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE); } + else { + response.headers().set(CONNECTION, HttpHeaders.Values.CLOSE); + } byte[] buf = message.getBytes(ConfigConstants.DEFAULT_CHARSET); ByteBuf b = Unpooled.copiedBuffer(buf); @@ -134,8 +140,12 @@ public static void sendResponse( ChannelFuture lastContentFuture = channelHandlerContext.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); // close the connection, if no keep-alive is needed - if (!HttpHeaders.isKeepAlive(httpRequest)) { + if (!keepAlive) { lastContentFuture.addListener(ChannelFutureListener.CLOSE); } } + + private static boolean isServerError(@Nonnull HttpResponseStatus statusCode) { + return statusCode.code() >= 500 && statusCode.code() < 600; + } } This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > HandlerUtils should close the channel on error responses > > > Key: FLINK-7753 >
[jira] [Assigned] (FLINK-7753) HandlerUtils should close the channel on error responses
[ https://issues.apache.org/jira/browse/FLINK-7753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eron Wright reassigned FLINK-7753: --- Assignee: (was: Eron Wright ) > HandlerUtils should close the channel on error responses > > > Key: FLINK-7753 > URL: https://issues.apache.org/jira/browse/FLINK-7753 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management, Mesos >Reporter: Eron Wright >Priority: Minor > Labels: pull-request-available > > Unexpected errors in the server pipeline correctly cause a 500 error > response. I suggest that such responses also close the channel rather than > allowing keep-alive. This would be a better security posture too since we > don't know if the pipeline is corrupt following an unexpected error. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] EronWright closed pull request #4765: [FLINK-7753] [flip-6] close REST channel on server error
EronWright closed pull request #4765: [FLINK-7753] [flip-6] close REST channel on server error URL: https://github.com/apache/flink/pull/4765 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerUtils.java index 0d7483aad3c..9de150d9dc4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerUtils.java @@ -74,7 +74,7 @@ sendErrorResponse(channelHandlerContext, httpRequest, new ErrorResponseBody("Internal server error. Could not map response to JSON."), HttpResponseStatus.INTERNAL_SERVER_ERROR); return; } - sendResponse(channelHandlerContext, httpRequest, sw.toString(), statusCode); + sendResponse(channelHandlerContext, httpRequest, sw.toString(), statusCode, false); } /** @@ -96,9 +96,9 @@ public static void sendErrorResponse( mapper.writeValue(sw, errorMessage); } catch (IOException e) { // this should never happen - sendResponse(channelHandlerContext, httpRequest, "Internal server error. Could not map error response to JSON.", HttpResponseStatus.INTERNAL_SERVER_ERROR); + sendResponse(channelHandlerContext, httpRequest, "Internal server error. Could not map error response to JSON.", HttpResponseStatus.INTERNAL_SERVER_ERROR, true); } - sendResponse(channelHandlerContext, httpRequest, sw.toString(), statusCode); + sendResponse(channelHandlerContext, httpRequest, sw.toString(), statusCode, isServerError(statusCode)); } /** @@ -108,19 +108,25 @@ public static void sendErrorResponse( * @param httpRequest originating http request * @param message which should be sent * @param statusCode of the message to send +* @param forceClose indicates whether to forcibly close the connection after the response is sent */ public static void sendResponse( @Nonnull ChannelHandlerContext channelHandlerContext, @Nonnull HttpRequest httpRequest, @Nonnull String message, - @Nonnull HttpResponseStatus statusCode) { + @Nonnull HttpResponseStatus statusCode, + boolean forceClose) { HttpResponse response = new DefaultHttpResponse(HTTP_1_1, statusCode); response.headers().set(CONTENT_TYPE, "application/json"); - if (HttpHeaders.isKeepAlive(httpRequest)) { + boolean keepAlive = !forceClose && HttpHeaders.isKeepAlive(httpRequest); + if (keepAlive) { response.headers().set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE); } + else { + response.headers().set(CONNECTION, HttpHeaders.Values.CLOSE); + } byte[] buf = message.getBytes(ConfigConstants.DEFAULT_CHARSET); ByteBuf b = Unpooled.copiedBuffer(buf); @@ -134,8 +140,12 @@ public static void sendResponse( ChannelFuture lastContentFuture = channelHandlerContext.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); // close the connection, if no keep-alive is needed - if (!HttpHeaders.isKeepAlive(httpRequest)) { + if (!keepAlive) { lastContentFuture.addListener(ChannelFutureListener.CLOSE); } } + + private static boolean isServerError(@Nonnull HttpResponseStatus statusCode) { + return statusCode.code() >= 500 && statusCode.code() < 600; + } } This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-7738) Create WebSocket handler (server)
[ https://issues.apache.org/jira/browse/FLINK-7738?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eron Wright reassigned FLINK-7738: --- Assignee: (was: Eron Wright ) > Create WebSocket handler (server) > - > > Key: FLINK-7738 > URL: https://issues.apache.org/jira/browse/FLINK-7738 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management, Mesos >Reporter: Eron Wright >Priority: Major > Labels: pull-request-available > > An abstract handler is needed to support websocket communication. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7738) Create WebSocket handler (server)
[ https://issues.apache.org/jira/browse/FLINK-7738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556256#comment-16556256 ] ASF GitHub Bot commented on FLINK-7738: --- EronWright commented on issue #4767: [FLINK-7738] [flip-6] Create WebSocket handler (server, client) URL: https://github.com/apache/flink/pull/4767#issuecomment-407890658 @tillrohrmann closing this due to inactivity. Ping me if you want me to take another crack at it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Create WebSocket handler (server) > - > > Key: FLINK-7738 > URL: https://issues.apache.org/jira/browse/FLINK-7738 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management, Mesos >Reporter: Eron Wright >Assignee: Eron Wright >Priority: Major > Labels: pull-request-available > > An abstract handler is needed to support websocket communication. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-7738) Create WebSocket handler (server)
[ https://issues.apache.org/jira/browse/FLINK-7738?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-7738: -- Labels: pull-request-available (was: ) > Create WebSocket handler (server) > - > > Key: FLINK-7738 > URL: https://issues.apache.org/jira/browse/FLINK-7738 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management, Mesos >Reporter: Eron Wright >Assignee: Eron Wright >Priority: Major > Labels: pull-request-available > > An abstract handler is needed to support websocket communication. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7738) Create WebSocket handler (server)
[ https://issues.apache.org/jira/browse/FLINK-7738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556257#comment-16556257 ] ASF GitHub Bot commented on FLINK-7738: --- EronWright closed pull request #4767: [FLINK-7738] [flip-6] Create WebSocket handler (server, client) URL: https://github.com/apache/flink/pull/4767 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/RedirectHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/RedirectHandlerTest.java index 4808781c7b8..cf465294f5a 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/RedirectHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/RedirectHandlerTest.java @@ -29,17 +29,24 @@ import org.apache.flink.util.TestLogger; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter; +import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.KeepAliveWrite; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Router; +import org.apache.flink.shaded.netty4.io.netty.util.ReferenceCountUtil; import org.junit.Assert; import org.junit.Test; import javax.annotation.Nonnull; +import java.util.HashMap; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -137,6 +144,35 @@ public void testRedirectHandler() throws Exception { } } + /** +* Tests the approach of using the redirect handler as a standalone handler. +*/ + @Test + public void testUserEvent() { + final String correctAddress = "foobar:21345"; + final CompletableFuture localAddressFuture = CompletableFuture.completedFuture(correctAddress); + final Time timeout = Time.seconds(10L); + + final RestfulGateway localGateway = mock(RestfulGateway.class); + when(localGateway.requestRestAddress(any(Time.class))).thenReturn(CompletableFuture.completedFuture(correctAddress)); + final GatewayRetriever gatewayRetriever = mock(GatewayRetriever.class); + when(gatewayRetriever.getNow()).thenReturn(Optional.of(localGateway)); + + final RedirectHandler redirectHandler = new RedirectHandler<>( + localAddressFuture, + gatewayRetriever, + timeout); + final UserEventHandler eventHandler = new UserEventHandler(); + EmbeddedChannel channel = new EmbeddedChannel(redirectHandler, eventHandler); + + // write a (routed) HTTP request, then validate that a user event was propagated + DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/"); + Routed routed = new Routed(null, false, request, "/", new HashMap<>(), new HashMap<>()); + channel.writeInbound(routed); + Assert.assertNotNull(eventHandler.gateway); + Assert.assertNotNull(eventHandler.routed); + } + private static class TestingHandler extends RedirectHandler { protected TestingHandler( @@ -154,4 +190,25 @@ protected void respondAsLeader(ChannelHandlerContext channelHandlerContext, Rout } } + private static class UserEventHandler extends ChannelInboundHandlerAdapter { + + public volatile T gateway; + + public volatile Routed routed; + + @Override + @SuppressWarnings("unchecked") + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt instanceof RedirectHandler.GatewayRetrieved) { + gateway = ((RedirectHandler.GatewayRetrieved) evt).getGateway(); + } +
[GitHub] EronWright closed pull request #4767: [FLINK-7738] [flip-6] Create WebSocket handler (server, client)
EronWright closed pull request #4767: [FLINK-7738] [flip-6] Create WebSocket handler (server, client) URL: https://github.com/apache/flink/pull/4767 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/RedirectHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/RedirectHandlerTest.java index 4808781c7b8..cf465294f5a 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/RedirectHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/RedirectHandlerTest.java @@ -29,17 +29,24 @@ import org.apache.flink.util.TestLogger; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter; +import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.KeepAliveWrite; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Router; +import org.apache.flink.shaded.netty4.io.netty.util.ReferenceCountUtil; import org.junit.Assert; import org.junit.Test; import javax.annotation.Nonnull; +import java.util.HashMap; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -137,6 +144,35 @@ public void testRedirectHandler() throws Exception { } } + /** +* Tests the approach of using the redirect handler as a standalone handler. +*/ + @Test + public void testUserEvent() { + final String correctAddress = "foobar:21345"; + final CompletableFuture localAddressFuture = CompletableFuture.completedFuture(correctAddress); + final Time timeout = Time.seconds(10L); + + final RestfulGateway localGateway = mock(RestfulGateway.class); + when(localGateway.requestRestAddress(any(Time.class))).thenReturn(CompletableFuture.completedFuture(correctAddress)); + final GatewayRetriever gatewayRetriever = mock(GatewayRetriever.class); + when(gatewayRetriever.getNow()).thenReturn(Optional.of(localGateway)); + + final RedirectHandler redirectHandler = new RedirectHandler<>( + localAddressFuture, + gatewayRetriever, + timeout); + final UserEventHandler eventHandler = new UserEventHandler(); + EmbeddedChannel channel = new EmbeddedChannel(redirectHandler, eventHandler); + + // write a (routed) HTTP request, then validate that a user event was propagated + DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/"); + Routed routed = new Routed(null, false, request, "/", new HashMap<>(), new HashMap<>()); + channel.writeInbound(routed); + Assert.assertNotNull(eventHandler.gateway); + Assert.assertNotNull(eventHandler.routed); + } + private static class TestingHandler extends RedirectHandler { protected TestingHandler( @@ -154,4 +190,25 @@ protected void respondAsLeader(ChannelHandlerContext channelHandlerContext, Rout } } + private static class UserEventHandler extends ChannelInboundHandlerAdapter { + + public volatile T gateway; + + public volatile Routed routed; + + @Override + @SuppressWarnings("unchecked") + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt instanceof RedirectHandler.GatewayRetrieved) { + gateway = ((RedirectHandler.GatewayRetrieved) evt).getGateway(); + } + super.userEventTriggered(ctx, evt); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + routed = (Routed) msg; +
[GitHub] EronWright commented on issue #4767: [FLINK-7738] [flip-6] Create WebSocket handler (server, client)
EronWright commented on issue #4767: [FLINK-7738] [flip-6] Create WebSocket handler (server, client) URL: https://github.com/apache/flink/pull/4767#issuecomment-407890658 @tillrohrmann closing this due to inactivity. Ping me if you want me to take another crack at it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Resolved] (FLINK-4849) trustStorePassword should be checked against null in SSLUtils#createSSLClientContext
[ https://issues.apache.org/jira/browse/FLINK-4849?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eron Wright resolved FLINK-4849. - Resolution: Invalid > trustStorePassword should be checked against null in > SSLUtils#createSSLClientContext > > > Key: FLINK-4849 > URL: https://issues.apache.org/jira/browse/FLINK-4849 > Project: Flink > Issue Type: Bug > Components: Security >Reporter: Ted Yu >Priority: Minor > > {code} > String trustStorePassword = sslConfig.getString( > ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD, > null); > ... > try { > trustStoreFile = new FileInputStream(new File(trustStoreFilePath)); > trustStore.load(trustStoreFile, trustStorePassword.toCharArray()); > {code} > If trustStorePassword is null, the load() call would throw NPE. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9700) Document FlinkKafkaProducer behaviour for Kafka versions > 0.11
[ https://issues.apache.org/jira/browse/FLINK-9700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556223#comment-16556223 ] Jonathan Miles commented on FLINK-9700: --- Duplicating the comment I added to FLINK-9690, since this is worth documenting as well... Although the protocol is backwards compatible, there is a performance penalty when using the 0.11 consumer and producer with later Kafka brokers. The protocol was changed at some point and using older connectors requires the brokers to do conversions between the on-disk and wire protocols, so we lose the zero-copy functionality and add extra GC burden among other issues. There's a note about this in the Kafka documentation. Although that's discussing conversions between different version numbers than we are, the effects are the same: https://kafka.apache.org/0102/documentation.html#upgrade_10_performance_impact > Document FlinkKafkaProducer behaviour for Kafka versions > 0.11 > --- > > Key: FLINK-9700 > URL: https://issues.apache.org/jira/browse/FLINK-9700 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.5.0 >Reporter: Ufuk Celebi >Assignee: vinoyang >Priority: Minor > > FlinkKafkaProducer for Kafka 0.11 uses reflection to work around API > limitations of the Kafka client. Using reflection breaks with newer versions > of the Kafka client (due to internal changes of the client). > The documentation does not mention newer Kafka versions. We should add the > following notes: > - Only package Kafka connector with kafka.version property set to 0.11.*.* > - Mention that it is possible to use the 0.11 connector with newer versions > of Kafka as the protocol seems to be backwards compatible (double check that > this is correct) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9690) Restoring state with FlinkKafkaProducer and Kafka 1.1.0 client fails
[ https://issues.apache.org/jira/browse/FLINK-9690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556222#comment-16556222 ] Jonathan Miles commented on FLINK-9690: --- Although the protocol is backwards compatible, there is a performance penalty when using the 0.11 consumer and producer with later Kafka brokers. The protocol was changed at some point and using older connectors requires the brokers to do conversions between the on-disk and wire protocols, so we lose the zero-copy functionality and add extra GC burden among other issues. There's a note about this in the Kafka documentation. Although that's discussing conversions between different version numbers than we are, the effects are the same: https://kafka.apache.org/0102/documentation.html#upgrade_10_performance_impact > Restoring state with FlinkKafkaProducer and Kafka 1.1.0 client fails > > > Key: FLINK-9690 > URL: https://issues.apache.org/jira/browse/FLINK-9690 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.2 >Reporter: Ufuk Celebi >Priority: Major > > Restoring a job from a savepoint that includes {{FlinkKafkaProducer}} > packaged with {{kafka.version}} set to {{1.1.0}} in Flink 1.4.2. > {code} > java.lang.RuntimeException: Incompatible KafkaProducer version > at > org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.getValue(FlinkKafkaProducer.java:301) > at > org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.getValue(FlinkKafkaProducer.java:292) > at > org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.resumeTransaction(FlinkKafkaProducer.java:195) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.recoverAndCommit(FlinkKafkaProducer011.java:723) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.recoverAndCommit(FlinkKafkaProducer011.java:93) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.recoverAndCommitInternal(TwoPhaseCommitSinkFunction.java:370) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:330) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initializeState(FlinkKafkaProducer011.java:856) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NoSuchFieldException: sequenceNumbers > at java.lang.Class.getDeclaredField(Class.java:2070) > at > org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.getValue(FlinkKafkaProducer.java:297) > ... 16 more > {code} > [~pnowojski] Any ideas about this issue? Judging from the stack trace it was > anticipated that reflective access might break with Kafka versions > 0.11.2.0. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-9643) Flink allowing TLS 1.1 in spite of configuring TLS 1.2
[ https://issues.apache.org/jira/browse/FLINK-9643?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Viktor Vlasov closed FLINK-9643. Resolution: Fixed Fix Version/s: 1.4.2 [~till.rohrmann] Excuse me for the late response. Yes, as I figured out, 1.4 and 1.5 are not affected. So if the 1.3.2 version is not under support, then I suppose this issue can be closed. > Flink allowing TLS 1.1 in spite of configuring TLS 1.2 > -- > > Key: FLINK-9643 > URL: https://issues.apache.org/jira/browse/FLINK-9643 > Project: Flink > Issue Type: Bug > Components: Security >Affects Versions: 1.3.2 >Reporter: Vinay >Assignee: Viktor Vlasov >Priority: Major > Fix For: 1.4.2 > > Attachments: result.csv, result2_rpc.csv, result_2.csv, test.png > > > I have deployed Flink 1.3.2 and enabled SSL settings. From the ssl debug > logs it shows that Flink is using TLSv1.2. However based on the security > scans we have observed that it also allows TLSv1.0 and TLSv1.1. > > In order to strictly use TLSv1.2 we have updated the following property of > java.security file: > jdk.tls.disabledAlgorithms=MD5, SSLv3, DSA, RSA keySize < 2048, TLSv1, > TLSv1.1 > But still it allows TLSv1.1 , verified this by hitting the following command > from master node: > openssl s_client -connect taskmanager1: -tls1 > (here listening_address_port is part of > akka.ssl.tcp://flink@taskmanager1:port/user/taskmanager) > Now, when I hit the above command for the data port, it does not allow > TLSv1.1 and only allows TLSv1.2 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Issue Comment Deleted] (FLINK-9927) Error in Python Stream API example on website
[ https://issues.apache.org/jira/browse/FLINK-9927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joe Malt updated FLINK-9927: Comment: was deleted (was: I've made the change, the pull request is here: https://github.com/apache/flink/pull/6424) > Error in Python Stream API example on website > - > > Key: FLINK-9927 > URL: https://issues.apache.org/jira/browse/FLINK-9927 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.6.0 >Reporter: Joe Malt >Priority: Minor > Labels: pull-request-available > > The [Python Programming Guide (Streaming) > |https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/python.html#program-skeleton]page > contains a WordCount example with the following main method: > {code:java} > def main(factory): > env = factory.get_execution_environment() > env.create_python_source(Generator(num_iters=1000)) \ > .flat_map(Tokenizer()) \ > .key_by(Selector()) \ > .time_window(milliseconds(50)) \ > .reduce(Sum()) \ > .print(){code} > The print() should, [according to the documentation, be > output()|https://ci.apache.org/projects/flink/flink-docs-release-1.6/api/java/org/apache/flink/streaming/python/api/datastream/PythonDataStream.html#output%E2%80%93]. > Trying to call print() results in an error: > {code:java} > jmalt-machine:bin jmalt$ ./pyflink-stream.sh > /Users/jmalt/flink-python/WordCount.py > Starting execution of program > Failed to run plan: null > Traceback (most recent call last): > File "", line 1, in > File > "/var/folders/t1/gcltcjcn5zdgqfqrc32xk90x85xkg9/T/flink_streaming_plan_9539e241-ba0a-42bf-9d4c-844dda26b998/WordCount.py", > line 43, in main > AttributeError: 'org.apache.flink.streaming.python.api.datastream.P' object > has no attribute 'print'{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9927) Error in Python Stream API example on website
[ https://issues.apache.org/jira/browse/FLINK-9927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556208#comment-16556208 ] Joe Malt commented on FLINK-9927: - I've made the change, the pull request is here: https://github.com/apache/flink/pull/6424 > Error in Python Stream API example on website > - > > Key: FLINK-9927 > URL: https://issues.apache.org/jira/browse/FLINK-9927 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.6.0 >Reporter: Joe Malt >Priority: Minor > Labels: pull-request-available > > The [Python Programming Guide (Streaming) > |https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/python.html#program-skeleton]page > contains a WordCount example with the following main method: > {code:java} > def main(factory): > env = factory.get_execution_environment() > env.create_python_source(Generator(num_iters=1000)) \ > .flat_map(Tokenizer()) \ > .key_by(Selector()) \ > .time_window(milliseconds(50)) \ > .reduce(Sum()) \ > .print(){code} > The print() should, [according to the documentation, be > output()|https://ci.apache.org/projects/flink/flink-docs-release-1.6/api/java/org/apache/flink/streaming/python/api/datastream/PythonDataStream.html#output%E2%80%93]. > Trying to call print() results in an error: > {code:java} > jmalt-machine:bin jmalt$ ./pyflink-stream.sh > /Users/jmalt/flink-python/WordCount.py > Starting execution of program > Failed to run plan: null > Traceback (most recent call last): > File "", line 1, in > File > "/var/folders/t1/gcltcjcn5zdgqfqrc32xk90x85xkg9/T/flink_streaming_plan_9539e241-ba0a-42bf-9d4c-844dda26b998/WordCount.py", > line 43, in main > AttributeError: 'org.apache.flink.streaming.python.api.datastream.P' object > has no attribute 'print'{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9927) Error in Python Stream API example on website
[ https://issues.apache.org/jira/browse/FLINK-9927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556198#comment-16556198 ] ASF GitHub Bot commented on FLINK-9927: --- JoeMalt opened a new pull request #6424: [FLINK-9927] [Documentation] Change .print() to .output() in Python Streaming example URL: https://github.com/apache/flink/pull/6424 The method .print() doesn't exist, it should be .output() *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.* *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.* ## Contribution Checklist - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue. - Name the pull request in the form "[FLINK-] [component] Title of the pull request", where *FLINK-* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component. Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`. - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review. - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Travis CI to do that following [this guide](http://flink.apache.org/contribute-code.html#best-practices). - Each pull request should address only one issue, not mix up code from multiple issues. - Each commit in the pull request has a meaningful commit message (including the JIRA id) - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below. **(The sections below can be removed for hotfixes of typos)** ## What is the purpose of the change Correct the final line of the pipeline example in the python streaming example. There is no such method as .print(), it should say .output(). This is a documentation change only, no code is touched. ## Brief change log - change one line of the [streaming program example](https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/python.html#streaming-program-example) code. ## Verifying this change This change is a trivial documentation change. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Error in Python Stream API example on website > - > > Key: FLINK-9927 > URL: https://issues.apache.org/jira/browse/FLINK-9927 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.6.0 >Reporter: Joe Malt >Priority: Minor > Labels: pull-request-available > > The [Python Programming Guide (Streaming) > |https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/python.html#program-skeleton]page > contains a WordCount example with the following main method: >
[jira] [Updated] (FLINK-9927) Error in Python Stream API example on website
[ https://issues.apache.org/jira/browse/FLINK-9927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-9927: -- Labels: pull-request-available (was: ) > Error in Python Stream API example on website > - > > Key: FLINK-9927 > URL: https://issues.apache.org/jira/browse/FLINK-9927 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.6.0 >Reporter: Joe Malt >Priority: Minor > Labels: pull-request-available > > The [Python Programming Guide (Streaming) > |https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/python.html#program-skeleton]page > contains a WordCount example with the following main method: > {code:java} > def main(factory): > env = factory.get_execution_environment() > env.create_python_source(Generator(num_iters=1000)) \ > .flat_map(Tokenizer()) \ > .key_by(Selector()) \ > .time_window(milliseconds(50)) \ > .reduce(Sum()) \ > .print(){code} > The print() should, [according to the documentation, be > output()|https://ci.apache.org/projects/flink/flink-docs-release-1.6/api/java/org/apache/flink/streaming/python/api/datastream/PythonDataStream.html#output%E2%80%93]. > Trying to call print() results in an error: > {code:java} > jmalt-machine:bin jmalt$ ./pyflink-stream.sh > /Users/jmalt/flink-python/WordCount.py > Starting execution of program > Failed to run plan: null > Traceback (most recent call last): > File "", line 1, in > File > "/var/folders/t1/gcltcjcn5zdgqfqrc32xk90x85xkg9/T/flink_streaming_plan_9539e241-ba0a-42bf-9d4c-844dda26b998/WordCount.py", > line 43, in main > AttributeError: 'org.apache.flink.streaming.python.api.datastream.P' object > has no attribute 'print'{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] JoeMalt opened a new pull request #6424: [FLINK-9927] [Documentation] Change .print() to .output() in Python Streaming example
JoeMalt opened a new pull request #6424: [FLINK-9927] [Documentation] Change .print() to .output() in Python Streaming example URL: https://github.com/apache/flink/pull/6424 The method .print() doesn't exist, it should be .output() *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.* *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.* ## Contribution Checklist - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue. - Name the pull request in the form "[FLINK-] [component] Title of the pull request", where *FLINK-* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component. Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`. - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review. - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Travis CI to do that following [this guide](http://flink.apache.org/contribute-code.html#best-practices). - Each pull request should address only one issue, not mix up code from multiple issues. - Each commit in the pull request has a meaningful commit message (including the JIRA id) - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below. **(The sections below can be removed for hotfixes of typos)** ## What is the purpose of the change Correct the final line of the pipeline example in the python streaming example. There is no such method as .print(), it should say .output(). This is a documentation change only, no code is touched. ## Brief change log - change one line of the [streaming program example](https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/python.html#streaming-program-example) code. ## Verifying this change This change is a trivial documentation change. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-8291) For security, Job Manager web UI should be accessed with username/password
[ https://issues.apache.org/jira/browse/FLINK-8291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556192#comment-16556192 ] Eron Wright commented on FLINK-8291: - Note that in FLIP-26 we discuss a few options for user authentication in the web UI. [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=80453255] > For security, Job Manager web UI should be accessed with username/password > --- > > Key: FLINK-8291 > URL: https://issues.apache.org/jira/browse/FLINK-8291 > Project: Flink > Issue Type: Improvement > Components: Security, Webfrontend >Affects Versions: 1.3.2 >Reporter: Lynch Lee >Priority: Major > > Nowaldays, we submit job from jobm webui without any key for login. > For security, Job Manager web UI should be accessed with username/password > Should we ??? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9693) Possible memory leak in jobmanager retaining archived checkpoints
[ https://issues.apache.org/jira/browse/FLINK-9693?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Steven Zhen Wu updated FLINK-9693: -- Attachment: 20180725_jm_mem_leak.png > Possible memory leak in jobmanager retaining archived checkpoints > - > > Key: FLINK-9693 > URL: https://issues.apache.org/jira/browse/FLINK-9693 > Project: Flink > Issue Type: Bug > Components: JobManager, State Backends, Checkpointing >Affects Versions: 1.5.0, 1.6.0 > Environment: !image.png!!image (1).png! >Reporter: Steven Zhen Wu >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.4.3, 1.5.1, 1.6.0 > > Attachments: 20180725_jm_mem_leak.png, > 41K_ExecutionVertex_objs_retained_9GB.png, ExecutionVertexZoomIn.png > > > First, some context about the job > * Flink 1.4.1 > * stand-alone deployment mode > * embarrassingly parallel: all operators are chained together > * parallelism is over 1,000 > * stateless except for Kafka source operators. checkpoint size is 8.4 MB. > * set "state.backend.fs.memory-threshold" so that only jobmanager writes to > S3 to checkpoint > * internal checkpoint with 10 checkpoints retained in history > > Summary of the observations > * 41,567 ExecutionVertex objects retained 9+ GB of memory > * Expanded in one ExecutionVertex. it seems to storing the kafka offsets for > source operator -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Reopened] (FLINK-9693) Possible memory leak in jobmanager retaining archived checkpoints
[ https://issues.apache.org/jira/browse/FLINK-9693?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Steven Zhen Wu reopened FLINK-9693: --- Till, I cherry picked your fix for 1.4 branch. we are still seeing the memory leak issue. will attach another screenshot > Possible memory leak in jobmanager retaining archived checkpoints > - > > Key: FLINK-9693 > URL: https://issues.apache.org/jira/browse/FLINK-9693 > Project: Flink > Issue Type: Bug > Components: JobManager, State Backends, Checkpointing >Affects Versions: 1.5.0, 1.6.0 > Environment: !image.png!!image (1).png! >Reporter: Steven Zhen Wu >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.4.3, 1.5.1, 1.6.0 > > Attachments: 41K_ExecutionVertex_objs_retained_9GB.png, > ExecutionVertexZoomIn.png > > > First, some context about the job > * Flink 1.4.1 > * stand-alone deployment mode > * embarrassingly parallel: all operators are chained together > * parallelism is over 1,000 > * stateless except for Kafka source operators. checkpoint size is 8.4 MB. > * set "state.backend.fs.memory-threshold" so that only jobmanager writes to > S3 to checkpoint > * internal checkpoint with 10 checkpoints retained in history > > Summary of the observations > * 41,567 ExecutionVertex objects retained 9+ GB of memory > * Expanded in one ExecutionVertex. it seems to storing the kafka offsets for > source operator -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9960) JobManager can't failover to hdfs with multiple namenodes
[ https://issues.apache.org/jira/browse/FLINK-9960?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556059#comment-16556059 ] Julio Biason commented on FLINK-9960: - Just to add: I tried to configure high-availability.storageDir to have the list of servers in the format "hdfs://namenode1:8020;namenode2:8020/flink/jobmanager", following the example used for journal configuration in Hadoop itself and it didn't work. > JobManager can't failover to hdfs with multiple namenodes > - > > Key: FLINK-9960 > URL: https://issues.apache.org/jira/browse/FLINK-9960 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.4.0 >Reporter: Julio Biason >Priority: Major > > When Hadoop/HDFS is running in HA mode, there may be more than one namenode > running: One active and another standby. > If, for whatever reasons, the server configured in > high-availability.storageDir is currently in standby mode (e.g., Hadoop > failed over the other namenode), the JobManager will refuse to start because > it can't contact hdfs – in this case, it should have a list of servers (up to > 2) and, in case of a failure to connect, go to the next one. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9960) JobManager can't failover to hdfs with multiple namenodes
Julio Biason created FLINK-9960: --- Summary: JobManager can't failover to hdfs with multiple namenodes Key: FLINK-9960 URL: https://issues.apache.org/jira/browse/FLINK-9960 Project: Flink Issue Type: Bug Components: JobManager Affects Versions: 1.4.0 Reporter: Julio Biason When Hadoop/HDFS is running in HA mode, there may be more than one namenode running: One active and another standby. If, for whatever reasons, the server configured in high-availability.storageDir is currently in standby mode (e.g., Hadoop failed over the other namenode), the JobManager will refuse to start because it can't contact hdfs – in this case, it should have a list of servers (up to 2) and, in case of a failure to connect, go to the next one. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9959) JoinFunction should be able to access its Window
[ https://issues.apache.org/jira/browse/FLINK-9959?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li updated FLINK-9959: Summary: JoinFunction should be able to access its Window (was: JoinFunction should be able to access Window) > JoinFunction should be able to access its Window > > > Key: FLINK-9959 > URL: https://issues.apache.org/jira/browse/FLINK-9959 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.5.1, 1.6.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Fix For: 1.7.0 > > > e.g. currently, a windowed join looks like this, and the JoinFunction doesn't > have access to the Window it runs against. > {code:java} > A.join(B) > .where(...) > .equalTo(...) > .window(...) > .apply(new JoinFunction() {}); > {code} > We can give JoinFunction access to its Window as {{JoinFunction OUT, Window>}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9959) JoinFunction should be able to access Window
Bowen Li created FLINK-9959: --- Summary: JoinFunction should be able to access Window Key: FLINK-9959 URL: https://issues.apache.org/jira/browse/FLINK-9959 Project: Flink Issue Type: Improvement Components: DataStream API Affects Versions: 1.5.1, 1.6.0 Reporter: Bowen Li Assignee: Bowen Li Fix For: 1.7.0 e.g. currently, a windowed join looks like this, and the JoinFunction doesn't have access to the Window it runs against. {code:java} A.join(B) .where(...) .equalTo(...) .window(...) .apply(new JoinFunction() {}); {code} We can give JoinFunction access to its Window as {{JoinFunction}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9877) Add separate docs page for different join types in DataStream API
[ https://issues.apache.org/jira/browse/FLINK-9877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556033#comment-16556033 ] ASF GitHub Bot commented on FLINK-9877: --- xccui commented on a change in pull request #6407: [FLINK-9877][docs] Add documentation page for different datastream joins URL: https://github.com/apache/flink/pull/6407#discussion_r205187204 ## File path: docs/dev/stream/operators/joining.md ## @@ -0,0 +1,286 @@ +--- +title: "Joining" +nav-id: streaming_joins +nav-show_overview: true +nav-parent_id: streaming +nav-pos: 10 +--- + + +* toc +{:toc} + +# Window Join +A window join will join the elements of two streams that share a common key and lie in the same window. These windows can be defined by using a [window assigner]({{ site.baseurl}}/dev/stream/operators/windows.html#window-assigners) and are evaluated on a union of both streams. This is especially important for session window joins, which will be demonstrated below. + +The joined elements are then passed to a user-defined `JoinFunction` or `FlatJoinFunction` where the user can perform transformations on the joined elements. + +The general usage always looks like the followning: + +```java +stream.join(otherStream) +.where() +.equalTo() +.window() +.apply() +``` + +Some notes on semantics: +- The creation of pairwise combinations of elements of the two streams behaves like an inner-join, meaning elements from one stream will not be emitted if they don't have a corresponding element from the other stream to be joined with. +- Those elements that do get joined will have as their timestamp the largest timestamp that still lies in the respective window. For example a window with `[5, 10)` as its boundaries would result in the joined elements having nine as their timestamp. + +In the following section we are going to give an overview over how different kinds of windows can be used for a window join and what the results of those joins would look like using examplary scenarios. + +## Tumbling Window +When performing a tumbling window join, all elements with a common key and a common tumbling window are joined as pairwise combinations and passed on to the user-defined function. Because this behaves like an inner join, elements of one stream that do not have elements from another stream in their tumbling window are not emitted! + +### Example + + +In our example we are defining a tumbling window with the size of 2 milliseconds, which results in windows of the form `[0,1], [2,3], ...`. The image shows the pairwise combinations of all elements in each window which will be passed on to the user-defined function. You can also see how in the tumbling window `[6,7]` nothing is emitted because no elements from the green stream exist to be joined with the orange elements ⑥ and ⑦. Review comment: As illustrated in Figure.X, we define a tumbling window with the size of 2 milliseconds, which results in windows of the form `[0,1], [2,3], ...`. The figure shows the pairwise combinations of all elements in each window that will be passed to the join function. Note that in the tumbling window `[6,7]` no result will be passed since there is no element exist in the green stream that can be joined with the orange elements ⑥ and ⑦. Besides, I can understand why you use such a tiny window size, but maybe we could give it a more reasonable value. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add separate docs page for different join types in DataStream API > - > > Key: FLINK-9877 > URL: https://issues.apache.org/jira/browse/FLINK-9877 > Project: Flink > Issue Type: Sub-task > Components: Documentation >Reporter: Florian Schmidt >Assignee: Florian Schmidt >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > https://github.com/apache/flink/pull/6407 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9877) Add separate docs page for different join types in DataStream API
[ https://issues.apache.org/jira/browse/FLINK-9877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556023#comment-16556023 ] ASF GitHub Bot commented on FLINK-9877: --- xccui commented on a change in pull request #6407: [FLINK-9877][docs] Add documentation page for different datastream joins URL: https://github.com/apache/flink/pull/6407#discussion_r205169631 ## File path: docs/dev/stream/operators/joining.md ## @@ -0,0 +1,286 @@ +--- +title: "Joining" +nav-id: streaming_joins +nav-show_overview: true +nav-parent_id: streaming +nav-pos: 10 +--- + + +* toc +{:toc} + +# Window Join +A window join will join the elements of two streams that share a common key and lie in the same window. These windows can be defined by using a [window assigner]({{ site.baseurl}}/dev/stream/operators/windows.html#window-assigners) and are evaluated on a union of both streams. This is especially important for session window joins, which will be demonstrated below. + +The joined elements are then passed to a user-defined `JoinFunction` or `FlatJoinFunction` where the user can perform transformations on the joined elements. + +The general usage always looks like the followning: + +```java +stream.join(otherStream) +.where() +.equalTo() +.window() +.apply() +``` + +Some notes on semantics: +- The creation of pairwise combinations of elements of the two streams behaves like an inner-join, meaning elements from one stream will not be emitted if they don't have a corresponding element from the other stream to be joined with. +- Those elements that do get joined will have as their timestamp the largest timestamp that still lies in the respective window. For example a window with `[5, 10)` as its boundaries would result in the joined elements having nine as their timestamp. + +In the following section we are going to give an overview over how different kinds of windows can be used for a window join and what the results of those joins would look like using examplary scenarios. + +## Tumbling Window +When performing a tumbling window join, all elements with a common key and a common tumbling window are joined as pairwise combinations and passed on to the user-defined function. Because this behaves like an inner join, elements of one stream that do not have elements from another stream in their tumbling window are not emitted! + +### Example + + +In our example we are defining a tumbling window with the size of 2 milliseconds, which results in windows of the form `[0,1], [2,3], ...`. The image shows the pairwise combinations of all elements in each window which will be passed on to the user-defined function. You can also see how in the tumbling window `[6,7]` nothing is emitted because no elements from the green stream exist to be joined with the orange elements ⑥ and ⑦. + + + + +```java +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; + +... + +DataStream orangeStream = ... +DataStream greenStream = ... + +orangeStream.join(greenStream) +.where() +.equalTo() +.window(TumblingEventTimeWindows.of(Time.seconds(2))) +.apply (new JoinFunction () { +@Override +public String join(Integer first, Integer second) { +return first + "," + second; +} +}); +``` + + + +```scala +import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; + +... + +val orangeStream: DataStream[Integer] = ... +val greenStream: DataStream[Integer] = ... + +orangeStream.join(greenStream) +.where(elem => /* select key */) +.equalTo(elem => /* select key */) +.window(TumblingEventTimeWindows.of(Time.milliseconds(2))) +.apply { (e1, e2) => e1 + "," + e2 } +``` + + + + +## Sliding Window Join +When performing a sliding window join, all elements with a common key and common sliding window are joined are pairwise combinations and passed on to the user-defined function. Elements of one stream that do not have elements from the other stream in the current sliding window are not emitted! Note that some elements might be joined in one sliding window but not in another! + + + +In this example we are using sliding windows with a duration of two milliseconds and slide them by one millisecond, resulting in the sliding windows `[-1, 0],[0,1],[1,2],[2,3], …`. The joined elements below the x-axis are the ones that are passed to the user-defined function for each sliding window. Here you can also see how for example the orange ② is joined with the green ③ in the window `[2,3]`, but is not joined with anything in the window `[1,2]`. + + + + +```java +import org.apache.flink.api.java.functions.KeySelector; +import
[jira] [Commented] (FLINK-9877) Add separate docs page for different join types in DataStream API
[ https://issues.apache.org/jira/browse/FLINK-9877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556034#comment-16556034 ] ASF GitHub Bot commented on FLINK-9877: --- xccui commented on a change in pull request #6407: [FLINK-9877][docs] Add documentation page for different datastream joins URL: https://github.com/apache/flink/pull/6407#discussion_r205180168 ## File path: docs/dev/stream/operators/joining.md ## @@ -0,0 +1,286 @@ +--- +title: "Joining" +nav-id: streaming_joins +nav-show_overview: true +nav-parent_id: streaming +nav-pos: 10 +--- + + +* toc +{:toc} + +# Window Join +A window join will join the elements of two streams that share a common key and lie in the same window. These windows can be defined by using a [window assigner]({{ site.baseurl}}/dev/stream/operators/windows.html#window-assigners) and are evaluated on a union of both streams. This is especially important for session window joins, which will be demonstrated below. + +The joined elements are then passed to a user-defined `JoinFunction` or `FlatJoinFunction` where the user can perform transformations on the joined elements. + +The general usage always looks like the followning: + +```java +stream.join(otherStream) +.where() +.equalTo() +.window() +.apply() +``` + +Some notes on semantics: Review comment: Replace these notes with an explanation of the codes above. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add separate docs page for different join types in DataStream API > - > > Key: FLINK-9877 > URL: https://issues.apache.org/jira/browse/FLINK-9877 > Project: Flink > Issue Type: Sub-task > Components: Documentation >Reporter: Florian Schmidt >Assignee: Florian Schmidt >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > https://github.com/apache/flink/pull/6407 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9877) Add separate docs page for different join types in DataStream API
[ https://issues.apache.org/jira/browse/FLINK-9877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556027#comment-16556027 ] ASF GitHub Bot commented on FLINK-9877: --- xccui commented on a change in pull request #6407: [FLINK-9877][docs] Add documentation page for different datastream joins URL: https://github.com/apache/flink/pull/6407#discussion_r205181959 ## File path: docs/dev/stream/operators/joining.md ## @@ -0,0 +1,286 @@ +--- +title: "Joining" +nav-id: streaming_joins +nav-show_overview: true +nav-parent_id: streaming +nav-pos: 10 +--- + + +* toc +{:toc} + +# Window Join +A window join will join the elements of two streams that share a common key and lie in the same window. These windows can be defined by using a [window assigner]({{ site.baseurl}}/dev/stream/operators/windows.html#window-assigners) and are evaluated on a union of both streams. This is especially important for session window joins, which will be demonstrated below. + +The joined elements are then passed to a user-defined `JoinFunction` or `FlatJoinFunction` where the user can perform transformations on the joined elements. + +The general usage always looks like the followning: + +```java +stream.join(otherStream) +.where() +.equalTo() +.window() +.apply() +``` + +Some notes on semantics: +- The creation of pairwise combinations of elements of the two streams behaves like an inner-join, meaning elements from one stream will not be emitted if they don't have a corresponding element from the other stream to be joined with. +- Those elements that do get joined will have as their timestamp the largest timestamp that still lies in the respective window. For example a window with `[5, 10)` as its boundaries would result in the joined elements having nine as their timestamp. + +In the following section we are going to give an overview over how different kinds of windows can be used for a window join and what the results of those joins would look like using examplary scenarios. Review comment: There's no need to introduce the different kinds of windows since they have already been documented in other pages. Just give the links and show your brilliant examples. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add separate docs page for different join types in DataStream API > - > > Key: FLINK-9877 > URL: https://issues.apache.org/jira/browse/FLINK-9877 > Project: Flink > Issue Type: Sub-task > Components: Documentation >Reporter: Florian Schmidt >Assignee: Florian Schmidt >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > https://github.com/apache/flink/pull/6407 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9877) Add separate docs page for different join types in DataStream API
[ https://issues.apache.org/jira/browse/FLINK-9877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556025#comment-16556025 ] ASF GitHub Bot commented on FLINK-9877: --- xccui commented on a change in pull request #6407: [FLINK-9877][docs] Add documentation page for different datastream joins URL: https://github.com/apache/flink/pull/6407#discussion_r205175476 ## File path: docs/dev/stream/operators/joining.md ## @@ -0,0 +1,286 @@ +--- +title: "Joining" +nav-id: streaming_joins +nav-show_overview: true +nav-parent_id: streaming +nav-pos: 10 +--- + + +* toc +{:toc} + +# Window Join +A window join will join the elements of two streams that share a common key and lie in the same window. These windows can be defined by using a [window assigner]({{ site.baseurl}}/dev/stream/operators/windows.html#window-assigners) and are evaluated on a union of both streams. This is especially important for session window joins, which will be demonstrated below. Review comment: A window join **joins** elements of two streams that share a common key and lie in the same window. These windows can be defined with a window assigner and are evaluated on **elements from both of the streams**. ~~This is especially important for session window joins, which will be demonstrated below.~~ This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add separate docs page for different join types in DataStream API > - > > Key: FLINK-9877 > URL: https://issues.apache.org/jira/browse/FLINK-9877 > Project: Flink > Issue Type: Sub-task > Components: Documentation >Reporter: Florian Schmidt >Assignee: Florian Schmidt >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > https://github.com/apache/flink/pull/6407 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9877) Add separate docs page for different join types in DataStream API
[ https://issues.apache.org/jira/browse/FLINK-9877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556028#comment-16556028 ] ASF GitHub Bot commented on FLINK-9877: --- xccui commented on a change in pull request #6407: [FLINK-9877][docs] Add documentation page for different datastream joins URL: https://github.com/apache/flink/pull/6407#discussion_r205191172 ## File path: docs/dev/stream/operators/joining.md ## @@ -0,0 +1,286 @@ +--- +title: "Joining" +nav-id: streaming_joins +nav-show_overview: true +nav-parent_id: streaming +nav-pos: 10 +--- + + +* toc +{:toc} + +# Window Join +A window join will join the elements of two streams that share a common key and lie in the same window. These windows can be defined by using a [window assigner]({{ site.baseurl}}/dev/stream/operators/windows.html#window-assigners) and are evaluated on a union of both streams. This is especially important for session window joins, which will be demonstrated below. + +The joined elements are then passed to a user-defined `JoinFunction` or `FlatJoinFunction` where the user can perform transformations on the joined elements. + +The general usage always looks like the followning: + +```java +stream.join(otherStream) +.where() +.equalTo() +.window() +.apply() +``` + +Some notes on semantics: +- The creation of pairwise combinations of elements of the two streams behaves like an inner-join, meaning elements from one stream will not be emitted if they don't have a corresponding element from the other stream to be joined with. +- Those elements that do get joined will have as their timestamp the largest timestamp that still lies in the respective window. For example a window with `[5, 10)` as its boundaries would result in the joined elements having nine as their timestamp. + +In the following section we are going to give an overview over how different kinds of windows can be used for a window join and what the results of those joins would look like using examplary scenarios. + +## Tumbling Window +When performing a tumbling window join, all elements with a common key and a common tumbling window are joined as pairwise combinations and passed on to the user-defined function. Because this behaves like an inner join, elements of one stream that do not have elements from another stream in their tumbling window are not emitted! + +### Example + + +In our example we are defining a tumbling window with the size of 2 milliseconds, which results in windows of the form `[0,1], [2,3], ...`. The image shows the pairwise combinations of all elements in each window which will be passed on to the user-defined function. You can also see how in the tumbling window `[6,7]` nothing is emitted because no elements from the green stream exist to be joined with the orange elements ⑥ and ⑦. + + + + +```java +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; + +... + +DataStream orangeStream = ... +DataStream greenStream = ... + +orangeStream.join(greenStream) +.where() +.equalTo() +.window(TumblingEventTimeWindows.of(Time.seconds(2))) +.apply (new JoinFunction () { +@Override +public String join(Integer first, Integer second) { +return first + "," + second; +} +}); +``` + + + +```scala +import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; + +... + +val orangeStream: DataStream[Integer] = ... +val greenStream: DataStream[Integer] = ... + +orangeStream.join(greenStream) +.where(elem => /* select key */) +.equalTo(elem => /* select key */) +.window(TumblingEventTimeWindows.of(Time.milliseconds(2))) +.apply { (e1, e2) => e1 + "," + e2 } +``` + + + + +## Sliding Window Join +When performing a sliding window join, all elements with a common key and common sliding window are joined are pairwise combinations and passed on to the user-defined function. Elements of one stream that do not have elements from the other stream in the current sliding window are not emitted! Note that some elements might be joined in one sliding window but not in another! + + + +In this example we are using sliding windows with a duration of two milliseconds and slide them by one millisecond, resulting in the sliding windows `[-1, 0],[0,1],[1,2],[2,3], …`. The joined elements below the x-axis are the ones that are passed to the user-defined function for each sliding window. Here you can also see how for example the orange ② is joined with the green ③ in the window `[2,3]`, but is not joined with anything in the window `[1,2]`. + + + + +```java +import org.apache.flink.api.java.functions.KeySelector; +import
[jira] [Commented] (FLINK-9877) Add separate docs page for different join types in DataStream API
[ https://issues.apache.org/jira/browse/FLINK-9877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556032#comment-16556032 ] ASF GitHub Bot commented on FLINK-9877: --- xccui commented on a change in pull request #6407: [FLINK-9877][docs] Add documentation page for different datastream joins URL: https://github.com/apache/flink/pull/6407#discussion_r205177501 ## File path: docs/dev/stream/operators/joining.md ## @@ -0,0 +1,286 @@ +--- +title: "Joining" +nav-id: streaming_joins +nav-show_overview: true +nav-parent_id: streaming +nav-pos: 10 +--- + + +* toc +{:toc} + +# Window Join +A window join will join the elements of two streams that share a common key and lie in the same window. These windows can be defined by using a [window assigner]({{ site.baseurl}}/dev/stream/operators/windows.html#window-assigners) and are evaluated on a union of both streams. This is especially important for session window joins, which will be demonstrated below. + +The joined elements are then passed to a user-defined `JoinFunction` or `FlatJoinFunction` where the user can perform transformations on the joined elements. + +The general usage always looks like the followning: Review comment: The general usage can be summarized as follows: This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add separate docs page for different join types in DataStream API > - > > Key: FLINK-9877 > URL: https://issues.apache.org/jira/browse/FLINK-9877 > Project: Flink > Issue Type: Sub-task > Components: Documentation >Reporter: Florian Schmidt >Assignee: Florian Schmidt >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > https://github.com/apache/flink/pull/6407 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9877) Add separate docs page for different join types in DataStream API
[ https://issues.apache.org/jira/browse/FLINK-9877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556024#comment-16556024 ] ASF GitHub Bot commented on FLINK-9877: --- xccui commented on a change in pull request #6407: [FLINK-9877][docs] Add documentation page for different datastream joins URL: https://github.com/apache/flink/pull/6407#discussion_r205169911 ## File path: docs/fig/interval-join.svg ## @@ -0,0 +1,128 @@ + Review comment: Don't forget to add copyright to these files. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add separate docs page for different join types in DataStream API > - > > Key: FLINK-9877 > URL: https://issues.apache.org/jira/browse/FLINK-9877 > Project: Flink > Issue Type: Sub-task > Components: Documentation >Reporter: Florian Schmidt >Assignee: Florian Schmidt >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > https://github.com/apache/flink/pull/6407 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9877) Add separate docs page for different join types in DataStream API
[ https://issues.apache.org/jira/browse/FLINK-9877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556031#comment-16556031 ] ASF GitHub Bot commented on FLINK-9877: --- xccui commented on a change in pull request #6407: [FLINK-9877][docs] Add documentation page for different datastream joins URL: https://github.com/apache/flink/pull/6407#discussion_r205195837 ## File path: docs/dev/stream/operators/joining.md ## @@ -0,0 +1,286 @@ +--- +title: "Joining" +nav-id: streaming_joins +nav-show_overview: true +nav-parent_id: streaming +nav-pos: 10 +--- + + +* toc +{:toc} + +# Window Join +A window join will join the elements of two streams that share a common key and lie in the same window. These windows can be defined by using a [window assigner]({{ site.baseurl}}/dev/stream/operators/windows.html#window-assigners) and are evaluated on a union of both streams. This is especially important for session window joins, which will be demonstrated below. + +The joined elements are then passed to a user-defined `JoinFunction` or `FlatJoinFunction` where the user can perform transformations on the joined elements. + +The general usage always looks like the followning: + +```java +stream.join(otherStream) +.where() +.equalTo() +.window() +.apply() +``` + +Some notes on semantics: +- The creation of pairwise combinations of elements of the two streams behaves like an inner-join, meaning elements from one stream will not be emitted if they don't have a corresponding element from the other stream to be joined with. +- Those elements that do get joined will have as their timestamp the largest timestamp that still lies in the respective window. For example a window with `[5, 10)` as its boundaries would result in the joined elements having nine as their timestamp. + +In the following section we are going to give an overview over how different kinds of windows can be used for a window join and what the results of those joins would look like using examplary scenarios. + +## Tumbling Window +When performing a tumbling window join, all elements with a common key and a common tumbling window are joined as pairwise combinations and passed on to the user-defined function. Because this behaves like an inner join, elements of one stream that do not have elements from another stream in their tumbling window are not emitted! + +### Example + + +In our example we are defining a tumbling window with the size of 2 milliseconds, which results in windows of the form `[0,1], [2,3], ...`. The image shows the pairwise combinations of all elements in each window which will be passed on to the user-defined function. You can also see how in the tumbling window `[6,7]` nothing is emitted because no elements from the green stream exist to be joined with the orange elements ⑥ and ⑦. + + + + +```java +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; + +... + +DataStream orangeStream = ... +DataStream greenStream = ... + +orangeStream.join(greenStream) +.where() +.equalTo() +.window(TumblingEventTimeWindows.of(Time.seconds(2))) +.apply (new JoinFunction () { +@Override +public String join(Integer first, Integer second) { +return first + "," + second; +} +}); +``` + + + +```scala +import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; + +... + +val orangeStream: DataStream[Integer] = ... +val greenStream: DataStream[Integer] = ... + +orangeStream.join(greenStream) +.where(elem => /* select key */) +.equalTo(elem => /* select key */) +.window(TumblingEventTimeWindows.of(Time.milliseconds(2))) +.apply { (e1, e2) => e1 + "," + e2 } +``` + + + + +## Sliding Window Join +When performing a sliding window join, all elements with a common key and common sliding window are joined are pairwise combinations and passed on to the user-defined function. Elements of one stream that do not have elements from the other stream in the current sliding window are not emitted! Note that some elements might be joined in one sliding window but not in another! + + + +In this example we are using sliding windows with a duration of two milliseconds and slide them by one millisecond, resulting in the sliding windows `[-1, 0],[0,1],[1,2],[2,3], …`. The joined elements below the x-axis are the ones that are passed to the user-defined function for each sliding window. Here you can also see how for example the orange ② is joined with the green ③ in the window `[2,3]`, but is not joined with anything in the window `[1,2]`. + + + + +```java +import org.apache.flink.api.java.functions.KeySelector; +import
[jira] [Commented] (FLINK-9877) Add separate docs page for different join types in DataStream API
[ https://issues.apache.org/jira/browse/FLINK-9877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556026#comment-16556026 ] ASF GitHub Bot commented on FLINK-9877: --- xccui commented on a change in pull request #6407: [FLINK-9877][docs] Add documentation page for different datastream joins URL: https://github.com/apache/flink/pull/6407#discussion_r205190250 ## File path: docs/dev/stream/operators/joining.md ## @@ -0,0 +1,286 @@ +--- +title: "Joining" +nav-id: streaming_joins +nav-show_overview: true +nav-parent_id: streaming +nav-pos: 10 +--- + + +* toc +{:toc} + +# Window Join +A window join will join the elements of two streams that share a common key and lie in the same window. These windows can be defined by using a [window assigner]({{ site.baseurl}}/dev/stream/operators/windows.html#window-assigners) and are evaluated on a union of both streams. This is especially important for session window joins, which will be demonstrated below. + +The joined elements are then passed to a user-defined `JoinFunction` or `FlatJoinFunction` where the user can perform transformations on the joined elements. + +The general usage always looks like the followning: + +```java +stream.join(otherStream) +.where() +.equalTo() +.window() +.apply() +``` + +Some notes on semantics: +- The creation of pairwise combinations of elements of the two streams behaves like an inner-join, meaning elements from one stream will not be emitted if they don't have a corresponding element from the other stream to be joined with. +- Those elements that do get joined will have as their timestamp the largest timestamp that still lies in the respective window. For example a window with `[5, 10)` as its boundaries would result in the joined elements having nine as their timestamp. + +In the following section we are going to give an overview over how different kinds of windows can be used for a window join and what the results of those joins would look like using examplary scenarios. + +## Tumbling Window +When performing a tumbling window join, all elements with a common key and a common tumbling window are joined as pairwise combinations and passed on to the user-defined function. Because this behaves like an inner join, elements of one stream that do not have elements from another stream in their tumbling window are not emitted! + +### Example + + +In our example we are defining a tumbling window with the size of 2 milliseconds, which results in windows of the form `[0,1], [2,3], ...`. The image shows the pairwise combinations of all elements in each window which will be passed on to the user-defined function. You can also see how in the tumbling window `[6,7]` nothing is emitted because no elements from the green stream exist to be joined with the orange elements ⑥ and ⑦. + + + + +```java +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; + +... + +DataStream orangeStream = ... +DataStream greenStream = ... + +orangeStream.join(greenStream) +.where() +.equalTo() +.window(TumblingEventTimeWindows.of(Time.seconds(2))) +.apply (new JoinFunction () { +@Override +public String join(Integer first, Integer second) { +return first + "," + second; +} +}); +``` + + + +```scala +import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; + +... + +val orangeStream: DataStream[Integer] = ... +val greenStream: DataStream[Integer] = ... + +orangeStream.join(greenStream) +.where(elem => /* select key */) +.equalTo(elem => /* select key */) +.window(TumblingEventTimeWindows.of(Time.milliseconds(2))) +.apply { (e1, e2) => e1 + "," + e2 } +``` + + + + +## Sliding Window Join +When performing a sliding window join, all elements with a common key and common sliding window are joined are pairwise combinations and passed on to the user-defined function. Elements of one stream that do not have elements from the other stream in the current sliding window are not emitted! Note that some elements might be joined in one sliding window but not in another! + + + +In this example we are using sliding windows with a duration of two milliseconds and slide them by one millisecond, resulting in the sliding windows `[-1, 0],[0,1],[1,2],[2,3], …`. The joined elements below the x-axis are the ones that are passed to the user-defined function for each sliding window. Here you can also see how for example the orange ② is joined with the green ③ in the window `[2,3]`, but is not joined with anything in the window `[1,2]`. Review comment: 1. Replace the word "duration" with "size" 2. Use `JoinFunction` instead of user-defined
[jira] [Commented] (FLINK-9877) Add separate docs page for different join types in DataStream API
[ https://issues.apache.org/jira/browse/FLINK-9877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556029#comment-16556029 ] ASF GitHub Bot commented on FLINK-9877: --- xccui commented on a change in pull request #6407: [FLINK-9877][docs] Add documentation page for different datastream joins URL: https://github.com/apache/flink/pull/6407#discussion_r205176853 ## File path: docs/dev/stream/operators/joining.md ## @@ -0,0 +1,286 @@ +--- +title: "Joining" +nav-id: streaming_joins +nav-show_overview: true +nav-parent_id: streaming +nav-pos: 10 +--- + + +* toc +{:toc} + +# Window Join +A window join will join the elements of two streams that share a common key and lie in the same window. These windows can be defined by using a [window assigner]({{ site.baseurl}}/dev/stream/operators/windows.html#window-assigners) and are evaluated on a union of both streams. This is especially important for session window joins, which will be demonstrated below. + +The joined elements are then passed to a user-defined `JoinFunction` or `FlatJoinFunction` where the user can perform transformations on the joined elements. Review comment: The elements from both sides are then passed to a user-defined `JoinFunction` or `FlatJoinFunction` where users can **emit results that meet the join criteria**. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add separate docs page for different join types in DataStream API > - > > Key: FLINK-9877 > URL: https://issues.apache.org/jira/browse/FLINK-9877 > Project: Flink > Issue Type: Sub-task > Components: Documentation >Reporter: Florian Schmidt >Assignee: Florian Schmidt >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > https://github.com/apache/flink/pull/6407 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9877) Add separate docs page for different join types in DataStream API
[ https://issues.apache.org/jira/browse/FLINK-9877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556030#comment-16556030 ] ASF GitHub Bot commented on FLINK-9877: --- xccui commented on a change in pull request #6407: [FLINK-9877][docs] Add documentation page for different datastream joins URL: https://github.com/apache/flink/pull/6407#discussion_r205194408 ## File path: docs/dev/stream/operators/joining.md ## @@ -0,0 +1,286 @@ +--- +title: "Joining" +nav-id: streaming_joins +nav-show_overview: true +nav-parent_id: streaming +nav-pos: 10 +--- + + +* toc +{:toc} + +# Window Join +A window join will join the elements of two streams that share a common key and lie in the same window. These windows can be defined by using a [window assigner]({{ site.baseurl}}/dev/stream/operators/windows.html#window-assigners) and are evaluated on a union of both streams. This is especially important for session window joins, which will be demonstrated below. + +The joined elements are then passed to a user-defined `JoinFunction` or `FlatJoinFunction` where the user can perform transformations on the joined elements. + +The general usage always looks like the followning: + +```java +stream.join(otherStream) +.where() +.equalTo() +.window() +.apply() +``` + +Some notes on semantics: +- The creation of pairwise combinations of elements of the two streams behaves like an inner-join, meaning elements from one stream will not be emitted if they don't have a corresponding element from the other stream to be joined with. +- Those elements that do get joined will have as their timestamp the largest timestamp that still lies in the respective window. For example a window with `[5, 10)` as its boundaries would result in the joined elements having nine as their timestamp. + +In the following section we are going to give an overview over how different kinds of windows can be used for a window join and what the results of those joins would look like using examplary scenarios. + +## Tumbling Window +When performing a tumbling window join, all elements with a common key and a common tumbling window are joined as pairwise combinations and passed on to the user-defined function. Because this behaves like an inner join, elements of one stream that do not have elements from another stream in their tumbling window are not emitted! + +### Example + + +In our example we are defining a tumbling window with the size of 2 milliseconds, which results in windows of the form `[0,1], [2,3], ...`. The image shows the pairwise combinations of all elements in each window which will be passed on to the user-defined function. You can also see how in the tumbling window `[6,7]` nothing is emitted because no elements from the green stream exist to be joined with the orange elements ⑥ and ⑦. + + + + +```java +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; + +... + +DataStream orangeStream = ... +DataStream greenStream = ... + +orangeStream.join(greenStream) +.where() +.equalTo() +.window(TumblingEventTimeWindows.of(Time.seconds(2))) +.apply (new JoinFunction () { +@Override +public String join(Integer first, Integer second) { +return first + "," + second; +} +}); +``` + + + +```scala +import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; + +... + +val orangeStream: DataStream[Integer] = ... +val greenStream: DataStream[Integer] = ... + +orangeStream.join(greenStream) +.where(elem => /* select key */) +.equalTo(elem => /* select key */) +.window(TumblingEventTimeWindows.of(Time.milliseconds(2))) +.apply { (e1, e2) => e1 + "," + e2 } +``` + + + + +## Sliding Window Join +When performing a sliding window join, all elements with a common key and common sliding window are joined are pairwise combinations and passed on to the user-defined function. Elements of one stream that do not have elements from the other stream in the current sliding window are not emitted! Note that some elements might be joined in one sliding window but not in another! + + + +In this example we are using sliding windows with a duration of two milliseconds and slide them by one millisecond, resulting in the sliding windows `[-1, 0],[0,1],[1,2],[2,3], …`. The joined elements below the x-axis are the ones that are passed to the user-defined function for each sliding window. Here you can also see how for example the orange ② is joined with the green ③ in the window `[2,3]`, but is not joined with anything in the window `[1,2]`. + + + + +```java +import org.apache.flink.api.java.functions.KeySelector; +import
[GitHub] xccui commented on a change in pull request #6407: [FLINK-9877][docs] Add documentation page for different datastream joins
xccui commented on a change in pull request #6407: [FLINK-9877][docs] Add documentation page for different datastream joins URL: https://github.com/apache/flink/pull/6407#discussion_r205177501 ## File path: docs/dev/stream/operators/joining.md ## @@ -0,0 +1,286 @@ +--- +title: "Joining" +nav-id: streaming_joins +nav-show_overview: true +nav-parent_id: streaming +nav-pos: 10 +--- + + +* toc +{:toc} + +# Window Join +A window join will join the elements of two streams that share a common key and lie in the same window. These windows can be defined by using a [window assigner]({{ site.baseurl}}/dev/stream/operators/windows.html#window-assigners) and are evaluated on a union of both streams. This is especially important for session window joins, which will be demonstrated below. + +The joined elements are then passed to a user-defined `JoinFunction` or `FlatJoinFunction` where the user can perform transformations on the joined elements. + +The general usage always looks like the followning: Review comment: The general usage can be summarized as follows: This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] xccui commented on a change in pull request #6407: [FLINK-9877][docs] Add documentation page for different datastream joins
xccui commented on a change in pull request #6407: [FLINK-9877][docs] Add documentation page for different datastream joins URL: https://github.com/apache/flink/pull/6407#discussion_r205187204 ## File path: docs/dev/stream/operators/joining.md ## @@ -0,0 +1,286 @@ +--- +title: "Joining" +nav-id: streaming_joins +nav-show_overview: true +nav-parent_id: streaming +nav-pos: 10 +--- + + +* toc +{:toc} + +# Window Join +A window join will join the elements of two streams that share a common key and lie in the same window. These windows can be defined by using a [window assigner]({{ site.baseurl}}/dev/stream/operators/windows.html#window-assigners) and are evaluated on a union of both streams. This is especially important for session window joins, which will be demonstrated below. + +The joined elements are then passed to a user-defined `JoinFunction` or `FlatJoinFunction` where the user can perform transformations on the joined elements. + +The general usage always looks like the followning: + +```java +stream.join(otherStream) +.where() +.equalTo() +.window() +.apply() +``` + +Some notes on semantics: +- The creation of pairwise combinations of elements of the two streams behaves like an inner-join, meaning elements from one stream will not be emitted if they don't have a corresponding element from the other stream to be joined with. +- Those elements that do get joined will have as their timestamp the largest timestamp that still lies in the respective window. For example a window with `[5, 10)` as its boundaries would result in the joined elements having nine as their timestamp. + +In the following section we are going to give an overview over how different kinds of windows can be used for a window join and what the results of those joins would look like using examplary scenarios. + +## Tumbling Window +When performing a tumbling window join, all elements with a common key and a common tumbling window are joined as pairwise combinations and passed on to the user-defined function. Because this behaves like an inner join, elements of one stream that do not have elements from another stream in their tumbling window are not emitted! + +### Example + + +In our example we are defining a tumbling window with the size of 2 milliseconds, which results in windows of the form `[0,1], [2,3], ...`. The image shows the pairwise combinations of all elements in each window which will be passed on to the user-defined function. You can also see how in the tumbling window `[6,7]` nothing is emitted because no elements from the green stream exist to be joined with the orange elements ⑥ and ⑦. Review comment: As illustrated in Figure.X, we define a tumbling window with the size of 2 milliseconds, which results in windows of the form `[0,1], [2,3], ...`. The figure shows the pairwise combinations of all elements in each window that will be passed to the join function. Note that in the tumbling window `[6,7]` no result will be passed since there is no element exist in the green stream that can be joined with the orange elements ⑥ and ⑦. Besides, I can understand why you use such a tiny window size, but maybe we could give it a more reasonable value. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] xccui commented on a change in pull request #6407: [FLINK-9877][docs] Add documentation page for different datastream joins
xccui commented on a change in pull request #6407: [FLINK-9877][docs] Add documentation page for different datastream joins URL: https://github.com/apache/flink/pull/6407#discussion_r205169631 ## File path: docs/dev/stream/operators/joining.md ## @@ -0,0 +1,286 @@ +--- +title: "Joining" +nav-id: streaming_joins +nav-show_overview: true +nav-parent_id: streaming +nav-pos: 10 +--- + + +* toc +{:toc} + +# Window Join +A window join will join the elements of two streams that share a common key and lie in the same window. These windows can be defined by using a [window assigner]({{ site.baseurl}}/dev/stream/operators/windows.html#window-assigners) and are evaluated on a union of both streams. This is especially important for session window joins, which will be demonstrated below. + +The joined elements are then passed to a user-defined `JoinFunction` or `FlatJoinFunction` where the user can perform transformations on the joined elements. + +The general usage always looks like the followning: + +```java +stream.join(otherStream) +.where() +.equalTo() +.window() +.apply() +``` + +Some notes on semantics: +- The creation of pairwise combinations of elements of the two streams behaves like an inner-join, meaning elements from one stream will not be emitted if they don't have a corresponding element from the other stream to be joined with. +- Those elements that do get joined will have as their timestamp the largest timestamp that still lies in the respective window. For example a window with `[5, 10)` as its boundaries would result in the joined elements having nine as their timestamp. + +In the following section we are going to give an overview over how different kinds of windows can be used for a window join and what the results of those joins would look like using examplary scenarios. + +## Tumbling Window +When performing a tumbling window join, all elements with a common key and a common tumbling window are joined as pairwise combinations and passed on to the user-defined function. Because this behaves like an inner join, elements of one stream that do not have elements from another stream in their tumbling window are not emitted! + +### Example + + +In our example we are defining a tumbling window with the size of 2 milliseconds, which results in windows of the form `[0,1], [2,3], ...`. The image shows the pairwise combinations of all elements in each window which will be passed on to the user-defined function. You can also see how in the tumbling window `[6,7]` nothing is emitted because no elements from the green stream exist to be joined with the orange elements ⑥ and ⑦. + + + + +```java +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; + +... + +DataStream orangeStream = ... +DataStream greenStream = ... + +orangeStream.join(greenStream) +.where() +.equalTo() +.window(TumblingEventTimeWindows.of(Time.seconds(2))) +.apply (new JoinFunction () { +@Override +public String join(Integer first, Integer second) { +return first + "," + second; +} +}); +``` + + + +```scala +import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; + +... + +val orangeStream: DataStream[Integer] = ... +val greenStream: DataStream[Integer] = ... + +orangeStream.join(greenStream) +.where(elem => /* select key */) +.equalTo(elem => /* select key */) +.window(TumblingEventTimeWindows.of(Time.milliseconds(2))) +.apply { (e1, e2) => e1 + "," + e2 } +``` + + + + +## Sliding Window Join +When performing a sliding window join, all elements with a common key and common sliding window are joined are pairwise combinations and passed on to the user-defined function. Elements of one stream that do not have elements from the other stream in the current sliding window are not emitted! Note that some elements might be joined in one sliding window but not in another! + + + +In this example we are using sliding windows with a duration of two milliseconds and slide them by one millisecond, resulting in the sliding windows `[-1, 0],[0,1],[1,2],[2,3], …`. The joined elements below the x-axis are the ones that are passed to the user-defined function for each sliding window. Here you can also see how for example the orange ② is joined with the green ③ in the window `[2,3]`, but is not joined with anything in the window `[1,2]`. + + + + +```java +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; + +... + +DataStream orangeStream = ... +DataStream greenStream = ... + +orangeStream.join(greenStream) +.where() +.equalTo() +
[GitHub] xccui commented on a change in pull request #6407: [FLINK-9877][docs] Add documentation page for different datastream joins
xccui commented on a change in pull request #6407: [FLINK-9877][docs] Add documentation page for different datastream joins URL: https://github.com/apache/flink/pull/6407#discussion_r205176853 ## File path: docs/dev/stream/operators/joining.md ## @@ -0,0 +1,286 @@ +--- +title: "Joining" +nav-id: streaming_joins +nav-show_overview: true +nav-parent_id: streaming +nav-pos: 10 +--- + + +* toc +{:toc} + +# Window Join +A window join will join the elements of two streams that share a common key and lie in the same window. These windows can be defined by using a [window assigner]({{ site.baseurl}}/dev/stream/operators/windows.html#window-assigners) and are evaluated on a union of both streams. This is especially important for session window joins, which will be demonstrated below. + +The joined elements are then passed to a user-defined `JoinFunction` or `FlatJoinFunction` where the user can perform transformations on the joined elements. Review comment: The elements from both sides are then passed to a user-defined `JoinFunction` or `FlatJoinFunction` where users can **emit results that meet the join criteria**. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] xccui commented on a change in pull request #6407: [FLINK-9877][docs] Add documentation page for different datastream joins
xccui commented on a change in pull request #6407: [FLINK-9877][docs] Add documentation page for different datastream joins URL: https://github.com/apache/flink/pull/6407#discussion_r205181959 ## File path: docs/dev/stream/operators/joining.md ## @@ -0,0 +1,286 @@ +--- +title: "Joining" +nav-id: streaming_joins +nav-show_overview: true +nav-parent_id: streaming +nav-pos: 10 +--- + + +* toc +{:toc} + +# Window Join +A window join will join the elements of two streams that share a common key and lie in the same window. These windows can be defined by using a [window assigner]({{ site.baseurl}}/dev/stream/operators/windows.html#window-assigners) and are evaluated on a union of both streams. This is especially important for session window joins, which will be demonstrated below. + +The joined elements are then passed to a user-defined `JoinFunction` or `FlatJoinFunction` where the user can perform transformations on the joined elements. + +The general usage always looks like the followning: + +```java +stream.join(otherStream) +.where() +.equalTo() +.window() +.apply() +``` + +Some notes on semantics: +- The creation of pairwise combinations of elements of the two streams behaves like an inner-join, meaning elements from one stream will not be emitted if they don't have a corresponding element from the other stream to be joined with. +- Those elements that do get joined will have as their timestamp the largest timestamp that still lies in the respective window. For example a window with `[5, 10)` as its boundaries would result in the joined elements having nine as their timestamp. + +In the following section we are going to give an overview over how different kinds of windows can be used for a window join and what the results of those joins would look like using examplary scenarios. Review comment: There's no need to introduce the different kinds of windows since they have already been documented in other pages. Just give the links and show your brilliant examples. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] xccui commented on a change in pull request #6407: [FLINK-9877][docs] Add documentation page for different datastream joins
xccui commented on a change in pull request #6407: [FLINK-9877][docs] Add documentation page for different datastream joins URL: https://github.com/apache/flink/pull/6407#discussion_r205190250 ## File path: docs/dev/stream/operators/joining.md ## @@ -0,0 +1,286 @@ +--- +title: "Joining" +nav-id: streaming_joins +nav-show_overview: true +nav-parent_id: streaming +nav-pos: 10 +--- + + +* toc +{:toc} + +# Window Join +A window join will join the elements of two streams that share a common key and lie in the same window. These windows can be defined by using a [window assigner]({{ site.baseurl}}/dev/stream/operators/windows.html#window-assigners) and are evaluated on a union of both streams. This is especially important for session window joins, which will be demonstrated below. + +The joined elements are then passed to a user-defined `JoinFunction` or `FlatJoinFunction` where the user can perform transformations on the joined elements. + +The general usage always looks like the followning: + +```java +stream.join(otherStream) +.where() +.equalTo() +.window() +.apply() +``` + +Some notes on semantics: +- The creation of pairwise combinations of elements of the two streams behaves like an inner-join, meaning elements from one stream will not be emitted if they don't have a corresponding element from the other stream to be joined with. +- Those elements that do get joined will have as their timestamp the largest timestamp that still lies in the respective window. For example a window with `[5, 10)` as its boundaries would result in the joined elements having nine as their timestamp. + +In the following section we are going to give an overview over how different kinds of windows can be used for a window join and what the results of those joins would look like using examplary scenarios. + +## Tumbling Window +When performing a tumbling window join, all elements with a common key and a common tumbling window are joined as pairwise combinations and passed on to the user-defined function. Because this behaves like an inner join, elements of one stream that do not have elements from another stream in their tumbling window are not emitted! + +### Example + + +In our example we are defining a tumbling window with the size of 2 milliseconds, which results in windows of the form `[0,1], [2,3], ...`. The image shows the pairwise combinations of all elements in each window which will be passed on to the user-defined function. You can also see how in the tumbling window `[6,7]` nothing is emitted because no elements from the green stream exist to be joined with the orange elements ⑥ and ⑦. + + + + +```java +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; + +... + +DataStream orangeStream = ... +DataStream greenStream = ... + +orangeStream.join(greenStream) +.where() +.equalTo() +.window(TumblingEventTimeWindows.of(Time.seconds(2))) +.apply (new JoinFunction () { +@Override +public String join(Integer first, Integer second) { +return first + "," + second; +} +}); +``` + + + +```scala +import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; + +... + +val orangeStream: DataStream[Integer] = ... +val greenStream: DataStream[Integer] = ... + +orangeStream.join(greenStream) +.where(elem => /* select key */) +.equalTo(elem => /* select key */) +.window(TumblingEventTimeWindows.of(Time.milliseconds(2))) +.apply { (e1, e2) => e1 + "," + e2 } +``` + + + + +## Sliding Window Join +When performing a sliding window join, all elements with a common key and common sliding window are joined are pairwise combinations and passed on to the user-defined function. Elements of one stream that do not have elements from the other stream in the current sliding window are not emitted! Note that some elements might be joined in one sliding window but not in another! + + + +In this example we are using sliding windows with a duration of two milliseconds and slide them by one millisecond, resulting in the sliding windows `[-1, 0],[0,1],[1,2],[2,3], …`. The joined elements below the x-axis are the ones that are passed to the user-defined function for each sliding window. Here you can also see how for example the orange ② is joined with the green ③ in the window `[2,3]`, but is not joined with anything in the window `[1,2]`. Review comment: 1. Replace the word "duration" with "size" 2. Use `JoinFunction` instead of user-defined function. 3. Extend the window size a little bit. 4. Would be better if you could make the elements from different sides distinguishable. This is an automated message
[GitHub] xccui commented on a change in pull request #6407: [FLINK-9877][docs] Add documentation page for different datastream joins
xccui commented on a change in pull request #6407: [FLINK-9877][docs] Add documentation page for different datastream joins URL: https://github.com/apache/flink/pull/6407#discussion_r205169911 ## File path: docs/fig/interval-join.svg ## @@ -0,0 +1,128 @@ + Review comment: Don't forget to add copyright to these files. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] xccui commented on a change in pull request #6407: [FLINK-9877][docs] Add documentation page for different datastream joins
xccui commented on a change in pull request #6407: [FLINK-9877][docs] Add documentation page for different datastream joins URL: https://github.com/apache/flink/pull/6407#discussion_r205195837 ## File path: docs/dev/stream/operators/joining.md ## @@ -0,0 +1,286 @@ +--- +title: "Joining" +nav-id: streaming_joins +nav-show_overview: true +nav-parent_id: streaming +nav-pos: 10 +--- + + +* toc +{:toc} + +# Window Join +A window join will join the elements of two streams that share a common key and lie in the same window. These windows can be defined by using a [window assigner]({{ site.baseurl}}/dev/stream/operators/windows.html#window-assigners) and are evaluated on a union of both streams. This is especially important for session window joins, which will be demonstrated below. + +The joined elements are then passed to a user-defined `JoinFunction` or `FlatJoinFunction` where the user can perform transformations on the joined elements. + +The general usage always looks like the followning: + +```java +stream.join(otherStream) +.where() +.equalTo() +.window() +.apply() +``` + +Some notes on semantics: +- The creation of pairwise combinations of elements of the two streams behaves like an inner-join, meaning elements from one stream will not be emitted if they don't have a corresponding element from the other stream to be joined with. +- Those elements that do get joined will have as their timestamp the largest timestamp that still lies in the respective window. For example a window with `[5, 10)` as its boundaries would result in the joined elements having nine as their timestamp. + +In the following section we are going to give an overview over how different kinds of windows can be used for a window join and what the results of those joins would look like using examplary scenarios. + +## Tumbling Window +When performing a tumbling window join, all elements with a common key and a common tumbling window are joined as pairwise combinations and passed on to the user-defined function. Because this behaves like an inner join, elements of one stream that do not have elements from another stream in their tumbling window are not emitted! + +### Example + + +In our example we are defining a tumbling window with the size of 2 milliseconds, which results in windows of the form `[0,1], [2,3], ...`. The image shows the pairwise combinations of all elements in each window which will be passed on to the user-defined function. You can also see how in the tumbling window `[6,7]` nothing is emitted because no elements from the green stream exist to be joined with the orange elements ⑥ and ⑦. + + + + +```java +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; + +... + +DataStream orangeStream = ... +DataStream greenStream = ... + +orangeStream.join(greenStream) +.where() +.equalTo() +.window(TumblingEventTimeWindows.of(Time.seconds(2))) +.apply (new JoinFunction () { +@Override +public String join(Integer first, Integer second) { +return first + "," + second; +} +}); +``` + + + +```scala +import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; + +... + +val orangeStream: DataStream[Integer] = ... +val greenStream: DataStream[Integer] = ... + +orangeStream.join(greenStream) +.where(elem => /* select key */) +.equalTo(elem => /* select key */) +.window(TumblingEventTimeWindows.of(Time.milliseconds(2))) +.apply { (e1, e2) => e1 + "," + e2 } +``` + + + + +## Sliding Window Join +When performing a sliding window join, all elements with a common key and common sliding window are joined are pairwise combinations and passed on to the user-defined function. Elements of one stream that do not have elements from the other stream in the current sliding window are not emitted! Note that some elements might be joined in one sliding window but not in another! + + + +In this example we are using sliding windows with a duration of two milliseconds and slide them by one millisecond, resulting in the sliding windows `[-1, 0],[0,1],[1,2],[2,3], …`. The joined elements below the x-axis are the ones that are passed to the user-defined function for each sliding window. Here you can also see how for example the orange ② is joined with the green ③ in the window `[2,3]`, but is not joined with anything in the window `[1,2]`. + + + + +```java +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; + +... + +DataStream orangeStream = ... +DataStream greenStream = ... + +orangeStream.join(greenStream) +.where() +.equalTo() +
[GitHub] xccui commented on a change in pull request #6407: [FLINK-9877][docs] Add documentation page for different datastream joins
xccui commented on a change in pull request #6407: [FLINK-9877][docs] Add documentation page for different datastream joins URL: https://github.com/apache/flink/pull/6407#discussion_r205180168 ## File path: docs/dev/stream/operators/joining.md ## @@ -0,0 +1,286 @@ +--- +title: "Joining" +nav-id: streaming_joins +nav-show_overview: true +nav-parent_id: streaming +nav-pos: 10 +--- + + +* toc +{:toc} + +# Window Join +A window join will join the elements of two streams that share a common key and lie in the same window. These windows can be defined by using a [window assigner]({{ site.baseurl}}/dev/stream/operators/windows.html#window-assigners) and are evaluated on a union of both streams. This is especially important for session window joins, which will be demonstrated below. + +The joined elements are then passed to a user-defined `JoinFunction` or `FlatJoinFunction` where the user can perform transformations on the joined elements. + +The general usage always looks like the followning: + +```java +stream.join(otherStream) +.where() +.equalTo() +.window() +.apply() +``` + +Some notes on semantics: Review comment: Replace these notes with an explanation of the codes above. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] xccui commented on a change in pull request #6407: [FLINK-9877][docs] Add documentation page for different datastream joins
xccui commented on a change in pull request #6407: [FLINK-9877][docs] Add documentation page for different datastream joins URL: https://github.com/apache/flink/pull/6407#discussion_r205175476 ## File path: docs/dev/stream/operators/joining.md ## @@ -0,0 +1,286 @@ +--- +title: "Joining" +nav-id: streaming_joins +nav-show_overview: true +nav-parent_id: streaming +nav-pos: 10 +--- + + +* toc +{:toc} + +# Window Join +A window join will join the elements of two streams that share a common key and lie in the same window. These windows can be defined by using a [window assigner]({{ site.baseurl}}/dev/stream/operators/windows.html#window-assigners) and are evaluated on a union of both streams. This is especially important for session window joins, which will be demonstrated below. Review comment: A window join **joins** elements of two streams that share a common key and lie in the same window. These windows can be defined with a window assigner and are evaluated on **elements from both of the streams**. ~~This is especially important for session window joins, which will be demonstrated below.~~ This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] xccui commented on a change in pull request #6407: [FLINK-9877][docs] Add documentation page for different datastream joins
xccui commented on a change in pull request #6407: [FLINK-9877][docs] Add documentation page for different datastream joins URL: https://github.com/apache/flink/pull/6407#discussion_r205191172 ## File path: docs/dev/stream/operators/joining.md ## @@ -0,0 +1,286 @@ +--- +title: "Joining" +nav-id: streaming_joins +nav-show_overview: true +nav-parent_id: streaming +nav-pos: 10 +--- + + +* toc +{:toc} + +# Window Join +A window join will join the elements of two streams that share a common key and lie in the same window. These windows can be defined by using a [window assigner]({{ site.baseurl}}/dev/stream/operators/windows.html#window-assigners) and are evaluated on a union of both streams. This is especially important for session window joins, which will be demonstrated below. + +The joined elements are then passed to a user-defined `JoinFunction` or `FlatJoinFunction` where the user can perform transformations on the joined elements. + +The general usage always looks like the followning: + +```java +stream.join(otherStream) +.where() +.equalTo() +.window() +.apply() +``` + +Some notes on semantics: +- The creation of pairwise combinations of elements of the two streams behaves like an inner-join, meaning elements from one stream will not be emitted if they don't have a corresponding element from the other stream to be joined with. +- Those elements that do get joined will have as their timestamp the largest timestamp that still lies in the respective window. For example a window with `[5, 10)` as its boundaries would result in the joined elements having nine as their timestamp. + +In the following section we are going to give an overview over how different kinds of windows can be used for a window join and what the results of those joins would look like using examplary scenarios. + +## Tumbling Window +When performing a tumbling window join, all elements with a common key and a common tumbling window are joined as pairwise combinations and passed on to the user-defined function. Because this behaves like an inner join, elements of one stream that do not have elements from another stream in their tumbling window are not emitted! + +### Example + + +In our example we are defining a tumbling window with the size of 2 milliseconds, which results in windows of the form `[0,1], [2,3], ...`. The image shows the pairwise combinations of all elements in each window which will be passed on to the user-defined function. You can also see how in the tumbling window `[6,7]` nothing is emitted because no elements from the green stream exist to be joined with the orange elements ⑥ and ⑦. + + + + +```java +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; + +... + +DataStream orangeStream = ... +DataStream greenStream = ... + +orangeStream.join(greenStream) +.where() +.equalTo() +.window(TumblingEventTimeWindows.of(Time.seconds(2))) +.apply (new JoinFunction () { +@Override +public String join(Integer first, Integer second) { +return first + "," + second; +} +}); +``` + + + +```scala +import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; + +... + +val orangeStream: DataStream[Integer] = ... +val greenStream: DataStream[Integer] = ... + +orangeStream.join(greenStream) +.where(elem => /* select key */) +.equalTo(elem => /* select key */) +.window(TumblingEventTimeWindows.of(Time.milliseconds(2))) +.apply { (e1, e2) => e1 + "," + e2 } +``` + + + + +## Sliding Window Join +When performing a sliding window join, all elements with a common key and common sliding window are joined are pairwise combinations and passed on to the user-defined function. Elements of one stream that do not have elements from the other stream in the current sliding window are not emitted! Note that some elements might be joined in one sliding window but not in another! + + + +In this example we are using sliding windows with a duration of two milliseconds and slide them by one millisecond, resulting in the sliding windows `[-1, 0],[0,1],[1,2],[2,3], …`. The joined elements below the x-axis are the ones that are passed to the user-defined function for each sliding window. Here you can also see how for example the orange ② is joined with the green ③ in the window `[2,3]`, but is not joined with anything in the window `[1,2]`. + + + + +```java +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; + +... + +DataStream orangeStream = ... +DataStream greenStream = ... + +orangeStream.join(greenStream) +.where() +.equalTo() +
[GitHub] xccui commented on a change in pull request #6407: [FLINK-9877][docs] Add documentation page for different datastream joins
xccui commented on a change in pull request #6407: [FLINK-9877][docs] Add documentation page for different datastream joins URL: https://github.com/apache/flink/pull/6407#discussion_r205194408 ## File path: docs/dev/stream/operators/joining.md ## @@ -0,0 +1,286 @@ +--- +title: "Joining" +nav-id: streaming_joins +nav-show_overview: true +nav-parent_id: streaming +nav-pos: 10 +--- + + +* toc +{:toc} + +# Window Join +A window join will join the elements of two streams that share a common key and lie in the same window. These windows can be defined by using a [window assigner]({{ site.baseurl}}/dev/stream/operators/windows.html#window-assigners) and are evaluated on a union of both streams. This is especially important for session window joins, which will be demonstrated below. + +The joined elements are then passed to a user-defined `JoinFunction` or `FlatJoinFunction` where the user can perform transformations on the joined elements. + +The general usage always looks like the followning: + +```java +stream.join(otherStream) +.where() +.equalTo() +.window() +.apply() +``` + +Some notes on semantics: +- The creation of pairwise combinations of elements of the two streams behaves like an inner-join, meaning elements from one stream will not be emitted if they don't have a corresponding element from the other stream to be joined with. +- Those elements that do get joined will have as their timestamp the largest timestamp that still lies in the respective window. For example a window with `[5, 10)` as its boundaries would result in the joined elements having nine as their timestamp. + +In the following section we are going to give an overview over how different kinds of windows can be used for a window join and what the results of those joins would look like using examplary scenarios. + +## Tumbling Window +When performing a tumbling window join, all elements with a common key and a common tumbling window are joined as pairwise combinations and passed on to the user-defined function. Because this behaves like an inner join, elements of one stream that do not have elements from another stream in their tumbling window are not emitted! + +### Example + + +In our example we are defining a tumbling window with the size of 2 milliseconds, which results in windows of the form `[0,1], [2,3], ...`. The image shows the pairwise combinations of all elements in each window which will be passed on to the user-defined function. You can also see how in the tumbling window `[6,7]` nothing is emitted because no elements from the green stream exist to be joined with the orange elements ⑥ and ⑦. + + + + +```java +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; + +... + +DataStream orangeStream = ... +DataStream greenStream = ... + +orangeStream.join(greenStream) +.where() +.equalTo() +.window(TumblingEventTimeWindows.of(Time.seconds(2))) +.apply (new JoinFunction () { +@Override +public String join(Integer first, Integer second) { +return first + "," + second; +} +}); +``` + + + +```scala +import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; + +... + +val orangeStream: DataStream[Integer] = ... +val greenStream: DataStream[Integer] = ... + +orangeStream.join(greenStream) +.where(elem => /* select key */) +.equalTo(elem => /* select key */) +.window(TumblingEventTimeWindows.of(Time.milliseconds(2))) +.apply { (e1, e2) => e1 + "," + e2 } +``` + + + + +## Sliding Window Join +When performing a sliding window join, all elements with a common key and common sliding window are joined are pairwise combinations and passed on to the user-defined function. Elements of one stream that do not have elements from the other stream in the current sliding window are not emitted! Note that some elements might be joined in one sliding window but not in another! + + + +In this example we are using sliding windows with a duration of two milliseconds and slide them by one millisecond, resulting in the sliding windows `[-1, 0],[0,1],[1,2],[2,3], …`. The joined elements below the x-axis are the ones that are passed to the user-defined function for each sliding window. Here you can also see how for example the orange ② is joined with the green ③ in the window `[2,3]`, but is not joined with anything in the window `[1,2]`. + + + + +```java +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; + +... + +DataStream orangeStream = ... +DataStream greenStream = ... + +orangeStream.join(greenStream) +.where() +.equalTo() +
[jira] [Commented] (FLINK-9679) Implement AvroSerializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556014#comment-16556014 ] ASF GitHub Bot commented on FLINK-9679: --- medcv commented on issue #6259: [FLINK-9679] Implement AvroSerializationSchema URL: https://github.com/apache/flink/pull/6259#issuecomment-407833734 @dawidwys I would appreciate if you take a look for the latest changes on this PR. New commit extend `SchemaCoder` with `writeSchema` method that helps to move the writing schema logic away from `AvroSerializationSchema` as you suggested. Still using static `subject` on `ConfluentRegistryAvroSerializationSchema` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement AvroSerializationSchema > - > > Key: FLINK-9679 > URL: https://issues.apache.org/jira/browse/FLINK-9679 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.6.0 >Reporter: Yazdan Shirvany >Assignee: Yazdan Shirvany >Priority: Major > Labels: pull-request-available > > Implement AvroSerializationSchema using Confluent Schema Registry -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] medcv commented on issue #6259: [FLINK-9679] Implement AvroSerializationSchema
medcv commented on issue #6259: [FLINK-9679] Implement AvroSerializationSchema URL: https://github.com/apache/flink/pull/6259#issuecomment-407833734 @dawidwys I would appreciate if you take a look for the latest changes on this PR. New commit extend `SchemaCoder` with `writeSchema` method that helps to move the writing schema logic away from `AvroSerializationSchema` as you suggested. Still using static `subject` on `ConfluentRegistryAvroSerializationSchema` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9944) Cleanup end-to-end test poms
[ https://issues.apache.org/jira/browse/FLINK-9944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16555994#comment-16555994 ] ASF GitHub Bot commented on FLINK-9944: --- zentol commented on a change in pull request #6421: [FLINK-9944][tests] Cleanup end-to-end test poms URL: https://github.com/apache/flink/pull/6421#discussion_r205191554 ## File path: flink-end-to-end-tests/flink-queryable-state-test/pom.xml ## @@ -33,98 +33,54 @@ jar - - org.apache.flink - flink-core - ${project.version} - org.apache.flink flink-streaming-java_${scala.binary.version} ${project.version} + provided org.apache.flink flink-statebackend-rocksdb_${scala.binary.version} ${project.version} + provided org.apache.maven.plugins - maven-jar-plugin - 2.4 - + maven-shade-plugin QsStateProducer package - jar + shade - QsStateProducer - - - - org.apache.flink.streaming.tests.queryablestate.QsStateProducer - - - + QsStateProducer + + + org.apache.flink.streaming.tests.queryablestate.QsStateProducer + + - - - - - org.apache.maven.plugins - maven-shade-plugin - - QsStateClient Review comment: this jar must be a fat jar containing flink-core and possibly other dependencies as it is not run as a flink application. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Cleanup end-to-end test poms > > > Key: FLINK-9944 > URL: https://issues.apache.org/jira/browse/FLINK-9944 > Project: Flink > Issue Type: Improvement > Components: Build System, Tests >Affects Versions: 1.6.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > The poms for the various end-to-end modules require some attention. > We can streamline the shade-plugin configuration by moving common elements to > the parent pom. > Most modules explicitly depend on flink-core; in the spirit of being similar > to quickstart projects (i.e. the template for user-jars) we should remove > this dependency. It is transitively pulled in by the API modules flink-java > and flink-streaming-java. > Other problems: > * several flink dependencies are not set to provided > * the datastream-allround-test has a hard-coded scala version in it's rocksdb > dependency. > * The ttl test depends on rocksdb but doesn't actually use it -- This
[GitHub] zentol commented on a change in pull request #6421: [FLINK-9944][tests] Cleanup end-to-end test poms
zentol commented on a change in pull request #6421: [FLINK-9944][tests] Cleanup end-to-end test poms URL: https://github.com/apache/flink/pull/6421#discussion_r205191554 ## File path: flink-end-to-end-tests/flink-queryable-state-test/pom.xml ## @@ -33,98 +33,54 @@ jar - - org.apache.flink - flink-core - ${project.version} - org.apache.flink flink-streaming-java_${scala.binary.version} ${project.version} + provided org.apache.flink flink-statebackend-rocksdb_${scala.binary.version} ${project.version} + provided org.apache.maven.plugins - maven-jar-plugin - 2.4 - + maven-shade-plugin QsStateProducer package - jar + shade - QsStateProducer - - - - org.apache.flink.streaming.tests.queryablestate.QsStateProducer - - - + QsStateProducer + + + org.apache.flink.streaming.tests.queryablestate.QsStateProducer + + - - - - - org.apache.maven.plugins - maven-shade-plugin - - QsStateClient Review comment: this jar must be a fat jar containing flink-core and possibly other dependencies as it is not run as a flink application. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9944) Cleanup end-to-end test poms
[ https://issues.apache.org/jira/browse/FLINK-9944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16555984#comment-16555984 ] ASF GitHub Bot commented on FLINK-9944: --- dawidwys commented on a change in pull request #6421: [FLINK-9944][tests] Cleanup end-to-end test poms URL: https://github.com/apache/flink/pull/6421#discussion_r205187594 ## File path: flink-end-to-end-tests/flink-bucketing-sink-test/pom.xml ## @@ -72,30 +66,15 @@ org.apache.maven.plugins maven-shade-plugin - 3.0.0 + BucketingSinkTestProgram Review comment: Ok point taken, thanks for explaining it to me. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Cleanup end-to-end test poms > > > Key: FLINK-9944 > URL: https://issues.apache.org/jira/browse/FLINK-9944 > Project: Flink > Issue Type: Improvement > Components: Build System, Tests >Affects Versions: 1.6.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > The poms for the various end-to-end modules require some attention. > We can streamline the shade-plugin configuration by moving common elements to > the parent pom. > Most modules explicitly depend on flink-core; in the spirit of being similar > to quickstart projects (i.e. the template for user-jars) we should remove > this dependency. It is transitively pulled in by the API modules flink-java > and flink-streaming-java. > Other problems: > * several flink dependencies are not set to provided > * the datastream-allround-test has a hard-coded scala version in it's rocksdb > dependency. > * The ttl test depends on rocksdb but doesn't actually use it -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9953) Active Kubernetes integration
[ https://issues.apache.org/jira/browse/FLINK-9953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16555933#comment-16555933 ] Elias Levy commented on FLINK-9953: --- This is a duplicate of FLINK-9495. > Active Kubernetes integration > - > > Key: FLINK-9953 > URL: https://issues.apache.org/jira/browse/FLINK-9953 > Project: Flink > Issue Type: New Feature > Components: Distributed Coordination, ResourceManager >Reporter: Till Rohrmann >Priority: Major > Fix For: 1.7.0 > > > This is the umbrella issue tracking Flink's active Kubernetes integration. > Active means in this context that the {{ResourceManager}} can talk to > Kubernetes to launch new pods similar to Flink's Yarn and Mesos integration. -- This message was sent by Atlassian JIRA (v7.6.3#76005)