[GitHub] [flink] link3280 commented on issue #8068: [FLINK-12042][StateBackends] Fix RocksDBStateBackend's mistaken usage of default filesystem
link3280 commented on issue #8068: [FLINK-12042][StateBackends] Fix RocksDBStateBackend's mistaken usage of default filesystem URL: https://github.com/apache/flink/pull/8068#issuecomment-485058426 @tillrohrmann @dawidwys PTAL This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-12269) Support Temporal Table Join in blink planner
[ https://issues.apache.org/jira/browse/FLINK-12269?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu reassigned FLINK-12269: --- Assignee: Jark Wu > Support Temporal Table Join in blink planner > > > Key: FLINK-12269 > URL: https://issues.apache.org/jira/browse/FLINK-12269 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Planner >Reporter: Jark Wu >Assignee: Jark Wu >Priority: Major > > Support translate following "FOR SYSTEM_TIME AS OF" query into > {{StreamExecTemporalTableJoin}}. > {code:sql} > SELECT > o.amout, o.currency, r.rate, o.amount * r.rate > FROM > Orders AS o > JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r > ON r.currency = o.currency > {code} > This is an extension to current temporal join (FLINK-9738) using a standard > syntax introduced in Calcite 1.19. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12269) Support Temporal Table Join in blink planner
Jark Wu created FLINK-12269: --- Summary: Support Temporal Table Join in blink planner Key: FLINK-12269 URL: https://issues.apache.org/jira/browse/FLINK-12269 Project: Flink Issue Type: New Feature Components: Table SQL / Planner Reporter: Jark Wu Support translate following "FOR SYSTEM_TIME AS OF" query into {{StreamExecTemporalTableJoin}}. {code:sql} SELECT o.amout, o.currency, r.rate, o.amount * r.rate FROM Orders AS o JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r ON r.currency = o.currency {code} This is an extension to current temporal join (FLINK-9738) using a standard syntax introduced in Calcite 1.19. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12268) Port SharedSlotsTest to new code base
leesf created FLINK-12268: - Summary: Port SharedSlotsTest to new code base Key: FLINK-12268 URL: https://issues.apache.org/jira/browse/FLINK-12268 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Reporter: leesf Assignee: leesf Fix For: 1.9.0 Get rid of Instance. Port SchedulerTestUtils#getRandomInstance to new code. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12267) Port SimpleSlotTest to new code base
leesf created FLINK-12267: - Summary: Port SimpleSlotTest to new code base Key: FLINK-12267 URL: https://issues.apache.org/jira/browse/FLINK-12267 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination, Tests Reporter: leesf Assignee: leesf Fix For: 1.9.0 Mainly get rid of `Instance` -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12264) Port ExecutionGraphTestUtils to new code base
[ https://issues.apache.org/jira/browse/FLINK-12264?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] leesf updated FLINK-12264: -- Component/s: Runtime / Coordination > Port ExecutionGraphTestUtils to new code base > - > > Key: FLINK-12264 > URL: https://issues.apache.org/jira/browse/FLINK-12264 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination, Tests >Reporter: leesf >Assignee: leesf >Priority: Major > Fix For: 1.9.0 > > > Mainly get rid of Instance. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12180) Port ExecutionVertexCancelTest to new codebase
[ https://issues.apache.org/jira/browse/FLINK-12180?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-12180: --- Labels: pull-request-available (was: ) > Port ExecutionVertexCancelTest to new codebase > -- > > Key: FLINK-12180 > URL: https://issues.apache.org/jira/browse/FLINK-12180 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination, Tests >Affects Versions: 1.9.0 >Reporter: TisonKun >Assignee: leesf >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > > Port {{ExecutionVertexCancelTest}} to new codebase. > Mainly get rid of the usage of {{Instance}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] flinkbot commented on issue #8227: [FLINK-12180][Tests] Port ExecutionVertexCancelTest to new codebase
flinkbot commented on issue #8227: [FLINK-12180][Tests] Port ExecutionVertexCancelTest to new codebase URL: https://github.com/apache/flink/pull/8227#issuecomment-485046244 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] leesf opened a new pull request #8227: [FLINK-12180][Tests] Port ExecutionVertexCancelTest to new codebase
leesf opened a new pull request #8227: [FLINK-12180][Tests] Port ExecutionVertexCancelTest to new codebase URL: https://github.com/apache/flink/pull/8227 ## What is the purpose of the change Port ExecutionVertexCancelTest to new codebase ## Brief change log - *Remove the use of Instance.* - *Use TestingLogicalSlot in replace of Instance#allocateSimpleSlot* ## Verifying this change This change is a trivial rework. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-7009) dogstatsd mode in statsd reporter
[ https://issues.apache.org/jira/browse/FLINK-7009?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1684#comment-1684 ] Brandon commented on FLINK-7009: Any chance of this getting merged in? Still would be extremely useful. > dogstatsd mode in statsd reporter > - > > Key: FLINK-7009 > URL: https://issues.apache.org/jira/browse/FLINK-7009 > Project: Flink > Issue Type: Improvement > Components: Runtime / Metrics >Affects Versions: 1.4.0 > Environment: org.apache.flink.metrics.statsd.StatsDReporter >Reporter: David Brinegar >Priority: Major > > The current statsd reporter can only report a subset of Flink metrics owing > to the manner in which Flink variables are handled, mainly around invalid > characters and metrics too long. As an option, it would be quite useful to > have a stricter dogstatsd compliant output. Dogstatsd metrics are tagged, > should be less than 200 characters including tag names and values, be > alphanumeric + underbar, delimited by periods. As a further pragmatic > restriction, negative and other invalid values should be ignored rather than > sent to the backend. These restrictions play well with a broad set of > collectors and time series databases. > This mode would: > * convert output to ascii alphanumeric characters with underbar, delimited by > periods. Runs of invalid characters within a metric segment would be > collapsed to a single underbar. > * report all Flink variables as tags > * compress overly long segments, say over 50 chars, to a symbolic > representation of the metric name, to preserve the unique metric time series > but avoid downstream truncation > * compress 32 character Flink IDs like tm_id, task_id, job_id, > task_attempt_id, to the first 8 characters, again to preserve enough > distinction amongst metrics while trimming up to 96 characters from the metric > * remove object references from names, such as the instance hash id of the > serializer > * drop negative or invalid numeric values such as "n/a", "-1" which is used > for unknowns like JVM.Memory.NonHeap.Max, and "-9223372036854775808" which is > used for unknowns like currentLowWaterMark > With these in place, it becomes quite reasonable to support LatencyGauge > metrics as well. > One idea for symbolic compression is to take the first 10 valid characters > plus a hash of the long name. For example, a value like this operator_name: > {code:java} > TriggerWindow(TumblingProcessingTimeWindows(5000), > ReducingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.PojoSerializer@f3395ffa, > > reduceFunction=org.apache.flink.streaming.examples.socket.SocketWindowWordCount$1@4201c465}, > ProcessingTimeTrigger(), WindowedStream.reduce(WindowedStream.java-301)) > {code} > would first drop the instance references. The stable version would be: > > {code:java} > TriggerWindow(TumblingProcessingTimeWindows(5000), > ReducingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.PojoSerializer, > > reduceFunction=org.apache.flink.streaming.examples.socket.SocketWindowWordCount$1}, > ProcessingTimeTrigger(), WindowedStream.reduce(WindowedStream.java-301)) > {code} > and then the compressed name would be the first ten valid characters plus the > hash of the stable string: > {code} > TriggerWin_d8c007da > {code} > This is just one way of dealing with unruly default names, the main point > would be to preserve the metrics so they are valid, avoid truncation, and can > be aggregated along other dimensions even if this particular dimension is > hard to parse after the compression. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] bowenli86 commented on issue #8200: [FLINK-11614] [chinese-translation,Documentation] Translate the "Conf…
bowenli86 commented on issue #8200: [FLINK-11614] [chinese-translation,Documentation] Translate the "Conf… URL: https://github.com/apache/flink/pull/8200#issuecomment-485025137 This is some really good stuff! Exciting to see some official users' contribution on doc translation. Can you retitle the PR to be complete? BTW, we may also need to wait for https://github.com/apache/flink-web/pull/190 on finalizing the review process. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on issue #7754: [FLINK-11530] [docs] Support multiple languages for the docs framework
bowenli86 commented on issue #7754: [FLINK-11530] [docs] Support multiple languages for the docs framework URL: https://github.com/apache/flink/pull/7754#issuecomment-485024126 @kkrugler FYI This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on issue #8205: [FLINK-12238] [TABLE/SQL] Support database related operations in GenericHiveMetastoreCatalog and setup flink-connector-hive module
bowenli86 commented on issue #8205: [FLINK-12238] [TABLE/SQL] Support database related operations in GenericHiveMetastoreCatalog and setup flink-connector-hive module URL: https://github.com/apache/flink/pull/8205#issuecomment-485022646 > I think we should also add a NOTICE file to `flink-connector-hive` module, because it bundles hive dependency. You can take this as an example https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-elasticsearch2/src/main/resources/META-INF/NOTICE. > > See more: https://cwiki.apache.org/confluence/display/FLINK/Licensing Thanks for reminding me that, @wuchong ! I propose to address it separately as I need a bit time to learn how to do it, and I've created [FLINK-12266](https://issues.apache.org/jira/browse/FLINK-12266) as an immediate followup. What do you think? I also remember you concluded a dev thread discussion a while ago to use flinkbot to remind devs on NOTICE file issues. I didn't see a flinkbot reminder yet. How does that go? BTW, let's all sync offline to get consensus on what component tags we should use for the PRs. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12266) add NOTICE file for dependencies that are newly introduced in flink-connector-hive
[ https://issues.apache.org/jira/browse/FLINK-12266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li updated FLINK-12266: - Description: Add a NOTICE file to `flink-connector-hive` module, because it bundles hive dependency. An example https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-elasticsearch2/src/main/resources/META-INF/NOTICE. See more: https://cwiki.apache.org/confluence/display/FLINK/Licensing > add NOTICE file for dependencies that are newly introduced in > flink-connector-hive > -- > > Key: FLINK-12266 > URL: https://issues.apache.org/jira/browse/FLINK-12266 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Hive >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Fix For: 1.9.0 > > > Add a NOTICE file to `flink-connector-hive` module, because it bundles hive > dependency. An example > https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-elasticsearch2/src/main/resources/META-INF/NOTICE. > See more: https://cwiki.apache.org/confluence/display/FLINK/Licensing -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] bowenli86 commented on a change in pull request #8205: [FLINK-12238] [TABLE/SQL] Support database related operations in GenericHiveMetastoreCatalog and setup flink-connector-hive modu
bowenli86 commented on a change in pull request #8205: [FLINK-12238] [TABLE/SQL] Support database related operations in GenericHiveMetastoreCatalog and setup flink-connector-hive module URL: https://github.com/apache/flink/pull/8205#discussion_r277098145 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogUtil.java ## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.hive; + +import org.apache.flink.table.catalog.CatalogDatabase; + +import org.apache.hadoop.hive.metastore.api.Database; + +import java.util.Map; + + +/** + * Utils to convert meta objects between Flink and Hive for GenericHiveMetastoreCatalog. + */ +public class GenericHiveMetastoreCatalogUtil { + + private GenericHiveMetastoreCatalogUtil() { + } + + // -- Utils -- + + /** +* Creates a Hive database from CatalogDatabase. +*/ + public static Database createHiveDatabase(String dbName, CatalogDatabase db) { + Map props = db.getProperties(); + return new Database( + dbName, + db.getDescription().get(), Review comment: That would be a misuse IMHO. I will add ifPresent() check though This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8205: [FLINK-12238] [TABLE/SQL] Support database related operations in GenericHiveMetastoreCatalog and setup flink-connector-hive modu
bowenli86 commented on a change in pull request #8205: [FLINK-12238] [TABLE/SQL] Support database related operations in GenericHiveMetastoreCatalog and setup flink-connector-hive module URL: https://github.com/apache/flink/pull/8205#discussion_r277096977 ## File path: flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java ## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.hive; + +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.CatalogTestBase; +import org.apache.flink.table.catalog.GenericCatalogDatabase; + +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.util.HashMap; + +/** + * Test for GenericHiveMetastoreCatalog. + */ +public class GenericHiveMetastoreCatalogTest extends CatalogTestBase { + + @BeforeClass + public static void init() throws IOException { + catalog = HiveTestUtils.createGenericHiveMetastoreCatalog(); + catalog.open(); + } + + // = + // GenericHiveMetastoreCatalog doesn't support table operation yet + // Thus, overriding the following tests which involve table operation in CatalogTestBase so they won't run against GenericHiveMetastoreCatalog + // = + + // TODO: re-enable this test once GenericHiveMetastoreCatalog support table operations + @Test Review comment: It may not matter that much as it's empty now and support of table operations is coming very soon. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-12256) Implement KafkaReadableCatalog
[ https://issues.apache.org/jira/browse/FLINK-12256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16822186#comment-16822186 ] Bowen Li commented on FLINK-12256: -- Hi, Thanks for drafting the doc. I think, however, it's too preliminary and needs more detailed information. Please refer to other FLIPs as examples. Just a few examples of what needs to be considered, as comments I've left in the doc: * more background on schema registry * are there counterparts in schema registry corresponding to Flink's meta objects? * how do you handle the interoperabilities between different versioned kafka and schema registry? * how are implemented catalog APIs are translated into schema registry client's REST calls? [~becket_qin] [~walterddr] Would be great to have you guys' opinions from the perspective of heavy Kafka users on the design along the way > Implement KafkaReadableCatalog > -- > > Key: FLINK-12256 > URL: https://issues.apache.org/jira/browse/FLINK-12256 > Project: Flink > Issue Type: New Feature > Components: Connectors / Kafka, Table SQL / Client >Affects Versions: 1.9.0 >Reporter: Artsem Semianenka >Assignee: Artsem Semianenka >Priority: Major > > KafkaReadableCatalog is a special implementation of ReadableCatalog > interface (which introduced in > [FLIP-30|https://cwiki.apache.org/confluence/display/FLINK/FLIP-30%3A+Unified+Catalog+APIs] > ) to retrieve meta information such topic name/schema of the topic from > Apache Kafka and Confluent Schema Registry. > New ReadableCatalog allows a user to run SQL queries like: > {code:java} > Select * form kafka.topic_name > {code} > without the need for manual definition of the table schema. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-12247) fix NPE when writing an archive file to a FileSystem
[ https://issues.apache.org/jira/browse/FLINK-12247?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16821807#comment-16821807 ] lamber-ken edited comment on FLINK-12247 at 4/19/19 6:31 PM: - [~till.rohrmann], I updated the issue and upload a patch. was (Author: lamber-ken): [~till.rohrmann], I updated the issue and upload a patch. :) > fix NPE when writing an archive file to a FileSystem > > > Key: FLINK-12247 > URL: https://issues.apache.org/jira/browse/FLINK-12247 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN >Affects Versions: 1.6.3, 1.8.0 >Reporter: lamber-ken >Assignee: lamber-ken >Priority: Major > Fix For: 1.9.0 > > Attachments: fix-nep.patch > > > h3. *Issue detail info* > In our hadoop product env, we use fixed-delay restart-strategy. > {code:java} > restart-strategy: fixed-delay > restart-strategy.fixed-delay.attempts: 20 > restart-strategy.fixed-delay.delay: 2 s > {code} > if a flink-job reaches the max attempt count, the flink job will write an > archive file to +FileSystem+ and shut down. > but when +SubtaskExecutionAttemptDetailsHandler+ handle the detail attempt > info of subtask, met NEP. > h3. *Detailed reasons are as follows:* > 0. Assume a scenario, a flink job {color:#ff}reaches the max attempt > count, ( 20 ){color} > 1. +ExecutionVertex+ is a parallel subtask of the execution. Each > +ExecutionVertex+ was created with {color:#660e7a}MAX_ATTEMPTS_HISTORY_SIZE( > default value: 16 ).{color} > 2. when +SubtaskExecutionAttemptDetailsHandler+ hand will get the attempt > from ++priorExecutions, > but priorExecutions just retained > {color:#660e7a}MAX_ATTEMPTS_HISTORY_SIZE{color} elemets, so some element > was dropped from the head of the list(FIFO). so may return null. > h3. *Detailed StackTrace* > {code:java} > java.lang.NullPointerException >at > org.apache.flink.runtime.rest.handler.util.MutableIOMetrics.addIOMetrics(MutableIOMetrics.java:88) >at > org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler.createDetailsInfo(SubtaskExecutionAttemptDetailsHandler.java:140) >at > org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler.archiveJsonWithPath(SubtaskExecutionAttemptDetailsHandler.java:120) >at > org.apache.flink.runtime.webmonitor.WebMonitorEndpoint.archiveJsonWithPath(WebMonitorEndpoint.java:780) >at > org.apache.flink.runtime.dispatcher.JsonResponseHistoryServerArchivist.archiveExecutionGraph(JsonResponseHistoryServerArchivist.java:57) >at > org.apache.flink.runtime.dispatcher.Dispatcher.archiveExecutionGraph(Dispatcher.java:758) >at > org.apache.flink.runtime.dispatcher.Dispatcher.jobReachedGloballyTerminalState(Dispatcher.java:730) >at > org.apache.flink.runtime.dispatcher.MiniDispatcher.jobReachedGloballyTerminalState(MiniDispatcher.java:138) >at > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$startJobManagerRunner$6(Dispatcher.java:341) >at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) >at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) >at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) >at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332) >at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158) >at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70) >at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142) >at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) >at > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) >at akka.actor.Actor$class.aroundReceive(Actor.scala:502) >at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) >at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) >at akka.actor.ActorCell.invoke(ActorCell.scala:495) >at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) >at akka.dispatch.Mailbox.run(Mailbox.scala:224) >at akka.dispatch.Mailbox.exec(Mailbox.scala:234) >at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > {code} > h3. *Minimal reproducible example* > {code:java} > public static void main(String[] args) throws Exception { >
[jira] [Closed] (FLINK-10974) Add FlatMap to TableAPI
[ https://issues.apache.org/jira/browse/FLINK-10974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng closed FLINK-10974. --- Resolution: Fixed Fix Version/s: 1.9.0 Fixed in master: 58e69a0f6951d99d38ba5462afbb94d5bef478fc > Add FlatMap to TableAPI > --- > > Key: FLINK-10974 > URL: https://issues.apache.org/jira/browse/FLINK-10974 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: sunjincheng >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > > Add FlatMap operator to Table API as described in [Google > doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr]. > The usage: > {code:java} > val res = tab >.flatMap(fun: TableFunction) // output has columns 'a, 'b, 'c >.select('a, 'c) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] asfgit closed pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API
asfgit closed pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API URL: https://github.com/apache/flink/pull/7196 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on issue #7196: [FLINK-10974] [table] Add support for flatMap to table API
sunjincheng121 commented on issue #7196: [FLINK-10974] [table] Add support for flatMap to table API URL: https://github.com/apache/flink/pull/7196#issuecomment-484973155 Merging... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12266) add NOTICE file for dependencies that are newly introduced in flink-connector-hive
[ https://issues.apache.org/jira/browse/FLINK-12266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li updated FLINK-12266: - Component/s: (was: Table SQL / API) Connectors / Hive > add NOTICE file for dependencies that are newly introduced in > flink-connector-hive > -- > > Key: FLINK-12266 > URL: https://issues.apache.org/jira/browse/FLINK-12266 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Hive >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Fix For: 1.9.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] bowenli86 commented on issue #8205: [FLINK-12238] [TABLE/SQL] Support database related operations in GenericHiveMetastoreCatalog and setup flink-connector-hive module
bowenli86 commented on issue #8205: [FLINK-12238] [TABLE/SQL] Support database related operations in GenericHiveMetastoreCatalog and setup flink-connector-hive module URL: https://github.com/apache/flink/pull/8205#issuecomment-484972133 > I wonder whether we should differentiate Flink DB/table from Hive's own DB/table, especially if we allow Flink and Hive users to share an HMS instance? Do you mean why we have, for example, GenericCatalogDatabase v.s. the upcoming HiveCatalogDatabase? There are quite some differences between Flink and Hive w.r.t db and table. E.g. for db, hive always requires a valid, meaningful location uri, but flink doesn't really care about it; for column types of table, right now several types cannot be mapped 1:1 between flink and hive, and we need to wait for the rework of Flink type system finishes. I doubt, at least for now, that we will unify them, but we can re-evaluate later on. I think what we currently should aim for is to share code as much as possible between GenericHiveMetastoreCatalog and the upcoming HiveCatalog when implementing catalog APIs. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] asfgit closed pull request #8087: [FLINK-12029][table] Add column operations for TableApi
asfgit closed pull request #8087: [FLINK-12029][table] Add column operations for TableApi URL: https://github.com/apache/flink/pull/8087 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-12029) Add Column selections
[ https://issues.apache.org/jira/browse/FLINK-12029?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng closed FLINK-12029. --- Resolution: Fixed Fix Version/s: 1.9.0 Fixed in master: e41b6d4784801677c60b75e353fcf7866106c287 > Add Column selections > - > > Key: FLINK-12029 > URL: https://issues.apache.org/jira/browse/FLINK-12029 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Affects Versions: 1.9.0 >Reporter: sunjincheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > In this Jira will add column operators/operations as follows: > Fine-grained column operations > * Column selection > See [google > doc|https://docs.google.com/document/d/1tryl6swt1K1pw7yvv5pdvFXSxfrBZ3_OkOObymis2ck/edit], > And I also have done some > [prototype|https://github.com/sunjincheng121/flink/pull/94/files] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] bowenli86 commented on a change in pull request #8205: [FLINK-12238] [TABLE/SQL] Support database related operations in GenericHiveMetastoreCatalog and setup flink-connector-hive modu
bowenli86 commented on a change in pull request #8205: [FLINK-12238] [TABLE/SQL] Support database related operations in GenericHiveMetastoreCatalog and setup flink-connector-hive module URL: https://github.com/apache/flink/pull/8205#discussion_r277038764 ## File path: flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java ## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.hive; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; + +/** + * Test utils for Hive connector. + */ +public class HiveTestUtils { + private static final String HIVE_SITE_XML = "hive-site.xml"; + private static final String HIVE_WAREHOUSE_URI_FORMAT = "jdbc:derby:;databaseName=%s;create=true"; + private static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + private static String warehouseDir; + private static String warehouseUri; + + /** +* Create a GenericHiveMetastoreCatalog with an embedded Hive Metastore. +*/ + public static GenericHiveMetastoreCatalog createGenericHiveMetastoreCatalog() throws IOException { + return new GenericHiveMetastoreCatalog("test", getHiveConf()); + } + + private static HiveConf getHiveConf() throws IOException { + ClassLoader classLoader = new HiveTestUtils().getClass().getClassLoader(); + HiveConf.setHiveSiteLocation(classLoader.getResource(HIVE_SITE_XML)); + + TEMPORARY_FOLDER.create(); + warehouseDir = TEMPORARY_FOLDER.newFolder().getAbsolutePath() + "/metastore_db"; + warehouseUri = String.format(HIVE_WAREHOUSE_URI_FORMAT, warehouseDir); + HiveConf hiveConf = new HiveConf(); + hiveConf.setBoolVar(HiveConf.ConfVars.METASTORE_SCHEMA_VERIFICATION, false); Review comment: good catch. I will keep those in hive-site.xml This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8205: [FLINK-12238] [TABLE/SQL] Support database related operations in GenericHiveMetastoreCatalog and setup flink-connector-hive modu
bowenli86 commented on a change in pull request #8205: [FLINK-12238] [TABLE/SQL] Support database related operations in GenericHiveMetastoreCatalog and setup flink-connector-hive module URL: https://github.com/apache/flink/pull/8205#discussion_r277038223 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java ## @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.hive; + +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.GenericCatalogDatabase; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.ReadableWritableCatalog; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.util.StringUtils; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.InvalidOperationException; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A catalog that persists all Flink streaming and batch metadata by using Hive metastore as a persistent storage. + */ +public class GenericHiveMetastoreCatalog implements ReadableWritableCatalog { + private static final Logger LOG = LoggerFactory.getLogger(GenericHiveMetastoreCatalog.class); + + public static final String DEFAULT_DB = "default"; + + private final String catalogName; + private final HiveConf hiveConf; + + private String currentDatabase = DEFAULT_DB; + private IMetaStoreClient client; + + public GenericHiveMetastoreCatalog(String catalogName, String hivemetastoreURI) { + this(catalogName, getHiveConf(hivemetastoreURI)); + } + + public GenericHiveMetastoreCatalog(String catalogName, HiveConf hiveConf) { + checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), "catalogName cannot be null or empty"); + this.catalogName = catalogName; + + this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be null"); + LOG.info("Created GenericHiveMetastoreCatalog '{}'", catalogName); + } + + private static HiveConf getHiveConf(String hiveMetastoreURI) { + checkArgument(!StringUtils.isNullOrWhitespaceOnly(hiveMetastoreURI), "hiveMetastoreURI cannot be null or empty"); + + HiveConf hiveConf = new HiveConf(); + hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, hiveMetastoreURI); + return hiveConf; + } + + private static IMetaStoreClient getMetastoreClient(HiveConf hiveConf) { + try { + return RetryingMetaStoreClient.getProxy( + hiveConf, + null, + null, + HiveMetaStoreClient.class.getName(), + true); + } catch (MetaException e) { + throw new CatalogException("Failed to create Hive metastore client", e);
[jira] [Created] (FLINK-12266) add NOTICE file for dependencies that are newly introduced in flink-connector-hive
Bowen Li created FLINK-12266: Summary: add NOTICE file for dependencies that are newly introduced in flink-connector-hive Key: FLINK-12266 URL: https://issues.apache.org/jira/browse/FLINK-12266 Project: Flink Issue Type: Sub-task Components: Table SQL / API Reporter: Bowen Li Assignee: Bowen Li Fix For: 1.9.0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] bowenli86 commented on a change in pull request #8205: [FLINK-12238] [TABLE/SQL] Support database related operations in GenericHiveMetastoreCatalog and setup flink-connector-hive modu
bowenli86 commented on a change in pull request #8205: [FLINK-12238] [TABLE/SQL] Support database related operations in GenericHiveMetastoreCatalog and setup flink-connector-hive module URL: https://github.com/apache/flink/pull/8205#discussion_r277037234 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java ## @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.hive; + +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.GenericCatalogDatabase; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.ReadableWritableCatalog; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.util.StringUtils; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.InvalidOperationException; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A catalog that persists all Flink streaming and batch metadata by using Hive metastore as a persistent storage. + */ +public class GenericHiveMetastoreCatalog implements ReadableWritableCatalog { Review comment: Can you share where you see it? "Hive Metastore" is more popular, for example, please see https://www.cloudera.com/documentation/enterprise/5-8-x/topics/cdh_ig_hive_metastore_configure.html This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-12265) link-table-planner-blink UT Failure: SortDistinctAggregateITCase.testSomeColumnsBothInDistinctAggAndGroupBy
Bowen Li created FLINK-12265: Summary: link-table-planner-blink UT Failure: SortDistinctAggregateITCase.testSomeColumnsBothInDistinctAggAndGroupBy Key: FLINK-12265 URL: https://issues.apache.org/jira/browse/FLINK-12265 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.9.0 Reporter: Bowen Li Assignee: Jingsong Lee Fix For: 1.9.0 Seems to be a memory allocation issue but would be worth checking out if that's caused by the planner itself taking too much memory https://travis-ci.org/apache/flink/jobs/522051049 11:41:37.461 [ERROR] Tests run: 23, Failures: 0, Errors: 1, Skipped: 2, Time elapsed: 73.859 s <<< FAILURE! - in org.apache.flink.table.runtime.batch.sql.agg.SortDistinctAggregateITCase 11:41:37.461 [ERROR] testSomeColumnsBothInDistinctAggAndGroupBy(org.apache.flink.table.runtime.batch.sql.agg.SortDistinctAggregateITCase) Time elapsed: 6.274 s <<< ERROR! org.apache.flink.runtime.client.JobExecutionException: Job execution failed. Caused by: java.lang.RuntimeException: org.apache.flink.runtime.memory.MemoryAllocationException: Could not allocate 64 pages. Only 0 pages are remaining. Caused by: org.apache.flink.runtime.memory.MemoryAllocationException: Could not allocate 64 pages. Only 0 pages are remaining. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12265) flink-table-planner-blink UT Failure: SortDistinctAggregateITCase.testSomeColumnsBothInDistinctAggAndGroupBy
[ https://issues.apache.org/jira/browse/FLINK-12265?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li updated FLINK-12265: - Summary: flink-table-planner-blink UT Failure: SortDistinctAggregateITCase.testSomeColumnsBothInDistinctAggAndGroupBy (was: link-table-planner-blink UT Failure: SortDistinctAggregateITCase.testSomeColumnsBothInDistinctAggAndGroupBy) > flink-table-planner-blink UT Failure: > SortDistinctAggregateITCase.testSomeColumnsBothInDistinctAggAndGroupBy > > > Key: FLINK-12265 > URL: https://issues.apache.org/jira/browse/FLINK-12265 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.9.0 >Reporter: Bowen Li >Assignee: Jingsong Lee >Priority: Major > Fix For: 1.9.0 > > > Seems to be a memory allocation issue but would be worth checking out if > that's caused by the planner itself taking too much memory > https://travis-ci.org/apache/flink/jobs/522051049 > 11:41:37.461 [ERROR] Tests run: 23, Failures: 0, Errors: 1, Skipped: 2, Time > elapsed: 73.859 s <<< FAILURE! - in > org.apache.flink.table.runtime.batch.sql.agg.SortDistinctAggregateITCase > 11:41:37.461 [ERROR] > testSomeColumnsBothInDistinctAggAndGroupBy(org.apache.flink.table.runtime.batch.sql.agg.SortDistinctAggregateITCase) > Time elapsed: 6.274 s <<< ERROR! > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > Caused by: java.lang.RuntimeException: > org.apache.flink.runtime.memory.MemoryAllocationException: Could not allocate > 64 pages. Only 0 pages are remaining. > Caused by: org.apache.flink.runtime.memory.MemoryAllocationException: Could > not allocate 64 pages. Only 0 pages are remaining. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] bowenli86 commented on a change in pull request #8222: [FLINK-11518] [SQL/TABLE] Add partition related catalog APIs and implement them in GenericInMemoryCatalog
bowenli86 commented on a change in pull request #8222: [FLINK-11518] [SQL/TABLE] Add partition related catalog APIs and implement them in GenericInMemoryCatalog URL: https://github.com/apache/flink/pull/8222#discussion_r277032972 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java ## @@ -202,6 +212,8 @@ public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) throws Ta if (tableExists(tablePath)) { tables.remove(tablePath); + + partitions.remove(tablePath); Review comment: good catch! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #8087: [FLINK-12029][table] Add column operations for TableApi
flinkbot edited a comment on issue #8087: [FLINK-12029][table] Add column operations for TableApi URL: https://github.com/apache/flink/pull/8087#issuecomment-478332056 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ✅ 1. The [description] looks good. - Approved by @sunjincheng121 [committer] * ✅ 2. There is [consensus] that the contribution should go into to Flink. - Approved by @sunjincheng121 [committer] * ❗ 3. Needs [attention] from. - Needs attention by @twalthr [PMC] * ✅ 4. The change fits into the overall [architecture]. - Approved by @sunjincheng121 [committer] * ✅ 5. Overall code [quality] is good. - Approved by @sunjincheng121 [committer] Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on issue #8087: [FLINK-12029][table] Add column operations for TableApi
sunjincheng121 commented on issue #8087: [FLINK-12029][table] Add column operations for TableApi URL: https://github.com/apache/flink/pull/8087#issuecomment-484947873 @flinkbot approve all This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-12257) Convert CatalogBaseTable to org.apache.calcite.schema.Table so that planner can use unified catalog APIs
[ https://issues.apache.org/jira/browse/FLINK-12257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li reassigned FLINK-12257: Assignee: Dawid Wysakowicz (was: Bowen Li) > Convert CatalogBaseTable to org.apache.calcite.schema.Table so that planner > can use unified catalog APIs > > > Key: FLINK-12257 > URL: https://issues.apache.org/jira/browse/FLINK-12257 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Bowen Li >Assignee: Dawid Wysakowicz >Priority: Major > Fix For: 1.9.0 > > > In FLINK-11476, we created CatalogManager to hook up planner with unified > catalog APIs. What's missing there is, at the very last step, convert > CatalogBaseTable to org.apache.calcite.schema.Table so that planner can use > unified catalog APIs, like how > {{ExternalTableUtil.fromExternalCatalogTable()}} works to convert the old > {{ExternalCatalogTable}} to a Calcite table -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-12256) Implement KafkaReadableCatalog
[ https://issues.apache.org/jira/browse/FLINK-12256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16822004#comment-16822004 ] Artsem Semianenka edited comment on FLINK-12256 at 4/19/19 3:47 PM: I've created the first draft of the [design document|https://docs.google.com/document/d/14thwgV2RY1AA9KgYztv_kLYSz4K1TckJ-YiGfkB5650/edit?usp=sharing]. Please let's continue the discussion in this ticket or in comments in Google Docs was (Author: artsem.semianenka): I've created the first draft of the[ design document|https://docs.google.com/document/d/14thwgV2RY1AA9KgYztv_kLYSz4K1TckJ-YiGfkB5650/edit?usp=sharing]. Please let's continue the discussion in this ticket or in comments in Google Docs > Implement KafkaReadableCatalog > -- > > Key: FLINK-12256 > URL: https://issues.apache.org/jira/browse/FLINK-12256 > Project: Flink > Issue Type: New Feature > Components: Connectors / Kafka, Table SQL / Client >Affects Versions: 1.9.0 >Reporter: Artsem Semianenka >Assignee: Artsem Semianenka >Priority: Major > > KafkaReadableCatalog is a special implementation of ReadableCatalog > interface (which introduced in > [FLIP-30|https://cwiki.apache.org/confluence/display/FLINK/FLIP-30%3A+Unified+Catalog+APIs] > ) to retrieve meta information such topic name/schema of the topic from > Apache Kafka and Confluent Schema Registry. > New ReadableCatalog allows a user to run SQL queries like: > {code:java} > Select * form kafka.topic_name > {code} > without the need for manual definition of the table schema. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12256) Implement KafkaReadableCatalog
[ https://issues.apache.org/jira/browse/FLINK-12256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16822004#comment-16822004 ] Artsem Semianenka commented on FLINK-12256: --- I've created the first draft of the[ design document|https://docs.google.com/document/d/14thwgV2RY1AA9KgYztv_kLYSz4K1TckJ-YiGfkB5650/edit?usp=sharing]. Please let's continue the discussion in this ticket or in comments in Google Docs > Implement KafkaReadableCatalog > -- > > Key: FLINK-12256 > URL: https://issues.apache.org/jira/browse/FLINK-12256 > Project: Flink > Issue Type: New Feature > Components: Connectors / Kafka, Table SQL / Client >Affects Versions: 1.9.0 >Reporter: Artsem Semianenka >Assignee: Artsem Semianenka >Priority: Major > > KafkaReadableCatalog is a special implementation of ReadableCatalog > interface (which introduced in > [FLIP-30|https://cwiki.apache.org/confluence/display/FLINK/FLIP-30%3A+Unified+Catalog+APIs] > ) to retrieve meta information such topic name/schema of the topic from > Apache Kafka and Confluent Schema Registry. > New ReadableCatalog allows a user to run SQL queries like: > {code:java} > Select * form kafka.topic_name > {code} > without the need for manual definition of the table schema. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] datto-aparrill commented on issue #8068: [FLINK-12042][StateBackends] Fix RocksDBStateBackend's mistaken usage of default filesystem
datto-aparrill commented on issue #8068: [FLINK-12042][StateBackends] Fix RocksDBStateBackend's mistaken usage of default filesystem URL: https://github.com/apache/flink/pull/8068#issuecomment-484916176 We ran into this bug as well, and can confirm that this commit fixes the issue. Please merge! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-12026) Remove the `xxxInternal` method from TableImpl
[ https://issues.apache.org/jira/browse/FLINK-12026?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng closed FLINK-12026. --- Resolution: Fixed Fix Version/s: 1.9.0 Fixed in master: c96a4d7afe379a291cc538ca36af896df8dc2127 > Remove the `xxxInternal` method from TableImpl > -- > > Key: FLINK-12026 > URL: https://issues.apache.org/jira/browse/FLINK-12026 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Affects Versions: 1.9.0 >Reporter: sunjincheng >Assignee: sunjincheng >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 20m > Remaining Estimate: 0h > > At present, each operator of TableImpl has an internal method of > `xxxInternal`, and `xxxInternal` is a temp method. I think it can be removed > at present to further simplify the code. Such as: > From: > {code:java} > override def select(fields: String): Table = { > select(ExpressionParser.parseExpressionList(fields): _*) > } > override def select(fields: Expression*): Table = { > selectInternal(fields.map(tableImpl.expressionBridge.bridge)) > } > private def selectInternal(fields: Seq[PlannerExpression]): Table = { > ... > // implementtition logic > ... > }{code} > To: > {code:java} > override def select(fields: String): Table = { > select(ExpressionParser.parseExpressionList(fields): _*) > } > override def select(fields: Expression*): Table = { > ... > // implementtition logic > ... > }{code} > > I think the implementtition logic can move into `select(fields: > Expression*)`. What do you think? [~dawidwys] [~hequn8128] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12259) Improve debuggability of method invocation failures in OptimizerPlanEnvironment
[ https://issues.apache.org/jira/browse/FLINK-12259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16821933#comment-16821933 ] Gaurav commented on FLINK-12259: Thanks [~fan_li_ya]. Added a comment. > Improve debuggability of method invocation failures in > OptimizerPlanEnvironment > > > Key: FLINK-12259 > URL: https://issues.apache.org/jira/browse/FLINK-12259 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.8.0 >Reporter: Gaurav >Assignee: Liya Fan >Priority: Minor > Labels: pull-request-available > Time Spent: 20m > Remaining Estimate: 0h > > In cases where method invocation fails without setting the `optimizerPlan`, > Flink does not always dump the stderr/stdout. Hence, logging from the method > is lost. The stacktrace alone is not always helpful. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] Xeli commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
Xeli commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r276994500 ## File path: flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource.java ## @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.gcp.pubsub; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.source.MultipleIdsMessageAcknowledgingSourceBase; +import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema; +import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory; +import org.apache.flink.util.Preconditions; + +import com.google.api.core.ApiFuture; +import com.google.auth.Credentials; +import com.google.cloud.pubsub.v1.Subscriber; +import com.google.cloud.pubsub.v1.stub.SubscriberStub; +import com.google.pubsub.v1.AcknowledgeRequest; +import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.PullRequest; +import com.google.pubsub.v1.PullResponse; +import com.google.pubsub.v1.ReceivedMessage; +import io.grpc.netty.shaded.io.netty.channel.EventLoopGroup; +import io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoopGroup; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CancellationException; +import java.util.concurrent.TimeUnit; + +import static com.google.cloud.pubsub.v1.SubscriptionAdminSettings.defaultCredentialsProviderBuilder; + +/** + * PubSub Source, this Source will consume PubSub messages from a subscription and Acknowledge them on the next checkpoint. + * This ensures every message will get acknowledged at least once. + */ +public class PubSubSource extends MultipleIdsMessageAcknowledgingSourceBase + implements ResultTypeQueryable, ParallelSourceFunction { + private static final Logger LOG = LoggerFactory.getLogger(PubSubSource.class); + protected final PubSubDeserializationSchema deserializationSchema; + protected final PubSubSubscriberFactory pubSubSubscriberFactory; + protected final Credentials credentials; + protected final String projectSubscriptionName; + protected final int maxMessagesPerPull; + + protected transient boolean deduplicateMessages; + protected transient SubscriberStub subscriber; + protected transient PullRequest pullRequest; + protected transient EventLoopGroup eventLoopGroup; + + protected transient volatile boolean isRunning; + protected transient volatile ApiFuture messagesFuture; + + PubSubSource(PubSubDeserializationSchema deserializationSchema, PubSubSubscriberFactory pubSubSubscriberFactory, Credentials credentials, String projectSubscriptionName, int maxMessagesPerPull) { + super(String.class); + this.deserializationSchema = deserializationSchema; + this.pubSubSubscriberFactory = pubSubSubscriberFactory; + this.credentials = credentials; + this.projectSubscriptionName = projectSubscriptionName; + this.maxMessagesPerPull = maxMessagesPerPull; + } + + @Override + public void open(Configuration configuration) throws Exception { + super.open(configuration); + if (hasNoCheckpointingEnabled(getRuntimeContext())) { + throw new IllegalArgumentException("The PubSubSource REQUIRES Checkpointing to be enabled and " + + "the checkpointing frequency must be MUCH lower than
[GitHub] [flink] gauravtiwari89 commented on a change in pull request #8220: [FLINK-12259][flink-clients]Improve debuggability of method invocatio…
gauravtiwari89 commented on a change in pull request #8220: [FLINK-12259][flink-clients]Improve debuggability of method invocatio… URL: https://github.com/apache/flink/pull/8220#discussion_r276992911 ## File path: flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java ## @@ -74,9 +76,11 @@ public FlinkPlan getOptimizedPlan(PackagedProgram prog) throws ProgramInvocation PrintStream originalOut = System.out; PrintStream originalErr = System.err; ByteArrayOutputStream baos = new ByteArrayOutputStream(); - System.setOut(new PrintStream(baos)); + TeeOutputStream combinedStdOut = new TeeOutputStream(originalOut, baos); + System.setOut(new PrintStream(combinedStdOut)); ByteArrayOutputStream baes = new ByteArrayOutputStream(); - System.setErr(new PrintStream(baes)); + TeeOutputStream combinedStdErr = new TeeOutputStream(originalErr, baes); Review comment: Do we need to close these streams? Additionally, its not clear to me why we need to trap the output. It seems to only be used for the exception at line 109. Couldn't the client just refer to the logs instead? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gauravtiwari89 commented on issue #8218: [FLINK-12259] Log stdout, stderr on Program invocation failure
gauravtiwari89 commented on issue #8218: [FLINK-12259] Log stdout,stderr on Program invocation failure URL: https://github.com/apache/flink/pull/8218#issuecomment-484903173 Closed in favor of https://github.com/apache/flink/pull/8220 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gauravtiwari89 closed pull request #8218: [FLINK-12259] Log stdout, stderr on Program invocation failure
gauravtiwari89 closed pull request #8218: [FLINK-12259] Log stdout,stderr on Program invocation failure URL: https://github.com/apache/flink/pull/8218 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-12264) Port ExecutionGraphTestUtils to new code base
leesf created FLINK-12264: - Summary: Port ExecutionGraphTestUtils to new code base Key: FLINK-12264 URL: https://issues.apache.org/jira/browse/FLINK-12264 Project: Flink Issue Type: Sub-task Components: Tests Reporter: leesf Assignee: leesf Fix For: 1.9.0 Mainly get rid of Instance. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] asfgit closed pull request #8070: [FLINK-12026][table]Remove the `xxxInternal` method from TableImpl
asfgit closed pull request #8070: [FLINK-12026][table]Remove the `xxxInternal` method from TableImpl URL: https://github.com/apache/flink/pull/8070 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] TisonKun closed pull request #7285: [hotfix] [resource manager] Remove legacy(unused) class ResourceManag…
TisonKun closed pull request #7285: [hotfix] [resource manager] Remove legacy(unused) class ResourceManag… URL: https://github.com/apache/flink/pull/7285 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] TisonKun commented on issue #7285: [hotfix] [resource manager] Remove legacy(unused) class ResourceManag…
TisonKun commented on issue #7285: [hotfix] [resource manager] Remove legacy(unused) class ResourceManag… URL: https://github.com/apache/flink/pull/7285#issuecomment-484897552 closing... may be a subtask of FLINK-10392. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] TisonKun removed a comment on issue #7285: [hotfix] [resource manager] Remove legacy(unused) class ResourceManag…
TisonKun removed a comment on issue #7285: [hotfix] [resource manager] Remove legacy(unused) class ResourceManag… URL: https://github.com/apache/flink/pull/7285#issuecomment-452182002 cc @tillrohrmann @GJL This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8226: [FLINK-12181][Tests] Port ExecutionGraphRestartTest to new codebase
flinkbot commented on issue #8226: [FLINK-12181][Tests] Port ExecutionGraphRestartTest to new codebase URL: https://github.com/apache/flink/pull/8226#issuecomment-484895638 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12181) Port ExecutionGraphRestartTest to new codebase
[ https://issues.apache.org/jira/browse/FLINK-12181?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-12181: --- Labels: pull-request-available (was: ) > Port ExecutionGraphRestartTest to new codebase > -- > > Key: FLINK-12181 > URL: https://issues.apache.org/jira/browse/FLINK-12181 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination, Tests >Affects Versions: 1.9.0 >Reporter: TisonKun >Assignee: leesf >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > > Port {{ExecutionGraphRestartTest}} to new codebase. > Mainly get rid of the usages of {{Scheduler}} and {{Instance}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] leesf opened a new pull request #8226: [FLINK-12181][Tests] Port ExecutionGraphRestartTest to new codebase
leesf opened a new pull request #8226: [FLINK-12181][Tests] Port ExecutionGraphRestartTest to new codebase URL: https://github.com/apache/flink/pull/8226 ## What is the purpose of the change Port ExecutionGraphRestartTest to new codebase. ## Brief change log *(for example:)* - *Use TaskManagerLocation, SimpleAckingTaskManagerGateway and TestingResourceManagerGateway in repalce of Instance.* - *Use SlotPool#releaseTaskManager in replace of Instrance#markDead * ## Verifying this change This change is a trivial rework. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] haf commented on issue #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
haf commented on issue #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#issuecomment-484884659 I'm testing this source on spotty connections (as I do most of my work on the go), and when streaming a lot of messages over 4G, at some point, the source logs this: ``` 14:40:33,649 INFO io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder - {} ignoring {} frame for stream {} {} 14:40:39,010 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 23 for job f2aed8990f452e357b8f15f6452e4244 (12951720 bytes in 18142 ms). 14:40:39,013 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 24 @ 1555677639010 for job f2aed8990f452e357b8f15f6452e4244. 14:41:11,608 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 24 for job f2aed8990f452e357b8f15f6452e4244 (14193479 bytes in 32593 ms). 14:41:11,614 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 25 @ 1555677671608 for job f2aed8990f452e357b8f15f6452e4244. 14:41:22,628 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 25 for job f2aed8990f452e357b8f15f6452e4244 (16366371 bytes in 11017 ms). 14:41:22,638 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 26 @ 1555677682628 for job f2aed8990f452e357b8f15f6452e4244. 14:41:43,287 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 26 for job f2aed8990f452e357b8f15f6452e4244 (17107164 bytes in 20655 ms). 14:41:43,290 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 27 @ 1555677703287 for job f2aed8990f452e357b8f15f6452e4244. 14:42:19,171 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 27 for job f2aed8990f452e357b8f15f6452e4244 (18502218 bytes in 35880 ms). 14:42:19,174 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 28 @ 1555677739171 for job f2aed8990f452e357b8f15f6452e4244. 14:42:55,863 INFO io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder - {} ignoring {} frame for stream {} {} 14:43:00,242 INFO io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder - {} ignoring {} frame for stream {} {} 14:43:00,242 INFO io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder - {} ignoring {} frame for stream {} {} 1 ``` And it would seem no further events are being pushed down the streaming pipeline. Sometimes, I also get: ``` 14:43:23,816 INFO org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - Request slot with profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=0} for job 2d2461182e268f6838e957972a9a0361 with allocation id AllocationID{8f8a93b52291c2fa883df2fa1a7735de}. 14:43:23,816 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Allocated slot for AllocationID{fc0b24a95227bdbfa46a4119fb907d19}. 14:43:23,816 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService - Add job 2d2461182e268f6838e957972a9a0361 for job leader monitoring. 14:43:23,816 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService - Try to register at job manager akka://flink/user/jobmanager_1 with leader id 692682c0-30b2-4900-ad43-3d94acf9a2c6. 14:43:23,816 WARN org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService - Error notifying leader listener about new leader java.lang.IllegalStateException: The RPC connection is already closed at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195) at org.apache.flink.runtime.registration.RegisteredRpcConnection.start(RegisteredRpcConnection.java:91) at org.apache.flink.runtime.taskexecutor.JobLeaderService$JobManagerLeaderListener.notifyLeaderAddress(JobLeaderService.java:327) at org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService$NotifyOfLeaderCall.run(EmbeddedLeaderService.java:430) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ``` All of this
[GitHub] [flink] haf commented on issue #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
haf commented on issue #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#issuecomment-484879762 Will there be support for replying messages? https://cloud.google.com/pubsub/docs/replay-qs This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunhaibotb commented on a change in pull request #8124: [FLINK-11877] Implement the runtime handling of the InputSelectable interface
sunhaibotb commented on a change in pull request #8124: [FLINK-11877] Implement the runtime handling of the InputSelectable interface URL: https://github.com/apache/flink/pull/8124#discussion_r276968221 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java ## @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.SimpleCounter; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; +import org.apache.flink.streaming.api.operators.InputSelectable; +import org.apache.flink.streaming.api.operators.InputSelection; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.metrics.WatermarkGauge; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Input reader for {@link org.apache.flink.streaming.runtime.tasks.TwoInputSelectableStreamTask} + * in the case that the operator is InputSelectable. + * + * @param The type of the records that arrive on the first input + * @param The type of the records that arrive on the second input + */ +@Internal +public class StreamTwoInputSelectableProcessor { + + private static final Logger LOG = LoggerFactory.getLogger(StreamTwoInputSelectableProcessor.class); + + private volatile boolean continuousProcessing = true; + + private final NetworkInput input1; + private final NetworkInput input2; + + private final Object lock; + + private final TwoInputStreamOperator streamOperator; + + private final InputSelectable inputSelector; + + private final AuxiliaryHandler auxiliaryHandler; + + private final CompletableFuture[] listenFutures; + + private final boolean[] isFinished; + + private InputSelection inputSelection; + + private AtomicInteger availableInputsMask = new AtomicInteger(); Review comment: I got it. Wish you a good holiday. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10984) Move flink-shaded-hadoop to flink-shaded
[ https://issues.apache.org/jira/browse/FLINK-10984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16821833#comment-16821833 ] sunjincheng commented on FLINK-10984: - Hi, [~Zentol] I have had open the PR about Remove `flink-shade-hadoop` from Flink. https://github.com/apache/flink/pull/8225 I appreciate if can have a look at, Thanks! :) > Move flink-shaded-hadoop to flink-shaded > > > Key: FLINK-10984 > URL: https://issues.apache.org/jira/browse/FLINK-10984 > Project: Flink > Issue Type: Improvement > Components: Build System, BuildSystem / Shaded >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: shaded-7.0 > > Time Spent: 40m > Remaining Estimate: 0h > > To allow reasonable dependency management we should move flink-shaded-hadoop > to flink-shaded, with each supported version having it's own module and > dependency management. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] flinkbot commented on issue #8225: FLINK-10984]Remove flink-shaded-hadoop from flink
flinkbot commented on issue #8225: FLINK-10984]Remove flink-shaded-hadoop from flink URL: https://github.com/apache/flink/pull/8225#issuecomment-484846415 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on issue #8225: FLINK-10984]Remove flink-shaded-hadoop from flink
sunjincheng121 commented on issue #8225: FLINK-10984]Remove flink-shaded-hadoop from flink URL: https://github.com/apache/flink/pull/8225#issuecomment-484846446 The CI will turn green after `flink-shaded` releases the 7.0 version. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 opened a new pull request #8225: FLINK-10984][Build System, BuildSystem / Shaded
sunjincheng121 opened a new pull request #8225: FLINK-10984][Build System, BuildSystem / Shaded URL: https://github.com/apache/flink/pull/8225 ## What is the purpose of the change To allow reasonable dependency management we should move `flink-shaded-hadoop` to `flink-shaded`, then Remove `flink-shaded-hadoop` from flink. ## Brief change log - Remove `flink-shaded-hadoop` from flink. - Remove `flink-shaded-hadoop/flink-shaded-hadoop2` from flink. - Move out `flink-shaded-yran-test` from `flink-shaded-hadoop`. - Modify the scripts in `flink-dist`, i.e. remove the logic about `hadoop`. - Modify some pom config about `flink-shaded-hadoop`. ## Verifying this change This change is already covered by existing tests, such as *(please describe tests)*. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-11530) Support multiple languages for the framework of flink docs
[ https://issues.apache.org/jira/browse/FLINK-11530?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu closed FLINK-11530. --- > Support multiple languages for the framework of flink docs > -- > > Key: FLINK-11530 > URL: https://issues.apache.org/jira/browse/FLINK-11530 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation >Reporter: Jark Wu >Assignee: Jark Wu >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 40m > Remaining Estimate: 0h > > A more detailed description can be found in the proposed doc: > https://docs.google.com/document/d/1R1-uDq-KawLB8afQYrczfcoQHjjIhq6tvUksxrfhBl0/edit# > This step aims to integrate the mulitple-language-plugin for flink docs to > support Chinese. All the $pagename.zh.md should be created first in this JIRA > but keep the original English contents. A link between English version and > Chinese version should also be considered. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-11530) Support multiple languages for the framework of flink docs
[ https://issues.apache.org/jira/browse/FLINK-11530?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu resolved FLINK-11530. - Resolution: Fixed Fix Version/s: 1.9.0 Resolved in 1.9 with commits: 182456f2ed0c9345990d8d25cac26d22e828bb46 63c7bd2697187c52ca062bb8512ce8e9744bb95f > Support multiple languages for the framework of flink docs > -- > > Key: FLINK-11530 > URL: https://issues.apache.org/jira/browse/FLINK-11530 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation >Reporter: Jark Wu >Assignee: Jark Wu >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 40m > Remaining Estimate: 0h > > A more detailed description can be found in the proposed doc: > https://docs.google.com/document/d/1R1-uDq-KawLB8afQYrczfcoQHjjIhq6tvUksxrfhBl0/edit# > This step aims to integrate the mulitple-language-plugin for flink docs to > support Chinese. All the $pagename.zh.md should be created first in this JIRA > but keep the original English contents. A link between English version and > Chinese version should also be considered. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] asfgit closed pull request #7754: [FLINK-11530] [docs] Support multiple languages for the docs framework
asfgit closed pull request #7754: [FLINK-11530] [docs] Support multiple languages for the docs framework URL: https://github.com/apache/flink/pull/7754 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on issue #7196: [FLINK-10974] [table] Add support for flatMap to table API
sunjincheng121 commented on issue #7196: [FLINK-10974] [table] Add support for flatMap to table API URL: https://github.com/apache/flink/pull/7196#issuecomment-484836678 +1 to merged! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on issue #8202: [FLINK-12133] [table-runtime-blink] Support unbounded aggregate in streaming table runtime
KurtYoung commented on issue #8202: [FLINK-12133] [table-runtime-blink] Support unbounded aggregate in streaming table runtime URL: https://github.com/apache/flink/pull/8202#issuecomment-484835447 LGTM, +1 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-12247) fix NPE when writing an archive file to a FileSystem
[ https://issues.apache.org/jira/browse/FLINK-12247?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16821807#comment-16821807 ] lamber-ken commented on FLINK-12247: [~till.rohrmann], I updated the issue and upload a patch. :) > fix NPE when writing an archive file to a FileSystem > > > Key: FLINK-12247 > URL: https://issues.apache.org/jira/browse/FLINK-12247 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN >Affects Versions: 1.6.3, 1.8.0 >Reporter: lamber-ken >Assignee: lamber-ken >Priority: Major > Fix For: 1.9.0 > > Attachments: fix-nep.patch > > > h3. *Issue detail info* > In our hadoop product env, we use fixed-delay restart-strategy. > {code:java} > restart-strategy: fixed-delay > restart-strategy.fixed-delay.attempts: 20 > restart-strategy.fixed-delay.delay: 2 s > {code} > if a flink-job reaches the max attempt count, the flink job will write an > archive file to +FileSystem+ and shut down. > but when +SubtaskExecutionAttemptDetailsHandler+ handle the detail attempt > info of subtask, met NEP. > h3. *Detailed reasons are as follows:* > 0. Assume a scenario, a flink job {color:#ff}reaches the max attempt > count, ( 20 ){color} > 1. +ExecutionVertex+ is a parallel subtask of the execution. Each > +ExecutionVertex+ was created with {color:#660e7a}MAX_ATTEMPTS_HISTORY_SIZE( > default value: 16 ).{color} > 2. when +SubtaskExecutionAttemptDetailsHandler+ hand will get the attempt > from ++priorExecutions, > but priorExecutions just retained > {color:#660e7a}MAX_ATTEMPTS_HISTORY_SIZE{color} elemets, so some element > was dropped from the head of the list(FIFO). so may return null. > h3. *Detailed StackTrace* > {code:java} > java.lang.NullPointerException >at > org.apache.flink.runtime.rest.handler.util.MutableIOMetrics.addIOMetrics(MutableIOMetrics.java:88) >at > org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler.createDetailsInfo(SubtaskExecutionAttemptDetailsHandler.java:140) >at > org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler.archiveJsonWithPath(SubtaskExecutionAttemptDetailsHandler.java:120) >at > org.apache.flink.runtime.webmonitor.WebMonitorEndpoint.archiveJsonWithPath(WebMonitorEndpoint.java:780) >at > org.apache.flink.runtime.dispatcher.JsonResponseHistoryServerArchivist.archiveExecutionGraph(JsonResponseHistoryServerArchivist.java:57) >at > org.apache.flink.runtime.dispatcher.Dispatcher.archiveExecutionGraph(Dispatcher.java:758) >at > org.apache.flink.runtime.dispatcher.Dispatcher.jobReachedGloballyTerminalState(Dispatcher.java:730) >at > org.apache.flink.runtime.dispatcher.MiniDispatcher.jobReachedGloballyTerminalState(MiniDispatcher.java:138) >at > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$startJobManagerRunner$6(Dispatcher.java:341) >at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) >at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) >at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) >at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332) >at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158) >at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70) >at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142) >at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) >at > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) >at akka.actor.Actor$class.aroundReceive(Actor.scala:502) >at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) >at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) >at akka.actor.ActorCell.invoke(ActorCell.scala:495) >at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) >at akka.dispatch.Mailbox.run(Mailbox.scala:224) >at akka.dispatch.Mailbox.exec(Mailbox.scala:234) >at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > {code} > h3. *Minimal reproducible example* > {code:java} > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > DataStream text = env.addSource(new
[jira] [Updated] (FLINK-12247) fix NPE when writing an archive file to a FileSystem
[ https://issues.apache.org/jira/browse/FLINK-12247?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lamber-ken updated FLINK-12247: --- Attachment: fix-nep.patch > fix NPE when writing an archive file to a FileSystem > > > Key: FLINK-12247 > URL: https://issues.apache.org/jira/browse/FLINK-12247 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN >Affects Versions: 1.6.3, 1.8.0 >Reporter: lamber-ken >Assignee: lamber-ken >Priority: Major > Fix For: 1.9.0 > > Attachments: fix-nep.patch > > > h3. *Issue detail info* > In our hadoop product env, we use fixed-delay restart-strategy. > {code:java} > restart-strategy: fixed-delay > restart-strategy.fixed-delay.attempts: 20 > restart-strategy.fixed-delay.delay: 2 s > {code} > if a flink-job reaches the max attempt count, the flink job will write an > archive file to +FileSystem+ and shut down. > but when +SubtaskExecutionAttemptDetailsHandler+ handle the detail attempt > info of subtask, met NEP. > h3. *Detailed reasons are as follows:* > 0. Assume a scenario, a flink job {color:#ff}reaches the max attempt > count, ( 20 ){color} > 1. +ExecutionVertex+ is a parallel subtask of the execution. Each > +ExecutionVertex+ was created with {color:#660e7a}MAX_ATTEMPTS_HISTORY_SIZE( > default value: 16 ).{color} > 2. when +SubtaskExecutionAttemptDetailsHandler+ hand will get the attempt > from ++priorExecutions, > but priorExecutions just retained > {color:#660e7a}MAX_ATTEMPTS_HISTORY_SIZE{color} elemets, so some element > was dropped from the head of the list(FIFO). so may return null. > h3. *Detailed StackTrace* > {code:java} > java.lang.NullPointerException >at > org.apache.flink.runtime.rest.handler.util.MutableIOMetrics.addIOMetrics(MutableIOMetrics.java:88) >at > org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler.createDetailsInfo(SubtaskExecutionAttemptDetailsHandler.java:140) >at > org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler.archiveJsonWithPath(SubtaskExecutionAttemptDetailsHandler.java:120) >at > org.apache.flink.runtime.webmonitor.WebMonitorEndpoint.archiveJsonWithPath(WebMonitorEndpoint.java:780) >at > org.apache.flink.runtime.dispatcher.JsonResponseHistoryServerArchivist.archiveExecutionGraph(JsonResponseHistoryServerArchivist.java:57) >at > org.apache.flink.runtime.dispatcher.Dispatcher.archiveExecutionGraph(Dispatcher.java:758) >at > org.apache.flink.runtime.dispatcher.Dispatcher.jobReachedGloballyTerminalState(Dispatcher.java:730) >at > org.apache.flink.runtime.dispatcher.MiniDispatcher.jobReachedGloballyTerminalState(MiniDispatcher.java:138) >at > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$startJobManagerRunner$6(Dispatcher.java:341) >at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) >at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) >at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) >at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332) >at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158) >at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70) >at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142) >at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) >at > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) >at akka.actor.Actor$class.aroundReceive(Actor.scala:502) >at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) >at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) >at akka.actor.ActorCell.invoke(ActorCell.scala:495) >at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) >at akka.dispatch.Mailbox.run(Mailbox.scala:224) >at akka.dispatch.Mailbox.exec(Mailbox.scala:234) >at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > {code} > h3. *Minimal reproducible example* > {code:java} > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > DataStream text = env.addSource(new SourceFunction() { > @Override > public void
[jira] [Updated] (FLINK-12247) fix NPE when writing an archive file to a FileSystem
[ https://issues.apache.org/jira/browse/FLINK-12247?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lamber-ken updated FLINK-12247: --- Attachment: (was: fix-nep.pathc) > fix NPE when writing an archive file to a FileSystem > > > Key: FLINK-12247 > URL: https://issues.apache.org/jira/browse/FLINK-12247 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN >Affects Versions: 1.6.3, 1.8.0 >Reporter: lamber-ken >Assignee: lamber-ken >Priority: Major > Fix For: 1.9.0 > > Attachments: fix-nep.patch > > > h3. *Issue detail info* > In our hadoop product env, we use fixed-delay restart-strategy. > {code:java} > restart-strategy: fixed-delay > restart-strategy.fixed-delay.attempts: 20 > restart-strategy.fixed-delay.delay: 2 s > {code} > if a flink-job reaches the max attempt count, the flink job will write an > archive file to +FileSystem+ and shut down. > but when +SubtaskExecutionAttemptDetailsHandler+ handle the detail attempt > info of subtask, met NEP. > h3. *Detailed reasons are as follows:* > 0. Assume a scenario, a flink job {color:#ff}reaches the max attempt > count, ( 20 ){color} > 1. +ExecutionVertex+ is a parallel subtask of the execution. Each > +ExecutionVertex+ was created with {color:#660e7a}MAX_ATTEMPTS_HISTORY_SIZE( > default value: 16 ).{color} > 2. when +SubtaskExecutionAttemptDetailsHandler+ hand will get the attempt > from ++priorExecutions, > but priorExecutions just retained > {color:#660e7a}MAX_ATTEMPTS_HISTORY_SIZE{color} elemets, so some element > was dropped from the head of the list(FIFO). so may return null. > h3. *Detailed StackTrace* > {code:java} > java.lang.NullPointerException >at > org.apache.flink.runtime.rest.handler.util.MutableIOMetrics.addIOMetrics(MutableIOMetrics.java:88) >at > org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler.createDetailsInfo(SubtaskExecutionAttemptDetailsHandler.java:140) >at > org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler.archiveJsonWithPath(SubtaskExecutionAttemptDetailsHandler.java:120) >at > org.apache.flink.runtime.webmonitor.WebMonitorEndpoint.archiveJsonWithPath(WebMonitorEndpoint.java:780) >at > org.apache.flink.runtime.dispatcher.JsonResponseHistoryServerArchivist.archiveExecutionGraph(JsonResponseHistoryServerArchivist.java:57) >at > org.apache.flink.runtime.dispatcher.Dispatcher.archiveExecutionGraph(Dispatcher.java:758) >at > org.apache.flink.runtime.dispatcher.Dispatcher.jobReachedGloballyTerminalState(Dispatcher.java:730) >at > org.apache.flink.runtime.dispatcher.MiniDispatcher.jobReachedGloballyTerminalState(MiniDispatcher.java:138) >at > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$startJobManagerRunner$6(Dispatcher.java:341) >at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) >at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) >at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) >at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332) >at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158) >at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70) >at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142) >at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) >at > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) >at akka.actor.Actor$class.aroundReceive(Actor.scala:502) >at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) >at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) >at akka.actor.ActorCell.invoke(ActorCell.scala:495) >at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) >at akka.dispatch.Mailbox.run(Mailbox.scala:224) >at akka.dispatch.Mailbox.exec(Mailbox.scala:234) >at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > {code} > h3. *Minimal reproducible example* > {code:java} > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > DataStream text = env.addSource(new SourceFunction() { > @Override > public void
[jira] [Updated] (FLINK-12247) fix NPE when writing an archive file to a FileSystem
[ https://issues.apache.org/jira/browse/FLINK-12247?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lamber-ken updated FLINK-12247: --- Attachment: fix-nep.pathc > fix NPE when writing an archive file to a FileSystem > > > Key: FLINK-12247 > URL: https://issues.apache.org/jira/browse/FLINK-12247 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN >Affects Versions: 1.6.3, 1.8.0 >Reporter: lamber-ken >Assignee: lamber-ken >Priority: Major > Fix For: 1.9.0 > > Attachments: fix-nep.pathc > > > h3. *Issue detail info* > In our hadoop product env, we use fixed-delay restart-strategy. > {code:java} > restart-strategy: fixed-delay > restart-strategy.fixed-delay.attempts: 20 > restart-strategy.fixed-delay.delay: 2 s > {code} > if a flink-job reaches the max attempt count, the flink job will write an > archive file to +FileSystem+ and shut down. > but when +SubtaskExecutionAttemptDetailsHandler+ handle the detail attempt > info of subtask, met NEP. > h3. *Detailed reasons are as follows:* > 0. Assume a scenario, a flink job {color:#ff}reaches the max attempt > count, ( 20 ){color} > 1. +ExecutionVertex+ is a parallel subtask of the execution. Each > +ExecutionVertex+ was created with {color:#660e7a}MAX_ATTEMPTS_HISTORY_SIZE( > default value: 16 ).{color} > 2. when +SubtaskExecutionAttemptDetailsHandler+ hand will get the attempt > from ++priorExecutions, > but priorExecutions just retained > {color:#660e7a}MAX_ATTEMPTS_HISTORY_SIZE{color} elemets, so some element > was dropped from the head of the list(FIFO). so may return null. > h3. *Detailed StackTrace* > {code:java} > java.lang.NullPointerException >at > org.apache.flink.runtime.rest.handler.util.MutableIOMetrics.addIOMetrics(MutableIOMetrics.java:88) >at > org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler.createDetailsInfo(SubtaskExecutionAttemptDetailsHandler.java:140) >at > org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler.archiveJsonWithPath(SubtaskExecutionAttemptDetailsHandler.java:120) >at > org.apache.flink.runtime.webmonitor.WebMonitorEndpoint.archiveJsonWithPath(WebMonitorEndpoint.java:780) >at > org.apache.flink.runtime.dispatcher.JsonResponseHistoryServerArchivist.archiveExecutionGraph(JsonResponseHistoryServerArchivist.java:57) >at > org.apache.flink.runtime.dispatcher.Dispatcher.archiveExecutionGraph(Dispatcher.java:758) >at > org.apache.flink.runtime.dispatcher.Dispatcher.jobReachedGloballyTerminalState(Dispatcher.java:730) >at > org.apache.flink.runtime.dispatcher.MiniDispatcher.jobReachedGloballyTerminalState(MiniDispatcher.java:138) >at > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$startJobManagerRunner$6(Dispatcher.java:341) >at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) >at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) >at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) >at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332) >at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158) >at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70) >at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142) >at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) >at > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) >at akka.actor.Actor$class.aroundReceive(Actor.scala:502) >at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) >at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) >at akka.actor.ActorCell.invoke(ActorCell.scala:495) >at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) >at akka.dispatch.Mailbox.run(Mailbox.scala:224) >at akka.dispatch.Mailbox.exec(Mailbox.scala:234) >at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > {code} > h3. *Minimal reproducible example* > {code:java} > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > DataStream text = env.addSource(new SourceFunction() { > @Override > public void
[GitHub] [flink] dianfu commented on issue #7196: [FLINK-10974] [table] Add support for flatMap to table API
dianfu commented on issue #7196: [FLINK-10974] [table] Add support for flatMap to table API URL: https://github.com/apache/flink/pull/7196#issuecomment-484821808 @sunjincheng121 Thanks a lot for the review. Updated the PR accordingly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW edited a comment on issue #8181: [FLINK-12199][network] Refactor IOMetrics to not distinguish between local/remote in/out bytes
zhijiangW edited a comment on issue #8181: [FLINK-12199][network] Refactor IOMetrics to not distinguish between local/remote in/out bytes URL: https://github.com/apache/flink/pull/8181#issuecomment-484820243 Thanks for the reviews @azagrebin ! If I understand correctly, the total IO related metrics could be divided into two groups. One group is general and suitable for all `ShuffleService` implementations such as current `numBytesIn/Out`, `numRecordsIn/Out` which would be still created in current `TaskIOMetrics`. Another group is special for current `NetworkEnvironment` implementation. We could create two private classes `NetworkInput/OutputMetrics` inside `NetworkEnvironment`, and the current metrics of `numBytesInLocal/Remote`, `numBuffersInLocal/Remote`, `input/outputQueueLength`, `in/outPoolUsage` could be migrated from `TaskIOMetricGroup` into new `NetworkInput/OutputMetrics`, then we could pass these special metrics into local/remote channels if needed. Maybe we could not need the new introduced `InputGateWithMetrics` future. We could pass general `Counter` (`numBytesIn`, `numBuffersIn`) from `TaskIOMetrics` in the constructor of `SingleInputGate` to update these counters properly, not pass specific `TaskIOMetricGroup` in the constructor. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on issue #8181: [FLINK-12199][network] Refactor IOMetrics to not distinguish between local/remote in/out bytes
zhijiangW commented on issue #8181: [FLINK-12199][network] Refactor IOMetrics to not distinguish between local/remote in/out bytes URL: https://github.com/apache/flink/pull/8181#issuecomment-484820243 Thanks for the reviews @azagrebin ! If I understand correctly, the total IO related metrics could be divided into two groups. One group is general and suitable for all `ShuffleService` implementations such as current bytesIn/Out, recordsIn/Out which would be still created in current `TaskIOMetrics`. Another group is special for current `NetworkEnvironment` implementation. We could create two private classes `NetworkInput/OutputMetrics` inside `NetworkEnvironment`, and the current metrics of `numBytesInLocal/Remote`, `numBuffersInLocal/Remote`, `input/outputQueueLength`, `in/outPoolUsage` could be migrated from `TaskIOMetricGroup` into `NetworkInput/OutputMetrics`, then we could pass these special metrics into local/remote channels. Maybe we could not need the new introduced `InputGateWithMetrics` future. We could pass general `Counter` (`numBytesIn`, `numBuffersIn`) from `TaskIOMetrics` in the constructor of `SingleInputGate` to update these values properly, not pass specific `TaskIOMetricGroup` in the constructor. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12247) fix NPE when writing an archive file to a FileSystem
[ https://issues.apache.org/jira/browse/FLINK-12247?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lamber-ken updated FLINK-12247: --- Description: h3. *Issue detail info* In our hadoop product env, we use fixed-delay restart-strategy. {code:java} restart-strategy: fixed-delay restart-strategy.fixed-delay.attempts: 20 restart-strategy.fixed-delay.delay: 2 s {code} if a flink-job reaches the max attempt count, the flink job will write an archive file to +FileSystem+ and shut down. but when +SubtaskExecutionAttemptDetailsHandler+ handle the detail attempt info of subtask, met NEP. h3. *Detailed reasons are as follows:* 0. Assume a scenario, a flink job {color:#ff}reaches the max attempt count, ( 20 ){color} 1. +ExecutionVertex+ is a parallel subtask of the execution. Each +ExecutionVertex+ was created with {color:#660e7a}MAX_ATTEMPTS_HISTORY_SIZE( default value: 16 ).{color} 2. when +SubtaskExecutionAttemptDetailsHandler+ hand will get the attempt from ++priorExecutions, but priorExecutions just retained {color:#660e7a}MAX_ATTEMPTS_HISTORY_SIZE{color} elemets, so some element was dropped from the head of the list(FIFO). so may return null. h3. *Detailed StackTrace* {code:java} java.lang.NullPointerException at org.apache.flink.runtime.rest.handler.util.MutableIOMetrics.addIOMetrics(MutableIOMetrics.java:88) at org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler.createDetailsInfo(SubtaskExecutionAttemptDetailsHandler.java:140) at org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler.archiveJsonWithPath(SubtaskExecutionAttemptDetailsHandler.java:120) at org.apache.flink.runtime.webmonitor.WebMonitorEndpoint.archiveJsonWithPath(WebMonitorEndpoint.java:780) at org.apache.flink.runtime.dispatcher.JsonResponseHistoryServerArchivist.archiveExecutionGraph(JsonResponseHistoryServerArchivist.java:57) at org.apache.flink.runtime.dispatcher.Dispatcher.archiveExecutionGraph(Dispatcher.java:758) at org.apache.flink.runtime.dispatcher.Dispatcher.jobReachedGloballyTerminalState(Dispatcher.java:730) at org.apache.flink.runtime.dispatcher.MiniDispatcher.jobReachedGloballyTerminalState(MiniDispatcher.java:138) at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$startJobManagerRunner$6(Dispatcher.java:341) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) at akka.actor.Actor$class.aroundReceive(Actor.scala:502) at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) at akka.actor.ActorCell.invoke(ActorCell.scala:495) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:224) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) {code} h3. *Minimal reproducible example* {code:java} public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream text = env.addSource(new SourceFunction() { @Override public void run(SourceContext ctx) throws Exception { while (true) { ctx.collect(""); Thread.sleep(100); } } @Override public void cancel() { } }); text.addSink(new SinkFunction() { @Override public void invoke(String value, Context context) throws Exception { System.out.println(1 / 0); } }); env.execute(); } {code} was: h3. *Issue detail info* In our hadoop product env, we use fixed-delay restart-strategy. {code:java} restart-strategy: fixed-delay restart-strategy.fixed-delay.attempts: 20 restart-strategy.fixed-delay.delay: 2 s {code} if a flink-job
[jira] [Updated] (FLINK-12247) fix NPE when writing an archive file to a FileSystem
[ https://issues.apache.org/jira/browse/FLINK-12247?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lamber-ken updated FLINK-12247: --- Description: h3. *Issue detail info* In our hadoop product env, we use fixed-delay restart-strategy. {code:java} restart-strategy: fixed-delay restart-strategy.fixed-delay.attempts: 20 restart-strategy.fixed-delay.delay: 2 s {code} if a flink-job reaches the max attempt count, the flink job will write an archive file to +FileSystem+ and shut down. but when +SubtaskExecutionAttemptDetailsHandler+ handle the detail attempt info of subtask, met NEP. h3. *Detailed reasons are as follows:* 0. Assume a scenario, a flink job {color:#ff}reaches the max attempt count, ( 20 ){color} 1. +ExecutionVertex+ is a parallel subtask of the execution. Each +ExecutionVertex+ was created with {color:#660e7a}MAX_ATTEMPTS_HISTORY_SIZE( default value: 16 ).{color} 2. when +SubtaskExecutionAttemptDetailsHandler+ hand will get the attempt from ++priorExecutions, but priorExecutions just retained {color:#660e7a}MAX_ATTEMPTS_HISTORY_SIZE{color} elemets, so some element was dropped from the head of the list(FIFO). so may return null. h3. *Detailed StackTrace* {code:java} java.lang.NullPointerException at org.apache.flink.runtime.rest.handler.util.MutableIOMetrics.addIOMetrics(MutableIOMetrics.java:88) at org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler.createDetailsInfo(SubtaskExecutionAttemptDetailsHandler.java:140) at org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler.archiveJsonWithPath(SubtaskExecutionAttemptDetailsHandler.java:120) at org.apache.flink.runtime.webmonitor.WebMonitorEndpoint.archiveJsonWithPath(WebMonitorEndpoint.java:780) at org.apache.flink.runtime.dispatcher.JsonResponseHistoryServerArchivist.archiveExecutionGraph(JsonResponseHistoryServerArchivist.java:57) at org.apache.flink.runtime.dispatcher.Dispatcher.archiveExecutionGraph(Dispatcher.java:758) at org.apache.flink.runtime.dispatcher.Dispatcher.jobReachedGloballyTerminalState(Dispatcher.java:730) at org.apache.flink.runtime.dispatcher.MiniDispatcher.jobReachedGloballyTerminalState(MiniDispatcher.java:138) at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$startJobManagerRunner$6(Dispatcher.java:341) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) at akka.actor.Actor$class.aroundReceive(Actor.scala:502) at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) at akka.actor.ActorCell.invoke(ActorCell.scala:495) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:224) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) {code} h3. *minimal reproducible example* {code:java} public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream text = env.addSource(new SourceFunction() { @Override public void run(SourceContext ctx) throws Exception { while (true) { ctx.collect(""); Thread.sleep(100); } } @Override public void cancel() { } }); text.addSink(new SinkFunction() { @Override public void invoke(String value, Context context) throws Exception { System.out.println(1 / 0); } }); env.execute(); } {code} was: h3. *Issue detail info* In our hadoop product env, we use fixed-delay restart-strategy. {code:java} restart-strategy: fixed-delay restart-strategy.fixed-delay.attempts: 20 restart-strategy.fixed-delay.delay: 2 s {code} if a flink-job reaches
[GitHub] [flink] pnowojski commented on a change in pull request #8124: [FLINK-11877] Implement the runtime handling of the InputSelectable interface
pnowojski commented on a change in pull request #8124: [FLINK-11877] Implement the runtime handling of the InputSelectable interface URL: https://github.com/apache/flink/pull/8124#discussion_r276939116 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java ## @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.SimpleCounter; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; +import org.apache.flink.streaming.api.operators.InputSelectable; +import org.apache.flink.streaming.api.operators.InputSelection; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.metrics.WatermarkGauge; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Input reader for {@link org.apache.flink.streaming.runtime.tasks.TwoInputSelectableStreamTask} + * in the case that the operator is InputSelectable. + * + * @param The type of the records that arrive on the first input + * @param The type of the records that arrive on the second input + */ +@Internal +public class StreamTwoInputSelectableProcessor { + + private static final Logger LOG = LoggerFactory.getLogger(StreamTwoInputSelectableProcessor.class); + + private volatile boolean continuousProcessing = true; + + private final NetworkInput input1; + private final NetworkInput input2; + + private final Object lock; + + private final TwoInputStreamOperator streamOperator; + + private final InputSelectable inputSelector; + + private final AuxiliaryHandler auxiliaryHandler; + + private final CompletableFuture[] listenFutures; + + private final boolean[] isFinished; + + private InputSelection inputSelection; + + private AtomicInteger availableInputsMask = new AtomicInteger(); Review comment: Yes, in case of 2.: > If only one of the inputs is available, always read it until another input become available or itself becomes unavailable/finished. let's start with always checking once per record if the other side of the input became available. After the holiday (we have now a public holiday in Germany) I'll have to think about this a little bit more, what would be the most general and efficient way how to handle this without this overhead. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-10941) Slots prematurely released which still contain unconsumed data
[ https://issues.apache.org/jira/browse/FLINK-10941?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski closed FLINK-10941. -- Resolution: Fixed Fix Version/s: 1.7.3 merged commit 0ba into apache:release-1.7 > Slots prematurely released which still contain unconsumed data > --- > > Key: FLINK-10941 > URL: https://issues.apache.org/jira/browse/FLINK-10941 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.5.5, 1.6.2, 1.7.0 >Reporter: Qi >Assignee: Andrey Zagrebin >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.3, 1.9.0, 1.8.1 > > Time Spent: 40m > Remaining Estimate: 0h > > Our case is: Flink 1.5 batch mode, 32 parallelism to read data source and 4 > parallelism to write data sink. > > The read task worked perfectly with 32 TMs. However when the job was > executing the write task, since only 4 TMs were needed, other 28 TMs were > released. This caused RemoteTransportException in the write task: > > org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: > Connection unexpectedly closed by remote task manager > ’the_previous_TM_used_by_read_task'. This might indicate that the remote task > manager was lost. > at > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelInactive(PartitionRequestClientHandler.java:133) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237) > ... > > After skimming YarnFlinkResourceManager related code, it seems to me that > Flink is releasing TMs when they’re idle, regardless of whether working TMs > need them. > > Put in another way, Flink seems to prematurely release slots which contain > unconsumed data and, thus, eventually release a TM which then fails a > consuming task. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12247) fix NPE when writing an archive file to a FileSystem
[ https://issues.apache.org/jira/browse/FLINK-12247?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lamber-ken updated FLINK-12247: --- Description: h3. *Issue detail info* In our hadoop product env, we use fixed-delay restart-strategy. {code:java} restart-strategy: fixed-delay restart-strategy.fixed-delay.attempts: 20 restart-strategy.fixed-delay.delay: 2 s {code} if a flink-job reaches the max attempt count, the flink job will write an archive file to +FileSystem+ and shut down. but when +SubtaskExecutionAttemptDetailsHandler+ handle the detail attempt info of subtask, met NEP. h3. *Detailed reasons are as follows:* 0. Assume a scenario, a flink job {color:#FF}reaches the max attempt count, ( 20 ){color} 1. +ExecutionVertex+ is a parallel subtask of the execution. Each +ExecutionVertex+ was created with {color:#660e7a}MAX_ATTEMPTS_HISTORY_SIZE( default value: 16 ).{color} 2. when +SubtaskExecutionAttemptDetailsHandler+ hand will get the attempt from ++priorExecutions, but priorExecutions just retained {color:#660e7a}MAX_ATTEMPTS_HISTORY_SIZE{color} elemets, so some element was dropped from the head of the list(FIFO). so may return null. h3. *Detailed StackTrace* {code:java} java.lang.NullPointerException at org.apache.flink.runtime.rest.handler.util.MutableIOMetrics.addIOMetrics(MutableIOMetrics.java:88) at org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler.createDetailsInfo(SubtaskExecutionAttemptDetailsHandler.java:140) at org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler.archiveJsonWithPath(SubtaskExecutionAttemptDetailsHandler.java:120) at org.apache.flink.runtime.webmonitor.WebMonitorEndpoint.archiveJsonWithPath(WebMonitorEndpoint.java:780) at org.apache.flink.runtime.dispatcher.JsonResponseHistoryServerArchivist.archiveExecutionGraph(JsonResponseHistoryServerArchivist.java:57) at org.apache.flink.runtime.dispatcher.Dispatcher.archiveExecutionGraph(Dispatcher.java:758) at org.apache.flink.runtime.dispatcher.Dispatcher.jobReachedGloballyTerminalState(Dispatcher.java:730) at org.apache.flink.runtime.dispatcher.MiniDispatcher.jobReachedGloballyTerminalState(MiniDispatcher.java:138) at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$startJobManagerRunner$6(Dispatcher.java:341) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) at akka.actor.Actor$class.aroundReceive(Actor.scala:502) at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) at akka.actor.ActorCell.invoke(ActorCell.scala:495) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:224) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) {code} was: In our product env, we use fixed-delay restart-strategy which job failed, met NEP when writing an archive file to FileSystem {code:java} java.lang.NullPointerException at org.apache.flink.runtime.rest.handler.util.MutableIOMetrics.addIOMetrics(MutableIOMetrics.java:88) at org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler.createDetailsInfo(SubtaskExecutionAttemptDetailsHandler.java:140) at org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler.archiveJsonWithPath(SubtaskExecutionAttemptDetailsHandler.java:120) at org.apache.flink.runtime.webmonitor.WebMonitorEndpoint.archiveJsonWithPath(WebMonitorEndpoint.java:780) at org.apache.flink.runtime.dispatcher.JsonResponseHistoryServerArchivist.archiveExecutionGraph(JsonResponseHistoryServerArchivist.java:57) at org.apache.flink.runtime.dispatcher.Dispatcher.archiveExecutionGraph(Dispatcher.java:758) at
[GitHub] [flink] pnowojski merged pull request #8201: [FLINK-10941] Keep slots which contain unconsumed result partitions
pnowojski merged pull request #8201: [FLINK-10941] Keep slots which contain unconsumed result partitions URL: https://github.com/apache/flink/pull/8201 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #8205: [FLINK-12238] [TABLE/SQL] Support database related operations in GenericHiveMetastoreCatalog and setup flink-connector-hive module
wuchong commented on a change in pull request #8205: [FLINK-12238] [TABLE/SQL] Support database related operations in GenericHiveMetastoreCatalog and setup flink-connector-hive module URL: https://github.com/apache/flink/pull/8205#discussion_r276929320 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogUtil.java ## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.hive; + +import org.apache.flink.table.catalog.CatalogDatabase; + +import org.apache.hadoop.hive.metastore.api.Database; + +import java.util.Map; + + +/** + * Utils to convert meta objects between Flink and Hive for GenericHiveMetastoreCatalog. + */ +public class GenericHiveMetastoreCatalogUtil { + + private GenericHiveMetastoreCatalogUtil() { + } + + // -- Utils -- + + /** +* Creates a Hive database from CatalogDatabase. +*/ + public static Database createHiveDatabase(String dbName, CatalogDatabase db) { + Map props = db.getProperties(); + return new Database( + dbName, + db.getDescription().get(), Review comment: But the parameter is a `CatalogDatabase`, not a `GenericCatalogDatabase`. Still not sure if it's non-null? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #8205: [FLINK-12238] [TABLE/SQL] Support database related operations in GenericHiveMetastoreCatalog and setup flink-connector-hive module
wuchong commented on a change in pull request #8205: [FLINK-12238] [TABLE/SQL] Support database related operations in GenericHiveMetastoreCatalog and setup flink-connector-hive module URL: https://github.com/apache/flink/pull/8205#discussion_r276928193 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java ## @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.hive; + +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.GenericCatalogDatabase; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.ReadableWritableCatalog; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.util.StringUtils; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.InvalidOperationException; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A catalog that persists all Flink streaming and batch metadata by using Hive metastore as a persistent storage. + */ +public class GenericHiveMetastoreCatalog implements ReadableWritableCatalog { Review comment: GenericHiveMetastoreCatalog => GenericHiveMetaStoreCatalog with upper case for `s`? I find that it's called `HiveMetaStore` in hive. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12247) fix NPE when writing an archive file to a FileSystem
[ https://issues.apache.org/jira/browse/FLINK-12247?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lamber-ken updated FLINK-12247: --- Description: In our product env, we use fixed-delay restart-strategy which job failed, met NEP when writing an archive file to FileSystem {code:java} java.lang.NullPointerException at org.apache.flink.runtime.rest.handler.util.MutableIOMetrics.addIOMetrics(MutableIOMetrics.java:88) at org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler.createDetailsInfo(SubtaskExecutionAttemptDetailsHandler.java:140) at org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler.archiveJsonWithPath(SubtaskExecutionAttemptDetailsHandler.java:120) at org.apache.flink.runtime.webmonitor.WebMonitorEndpoint.archiveJsonWithPath(WebMonitorEndpoint.java:780) at org.apache.flink.runtime.dispatcher.JsonResponseHistoryServerArchivist.archiveExecutionGraph(JsonResponseHistoryServerArchivist.java:57) at org.apache.flink.runtime.dispatcher.Dispatcher.archiveExecutionGraph(Dispatcher.java:758) at org.apache.flink.runtime.dispatcher.Dispatcher.jobReachedGloballyTerminalState(Dispatcher.java:730) at org.apache.flink.runtime.dispatcher.MiniDispatcher.jobReachedGloballyTerminalState(MiniDispatcher.java:138) at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$startJobManagerRunner$6(Dispatcher.java:341) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) at akka.actor.Actor$class.aroundReceive(Actor.scala:502) at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) at akka.actor.ActorCell.invoke(ActorCell.scala:495) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:224) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) {code} was: In our product env, flink restart-strategy config job failed, met NEP when writing an archive file to FileSystem {code:java} java.lang.NullPointerException at org.apache.flink.runtime.rest.handler.util.MutableIOMetrics.addIOMetrics(MutableIOMetrics.java:88) at org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler.createDetailsInfo(SubtaskExecutionAttemptDetailsHandler.java:140) at org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler.archiveJsonWithPath(SubtaskExecutionAttemptDetailsHandler.java:120) at org.apache.flink.runtime.webmonitor.WebMonitorEndpoint.archiveJsonWithPath(WebMonitorEndpoint.java:780) at org.apache.flink.runtime.dispatcher.JsonResponseHistoryServerArchivist.archiveExecutionGraph(JsonResponseHistoryServerArchivist.java:57) at org.apache.flink.runtime.dispatcher.Dispatcher.archiveExecutionGraph(Dispatcher.java:758) at org.apache.flink.runtime.dispatcher.Dispatcher.jobReachedGloballyTerminalState(Dispatcher.java:730) at org.apache.flink.runtime.dispatcher.MiniDispatcher.jobReachedGloballyTerminalState(MiniDispatcher.java:138) at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$startJobManagerRunner$6(Dispatcher.java:341) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142) at
[GitHub] [flink] JingsongLi commented on a change in pull request #8222: [FLINK-11518] [SQL/TABLE] Add partition related catalog APIs and implement them in GenericInMemoryCatalog
JingsongLi commented on a change in pull request #8222: [FLINK-11518] [SQL/TABLE] Add partition related catalog APIs and implement them in GenericInMemoryCatalog URL: https://github.com/apache/flink/pull/8222#discussion_r276928708 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/PartitionAlreadyExistException.java ## @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.exceptions; + +import org.apache.flink.table.catalog.CatalogPartition; +import org.apache.flink.table.catalog.ObjectPath; + +/** + * Exception for adding an already existed partition. + */ +public class PartitionAlreadyExistException extends RuntimeException { + private static final String MSG = "Partition %s of table %s in catalog %s already exists."; + + public PartitionAlreadyExistException( + String catalogName, + ObjectPath tablePath, + CatalogPartition.PartitionSpec partitionSpec) { + + super(String.format(MSG, partitionSpec, tablePath.getFullName(), catalogName), null); Review comment: just remove `, null` ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #8222: [FLINK-11518] [SQL/TABLE] Add partition related catalog APIs and implement them in GenericInMemoryCatalog
JingsongLi commented on a change in pull request #8222: [FLINK-11518] [SQL/TABLE] Add partition related catalog APIs and implement them in GenericInMemoryCatalog URL: https://github.com/apache/flink/pull/8222#discussion_r276926297 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java ## @@ -29,4 +32,15 @@ * @return table statistics */ TableStats getStatistics(); + + /** +* Check if the table is partitioend or not. +*/ + boolean isPartitioned(); + + /** +* +* @return Review comment: comment? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #8222: [FLINK-11518] [SQL/TABLE] Add partition related catalog APIs and implement them in GenericInMemoryCatalog
JingsongLi commented on a change in pull request #8222: [FLINK-11518] [SQL/TABLE] Add partition related catalog APIs and implement them in GenericInMemoryCatalog URL: https://github.com/apache/flink/pull/8222#discussion_r276927642 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java ## @@ -202,6 +212,8 @@ public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) throws Ta if (tableExists(tablePath)) { tables.remove(tablePath); + + partitions.remove(tablePath); Review comment: renameTable too? maybe add test? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #8222: [FLINK-11518] [SQL/TABLE] Add partition related catalog APIs and implement them in GenericInMemoryCatalog
JingsongLi commented on a change in pull request #8222: [FLINK-11518] [SQL/TABLE] Add partition related catalog APIs and implement them in GenericInMemoryCatalog URL: https://github.com/apache/flink/pull/8222#discussion_r276926272 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java ## @@ -29,4 +32,15 @@ * @return table statistics */ TableStats getStatistics(); + + /** +* Check if the table is partitioend or not. +*/ + boolean isPartitioned(); + + /** +* +* @return +*/ + Set getPartitionKeys() throws TableNotPartitionedException; Review comment: LinkedHashSet? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12247) fix NPE when writing an archive file to a FileSystem
[ https://issues.apache.org/jira/browse/FLINK-12247?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lamber-ken updated FLINK-12247: --- Description: In our product env, flink restart-strategy config job failed, met NEP when writing an archive file to FileSystem {code:java} java.lang.NullPointerException at org.apache.flink.runtime.rest.handler.util.MutableIOMetrics.addIOMetrics(MutableIOMetrics.java:88) at org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler.createDetailsInfo(SubtaskExecutionAttemptDetailsHandler.java:140) at org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler.archiveJsonWithPath(SubtaskExecutionAttemptDetailsHandler.java:120) at org.apache.flink.runtime.webmonitor.WebMonitorEndpoint.archiveJsonWithPath(WebMonitorEndpoint.java:780) at org.apache.flink.runtime.dispatcher.JsonResponseHistoryServerArchivist.archiveExecutionGraph(JsonResponseHistoryServerArchivist.java:57) at org.apache.flink.runtime.dispatcher.Dispatcher.archiveExecutionGraph(Dispatcher.java:758) at org.apache.flink.runtime.dispatcher.Dispatcher.jobReachedGloballyTerminalState(Dispatcher.java:730) at org.apache.flink.runtime.dispatcher.MiniDispatcher.jobReachedGloballyTerminalState(MiniDispatcher.java:138) at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$startJobManagerRunner$6(Dispatcher.java:341) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) at akka.actor.Actor$class.aroundReceive(Actor.scala:502) at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) at akka.actor.ActorCell.invoke(ActorCell.scala:495) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:224) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) {code} was: job failed, met NEP when writing an archive file to FileSystem {code:java} java.lang.NullPointerException at org.apache.flink.runtime.rest.handler.util.MutableIOMetrics.addIOMetrics(MutableIOMetrics.java:88) at org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler.createDetailsInfo(SubtaskExecutionAttemptDetailsHandler.java:140) at org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler.archiveJsonWithPath(SubtaskExecutionAttemptDetailsHandler.java:120) at org.apache.flink.runtime.webmonitor.WebMonitorEndpoint.archiveJsonWithPath(WebMonitorEndpoint.java:780) at org.apache.flink.runtime.dispatcher.JsonResponseHistoryServerArchivist.archiveExecutionGraph(JsonResponseHistoryServerArchivist.java:57) at org.apache.flink.runtime.dispatcher.Dispatcher.archiveExecutionGraph(Dispatcher.java:758) at org.apache.flink.runtime.dispatcher.Dispatcher.jobReachedGloballyTerminalState(Dispatcher.java:730) at org.apache.flink.runtime.dispatcher.MiniDispatcher.jobReachedGloballyTerminalState(MiniDispatcher.java:138) at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$startJobManagerRunner$6(Dispatcher.java:341) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142) at
[jira] [Updated] (FLINK-12139) Flink on mesos - Parameterize disk space needed.
[ https://issues.apache.org/jira/browse/FLINK-12139?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-12139: --- Labels: pull-request-available (was: ) > Flink on mesos - Parameterize disk space needed. > > > Key: FLINK-12139 > URL: https://issues.apache.org/jira/browse/FLINK-12139 > Project: Flink > Issue Type: Improvement > Components: Deployment / Mesos >Reporter: Juan >Assignee: Oleksandr Nitavskyi >Priority: Minor > Labels: pull-request-available > > We are having a small issue while trying to deploy Flink on Mesos using > marathon. In our set up of Mesos we are required to specify the amount of > disk space we want to have for the applications we deploy there. > The current default value in Flink is 0 and it's currently is not > parameterizable. This means that we ask 0 disk space for our instances so > Flink can't work. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] flinkbot commented on issue #8224: [FLINK-12139][Mesos] Add disk space parameter.
flinkbot commented on issue #8224: [FLINK-12139][Mesos] Add disk space parameter. URL: https://github.com/apache/flink/pull/8224#issuecomment-484801689 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] juangentile opened a new pull request #8224: [FLINK-12139][Mesos] Add disk space parameter.
juangentile opened a new pull request #8224: [FLINK-12139][Mesos] Add disk space parameter. URL: https://github.com/apache/flink/pull/8224 ## What is the purpose of the change *Add a parameter to set the required disk space for a Mesos deployment. Before this change the default value is 0 with no option to change it. We maintain the default 0, we only make it parameterizable* ## Brief change log - *The new parameter is loaded in the class MesosTaskManagerParameters.java* - *The new parameter is used in the class MesosResourceManager* ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): don't know - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: yes - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on issue #8205: [FLINK-12238] [TABLE/SQL] Support database related operations in GenericHiveMetastoreCatalog and setup flink-connector-hive module
wuchong commented on issue #8205: [FLINK-12238] [TABLE/SQL] Support database related operations in GenericHiveMetastoreCatalog and setup flink-connector-hive module URL: https://github.com/apache/flink/pull/8205#issuecomment-484799383 I think we should also add a NOTICE file to `flink-connector-hive` module, because it bundles hive dependency. You can take this as an example https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-elasticsearch2/src/main/resources/META-INF/NOTICE. See more: https://cwiki.apache.org/confluence/display/FLINK/Licensing This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #7571: [FLINK-10724] Refactor failure handling in check point coordinator
yanghua commented on a change in pull request #7571: [FLINK-10724] Refactor failure handling in check point coordinator URL: https://github.com/apache/flink/pull/7571#discussion_r276927208 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java ## @@ -101,7 +100,7 @@ private final CheckpointStorageLocation targetLocation; /** The promise to fulfill once the checkpoint has been completed. */ - private final CompletableFuture onCompletionPromise; + private final CompletableFuture onCompletionPromise; Review comment: @StefanRRichter I have refactored my PR based on the new design document. Would you please have a look? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on issue #8202: [FLINK-12133] [table-runtime-blink] Support unbounded aggregate in streaming table runtime
wuchong commented on issue #8202: [FLINK-12133] [table-runtime-blink] Support unbounded aggregate in streaming table runtime URL: https://github.com/apache/flink/pull/8202#issuecomment-484797370 Rebased. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #8205: [FLINK-12238] [TABLE/SQL] Support database related operations in GenericHiveMetastoreCatalog and setup flink-connector-hive module
wuchong commented on a change in pull request #8205: [FLINK-12238] [TABLE/SQL] Support database related operations in GenericHiveMetastoreCatalog and setup flink-connector-hive module URL: https://github.com/apache/flink/pull/8205#discussion_r276925651 ## File path: flink-connectors/flink-connector-hive/pom.xml ## @@ -0,0 +1,360 @@ + + +http://maven.apache.org/POM/4.0.0; +xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> + + 4.0.0 + + + org.apache.flink + flink-connectors + 1.9-SNAPSHOT + .. + + + flink-connector-hive_${scala.binary.version} Review comment: I think only the modules depends on a scala module (e.g. `flink-streaming-java_2.11`) need to specify the scala version. Other modules shouldn't specify scala version. For example, `flink-core`, `flink-table-common`, `flink-table-api-java` do not have a scala version. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API
sunjincheng121 commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API URL: https://github.com/apache/flink/pull/7196#discussion_r276917451 ## File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala ## @@ -305,6 +305,36 @@ class CorrelateITCase extends AbstractTestBase { ) } + @Test + def testFlatMap(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = StreamTableEnvironment.create(env) +StreamITCase.testResults = mutable.MutableList() + +val func2 = new TableFunc2 +val ds = testData(env).toTable(tEnv, 'a, 'b, 'c) + // test non alias + .flatMap(func2('c)) + .select('f0, 'f1) + // test the output field name of flatMap is the same as the field name of input table Review comment: ` input table`->`the input table`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API
sunjincheng121 commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API URL: https://github.com/apache/flink/pull/7196#discussion_r276922137 ## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/operations/OperationTreeBuilder.scala ## @@ -365,6 +367,53 @@ class OperationTreeBuilder(private val tableEnv: TableEnvironment) { mapFunction.asInstanceOf[CallExpression].getFunctionDefinition.getType == SCALAR_FUNCTION } + def flatMap(tableFunction: Expression, child: TableOperation): TableOperation = { + +val resolver = resolverFor(tableCatalog, functionCatalog, child).build() +val resolvedTableFunction = resolveSingleExpression(tableFunction, resolver) + +if (!isTableFunction(resolvedTableFunction)) { + throw new ValidationException("Only TableFunction can be used in the flatMap operator.") +} + +val originFieldNames: Seq[String] = + resolvedTableFunction.asInstanceOf[CallExpression].getFunctionDefinition match { +case tfd: TableFunctionDefinition => + UserDefinedFunctionUtils.getFieldInfo(tfd.getResultType)._1 + } + +def getUniqueName(inputName: String, usedFieldNames: Seq[String]): String = { + var i = 0 + var resultName = inputName + while (usedFieldNames.contains(resultName)) { +resultName = resultName + "_" + i +i += 1 + } + resultName +} + +val usedFieldNames = child.asInstanceOf[LogicalNode].output.map(_.name).toBuffer +val newFieldNames = originFieldNames.map({ e => + val resultName = getUniqueName(e, usedFieldNames) + usedFieldNames.append(resultName) + resultName +}) + +val renamedTableFunction = ApiExpressionUtils.call( + BuiltInFunctionDefinitions.AS, + resolvedTableFunction +: newFieldNames.map(ApiExpressionUtils.valueLiteral(_)): _*) +val joinNode = joinLateral(child, renamedTableFunction, JoinType.INNER, Optional.empty()) +val dropNode = dropColumns( Review comment: `dorpNode` ->`rightNode`, becaust `joinNode`=`leftNode`+`rightNode`. after `dropCloumns()`, all columns if from `rightNode`. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API
sunjincheng121 commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API URL: https://github.com/apache/flink/pull/7196#discussion_r276914756 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java ## @@ -1019,10 +1020,40 @@ * Scala Example: * * -* {@code val func = new MyMapFunction() +* {@code +* val func = new MyMapFunction() * tab.map(func('c)) * } * */ Table map(Expression mapFunction); + + /** +* Performs a flatMap operation with a table function. +* +* Example: +* +* +* {@code +* TableFunction func = new MyFlatMapFunction(); +* tableEnv.registerFunction("func", func); +* table.flatMap("func(c)"); +* } +* +*/ + Table flatMap(String tableFunction); + + /** +* Performs a flatMap operation with a table function. Review comment: Performs a flatMap operation with an user-defined table function or built-in table function. The output will be flattened if the output type is a composite type. ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API
sunjincheng121 commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API URL: https://github.com/apache/flink/pull/7196#discussion_r276923999 ## File path: docs/dev/table/tableApi.md ## @@ -1842,11 +1843,27 @@ ScalarFunction func = new MyMapFunction(); tableEnv.registerFunction("func", func); Table table = input - .map(func("c")).as("a, b") + .map("func(c)").as("a, b") {% endhighlight %} + + +FlatMap +Batch Streaming + + +Performs a flatMap operation with a table function. +{% highlight java %} +TableFunction func = new MyFlatMapFunction(); +tableEnv.registerFunction("func", func); Review comment: I think is better to add the `MyFlatMapFunction()` definition. So that user can clearly know why we can `as("a, b")`. What do you think? If so, please improve the map example as well. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on issue #8205: [FLINK-12238] [TABLE/SQL] Support database related operations in GenericHiveMetastoreCatalog and setup flink-connector-hive module
KurtYoung commented on issue #8205: [FLINK-12238] [TABLE/SQL] Support database related operations in GenericHiveMetastoreCatalog and setup flink-connector-hive module URL: https://github.com/apache/flink/pull/8205#issuecomment-484794093 One minor thing: could you make the component name in commit message more align with other commits? In this case, I think [table] is enough. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on issue #8202: [FLINK-12133] [table-runtime-blink] Support unbounded aggregate in streaming table runtime
KurtYoung commented on issue #8202: [FLINK-12133] [table-runtime-blink] Support unbounded aggregate in streaming table runtime URL: https://github.com/apache/flink/pull/8202#issuecomment-484793449 I just merged another PR, please resolve conflicts again This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung closed pull request #8176: [FLINK-12192] [table-planner-blink] Add support for generating optimized logical plan for grouping sets and distinct aggregate
KurtYoung closed pull request #8176: [FLINK-12192] [table-planner-blink] Add support for generating optimized logical plan for grouping sets and distinct aggregate URL: https://github.com/apache/flink/pull/8176 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-12192) Add support for generating optimized logical plan for grouping sets and distinct aggregate
[ https://issues.apache.org/jira/browse/FLINK-12192?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kurt Young closed FLINK-12192. -- Resolution: Fixed Fix Version/s: 1.9.0 fixed in 1c55f7a7e18e03f5b777c9fbe3234055904b00d6 > Add support for generating optimized logical plan for grouping sets and > distinct aggregate > -- > > Key: FLINK-12192 > URL: https://issues.apache.org/jira/browse/FLINK-12192 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Planner >Reporter: godfrey he >Assignee: godfrey he >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > This issue aims to supports generating optimized logical plan for grouping > sets and distinct aggregate. (mentioned in FLINK-12076 and FLINK-12098) > for batch, query with distinct aggregate will be rewritten into two > non-distinct aggregates by extended > [AggregateExpandDistinctAggregatesRule|https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/rel/rules/AggregateExpandDistinctAggregatesRule.java], > the first aggregate computes the distinct key and non-distinct aggregate > function, and the second aggregate computes the distinct aggregate function > based on first aggregate result. The first aggregate has grouping sets if > there are more than one distinct aggregate functions on different fields. > for stream, query with distinct aggregate is handled by SplitAggregateRule in > FLINK-12161. > query with grouping sets (or cube, rollup) will be rewritten into a regular > aggregate with expand, and the expand node will duplicates the input data for > each simple group. > e.g. > {noformat} > schema: > MyTable: a: INT, b: BIGINT, c: VARCHAR(32), d: VARCHAR(32) > Original records: > +-+-+-+-+ > | a | b | c | d | > +-+-+-+-+ > | 1 | 1 | c1 | d1 | > +-+-+-+-+ > | 1 | 2 | c1 | d2 | > +-+-+-+-+ > | 2 | 1 | c1 | d1 | > +-+-+-+-+ > SELECT a, c, SUM(b) as b FROM MyTable GROUP BY GROUPING SETS (a, c) > logical plan after expanded: > LogicalCalc(expr#0..3=[{inputs}], proj#0..1=[{exprs}], b=[$t3]) > LogicalAggregate(group=[{0, 2, 3}], groups=[[]], b=[SUM($1)]) > LogicalExpand(projects=[{a=[$0], b=[$1], c=[null], $e=[1]}, > {a=[null], b=[$1], c=[$2], $e=[2]}]) > LogicalNativeTableScan(table=[[builtin, default, MyTable]]) > notes: > '$e = 1' is equivalent to 'group by a' > '$e = 2' is equivalent to 'group by c' > expanded records: > +-+-+-+-+ > | a | b | c | $e | > +-+-+-+-+---+--- > | 1 | 1 | null| 1 | | > +-+-+-+-+ records expanded by record1 > | null| 1 | c1 | 2 | | > +-+-+-+-+---+--- > | 1 | 2 | null| 1 | | > +-+-+-+-+ records expanded by record2 > | null| 2 | c1 | 2 | | > +-+-+-+-+---+--- > | 2 | 1 | null| 1 | | > +-+-+-+-+ records expanded by record3 > | null| 1 | c1 | 2 | | > +-+-+-+-+---+--- > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)