[GitHub] [flink] link3280 commented on issue #8068: [FLINK-12042][StateBackends] Fix RocksDBStateBackend's mistaken usage of default filesystem

2019-04-19 Thread GitBox
link3280 commented on issue #8068: [FLINK-12042][StateBackends] Fix 
RocksDBStateBackend's mistaken usage of default filesystem
URL: https://github.com/apache/flink/pull/8068#issuecomment-485058426
 
 
   @tillrohrmann  @dawidwys PTAL


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-12269) Support Temporal Table Join in blink planner

2019-04-19 Thread Jark Wu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12269?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu reassigned FLINK-12269:
---

Assignee: Jark Wu

> Support Temporal Table Join in blink planner
> 
>
> Key: FLINK-12269
> URL: https://issues.apache.org/jira/browse/FLINK-12269
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Planner
>Reporter: Jark Wu
>Assignee: Jark Wu
>Priority: Major
>
> Support translate following "FOR SYSTEM_TIME AS OF" query into 
> {{StreamExecTemporalTableJoin}}.
> {code:sql}
> SELECT
>   o.amout, o.currency, r.rate, o.amount * r.rate
> FROM
>   Orders AS o
>   JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
>   ON r.currency = o.currency
> {code}
> This is an extension to current temporal join (FLINK-9738) using a standard 
> syntax introduced in Calcite 1.19.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12269) Support Temporal Table Join in blink planner

2019-04-19 Thread Jark Wu (JIRA)
Jark Wu created FLINK-12269:
---

 Summary: Support Temporal Table Join in blink planner
 Key: FLINK-12269
 URL: https://issues.apache.org/jira/browse/FLINK-12269
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / Planner
Reporter: Jark Wu


Support translate following "FOR SYSTEM_TIME AS OF" query into 
{{StreamExecTemporalTableJoin}}.

{code:sql}
SELECT
  o.amout, o.currency, r.rate, o.amount * r.rate
FROM
  Orders AS o
  JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
  ON r.currency = o.currency
{code}

This is an extension to current temporal join (FLINK-9738) using a standard 
syntax introduced in Calcite 1.19.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12268) Port SharedSlotsTest to new code base

2019-04-19 Thread leesf (JIRA)
leesf created FLINK-12268:
-

 Summary: Port SharedSlotsTest to new code base
 Key: FLINK-12268
 URL: https://issues.apache.org/jira/browse/FLINK-12268
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: leesf
Assignee: leesf
 Fix For: 1.9.0


Get rid of Instance.

Port SchedulerTestUtils#getRandomInstance to new code.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12267) Port SimpleSlotTest to new code base

2019-04-19 Thread leesf (JIRA)
leesf created FLINK-12267:
-

 Summary: Port SimpleSlotTest to new code base
 Key: FLINK-12267
 URL: https://issues.apache.org/jira/browse/FLINK-12267
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination, Tests
Reporter: leesf
Assignee: leesf
 Fix For: 1.9.0


Mainly get rid of `Instance`



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12264) Port ExecutionGraphTestUtils to new code base

2019-04-19 Thread leesf (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12264?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

leesf updated FLINK-12264:
--
Component/s: Runtime / Coordination

> Port ExecutionGraphTestUtils to new code base
> -
>
> Key: FLINK-12264
> URL: https://issues.apache.org/jira/browse/FLINK-12264
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination, Tests
>Reporter: leesf
>Assignee: leesf
>Priority: Major
> Fix For: 1.9.0
>
>
> Mainly get rid of Instance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12180) Port ExecutionVertexCancelTest to new codebase

2019-04-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12180?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12180:
---
Labels: pull-request-available  (was: )

> Port ExecutionVertexCancelTest to new codebase
> --
>
> Key: FLINK-12180
> URL: https://issues.apache.org/jira/browse/FLINK-12180
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination, Tests
>Affects Versions: 1.9.0
>Reporter: TisonKun
>Assignee: leesf
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>
> Port {{ExecutionVertexCancelTest}} to new codebase.
> Mainly get rid of the usage of {{Instance}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] flinkbot commented on issue #8227: [FLINK-12180][Tests] Port ExecutionVertexCancelTest to new codebase

2019-04-19 Thread GitBox
flinkbot commented on issue #8227: [FLINK-12180][Tests] Port 
ExecutionVertexCancelTest to new codebase
URL: https://github.com/apache/flink/pull/8227#issuecomment-485046244
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] leesf opened a new pull request #8227: [FLINK-12180][Tests] Port ExecutionVertexCancelTest to new codebase

2019-04-19 Thread GitBox
leesf opened a new pull request #8227: [FLINK-12180][Tests] Port 
ExecutionVertexCancelTest to new codebase
URL: https://github.com/apache/flink/pull/8227
 
 
   
   ## What is the purpose of the change
   
   Port ExecutionVertexCancelTest to new codebase
   
   
   ## Brief change log
   
 - *Remove the use of Instance.*
 - *Use TestingLogicalSlot in replace of Instance#allocateSimpleSlot*
   
   
   ## Verifying this change
   
   This change is a trivial rework.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-7009) dogstatsd mode in statsd reporter

2019-04-19 Thread Brandon (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7009?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1684#comment-1684
 ] 

Brandon commented on FLINK-7009:


Any chance of this getting merged in? Still would be extremely useful.

> dogstatsd mode in statsd reporter
> -
>
> Key: FLINK-7009
> URL: https://issues.apache.org/jira/browse/FLINK-7009
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics
>Affects Versions: 1.4.0
> Environment: org.apache.flink.metrics.statsd.StatsDReporter
>Reporter: David Brinegar
>Priority: Major
>
> The current statsd reporter can only report a subset of Flink metrics owing 
> to the manner in which Flink variables are handled, mainly around invalid 
> characters and metrics too long.  As an option, it would be quite useful to 
> have a stricter dogstatsd compliant output.  Dogstatsd metrics are tagged, 
> should be less than 200 characters including tag names and values, be 
> alphanumeric + underbar, delimited by periods.  As a further pragmatic 
> restriction, negative and other invalid values should be ignored rather than 
> sent to the backend.  These restrictions play well with a broad set of 
> collectors and time series databases.
> This mode would:
> * convert output to ascii alphanumeric characters with underbar, delimited by 
> periods.  Runs of invalid characters within a metric segment would be 
> collapsed to a single underbar.
> * report all Flink variables as tags
> * compress overly long segments, say over 50 chars, to a symbolic 
> representation of the metric name, to preserve the unique metric time series 
> but avoid downstream truncation
> * compress 32 character Flink IDs like tm_id, task_id, job_id, 
> task_attempt_id, to the first 8 characters, again to preserve enough 
> distinction amongst metrics while trimming up to 96 characters from the metric
> * remove object references from names, such as the instance hash id of the 
> serializer
> * drop negative or invalid numeric values such as "n/a", "-1" which is used 
> for unknowns like JVM.Memory.NonHeap.Max, and "-9223372036854775808" which is 
> used for unknowns like currentLowWaterMark
> With these in place, it becomes quite reasonable to support LatencyGauge 
> metrics as well.
> One idea for symbolic compression is to take the first 10 valid characters 
> plus a hash of the long name.  For example, a value like this operator_name:
> {code:java}
> TriggerWindow(TumblingProcessingTimeWindows(5000), 
> ReducingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.PojoSerializer@f3395ffa,
>  
> reduceFunction=org.apache.flink.streaming.examples.socket.SocketWindowWordCount$1@4201c465},
>  ProcessingTimeTrigger(), WindowedStream.reduce(WindowedStream.java-301))
> {code}
> would first drop the instance references.  The stable version would be:
>  
> {code:java}
> TriggerWindow(TumblingProcessingTimeWindows(5000), 
> ReducingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.PojoSerializer,
>  
> reduceFunction=org.apache.flink.streaming.examples.socket.SocketWindowWordCount$1},
>  ProcessingTimeTrigger(), WindowedStream.reduce(WindowedStream.java-301))
> {code}
> and then the compressed name would be the first ten valid characters plus the 
> hash of the stable string:
> {code}
> TriggerWin_d8c007da
> {code}
> This is just one way of dealing with unruly default names, the main point 
> would be to preserve the metrics so they are valid, avoid truncation, and can 
> be aggregated along other dimensions even if this particular dimension is 
> hard to parse after the compression.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] bowenli86 commented on issue #8200: [FLINK-11614] [chinese-translation,Documentation] Translate the "Conf…

2019-04-19 Thread GitBox
bowenli86 commented on issue #8200: [FLINK-11614] 
[chinese-translation,Documentation] Translate the "Conf…
URL: https://github.com/apache/flink/pull/8200#issuecomment-485025137
 
 
   This is some really good stuff! Exciting to see some official users' 
contribution on doc translation. Can you retitle the PR to be complete?
   
   BTW, we may also need to wait for 
https://github.com/apache/flink-web/pull/190 on finalizing the review process.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on issue #7754: [FLINK-11530] [docs] Support multiple languages for the docs framework

2019-04-19 Thread GitBox
bowenli86 commented on issue #7754: [FLINK-11530] [docs] Support multiple 
languages for the docs framework
URL: https://github.com/apache/flink/pull/7754#issuecomment-485024126
 
 
   @kkrugler FYI


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on issue #8205: [FLINK-12238] [TABLE/SQL] Support database related operations in GenericHiveMetastoreCatalog and setup flink-connector-hive module

2019-04-19 Thread GitBox
bowenli86 commented on issue #8205: [FLINK-12238] [TABLE/SQL] Support database 
related operations in GenericHiveMetastoreCatalog and setup 
flink-connector-hive module
URL: https://github.com/apache/flink/pull/8205#issuecomment-485022646
 
 
   > I think we should also add a NOTICE file to `flink-connector-hive` module, 
because it bundles hive dependency. You can take this as an example 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-elasticsearch2/src/main/resources/META-INF/NOTICE.
   > 
   > See more: https://cwiki.apache.org/confluence/display/FLINK/Licensing
   
   Thanks for reminding me that, @wuchong ! I propose to address it separately 
as I need a bit time to learn how to do it, and I've created 
[FLINK-12266](https://issues.apache.org/jira/browse/FLINK-12266) as an 
immediate followup. What do you think?
   
   I also remember you concluded a dev thread discussion a while ago to use 
flinkbot to remind devs on NOTICE file issues. I didn't see a flinkbot reminder 
yet. How does that go?
   
   BTW, let's all sync offline to get consensus on what component tags we 
should use for the PRs. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12266) add NOTICE file for dependencies that are newly introduced in flink-connector-hive

2019-04-19 Thread Bowen Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-12266:
-
Description: 
Add a NOTICE file to `flink-connector-hive` module, because it bundles hive 
dependency. An example 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-elasticsearch2/src/main/resources/META-INF/NOTICE.

See more: https://cwiki.apache.org/confluence/display/FLINK/Licensing

> add NOTICE file for dependencies that are newly introduced in 
> flink-connector-hive
> --
>
> Key: FLINK-12266
> URL: https://issues.apache.org/jira/browse/FLINK-12266
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.9.0
>
>
> Add a NOTICE file to `flink-connector-hive` module, because it bundles hive 
> dependency. An example 
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-elasticsearch2/src/main/resources/META-INF/NOTICE.
> See more: https://cwiki.apache.org/confluence/display/FLINK/Licensing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] bowenli86 commented on a change in pull request #8205: [FLINK-12238] [TABLE/SQL] Support database related operations in GenericHiveMetastoreCatalog and setup flink-connector-hive modu

2019-04-19 Thread GitBox
bowenli86 commented on a change in pull request #8205: [FLINK-12238] 
[TABLE/SQL] Support database related operations in GenericHiveMetastoreCatalog 
and setup flink-connector-hive module
URL: https://github.com/apache/flink/pull/8205#discussion_r277098145
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogUtil.java
 ##
 @@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.flink.table.catalog.CatalogDatabase;
+
+import org.apache.hadoop.hive.metastore.api.Database;
+
+import java.util.Map;
+
+
+/**
+ * Utils to convert meta objects between Flink and Hive for 
GenericHiveMetastoreCatalog.
+ */
+public class GenericHiveMetastoreCatalogUtil {
+
+   private GenericHiveMetastoreCatalogUtil() {
+   }
+
+   // -- Utils --
+
+   /**
+* Creates a Hive database from CatalogDatabase.
+*/
+   public static Database createHiveDatabase(String dbName, 
CatalogDatabase db) {
+   Map props = db.getProperties();
+   return new Database(
+   dbName,
+   db.getDescription().get(),
 
 Review comment:
   That would be a misuse IMHO. I will add ifPresent() check though


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8205: [FLINK-12238] [TABLE/SQL] Support database related operations in GenericHiveMetastoreCatalog and setup flink-connector-hive modu

2019-04-19 Thread GitBox
bowenli86 commented on a change in pull request #8205: [FLINK-12238] 
[TABLE/SQL] Support database related operations in GenericHiveMetastoreCatalog 
and setup flink-connector-hive module
URL: https://github.com/apache/flink/pull/8205#discussion_r277096977
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java
 ##
 @@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogTestBase;
+import org.apache.flink.table.catalog.GenericCatalogDatabase;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+/**
+ * Test for GenericHiveMetastoreCatalog.
+ */
+public class GenericHiveMetastoreCatalogTest extends CatalogTestBase {
+
+   @BeforeClass
+   public static void init() throws IOException {
+   catalog = HiveTestUtils.createGenericHiveMetastoreCatalog();
+   catalog.open();
+   }
+
+   // =
+   // GenericHiveMetastoreCatalog doesn't support table operation yet
+   // Thus, overriding the following tests which involve table operation 
in CatalogTestBase so they won't run against GenericHiveMetastoreCatalog
+   // =
+
+   // TODO: re-enable this test once GenericHiveMetastoreCatalog support 
table operations
+   @Test
 
 Review comment:
   It may not matter that much as it's empty now and support of table 
operations is coming very soon.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12256) Implement KafkaReadableCatalog

2019-04-19 Thread Bowen Li (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16822186#comment-16822186
 ] 

Bowen Li commented on FLINK-12256:
--

Hi,

Thanks for drafting the doc. I think, however, it's too preliminary and needs 
more detailed information. Please refer to other FLIPs as examples.

Just a few examples of what needs to be considered, as comments I've left in 
the doc:

* more background on schema registry
* are there counterparts in schema registry corresponding to Flink's meta 
objects?
* how do you handle the interoperabilities between different versioned kafka 
and schema registry?
* how are implemented catalog APIs are translated into schema registry client's 
REST calls?

[~becket_qin] [~walterddr] Would be great to have you guys' opinions from the 
perspective of heavy Kafka users on the design along the way


> Implement KafkaReadableCatalog
> --
>
> Key: FLINK-12256
> URL: https://issues.apache.org/jira/browse/FLINK-12256
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Kafka, Table SQL / Client
>Affects Versions: 1.9.0
>Reporter: Artsem Semianenka
>Assignee: Artsem Semianenka
>Priority: Major
>
>  KafkaReadableCatalog is a special implementation of ReadableCatalog 
> interface (which introduced in 
> [FLIP-30|https://cwiki.apache.org/confluence/display/FLINK/FLIP-30%3A+Unified+Catalog+APIs]
>  )  to retrieve meta information such topic name/schema of the topic from 
> Apache Kafka and Confluent Schema Registry. 
> New ReadableCatalog allows a user to run SQL queries like:
> {code:java}
> Select * form kafka.topic_name  
> {code}
> without the need for manual definition of the table schema.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-12247) fix NPE when writing an archive file to a FileSystem

2019-04-19 Thread lamber-ken (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12247?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16821807#comment-16821807
 ] 

lamber-ken edited comment on FLINK-12247 at 4/19/19 6:31 PM:
-

[~till.rohrmann], I updated the issue and upload a patch. 


was (Author: lamber-ken):
[~till.rohrmann], I updated the issue and upload a patch. :)

> fix NPE when writing an archive file to a FileSystem
> 
>
> Key: FLINK-12247
> URL: https://issues.apache.org/jira/browse/FLINK-12247
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN
>Affects Versions: 1.6.3, 1.8.0
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Major
> Fix For: 1.9.0
>
> Attachments: fix-nep.patch
>
>
> h3. *Issue detail info*
> In our hadoop product env, we use fixed-delay restart-strategy.
> {code:java}
> restart-strategy: fixed-delay
> restart-strategy.fixed-delay.attempts: 20
> restart-strategy.fixed-delay.delay: 2 s
> {code}
> if a flink-job reaches the max attempt count, the flink job will write an 
> archive file to +FileSystem+ and shut down.
> but when +SubtaskExecutionAttemptDetailsHandler+ handle the detail attempt 
> info of subtask, met NEP.
> h3. *Detailed reasons are as follows:*
> 0. Assume a scenario, a flink job {color:#ff}reaches the max attempt 
> count, ( 20 ){color}
> 1. +ExecutionVertex+ is a parallel subtask of the execution. Each 
> +ExecutionVertex+ was created with {color:#660e7a}MAX_ATTEMPTS_HISTORY_SIZE( 
> default value: 16 ).{color}
> 2. when +SubtaskExecutionAttemptDetailsHandler+ hand will get the attempt 
> from ++priorExecutions,
>    but priorExecutions just retained 
> {color:#660e7a}MAX_ATTEMPTS_HISTORY_SIZE{color} elemets, so some element
>    was dropped from the head of the list(FIFO). so may return null.
> h3. *Detailed StackTrace*
> {code:java}
> java.lang.NullPointerException
>at 
> org.apache.flink.runtime.rest.handler.util.MutableIOMetrics.addIOMetrics(MutableIOMetrics.java:88)
>at 
> org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler.createDetailsInfo(SubtaskExecutionAttemptDetailsHandler.java:140)
>at 
> org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler.archiveJsonWithPath(SubtaskExecutionAttemptDetailsHandler.java:120)
>at 
> org.apache.flink.runtime.webmonitor.WebMonitorEndpoint.archiveJsonWithPath(WebMonitorEndpoint.java:780)
>at 
> org.apache.flink.runtime.dispatcher.JsonResponseHistoryServerArchivist.archiveExecutionGraph(JsonResponseHistoryServerArchivist.java:57)
>at 
> org.apache.flink.runtime.dispatcher.Dispatcher.archiveExecutionGraph(Dispatcher.java:758)
>at 
> org.apache.flink.runtime.dispatcher.Dispatcher.jobReachedGloballyTerminalState(Dispatcher.java:730)
>at 
> org.apache.flink.runtime.dispatcher.MiniDispatcher.jobReachedGloballyTerminalState(MiniDispatcher.java:138)
>at 
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$startJobManagerRunner$6(Dispatcher.java:341)
>at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
>at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
>at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {code}
> h3. *Minimal reproducible example*
> {code:java}
> public static void main(String[] args) throws Exception {
> 

[jira] [Closed] (FLINK-10974) Add FlatMap to TableAPI

2019-04-19 Thread sunjincheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng closed FLINK-10974.
---
   Resolution: Fixed
Fix Version/s: 1.9.0

Fixed in master: 58e69a0f6951d99d38ba5462afbb94d5bef478fc

> Add FlatMap to TableAPI
> ---
>
> Key: FLINK-10974
> URL: https://issues.apache.org/jira/browse/FLINK-10974
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: sunjincheng
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>
> Add FlatMap operator to Table API as described in [Google 
> doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr].
> The usage:
> {code:java}
> val res = tab
>.flatMap(fun: TableFunction)  // output has columns 'a, 'b, 'c
>.select('a, 'c)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] asfgit closed pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API

2019-04-19 Thread GitBox
asfgit closed pull request #7196: [FLINK-10974] [table] Add support for flatMap 
to table API
URL: https://github.com/apache/flink/pull/7196
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on issue #7196: [FLINK-10974] [table] Add support for flatMap to table API

2019-04-19 Thread GitBox
sunjincheng121 commented on issue #7196: [FLINK-10974] [table] Add support for 
flatMap to table API
URL: https://github.com/apache/flink/pull/7196#issuecomment-484973155
 
 
   Merging...


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12266) add NOTICE file for dependencies that are newly introduced in flink-connector-hive

2019-04-19 Thread Bowen Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-12266:
-
Component/s: (was: Table SQL / API)
 Connectors / Hive

> add NOTICE file for dependencies that are newly introduced in 
> flink-connector-hive
> --
>
> Key: FLINK-12266
> URL: https://issues.apache.org/jira/browse/FLINK-12266
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.9.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] bowenli86 commented on issue #8205: [FLINK-12238] [TABLE/SQL] Support database related operations in GenericHiveMetastoreCatalog and setup flink-connector-hive module

2019-04-19 Thread GitBox
bowenli86 commented on issue #8205: [FLINK-12238] [TABLE/SQL] Support database 
related operations in GenericHiveMetastoreCatalog and setup 
flink-connector-hive module
URL: https://github.com/apache/flink/pull/8205#issuecomment-484972133
 
 
   > I wonder whether we should differentiate Flink DB/table from Hive's own 
DB/table, especially if we allow Flink and Hive users to share an HMS instance?
   
   Do you mean why we have, for example, GenericCatalogDatabase v.s. the 
upcoming HiveCatalogDatabase? There are quite some differences between Flink 
and Hive w.r.t db and table. E.g. for db, hive always requires a valid, 
meaningful location uri, but flink doesn't really care about it; for column 
types of table, right now several types cannot be mapped 1:1 between flink and 
hive, and we need to wait for the rework of Flink type system finishes.
   
   I doubt, at least for now, that we will unify them, but we can re-evaluate 
later on. I think what we currently should aim for is to share code as much as 
possible between GenericHiveMetastoreCatalog and the upcoming HiveCatalog when 
implementing catalog APIs.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] asfgit closed pull request #8087: [FLINK-12029][table] Add column operations for TableApi

2019-04-19 Thread GitBox
asfgit closed pull request #8087: [FLINK-12029][table] Add column operations 
for TableApi
URL: https://github.com/apache/flink/pull/8087
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-12029) Add Column selections

2019-04-19 Thread sunjincheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12029?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng closed FLINK-12029.
---
   Resolution: Fixed
Fix Version/s: 1.9.0

Fixed in master: e41b6d4784801677c60b75e353fcf7866106c287

> Add Column selections
> -
>
> Key: FLINK-12029
> URL: https://issues.apache.org/jira/browse/FLINK-12029
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.9.0
>Reporter: sunjincheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> In this Jira will add column operators/operations as follows:
> Fine-grained column operations
>  * Column selection
> See [google 
> doc|https://docs.google.com/document/d/1tryl6swt1K1pw7yvv5pdvFXSxfrBZ3_OkOObymis2ck/edit],
>  And I also have done some 
> [prototype|https://github.com/sunjincheng121/flink/pull/94/files]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] bowenli86 commented on a change in pull request #8205: [FLINK-12238] [TABLE/SQL] Support database related operations in GenericHiveMetastoreCatalog and setup flink-connector-hive modu

2019-04-19 Thread GitBox
bowenli86 commented on a change in pull request #8205: [FLINK-12238] 
[TABLE/SQL] Support database related operations in GenericHiveMetastoreCatalog 
and setup flink-connector-hive module
URL: https://github.com/apache/flink/pull/8205#discussion_r277038764
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
 ##
 @@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+
+/**
+ * Test utils for Hive connector.
+ */
+public class HiveTestUtils {
+   private static final String HIVE_SITE_XML = "hive-site.xml";
+   private static final String HIVE_WAREHOUSE_URI_FORMAT = 
"jdbc:derby:;databaseName=%s;create=true";
+   private static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+   private static String warehouseDir;
+   private static String warehouseUri;
+
+   /**
+* Create a GenericHiveMetastoreCatalog with an embedded Hive Metastore.
+*/
+   public static GenericHiveMetastoreCatalog 
createGenericHiveMetastoreCatalog() throws IOException {
+   return new GenericHiveMetastoreCatalog("test", getHiveConf());
+   }
+
+   private static HiveConf getHiveConf() throws IOException {
+   ClassLoader classLoader = new 
HiveTestUtils().getClass().getClassLoader();
+   
HiveConf.setHiveSiteLocation(classLoader.getResource(HIVE_SITE_XML));
+
+   TEMPORARY_FOLDER.create();
+   warehouseDir = TEMPORARY_FOLDER.newFolder().getAbsolutePath() + 
"/metastore_db";
+   warehouseUri = String.format(HIVE_WAREHOUSE_URI_FORMAT, 
warehouseDir);
+   HiveConf hiveConf = new HiveConf();
+   
hiveConf.setBoolVar(HiveConf.ConfVars.METASTORE_SCHEMA_VERIFICATION, false);
 
 Review comment:
   good catch. I will keep those in hive-site.xml


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8205: [FLINK-12238] [TABLE/SQL] Support database related operations in GenericHiveMetastoreCatalog and setup flink-connector-hive modu

2019-04-19 Thread GitBox
bowenli86 commented on a change in pull request #8205: [FLINK-12238] 
[TABLE/SQL] Support database related operations in GenericHiveMetastoreCatalog 
and setup flink-connector-hive module
URL: https://github.com/apache/flink/pull/8205#discussion_r277038223
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
 ##
 @@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.GenericCatalogDatabase;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ReadableWritableCatalog;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A catalog that persists all Flink streaming and batch metadata by using 
Hive metastore as a persistent storage.
+ */
+public class GenericHiveMetastoreCatalog implements ReadableWritableCatalog {
+   private static final Logger LOG = 
LoggerFactory.getLogger(GenericHiveMetastoreCatalog.class);
+
+   public static final String DEFAULT_DB = "default";
+
+   private final String catalogName;
+   private final HiveConf hiveConf;
+
+   private String currentDatabase = DEFAULT_DB;
+   private IMetaStoreClient client;
+
+   public GenericHiveMetastoreCatalog(String catalogName, String 
hivemetastoreURI) {
+   this(catalogName, getHiveConf(hivemetastoreURI));
+   }
+
+   public GenericHiveMetastoreCatalog(String catalogName, HiveConf 
hiveConf) {
+   checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), 
"catalogName cannot be null or empty");
+   this.catalogName = catalogName;
+
+   this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be 
null");
+   LOG.info("Created GenericHiveMetastoreCatalog '{}'", 
catalogName);
+   }
+
+   private static HiveConf getHiveConf(String hiveMetastoreURI) {
+   
checkArgument(!StringUtils.isNullOrWhitespaceOnly(hiveMetastoreURI), 
"hiveMetastoreURI cannot be null or empty");
+
+   HiveConf hiveConf = new HiveConf();
+   hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, 
hiveMetastoreURI);
+   return hiveConf;
+   }
+
+   private static IMetaStoreClient getMetastoreClient(HiveConf hiveConf) {
+   try {
+   return RetryingMetaStoreClient.getProxy(
+   hiveConf,
+   null,
+   null,
+   HiveMetaStoreClient.class.getName(),
+   true);
+   } catch (MetaException e) {
+   throw new CatalogException("Failed to create Hive 
metastore client", e);

[jira] [Created] (FLINK-12266) add NOTICE file for dependencies that are newly introduced in flink-connector-hive

2019-04-19 Thread Bowen Li (JIRA)
Bowen Li created FLINK-12266:


 Summary: add NOTICE file for dependencies that are newly 
introduced in flink-connector-hive
 Key: FLINK-12266
 URL: https://issues.apache.org/jira/browse/FLINK-12266
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Reporter: Bowen Li
Assignee: Bowen Li
 Fix For: 1.9.0






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] bowenli86 commented on a change in pull request #8205: [FLINK-12238] [TABLE/SQL] Support database related operations in GenericHiveMetastoreCatalog and setup flink-connector-hive modu

2019-04-19 Thread GitBox
bowenli86 commented on a change in pull request #8205: [FLINK-12238] 
[TABLE/SQL] Support database related operations in GenericHiveMetastoreCatalog 
and setup flink-connector-hive module
URL: https://github.com/apache/flink/pull/8205#discussion_r277037234
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
 ##
 @@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.GenericCatalogDatabase;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ReadableWritableCatalog;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A catalog that persists all Flink streaming and batch metadata by using 
Hive metastore as a persistent storage.
+ */
+public class GenericHiveMetastoreCatalog implements ReadableWritableCatalog {
 
 Review comment:
   Can you share where you see it? "Hive Metastore" is more popular, for 
example, please see 
https://www.cloudera.com/documentation/enterprise/5-8-x/topics/cdh_ig_hive_metastore_configure.html


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-12265) link-table-planner-blink UT Failure: SortDistinctAggregateITCase.testSomeColumnsBothInDistinctAggAndGroupBy

2019-04-19 Thread Bowen Li (JIRA)
Bowen Li created FLINK-12265:


 Summary: link-table-planner-blink UT Failure: 
SortDistinctAggregateITCase.testSomeColumnsBothInDistinctAggAndGroupBy
 Key: FLINK-12265
 URL: https://issues.apache.org/jira/browse/FLINK-12265
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.9.0
Reporter: Bowen Li
Assignee: Jingsong Lee
 Fix For: 1.9.0


Seems to be a memory allocation issue but would be worth checking out if that's 
caused by the planner itself taking too much memory

https://travis-ci.org/apache/flink/jobs/522051049

11:41:37.461 [ERROR] Tests run: 23, Failures: 0, Errors: 1, Skipped: 2, Time 
elapsed: 73.859 s <<< FAILURE! - in 
org.apache.flink.table.runtime.batch.sql.agg.SortDistinctAggregateITCase
11:41:37.461 [ERROR] 
testSomeColumnsBothInDistinctAggAndGroupBy(org.apache.flink.table.runtime.batch.sql.agg.SortDistinctAggregateITCase)
  Time elapsed: 6.274 s  <<< ERROR!
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
Caused by: java.lang.RuntimeException: 
org.apache.flink.runtime.memory.MemoryAllocationException: Could not allocate 
64 pages. Only 0 pages are remaining.
Caused by: org.apache.flink.runtime.memory.MemoryAllocationException: Could not 
allocate 64 pages. Only 0 pages are remaining.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12265) flink-table-planner-blink UT Failure: SortDistinctAggregateITCase.testSomeColumnsBothInDistinctAggAndGroupBy

2019-04-19 Thread Bowen Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12265?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-12265:
-
Summary: flink-table-planner-blink UT Failure: 
SortDistinctAggregateITCase.testSomeColumnsBothInDistinctAggAndGroupBy  (was: 
link-table-planner-blink UT Failure: 
SortDistinctAggregateITCase.testSomeColumnsBothInDistinctAggAndGroupBy)

> flink-table-planner-blink UT Failure: 
> SortDistinctAggregateITCase.testSomeColumnsBothInDistinctAggAndGroupBy
> 
>
> Key: FLINK-12265
> URL: https://issues.apache.org/jira/browse/FLINK-12265
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.9.0
>Reporter: Bowen Li
>Assignee: Jingsong Lee
>Priority: Major
> Fix For: 1.9.0
>
>
> Seems to be a memory allocation issue but would be worth checking out if 
> that's caused by the planner itself taking too much memory
> https://travis-ci.org/apache/flink/jobs/522051049
> 11:41:37.461 [ERROR] Tests run: 23, Failures: 0, Errors: 1, Skipped: 2, Time 
> elapsed: 73.859 s <<< FAILURE! - in 
> org.apache.flink.table.runtime.batch.sql.agg.SortDistinctAggregateITCase
> 11:41:37.461 [ERROR] 
> testSomeColumnsBothInDistinctAggAndGroupBy(org.apache.flink.table.runtime.batch.sql.agg.SortDistinctAggregateITCase)
>   Time elapsed: 6.274 s  <<< ERROR!
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> Caused by: java.lang.RuntimeException: 
> org.apache.flink.runtime.memory.MemoryAllocationException: Could not allocate 
> 64 pages. Only 0 pages are remaining.
> Caused by: org.apache.flink.runtime.memory.MemoryAllocationException: Could 
> not allocate 64 pages. Only 0 pages are remaining.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] bowenli86 commented on a change in pull request #8222: [FLINK-11518] [SQL/TABLE] Add partition related catalog APIs and implement them in GenericInMemoryCatalog

2019-04-19 Thread GitBox
bowenli86 commented on a change in pull request #8222: [FLINK-11518] 
[SQL/TABLE] Add partition related catalog APIs and implement them in 
GenericInMemoryCatalog
URL: https://github.com/apache/flink/pull/8222#discussion_r277032972
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
 ##
 @@ -202,6 +212,8 @@ public void dropTable(ObjectPath tablePath, boolean 
ignoreIfNotExists) throws Ta
 
if (tableExists(tablePath)) {
tables.remove(tablePath);
+
+   partitions.remove(tablePath);
 
 Review comment:
   good catch!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #8087: [FLINK-12029][table] Add column operations for TableApi

2019-04-19 Thread GitBox
flinkbot edited a comment on issue #8087: [FLINK-12029][table] Add column 
operations for TableApi
URL: https://github.com/apache/flink/pull/8087#issuecomment-478332056
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ✅ 1. The [description] looks good.
   - Approved by @sunjincheng121 [committer]
   * ✅ 2. There is [consensus] that the contribution should go into to Flink.
   - Approved by @sunjincheng121 [committer]
   * ❗ 3. Needs [attention] from.
   - Needs attention by @twalthr [PMC]
   * ✅ 4. The change fits into the overall [architecture].
   - Approved by @sunjincheng121 [committer]
   * ✅ 5. Overall code [quality] is good.
   - Approved by @sunjincheng121 [committer]
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on issue #8087: [FLINK-12029][table] Add column operations for TableApi

2019-04-19 Thread GitBox
sunjincheng121 commented on issue #8087: [FLINK-12029][table] Add column 
operations for TableApi
URL: https://github.com/apache/flink/pull/8087#issuecomment-484947873
 
 
   @flinkbot approve all
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-12257) Convert CatalogBaseTable to org.apache.calcite.schema.Table so that planner can use unified catalog APIs

2019-04-19 Thread Bowen Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li reassigned FLINK-12257:


Assignee: Dawid Wysakowicz  (was: Bowen Li)

> Convert CatalogBaseTable to org.apache.calcite.schema.Table so that planner 
> can use unified catalog APIs
> 
>
> Key: FLINK-12257
> URL: https://issues.apache.org/jira/browse/FLINK-12257
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Bowen Li
>Assignee: Dawid Wysakowicz
>Priority: Major
> Fix For: 1.9.0
>
>
> In FLINK-11476, we created CatalogManager to hook up planner with unified 
> catalog APIs. What's missing there is, at the very last step, convert 
> CatalogBaseTable to org.apache.calcite.schema.Table so that planner can use 
> unified catalog APIs, like how 
> {{ExternalTableUtil.fromExternalCatalogTable()}} works to convert the old 
> {{ExternalCatalogTable}} to a Calcite table



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-12256) Implement KafkaReadableCatalog

2019-04-19 Thread Artsem Semianenka (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16822004#comment-16822004
 ] 

Artsem Semianenka edited comment on FLINK-12256 at 4/19/19 3:47 PM:


I've created the first draft of the [design 
document|https://docs.google.com/document/d/14thwgV2RY1AA9KgYztv_kLYSz4K1TckJ-YiGfkB5650/edit?usp=sharing].
 Please let's continue the discussion in this ticket or in comments in Google 
Docs 


was (Author: artsem.semianenka):
I've created the first draft of the[ design 
document|https://docs.google.com/document/d/14thwgV2RY1AA9KgYztv_kLYSz4K1TckJ-YiGfkB5650/edit?usp=sharing].
 Please let's continue the discussion in this ticket or in comments in Google 
Docs 

> Implement KafkaReadableCatalog
> --
>
> Key: FLINK-12256
> URL: https://issues.apache.org/jira/browse/FLINK-12256
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Kafka, Table SQL / Client
>Affects Versions: 1.9.0
>Reporter: Artsem Semianenka
>Assignee: Artsem Semianenka
>Priority: Major
>
>  KafkaReadableCatalog is a special implementation of ReadableCatalog 
> interface (which introduced in 
> [FLIP-30|https://cwiki.apache.org/confluence/display/FLINK/FLIP-30%3A+Unified+Catalog+APIs]
>  )  to retrieve meta information such topic name/schema of the topic from 
> Apache Kafka and Confluent Schema Registry. 
> New ReadableCatalog allows a user to run SQL queries like:
> {code:java}
> Select * form kafka.topic_name  
> {code}
> without the need for manual definition of the table schema.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12256) Implement KafkaReadableCatalog

2019-04-19 Thread Artsem Semianenka (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16822004#comment-16822004
 ] 

Artsem Semianenka commented on FLINK-12256:
---

I've created the first draft of the[ design 
document|https://docs.google.com/document/d/14thwgV2RY1AA9KgYztv_kLYSz4K1TckJ-YiGfkB5650/edit?usp=sharing].
 Please let's continue the discussion in this ticket or in comments in Google 
Docs 

> Implement KafkaReadableCatalog
> --
>
> Key: FLINK-12256
> URL: https://issues.apache.org/jira/browse/FLINK-12256
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Kafka, Table SQL / Client
>Affects Versions: 1.9.0
>Reporter: Artsem Semianenka
>Assignee: Artsem Semianenka
>Priority: Major
>
>  KafkaReadableCatalog is a special implementation of ReadableCatalog 
> interface (which introduced in 
> [FLIP-30|https://cwiki.apache.org/confluence/display/FLINK/FLIP-30%3A+Unified+Catalog+APIs]
>  )  to retrieve meta information such topic name/schema of the topic from 
> Apache Kafka and Confluent Schema Registry. 
> New ReadableCatalog allows a user to run SQL queries like:
> {code:java}
> Select * form kafka.topic_name  
> {code}
> without the need for manual definition of the table schema.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] datto-aparrill commented on issue #8068: [FLINK-12042][StateBackends] Fix RocksDBStateBackend's mistaken usage of default filesystem

2019-04-19 Thread GitBox
datto-aparrill commented on issue #8068: [FLINK-12042][StateBackends] Fix 
RocksDBStateBackend's mistaken usage of default filesystem
URL: https://github.com/apache/flink/pull/8068#issuecomment-484916176
 
 
   We ran into this bug as well, and can confirm that this commit fixes the 
issue. Please merge!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-12026) Remove the `xxxInternal` method from TableImpl

2019-04-19 Thread sunjincheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12026?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng closed FLINK-12026.
---
   Resolution: Fixed
Fix Version/s: 1.9.0

Fixed in master: c96a4d7afe379a291cc538ca36af896df8dc2127

> Remove the `xxxInternal` method from TableImpl
> --
>
> Key: FLINK-12026
> URL: https://issues.apache.org/jira/browse/FLINK-12026
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.9.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> At present, each operator of TableImpl has an internal method of 
> `xxxInternal`, and `xxxInternal` is a temp method. I think it can be removed 
> at present to further simplify the code. Such as:
> From:
> {code:java}
> override def select(fields: String): Table = {
>   select(ExpressionParser.parseExpressionList(fields): _*)
> }
> override def select(fields: Expression*): Table = {
>   selectInternal(fields.map(tableImpl.expressionBridge.bridge))
> }
> private def selectInternal(fields: Seq[PlannerExpression]): Table = {
> ...
> // implementtition logic
> ...
> }{code}
> To:
> {code:java}
> override def select(fields: String): Table = { 
> select(ExpressionParser.parseExpressionList(fields): _*) 
> } 
> override def select(fields: Expression*): Table = {
> ... 
> // implementtition logic 
> ... 
> }{code}
>  
> I think the implementtition logic can move into `select(fields: 
> Expression*)`.  What do you think? [~dawidwys] [~hequn8128]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12259) Improve debuggability of method invocation failures in OptimizerPlanEnvironment

2019-04-19 Thread Gaurav (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16821933#comment-16821933
 ] 

Gaurav commented on FLINK-12259:


Thanks [~fan_li_ya]. Added a comment. 

> Improve debuggability of method invocation failures in 
> OptimizerPlanEnvironment 
> 
>
> Key: FLINK-12259
> URL: https://issues.apache.org/jira/browse/FLINK-12259
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.8.0
>Reporter: Gaurav
>Assignee: Liya Fan
>Priority: Minor
>  Labels: pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> In cases where method invocation fails without setting the `optimizerPlan`, 
> Flink does not always dump the stderr/stdout. Hence, logging from the method 
> is lost. The stacktrace alone is not always helpful.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] Xeli commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2019-04-19 Thread GitBox
Xeli commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added 
PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r276994500
 
 

 ##
 File path: 
flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource.java
 ##
 @@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.gcp.pubsub;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.streaming.api.functions.source.MultipleIdsMessageAcknowledgingSourceBase;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory;
+import org.apache.flink.util.Preconditions;
+
+import com.google.api.core.ApiFuture;
+import com.google.auth.Credentials;
+import com.google.cloud.pubsub.v1.Subscriber;
+import com.google.cloud.pubsub.v1.stub.SubscriberStub;
+import com.google.pubsub.v1.AcknowledgeRequest;
+import com.google.pubsub.v1.ProjectSubscriptionName;
+import com.google.pubsub.v1.PubsubMessage;
+import com.google.pubsub.v1.PullRequest;
+import com.google.pubsub.v1.PullResponse;
+import com.google.pubsub.v1.ReceivedMessage;
+import io.grpc.netty.shaded.io.netty.channel.EventLoopGroup;
+import io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoopGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.TimeUnit;
+
+import static 
com.google.cloud.pubsub.v1.SubscriptionAdminSettings.defaultCredentialsProviderBuilder;
+
+/**
+ * PubSub Source, this Source will consume PubSub messages from a subscription 
and Acknowledge them on the next checkpoint.
+ * This ensures every message will get acknowledged at least once.
+ */
+public class PubSubSource extends 
MultipleIdsMessageAcknowledgingSourceBase
+   implements ResultTypeQueryable, 
ParallelSourceFunction {
+   private static final Logger LOG = 
LoggerFactory.getLogger(PubSubSource.class);
+   protected final PubSubDeserializationSchema deserializationSchema;
+   protected final PubSubSubscriberFactory pubSubSubscriberFactory;
+   protected final Credentials credentials;
+   protected final String projectSubscriptionName;
+   protected final int maxMessagesPerPull;
+
+   protected transient boolean deduplicateMessages;
+   protected transient SubscriberStub subscriber;
+   protected transient PullRequest pullRequest;
+   protected transient EventLoopGroup eventLoopGroup;
+
+   protected transient volatile boolean isRunning;
+   protected transient volatile ApiFuture messagesFuture;
+
+   PubSubSource(PubSubDeserializationSchema deserializationSchema, 
PubSubSubscriberFactory pubSubSubscriberFactory, Credentials credentials, 
String projectSubscriptionName, int maxMessagesPerPull) {
+   super(String.class);
+   this.deserializationSchema = deserializationSchema;
+   this.pubSubSubscriberFactory = pubSubSubscriberFactory;
+   this.credentials = credentials;
+   this.projectSubscriptionName = projectSubscriptionName;
+   this.maxMessagesPerPull = maxMessagesPerPull;
+   }
+
+   @Override
+   public void open(Configuration configuration) throws Exception {
+   super.open(configuration);
+   if (hasNoCheckpointingEnabled(getRuntimeContext())) {
+   throw new IllegalArgumentException("The PubSubSource 
REQUIRES Checkpointing to be enabled and " +
+   "the checkpointing frequency must be MUCH lower 
than 

[GitHub] [flink] gauravtiwari89 commented on a change in pull request #8220: [FLINK-12259][flink-clients]Improve debuggability of method invocatio…

2019-04-19 Thread GitBox
gauravtiwari89 commented on a change in pull request #8220: 
[FLINK-12259][flink-clients]Improve debuggability of method invocatio…
URL: https://github.com/apache/flink/pull/8220#discussion_r276992911
 
 

 ##
 File path: 
flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java
 ##
 @@ -74,9 +76,11 @@ public FlinkPlan getOptimizedPlan(PackagedProgram prog) 
throws ProgramInvocation
PrintStream originalOut = System.out;
PrintStream originalErr = System.err;
ByteArrayOutputStream baos = new ByteArrayOutputStream();
-   System.setOut(new PrintStream(baos));
+   TeeOutputStream combinedStdOut = new 
TeeOutputStream(originalOut, baos);
+   System.setOut(new PrintStream(combinedStdOut));
ByteArrayOutputStream baes = new ByteArrayOutputStream();
-   System.setErr(new PrintStream(baes));
+   TeeOutputStream combinedStdErr = new 
TeeOutputStream(originalErr, baes);
 
 Review comment:
   Do we need to close these streams? Additionally, its not clear to me why we 
need to trap the output. It seems to only be used for the exception at line 
109. Couldn't the client just refer to the logs instead?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] gauravtiwari89 commented on issue #8218: [FLINK-12259] Log stdout, stderr on Program invocation failure

2019-04-19 Thread GitBox
gauravtiwari89 commented on issue #8218: [FLINK-12259] Log stdout,stderr on 
Program invocation failure
URL: https://github.com/apache/flink/pull/8218#issuecomment-484903173
 
 
   Closed in favor of https://github.com/apache/flink/pull/8220


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] gauravtiwari89 closed pull request #8218: [FLINK-12259] Log stdout, stderr on Program invocation failure

2019-04-19 Thread GitBox
gauravtiwari89 closed pull request #8218: [FLINK-12259] Log stdout,stderr on 
Program invocation failure
URL: https://github.com/apache/flink/pull/8218
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-12264) Port ExecutionGraphTestUtils to new code base

2019-04-19 Thread leesf (JIRA)
leesf created FLINK-12264:
-

 Summary: Port ExecutionGraphTestUtils to new code base
 Key: FLINK-12264
 URL: https://issues.apache.org/jira/browse/FLINK-12264
 Project: Flink
  Issue Type: Sub-task
  Components: Tests
Reporter: leesf
Assignee: leesf
 Fix For: 1.9.0


Mainly get rid of Instance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] asfgit closed pull request #8070: [FLINK-12026][table]Remove the `xxxInternal` method from TableImpl

2019-04-19 Thread GitBox
asfgit closed pull request #8070: [FLINK-12026][table]Remove the `xxxInternal` 
method from TableImpl
URL: https://github.com/apache/flink/pull/8070
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] TisonKun closed pull request #7285: [hotfix] [resource manager] Remove legacy(unused) class ResourceManag…

2019-04-19 Thread GitBox
TisonKun closed pull request #7285: [hotfix] [resource manager] Remove 
legacy(unused) class ResourceManag…
URL: https://github.com/apache/flink/pull/7285
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] TisonKun commented on issue #7285: [hotfix] [resource manager] Remove legacy(unused) class ResourceManag…

2019-04-19 Thread GitBox
TisonKun commented on issue #7285: [hotfix] [resource manager] Remove 
legacy(unused) class ResourceManag…
URL: https://github.com/apache/flink/pull/7285#issuecomment-484897552
 
 
   closing...
   
   may be a subtask of FLINK-10392.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] TisonKun removed a comment on issue #7285: [hotfix] [resource manager] Remove legacy(unused) class ResourceManag…

2019-04-19 Thread GitBox
TisonKun removed a comment on issue #7285: [hotfix] [resource manager] Remove 
legacy(unused) class ResourceManag…
URL: https://github.com/apache/flink/pull/7285#issuecomment-452182002
 
 
   cc @tillrohrmann  @GJL 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8226: [FLINK-12181][Tests] Port ExecutionGraphRestartTest to new codebase

2019-04-19 Thread GitBox
flinkbot commented on issue #8226: [FLINK-12181][Tests] Port 
ExecutionGraphRestartTest to new codebase
URL: https://github.com/apache/flink/pull/8226#issuecomment-484895638
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12181) Port ExecutionGraphRestartTest to new codebase

2019-04-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12181?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12181:
---
Labels: pull-request-available  (was: )

> Port ExecutionGraphRestartTest to new codebase
> --
>
> Key: FLINK-12181
> URL: https://issues.apache.org/jira/browse/FLINK-12181
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination, Tests
>Affects Versions: 1.9.0
>Reporter: TisonKun
>Assignee: leesf
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>
> Port {{ExecutionGraphRestartTest}} to new codebase.
> Mainly get rid of the usages of {{Scheduler}} and {{Instance}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] leesf opened a new pull request #8226: [FLINK-12181][Tests] Port ExecutionGraphRestartTest to new codebase

2019-04-19 Thread GitBox
leesf opened a new pull request #8226: [FLINK-12181][Tests] Port 
ExecutionGraphRestartTest to new codebase
URL: https://github.com/apache/flink/pull/8226
 
 
   
   
   ## What is the purpose of the change
   
   Port ExecutionGraphRestartTest to new codebase.
   
   ## Brief change log
   
   *(for example:)*
 - *Use TaskManagerLocation, SimpleAckingTaskManagerGateway and 
TestingResourceManagerGateway in repalce of Instance.*
 - *Use SlotPool#releaseTaskManager in replace of Instrance#markDead *
   
   
   ## Verifying this change
   
   This change is a trivial rework.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] haf commented on issue #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2019-04-19 Thread GitBox
haf commented on issue #6594: [FLINK-9311] [pubsub] Added PubSub source 
connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#issuecomment-484884659
 
 
   I'm testing this source on spotty connections (as I do most of my work on 
the go), and when streaming a lot of messages over 4G, at some point, the 
source logs this:
   
   ```
   14:40:33,649 INFO  
io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder 
 - {} ignoring {} frame for stream {} {}
   14:40:39,010 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator 
- Completed checkpoint 23 for job f2aed8990f452e357b8f15f6452e4244 
(12951720 bytes in 18142 ms).
   14:40:39,013 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator 
- Triggering checkpoint 24 @ 1555677639010 for job 
f2aed8990f452e357b8f15f6452e4244.
   14:41:11,608 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator 
- Completed checkpoint 24 for job f2aed8990f452e357b8f15f6452e4244 
(14193479 bytes in 32593 ms).
   14:41:11,614 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator 
- Triggering checkpoint 25 @ 1555677671608 for job 
f2aed8990f452e357b8f15f6452e4244.
   14:41:22,628 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator 
- Completed checkpoint 25 for job f2aed8990f452e357b8f15f6452e4244 
(16366371 bytes in 11017 ms).
   14:41:22,638 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator 
- Triggering checkpoint 26 @ 1555677682628 for job 
f2aed8990f452e357b8f15f6452e4244.
   14:41:43,287 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator 
- Completed checkpoint 26 for job f2aed8990f452e357b8f15f6452e4244 
(17107164 bytes in 20655 ms).
   14:41:43,290 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator 
- Triggering checkpoint 27 @ 1555677703287 for job 
f2aed8990f452e357b8f15f6452e4244.
   14:42:19,171 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator 
- Completed checkpoint 27 for job f2aed8990f452e357b8f15f6452e4244 
(18502218 bytes in 35880 ms).
   14:42:19,174 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator 
- Triggering checkpoint 28 @ 1555677739171 for job 
f2aed8990f452e357b8f15f6452e4244.
   14:42:55,863 INFO  
io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder 
 - {} ignoring {} frame for stream {} {}
   14:43:00,242 INFO  
io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder 
 - {} ignoring {} frame for stream {} {}
   14:43:00,242 INFO  
io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder 
 - {} ignoring {} frame for stream {} {}
   1
   ```
   
   And it would seem no further events are being pushed down the streaming 
pipeline.
   
   Sometimes, I also get:
   
   ```
   14:43:23,816 INFO  
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Request 
slot with profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, 
directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=0} for job 
2d2461182e268f6838e957972a9a0361 with allocation id 
AllocationID{8f8a93b52291c2fa883df2fa1a7735de}.
   14:43:23,816 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor
- Allocated slot for AllocationID{fc0b24a95227bdbfa46a4119fb907d19}.
   14:43:23,816 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService
- Add job 2d2461182e268f6838e957972a9a0361 for job leader monitoring.
   14:43:23,816 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService
- Try to register at job manager akka://flink/user/jobmanager_1 with leader 
id 692682c0-30b2-4900-ad43-3d94acf9a2c6.
   14:43:23,816 WARN  
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService  
- Error notifying leader listener about new leader
   java.lang.IllegalStateException: The RPC connection is already closed
at 
org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
at 
org.apache.flink.runtime.registration.RegisteredRpcConnection.start(RegisteredRpcConnection.java:91)
at 
org.apache.flink.runtime.taskexecutor.JobLeaderService$JobManagerLeaderListener.notifyLeaderAddress(JobLeaderService.java:327)
at 
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService$NotifyOfLeaderCall.run(EmbeddedLeaderService.java:430)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
   ```
   
   All of this 

[GitHub] [flink] haf commented on issue #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2019-04-19 Thread GitBox
haf commented on issue #6594: [FLINK-9311] [pubsub] Added PubSub source 
connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#issuecomment-484879762
 
 
   Will there be support for replying messages? 
https://cloud.google.com/pubsub/docs/replay-qs


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunhaibotb commented on a change in pull request #8124: [FLINK-11877] Implement the runtime handling of the InputSelectable interface

2019-04-19 Thread GitBox
sunhaibotb commented on a change in pull request #8124: [FLINK-11877] Implement 
the runtime handling of the InputSelectable interface
URL: https://github.com/apache/flink/pull/8124#discussion_r276968221
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java
 ##
 @@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.streaming.api.operators.InputSelectable;
+import org.apache.flink.streaming.api.operators.InputSelection;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Input reader for {@link 
org.apache.flink.streaming.runtime.tasks.TwoInputSelectableStreamTask}
+ * in the case that the operator is InputSelectable.
+ *
+ * @param  The type of the records that arrive on the first input
+ * @param  The type of the records that arrive on the second input
+ */
+@Internal
+public class StreamTwoInputSelectableProcessor {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(StreamTwoInputSelectableProcessor.class);
+
+   private volatile boolean continuousProcessing = true;
+
+   private final NetworkInput input1;
+   private final NetworkInput input2;
+
+   private final Object lock;
+
+   private final TwoInputStreamOperator streamOperator;
+
+   private final InputSelectable inputSelector;
+
+   private final AuxiliaryHandler auxiliaryHandler;
+
+   private final CompletableFuture[] listenFutures;
+
+   private final boolean[] isFinished;
+
+   private InputSelection inputSelection;
+
+   private AtomicInteger availableInputsMask = new AtomicInteger();
 
 Review comment:
   I got it.  Wish you a good holiday.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10984) Move flink-shaded-hadoop to flink-shaded

2019-04-19 Thread sunjincheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16821833#comment-16821833
 ] 

sunjincheng commented on FLINK-10984:
-

Hi, [~Zentol] I have had open the PR about Remove `flink-shade-hadoop` from 
Flink.
https://github.com/apache/flink/pull/8225 I appreciate if can have a look at, 
Thanks! :)

> Move flink-shaded-hadoop to flink-shaded
> 
>
> Key: FLINK-10984
> URL: https://issues.apache.org/jira/browse/FLINK-10984
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System, BuildSystem / Shaded
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: shaded-7.0
>
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> To allow reasonable dependency management we should move flink-shaded-hadoop 
> to flink-shaded, with each supported version having it's own module and 
> dependency management.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] flinkbot commented on issue #8225: FLINK-10984]Remove flink-shaded-hadoop from flink

2019-04-19 Thread GitBox
flinkbot commented on issue #8225: FLINK-10984]Remove flink-shaded-hadoop from 
flink
URL: https://github.com/apache/flink/pull/8225#issuecomment-484846415
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on issue #8225: FLINK-10984]Remove flink-shaded-hadoop from flink

2019-04-19 Thread GitBox
sunjincheng121 commented on issue #8225: FLINK-10984]Remove flink-shaded-hadoop 
from flink
URL: https://github.com/apache/flink/pull/8225#issuecomment-484846446
 
 
   The CI will turn green after `flink-shaded` releases the 7.0 version.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 opened a new pull request #8225: FLINK-10984][Build System, BuildSystem / Shaded

2019-04-19 Thread GitBox
sunjincheng121 opened a new pull request #8225: FLINK-10984][Build System, 
BuildSystem / Shaded
URL: https://github.com/apache/flink/pull/8225
 
 
   ## What is the purpose of the change
   
   To allow reasonable dependency management we should move 
`flink-shaded-hadoop` to `flink-shaded`, then Remove `flink-shaded-hadoop` from 
flink. 
   
   ## Brief change log
 - Remove `flink-shaded-hadoop` from flink.
 - Remove `flink-shaded-hadoop/flink-shaded-hadoop2` from flink.
 - Move out `flink-shaded-yran-test` from `flink-shaded-hadoop`.
 - Modify the scripts in `flink-dist`, i.e. remove the logic about `hadoop`.
 - Modify some pom config about `flink-shaded-hadoop`.
   
   ## Verifying this change
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-11530) Support multiple languages for the framework of flink docs

2019-04-19 Thread Jark Wu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11530?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu closed FLINK-11530.
---

> Support multiple languages for the framework of flink docs
> --
>
> Key: FLINK-11530
> URL: https://issues.apache.org/jira/browse/FLINK-11530
> Project: Flink
>  Issue Type: Sub-task
>  Components: chinese-translation, Documentation
>Reporter: Jark Wu
>Assignee: Jark Wu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> A more detailed description can be found in the proposed doc: 
> https://docs.google.com/document/d/1R1-uDq-KawLB8afQYrczfcoQHjjIhq6tvUksxrfhBl0/edit#
> This step aims to integrate the mulitple-language-plugin for flink docs to 
> support Chinese. All the $pagename.zh.md should be created first in this JIRA 
> but keep the original English contents. A link between English version and 
> Chinese version should also be considered.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-11530) Support multiple languages for the framework of flink docs

2019-04-19 Thread Jark Wu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11530?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu resolved FLINK-11530.
-
   Resolution: Fixed
Fix Version/s: 1.9.0

Resolved in 1.9 with commits:

182456f2ed0c9345990d8d25cac26d22e828bb46

63c7bd2697187c52ca062bb8512ce8e9744bb95f

 

 

> Support multiple languages for the framework of flink docs
> --
>
> Key: FLINK-11530
> URL: https://issues.apache.org/jira/browse/FLINK-11530
> Project: Flink
>  Issue Type: Sub-task
>  Components: chinese-translation, Documentation
>Reporter: Jark Wu
>Assignee: Jark Wu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> A more detailed description can be found in the proposed doc: 
> https://docs.google.com/document/d/1R1-uDq-KawLB8afQYrczfcoQHjjIhq6tvUksxrfhBl0/edit#
> This step aims to integrate the mulitple-language-plugin for flink docs to 
> support Chinese. All the $pagename.zh.md should be created first in this JIRA 
> but keep the original English contents. A link between English version and 
> Chinese version should also be considered.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] asfgit closed pull request #7754: [FLINK-11530] [docs] Support multiple languages for the docs framework

2019-04-19 Thread GitBox
asfgit closed pull request #7754: [FLINK-11530] [docs] Support multiple 
languages for the docs framework
URL: https://github.com/apache/flink/pull/7754
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on issue #7196: [FLINK-10974] [table] Add support for flatMap to table API

2019-04-19 Thread GitBox
sunjincheng121 commented on issue #7196: [FLINK-10974] [table] Add support for 
flatMap to table API
URL: https://github.com/apache/flink/pull/7196#issuecomment-484836678
 
 
   +1 to merged!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on issue #8202: [FLINK-12133] [table-runtime-blink] Support unbounded aggregate in streaming table runtime

2019-04-19 Thread GitBox
KurtYoung commented on issue #8202: [FLINK-12133] [table-runtime-blink] Support 
unbounded aggregate in streaming table runtime
URL: https://github.com/apache/flink/pull/8202#issuecomment-484835447
 
 
   LGTM, +1


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12247) fix NPE when writing an archive file to a FileSystem

2019-04-19 Thread lamber-ken (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12247?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16821807#comment-16821807
 ] 

lamber-ken commented on FLINK-12247:


[~till.rohrmann], I updated the issue and upload a patch. :)

> fix NPE when writing an archive file to a FileSystem
> 
>
> Key: FLINK-12247
> URL: https://issues.apache.org/jira/browse/FLINK-12247
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN
>Affects Versions: 1.6.3, 1.8.0
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Major
> Fix For: 1.9.0
>
> Attachments: fix-nep.patch
>
>
> h3. *Issue detail info*
> In our hadoop product env, we use fixed-delay restart-strategy.
> {code:java}
> restart-strategy: fixed-delay
> restart-strategy.fixed-delay.attempts: 20
> restart-strategy.fixed-delay.delay: 2 s
> {code}
> if a flink-job reaches the max attempt count, the flink job will write an 
> archive file to +FileSystem+ and shut down.
> but when +SubtaskExecutionAttemptDetailsHandler+ handle the detail attempt 
> info of subtask, met NEP.
> h3. *Detailed reasons are as follows:*
> 0. Assume a scenario, a flink job {color:#ff}reaches the max attempt 
> count, ( 20 ){color}
> 1. +ExecutionVertex+ is a parallel subtask of the execution. Each 
> +ExecutionVertex+ was created with {color:#660e7a}MAX_ATTEMPTS_HISTORY_SIZE( 
> default value: 16 ).{color}
> 2. when +SubtaskExecutionAttemptDetailsHandler+ hand will get the attempt 
> from ++priorExecutions,
>    but priorExecutions just retained 
> {color:#660e7a}MAX_ATTEMPTS_HISTORY_SIZE{color} elemets, so some element
>    was dropped from the head of the list(FIFO). so may return null.
> h3. *Detailed StackTrace*
> {code:java}
> java.lang.NullPointerException
>at 
> org.apache.flink.runtime.rest.handler.util.MutableIOMetrics.addIOMetrics(MutableIOMetrics.java:88)
>at 
> org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler.createDetailsInfo(SubtaskExecutionAttemptDetailsHandler.java:140)
>at 
> org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler.archiveJsonWithPath(SubtaskExecutionAttemptDetailsHandler.java:120)
>at 
> org.apache.flink.runtime.webmonitor.WebMonitorEndpoint.archiveJsonWithPath(WebMonitorEndpoint.java:780)
>at 
> org.apache.flink.runtime.dispatcher.JsonResponseHistoryServerArchivist.archiveExecutionGraph(JsonResponseHistoryServerArchivist.java:57)
>at 
> org.apache.flink.runtime.dispatcher.Dispatcher.archiveExecutionGraph(Dispatcher.java:758)
>at 
> org.apache.flink.runtime.dispatcher.Dispatcher.jobReachedGloballyTerminalState(Dispatcher.java:730)
>at 
> org.apache.flink.runtime.dispatcher.MiniDispatcher.jobReachedGloballyTerminalState(MiniDispatcher.java:138)
>at 
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$startJobManagerRunner$6(Dispatcher.java:341)
>at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
>at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
>at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {code}
> h3. *Minimal reproducible example*
> {code:java}
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> DataStream text = env.addSource(new 

[jira] [Updated] (FLINK-12247) fix NPE when writing an archive file to a FileSystem

2019-04-19 Thread lamber-ken (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12247?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lamber-ken updated FLINK-12247:
---
Attachment: fix-nep.patch

> fix NPE when writing an archive file to a FileSystem
> 
>
> Key: FLINK-12247
> URL: https://issues.apache.org/jira/browse/FLINK-12247
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN
>Affects Versions: 1.6.3, 1.8.0
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Major
> Fix For: 1.9.0
>
> Attachments: fix-nep.patch
>
>
> h3. *Issue detail info*
> In our hadoop product env, we use fixed-delay restart-strategy.
> {code:java}
> restart-strategy: fixed-delay
> restart-strategy.fixed-delay.attempts: 20
> restart-strategy.fixed-delay.delay: 2 s
> {code}
> if a flink-job reaches the max attempt count, the flink job will write an 
> archive file to +FileSystem+ and shut down.
> but when +SubtaskExecutionAttemptDetailsHandler+ handle the detail attempt 
> info of subtask, met NEP.
> h3. *Detailed reasons are as follows:*
> 0. Assume a scenario, a flink job {color:#ff}reaches the max attempt 
> count, ( 20 ){color}
> 1. +ExecutionVertex+ is a parallel subtask of the execution. Each 
> +ExecutionVertex+ was created with {color:#660e7a}MAX_ATTEMPTS_HISTORY_SIZE( 
> default value: 16 ).{color}
> 2. when +SubtaskExecutionAttemptDetailsHandler+ hand will get the attempt 
> from ++priorExecutions,
>    but priorExecutions just retained 
> {color:#660e7a}MAX_ATTEMPTS_HISTORY_SIZE{color} elemets, so some element
>    was dropped from the head of the list(FIFO). so may return null.
> h3. *Detailed StackTrace*
> {code:java}
> java.lang.NullPointerException
>at 
> org.apache.flink.runtime.rest.handler.util.MutableIOMetrics.addIOMetrics(MutableIOMetrics.java:88)
>at 
> org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler.createDetailsInfo(SubtaskExecutionAttemptDetailsHandler.java:140)
>at 
> org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler.archiveJsonWithPath(SubtaskExecutionAttemptDetailsHandler.java:120)
>at 
> org.apache.flink.runtime.webmonitor.WebMonitorEndpoint.archiveJsonWithPath(WebMonitorEndpoint.java:780)
>at 
> org.apache.flink.runtime.dispatcher.JsonResponseHistoryServerArchivist.archiveExecutionGraph(JsonResponseHistoryServerArchivist.java:57)
>at 
> org.apache.flink.runtime.dispatcher.Dispatcher.archiveExecutionGraph(Dispatcher.java:758)
>at 
> org.apache.flink.runtime.dispatcher.Dispatcher.jobReachedGloballyTerminalState(Dispatcher.java:730)
>at 
> org.apache.flink.runtime.dispatcher.MiniDispatcher.jobReachedGloballyTerminalState(MiniDispatcher.java:138)
>at 
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$startJobManagerRunner$6(Dispatcher.java:341)
>at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
>at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
>at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {code}
> h3. *Minimal reproducible example*
> {code:java}
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> DataStream text = env.addSource(new SourceFunction() {
> @Override
> public void 

[jira] [Updated] (FLINK-12247) fix NPE when writing an archive file to a FileSystem

2019-04-19 Thread lamber-ken (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12247?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lamber-ken updated FLINK-12247:
---
Attachment: (was: fix-nep.pathc)

> fix NPE when writing an archive file to a FileSystem
> 
>
> Key: FLINK-12247
> URL: https://issues.apache.org/jira/browse/FLINK-12247
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN
>Affects Versions: 1.6.3, 1.8.0
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Major
> Fix For: 1.9.0
>
> Attachments: fix-nep.patch
>
>
> h3. *Issue detail info*
> In our hadoop product env, we use fixed-delay restart-strategy.
> {code:java}
> restart-strategy: fixed-delay
> restart-strategy.fixed-delay.attempts: 20
> restart-strategy.fixed-delay.delay: 2 s
> {code}
> if a flink-job reaches the max attempt count, the flink job will write an 
> archive file to +FileSystem+ and shut down.
> but when +SubtaskExecutionAttemptDetailsHandler+ handle the detail attempt 
> info of subtask, met NEP.
> h3. *Detailed reasons are as follows:*
> 0. Assume a scenario, a flink job {color:#ff}reaches the max attempt 
> count, ( 20 ){color}
> 1. +ExecutionVertex+ is a parallel subtask of the execution. Each 
> +ExecutionVertex+ was created with {color:#660e7a}MAX_ATTEMPTS_HISTORY_SIZE( 
> default value: 16 ).{color}
> 2. when +SubtaskExecutionAttemptDetailsHandler+ hand will get the attempt 
> from ++priorExecutions,
>    but priorExecutions just retained 
> {color:#660e7a}MAX_ATTEMPTS_HISTORY_SIZE{color} elemets, so some element
>    was dropped from the head of the list(FIFO). so may return null.
> h3. *Detailed StackTrace*
> {code:java}
> java.lang.NullPointerException
>at 
> org.apache.flink.runtime.rest.handler.util.MutableIOMetrics.addIOMetrics(MutableIOMetrics.java:88)
>at 
> org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler.createDetailsInfo(SubtaskExecutionAttemptDetailsHandler.java:140)
>at 
> org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler.archiveJsonWithPath(SubtaskExecutionAttemptDetailsHandler.java:120)
>at 
> org.apache.flink.runtime.webmonitor.WebMonitorEndpoint.archiveJsonWithPath(WebMonitorEndpoint.java:780)
>at 
> org.apache.flink.runtime.dispatcher.JsonResponseHistoryServerArchivist.archiveExecutionGraph(JsonResponseHistoryServerArchivist.java:57)
>at 
> org.apache.flink.runtime.dispatcher.Dispatcher.archiveExecutionGraph(Dispatcher.java:758)
>at 
> org.apache.flink.runtime.dispatcher.Dispatcher.jobReachedGloballyTerminalState(Dispatcher.java:730)
>at 
> org.apache.flink.runtime.dispatcher.MiniDispatcher.jobReachedGloballyTerminalState(MiniDispatcher.java:138)
>at 
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$startJobManagerRunner$6(Dispatcher.java:341)
>at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
>at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
>at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {code}
> h3. *Minimal reproducible example*
> {code:java}
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> DataStream text = env.addSource(new SourceFunction() {
> @Override
> public void 

[jira] [Updated] (FLINK-12247) fix NPE when writing an archive file to a FileSystem

2019-04-19 Thread lamber-ken (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12247?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lamber-ken updated FLINK-12247:
---
Attachment: fix-nep.pathc

> fix NPE when writing an archive file to a FileSystem
> 
>
> Key: FLINK-12247
> URL: https://issues.apache.org/jira/browse/FLINK-12247
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN
>Affects Versions: 1.6.3, 1.8.0
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Major
> Fix For: 1.9.0
>
> Attachments: fix-nep.pathc
>
>
> h3. *Issue detail info*
> In our hadoop product env, we use fixed-delay restart-strategy.
> {code:java}
> restart-strategy: fixed-delay
> restart-strategy.fixed-delay.attempts: 20
> restart-strategy.fixed-delay.delay: 2 s
> {code}
> if a flink-job reaches the max attempt count, the flink job will write an 
> archive file to +FileSystem+ and shut down.
> but when +SubtaskExecutionAttemptDetailsHandler+ handle the detail attempt 
> info of subtask, met NEP.
> h3. *Detailed reasons are as follows:*
> 0. Assume a scenario, a flink job {color:#ff}reaches the max attempt 
> count, ( 20 ){color}
> 1. +ExecutionVertex+ is a parallel subtask of the execution. Each 
> +ExecutionVertex+ was created with {color:#660e7a}MAX_ATTEMPTS_HISTORY_SIZE( 
> default value: 16 ).{color}
> 2. when +SubtaskExecutionAttemptDetailsHandler+ hand will get the attempt 
> from ++priorExecutions,
>    but priorExecutions just retained 
> {color:#660e7a}MAX_ATTEMPTS_HISTORY_SIZE{color} elemets, so some element
>    was dropped from the head of the list(FIFO). so may return null.
> h3. *Detailed StackTrace*
> {code:java}
> java.lang.NullPointerException
>at 
> org.apache.flink.runtime.rest.handler.util.MutableIOMetrics.addIOMetrics(MutableIOMetrics.java:88)
>at 
> org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler.createDetailsInfo(SubtaskExecutionAttemptDetailsHandler.java:140)
>at 
> org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler.archiveJsonWithPath(SubtaskExecutionAttemptDetailsHandler.java:120)
>at 
> org.apache.flink.runtime.webmonitor.WebMonitorEndpoint.archiveJsonWithPath(WebMonitorEndpoint.java:780)
>at 
> org.apache.flink.runtime.dispatcher.JsonResponseHistoryServerArchivist.archiveExecutionGraph(JsonResponseHistoryServerArchivist.java:57)
>at 
> org.apache.flink.runtime.dispatcher.Dispatcher.archiveExecutionGraph(Dispatcher.java:758)
>at 
> org.apache.flink.runtime.dispatcher.Dispatcher.jobReachedGloballyTerminalState(Dispatcher.java:730)
>at 
> org.apache.flink.runtime.dispatcher.MiniDispatcher.jobReachedGloballyTerminalState(MiniDispatcher.java:138)
>at 
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$startJobManagerRunner$6(Dispatcher.java:341)
>at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
>at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
>at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {code}
> h3. *Minimal reproducible example*
> {code:java}
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> DataStream text = env.addSource(new SourceFunction() {
> @Override
> public void 

[GitHub] [flink] dianfu commented on issue #7196: [FLINK-10974] [table] Add support for flatMap to table API

2019-04-19 Thread GitBox
dianfu commented on issue #7196: [FLINK-10974] [table] Add support for flatMap 
to table API
URL: https://github.com/apache/flink/pull/7196#issuecomment-484821808
 
 
   @sunjincheng121 Thanks a lot for the review. Updated the PR accordingly.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW edited a comment on issue #8181: [FLINK-12199][network] Refactor IOMetrics to not distinguish between local/remote in/out bytes

2019-04-19 Thread GitBox
zhijiangW edited a comment on issue #8181: [FLINK-12199][network] Refactor 
IOMetrics to not distinguish between local/remote in/out bytes
URL: https://github.com/apache/flink/pull/8181#issuecomment-484820243
 
 
   Thanks for the reviews @azagrebin !
   
   If I understand correctly, the total IO related metrics could be divided 
into two groups. One group is general and suitable for all `ShuffleService` 
implementations such as current `numBytesIn/Out`, `numRecordsIn/Out` which 
would be still created in current `TaskIOMetrics`.
   
   Another group is special for current `NetworkEnvironment` implementation. We 
could create two private classes `NetworkInput/OutputMetrics` inside 
`NetworkEnvironment`, and the current metrics of `numBytesInLocal/Remote`, 
`numBuffersInLocal/Remote`, `input/outputQueueLength`, `in/outPoolUsage` could 
be migrated from `TaskIOMetricGroup` into new `NetworkInput/OutputMetrics`, 
then we could pass these special metrics into local/remote channels if needed.
   
   Maybe we could not need the new introduced `InputGateWithMetrics` future. We 
could pass general `Counter`  (`numBytesIn`, `numBuffersIn`)  from 
`TaskIOMetrics` in the constructor of `SingleInputGate` to update these 
counters properly, not pass specific `TaskIOMetricGroup` in the constructor. 
What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on issue #8181: [FLINK-12199][network] Refactor IOMetrics to not distinguish between local/remote in/out bytes

2019-04-19 Thread GitBox
zhijiangW commented on issue #8181: [FLINK-12199][network] Refactor IOMetrics 
to not distinguish between local/remote in/out bytes
URL: https://github.com/apache/flink/pull/8181#issuecomment-484820243
 
 
   Thanks for the reviews @azagrebin !
   
   If I understand correctly, the total IO related metrics could be divided 
into two groups. One group is general and suitable for all `ShuffleService` 
implementations such as current bytesIn/Out, recordsIn/Out which would be still 
created in current `TaskIOMetrics`.
   
   Another group is special for current `NetworkEnvironment` implementation. We 
could create two private classes `NetworkInput/OutputMetrics` inside 
`NetworkEnvironment`, and the current metrics of `numBytesInLocal/Remote`, 
`numBuffersInLocal/Remote`, `input/outputQueueLength`, `in/outPoolUsage` could 
be migrated from `TaskIOMetricGroup` into `NetworkInput/OutputMetrics`, then we 
could pass these special metrics into local/remote channels.
   
   Maybe we could not need the new introduced `InputGateWithMetrics` future. We 
could pass general `Counter`  (`numBytesIn`, `numBuffersIn`) from 
`TaskIOMetrics` in the constructor of `SingleInputGate` to update these values 
properly, not pass specific `TaskIOMetricGroup` in the constructor. What do you 
think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12247) fix NPE when writing an archive file to a FileSystem

2019-04-19 Thread lamber-ken (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12247?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lamber-ken updated FLINK-12247:
---
Description: 
h3. *Issue detail info*

In our hadoop product env, we use fixed-delay restart-strategy.
{code:java}
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 20
restart-strategy.fixed-delay.delay: 2 s
{code}
if a flink-job reaches the max attempt count, the flink job will write an 
archive file to +FileSystem+ and shut down.

but when +SubtaskExecutionAttemptDetailsHandler+ handle the detail attempt info 
of subtask, met NEP.
h3. *Detailed reasons are as follows:*

0. Assume a scenario, a flink job {color:#ff}reaches the max attempt count, 
( 20 ){color}

1. +ExecutionVertex+ is a parallel subtask of the execution. Each 
+ExecutionVertex+ was created with {color:#660e7a}MAX_ATTEMPTS_HISTORY_SIZE( 
default value: 16 ).{color}

2. when +SubtaskExecutionAttemptDetailsHandler+ hand will get the attempt from 
++priorExecutions,

   but priorExecutions just retained 
{color:#660e7a}MAX_ATTEMPTS_HISTORY_SIZE{color} elemets, so some element

   was dropped from the head of the list(FIFO). so may return null.
h3. *Detailed StackTrace*
{code:java}
java.lang.NullPointerException
   at 
org.apache.flink.runtime.rest.handler.util.MutableIOMetrics.addIOMetrics(MutableIOMetrics.java:88)
   at 
org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler.createDetailsInfo(SubtaskExecutionAttemptDetailsHandler.java:140)
   at 
org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler.archiveJsonWithPath(SubtaskExecutionAttemptDetailsHandler.java:120)
   at 
org.apache.flink.runtime.webmonitor.WebMonitorEndpoint.archiveJsonWithPath(WebMonitorEndpoint.java:780)
   at 
org.apache.flink.runtime.dispatcher.JsonResponseHistoryServerArchivist.archiveExecutionGraph(JsonResponseHistoryServerArchivist.java:57)
   at 
org.apache.flink.runtime.dispatcher.Dispatcher.archiveExecutionGraph(Dispatcher.java:758)
   at 
org.apache.flink.runtime.dispatcher.Dispatcher.jobReachedGloballyTerminalState(Dispatcher.java:730)
   at 
org.apache.flink.runtime.dispatcher.MiniDispatcher.jobReachedGloballyTerminalState(MiniDispatcher.java:138)
   at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$startJobManagerRunner$6(Dispatcher.java:341)
   at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
   at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
   at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
   at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
   at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
   at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
   at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
   at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
   at 
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
   at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
   at akka.actor.ActorCell.invoke(ActorCell.scala:495)
   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
   at akka.dispatch.Mailbox.run(Mailbox.scala:224)
   at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
   at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
   at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
   at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


{code}
h3. *Minimal reproducible example*
{code:java}
public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

DataStream text = env.addSource(new SourceFunction() {
@Override
public void run(SourceContext ctx) throws Exception {
while (true) {
ctx.collect("");
Thread.sleep(100);
}
}

@Override
public void cancel() {

}
});

text.addSink(new SinkFunction() {
@Override
public void invoke(String value, Context context) throws Exception {
System.out.println(1 / 0);
}
});

env.execute();

}
{code}
 

  was:
h3. *Issue detail info*

In our hadoop product env, we use fixed-delay restart-strategy.
{code:java}
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 20
restart-strategy.fixed-delay.delay: 2 s
{code}
if a flink-job 

[jira] [Updated] (FLINK-12247) fix NPE when writing an archive file to a FileSystem

2019-04-19 Thread lamber-ken (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12247?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lamber-ken updated FLINK-12247:
---
Description: 
h3. *Issue detail info*

In our hadoop product env, we use fixed-delay restart-strategy.
{code:java}
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 20
restart-strategy.fixed-delay.delay: 2 s
{code}
if a flink-job reaches the max attempt count, the flink job will write an 
archive file to +FileSystem+ and shut down.

but when +SubtaskExecutionAttemptDetailsHandler+ handle the detail attempt info 
of subtask, met NEP.
h3. *Detailed reasons are as follows:*

0. Assume a scenario, a flink job {color:#ff}reaches the max attempt count, 
( 20 ){color}

1. +ExecutionVertex+ is a parallel subtask of the execution. Each 
+ExecutionVertex+ was created with {color:#660e7a}MAX_ATTEMPTS_HISTORY_SIZE( 
default value: 16 ).{color}

2. when +SubtaskExecutionAttemptDetailsHandler+ hand will get the attempt from 
++priorExecutions,

   but priorExecutions just retained 
{color:#660e7a}MAX_ATTEMPTS_HISTORY_SIZE{color} elemets, so some element

   was dropped from the head of the list(FIFO). so may return null.
h3. *Detailed StackTrace*
{code:java}
java.lang.NullPointerException
   at 
org.apache.flink.runtime.rest.handler.util.MutableIOMetrics.addIOMetrics(MutableIOMetrics.java:88)
   at 
org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler.createDetailsInfo(SubtaskExecutionAttemptDetailsHandler.java:140)
   at 
org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler.archiveJsonWithPath(SubtaskExecutionAttemptDetailsHandler.java:120)
   at 
org.apache.flink.runtime.webmonitor.WebMonitorEndpoint.archiveJsonWithPath(WebMonitorEndpoint.java:780)
   at 
org.apache.flink.runtime.dispatcher.JsonResponseHistoryServerArchivist.archiveExecutionGraph(JsonResponseHistoryServerArchivist.java:57)
   at 
org.apache.flink.runtime.dispatcher.Dispatcher.archiveExecutionGraph(Dispatcher.java:758)
   at 
org.apache.flink.runtime.dispatcher.Dispatcher.jobReachedGloballyTerminalState(Dispatcher.java:730)
   at 
org.apache.flink.runtime.dispatcher.MiniDispatcher.jobReachedGloballyTerminalState(MiniDispatcher.java:138)
   at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$startJobManagerRunner$6(Dispatcher.java:341)
   at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
   at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
   at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
   at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
   at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
   at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
   at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
   at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
   at 
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
   at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
   at akka.actor.ActorCell.invoke(ActorCell.scala:495)
   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
   at akka.dispatch.Mailbox.run(Mailbox.scala:224)
   at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
   at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
   at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
   at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


{code}
h3. *minimal reproducible example*
{code:java}
public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

DataStream text = env.addSource(new SourceFunction() {
@Override
public void run(SourceContext ctx) throws Exception {
while (true) {
ctx.collect("");
Thread.sleep(100);
}
}

@Override
public void cancel() {

}
});

text.addSink(new SinkFunction() {
@Override
public void invoke(String value, Context context) throws Exception {
System.out.println(1 / 0);
}
});

env.execute();

}
{code}
 

  was:
h3. *Issue detail info*
In our hadoop product env, we use fixed-delay restart-strategy.
{code:java}
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 20
restart-strategy.fixed-delay.delay: 2 s
{code}
if a flink-job reaches 

[GitHub] [flink] pnowojski commented on a change in pull request #8124: [FLINK-11877] Implement the runtime handling of the InputSelectable interface

2019-04-19 Thread GitBox
pnowojski commented on a change in pull request #8124: [FLINK-11877] Implement 
the runtime handling of the InputSelectable interface
URL: https://github.com/apache/flink/pull/8124#discussion_r276939116
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java
 ##
 @@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.streaming.api.operators.InputSelectable;
+import org.apache.flink.streaming.api.operators.InputSelection;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Input reader for {@link 
org.apache.flink.streaming.runtime.tasks.TwoInputSelectableStreamTask}
+ * in the case that the operator is InputSelectable.
+ *
+ * @param  The type of the records that arrive on the first input
+ * @param  The type of the records that arrive on the second input
+ */
+@Internal
+public class StreamTwoInputSelectableProcessor {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(StreamTwoInputSelectableProcessor.class);
+
+   private volatile boolean continuousProcessing = true;
+
+   private final NetworkInput input1;
+   private final NetworkInput input2;
+
+   private final Object lock;
+
+   private final TwoInputStreamOperator streamOperator;
+
+   private final InputSelectable inputSelector;
+
+   private final AuxiliaryHandler auxiliaryHandler;
+
+   private final CompletableFuture[] listenFutures;
+
+   private final boolean[] isFinished;
+
+   private InputSelection inputSelection;
+
+   private AtomicInteger availableInputsMask = new AtomicInteger();
 
 Review comment:
   Yes, in case of 2.:
   > If only one of the inputs is available, always read it until another input 
become available or itself becomes unavailable/finished.
   
   let's start with always checking once per record if the other side of the 
input became available.
   
   After the holiday (we have now a public holiday in Germany) I'll have to 
think about this a little bit more, what would be the most general and 
efficient way how to handle this without this overhead.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-10941) Slots prematurely released which still contain unconsumed data

2019-04-19 Thread Piotr Nowojski (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10941?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-10941.
--
   Resolution: Fixed
Fix Version/s: 1.7.3

merged commit 0ba into apache:release-1.7

> Slots prematurely released which still contain unconsumed data 
> ---
>
> Key: FLINK-10941
> URL: https://issues.apache.org/jira/browse/FLINK-10941
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.5.5, 1.6.2, 1.7.0
>Reporter: Qi
>Assignee: Andrey Zagrebin
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.3, 1.9.0, 1.8.1
>
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> Our case is: Flink 1.5 batch mode, 32 parallelism to read data source and 4 
> parallelism to write data sink.
>  
> The read task worked perfectly with 32 TMs. However when the job was 
> executing the write task, since only 4 TMs were needed, other 28 TMs were 
> released. This caused RemoteTransportException in the write task:
>  
> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: 
> Connection unexpectedly closed by remote task manager 
> ’the_previous_TM_used_by_read_task'. This might indicate that the remote task 
> manager was lost.
>   at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelInactive(PartitionRequestClientHandler.java:133)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
>   ...
>  
> After skimming YarnFlinkResourceManager related code, it seems to me that 
> Flink is releasing TMs when they’re idle, regardless of whether working TMs 
> need them.
>  
> Put in another way, Flink seems to prematurely release slots which contain 
> unconsumed data and, thus, eventually release a TM which then fails a 
> consuming task.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12247) fix NPE when writing an archive file to a FileSystem

2019-04-19 Thread lamber-ken (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12247?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lamber-ken updated FLINK-12247:
---
Description: 
h3. *Issue detail info*
In our hadoop product env, we use fixed-delay restart-strategy.
{code:java}
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 20
restart-strategy.fixed-delay.delay: 2 s
{code}
if a flink-job reaches the max attempt count, the flink job will write an 
archive file to +FileSystem+ and shut down.

but when +SubtaskExecutionAttemptDetailsHandler+ handle the detail attempt info 
of subtask, met NEP.
h3. *Detailed reasons are as follows:*

0. Assume a scenario, a flink job {color:#FF}reaches the max attempt count, 
( 20 ){color}


1. +ExecutionVertex+ is a parallel subtask of the execution. Each 
+ExecutionVertex+ was created with {color:#660e7a}MAX_ATTEMPTS_HISTORY_SIZE( 
default value: 16 ).{color}

2. when +SubtaskExecutionAttemptDetailsHandler+ hand will get the attempt from 
++priorExecutions, 

   but priorExecutions just retained 
{color:#660e7a}MAX_ATTEMPTS_HISTORY_SIZE{color} elemets, so some element

   was dropped from the head of the list(FIFO). so may return null.
h3. *Detailed StackTrace*
{code:java}
java.lang.NullPointerException
   at 
org.apache.flink.runtime.rest.handler.util.MutableIOMetrics.addIOMetrics(MutableIOMetrics.java:88)
   at 
org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler.createDetailsInfo(SubtaskExecutionAttemptDetailsHandler.java:140)
   at 
org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler.archiveJsonWithPath(SubtaskExecutionAttemptDetailsHandler.java:120)
   at 
org.apache.flink.runtime.webmonitor.WebMonitorEndpoint.archiveJsonWithPath(WebMonitorEndpoint.java:780)
   at 
org.apache.flink.runtime.dispatcher.JsonResponseHistoryServerArchivist.archiveExecutionGraph(JsonResponseHistoryServerArchivist.java:57)
   at 
org.apache.flink.runtime.dispatcher.Dispatcher.archiveExecutionGraph(Dispatcher.java:758)
   at 
org.apache.flink.runtime.dispatcher.Dispatcher.jobReachedGloballyTerminalState(Dispatcher.java:730)
   at 
org.apache.flink.runtime.dispatcher.MiniDispatcher.jobReachedGloballyTerminalState(MiniDispatcher.java:138)
   at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$startJobManagerRunner$6(Dispatcher.java:341)
   at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
   at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
   at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
   at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
   at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
   at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
   at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
   at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
   at 
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
   at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
   at akka.actor.ActorCell.invoke(ActorCell.scala:495)
   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
   at akka.dispatch.Mailbox.run(Mailbox.scala:224)
   at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
   at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
   at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
   at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


{code}

  was:
In our product env, we use fixed-delay restart-strategy which 

 

 

job failed, met NEP when writing an archive file to FileSystem
{code:java}
java.lang.NullPointerException
   at 
org.apache.flink.runtime.rest.handler.util.MutableIOMetrics.addIOMetrics(MutableIOMetrics.java:88)
   at 
org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler.createDetailsInfo(SubtaskExecutionAttemptDetailsHandler.java:140)
   at 
org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler.archiveJsonWithPath(SubtaskExecutionAttemptDetailsHandler.java:120)
   at 
org.apache.flink.runtime.webmonitor.WebMonitorEndpoint.archiveJsonWithPath(WebMonitorEndpoint.java:780)
   at 
org.apache.flink.runtime.dispatcher.JsonResponseHistoryServerArchivist.archiveExecutionGraph(JsonResponseHistoryServerArchivist.java:57)
   at 
org.apache.flink.runtime.dispatcher.Dispatcher.archiveExecutionGraph(Dispatcher.java:758)
   at 

[GitHub] [flink] pnowojski merged pull request #8201: [FLINK-10941] Keep slots which contain unconsumed result partitions

2019-04-19 Thread GitBox
pnowojski merged pull request #8201: [FLINK-10941] Keep slots which contain 
unconsumed result partitions
URL: https://github.com/apache/flink/pull/8201
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #8205: [FLINK-12238] [TABLE/SQL] Support database related operations in GenericHiveMetastoreCatalog and setup flink-connector-hive module

2019-04-19 Thread GitBox
wuchong commented on a change in pull request #8205: [FLINK-12238] [TABLE/SQL] 
Support database related operations in GenericHiveMetastoreCatalog and setup 
flink-connector-hive module
URL: https://github.com/apache/flink/pull/8205#discussion_r276929320
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogUtil.java
 ##
 @@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.flink.table.catalog.CatalogDatabase;
+
+import org.apache.hadoop.hive.metastore.api.Database;
+
+import java.util.Map;
+
+
+/**
+ * Utils to convert meta objects between Flink and Hive for 
GenericHiveMetastoreCatalog.
+ */
+public class GenericHiveMetastoreCatalogUtil {
+
+   private GenericHiveMetastoreCatalogUtil() {
+   }
+
+   // -- Utils --
+
+   /**
+* Creates a Hive database from CatalogDatabase.
+*/
+   public static Database createHiveDatabase(String dbName, 
CatalogDatabase db) {
+   Map props = db.getProperties();
+   return new Database(
+   dbName,
+   db.getDescription().get(),
 
 Review comment:
   But the parameter is a `CatalogDatabase`, not a `GenericCatalogDatabase`. 
Still not sure if it's non-null? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #8205: [FLINK-12238] [TABLE/SQL] Support database related operations in GenericHiveMetastoreCatalog and setup flink-connector-hive module

2019-04-19 Thread GitBox
wuchong commented on a change in pull request #8205: [FLINK-12238] [TABLE/SQL] 
Support database related operations in GenericHiveMetastoreCatalog and setup 
flink-connector-hive module
URL: https://github.com/apache/flink/pull/8205#discussion_r276928193
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
 ##
 @@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.GenericCatalogDatabase;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ReadableWritableCatalog;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A catalog that persists all Flink streaming and batch metadata by using 
Hive metastore as a persistent storage.
+ */
+public class GenericHiveMetastoreCatalog implements ReadableWritableCatalog {
 
 Review comment:
   GenericHiveMetastoreCatalog => GenericHiveMetaStoreCatalog  with upper case 
for `s`? 
   
   I find that it's called `HiveMetaStore` in hive.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12247) fix NPE when writing an archive file to a FileSystem

2019-04-19 Thread lamber-ken (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12247?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lamber-ken updated FLINK-12247:
---
Description: 
In our product env, we use fixed-delay restart-strategy which 

 

 

job failed, met NEP when writing an archive file to FileSystem
{code:java}
java.lang.NullPointerException
   at 
org.apache.flink.runtime.rest.handler.util.MutableIOMetrics.addIOMetrics(MutableIOMetrics.java:88)
   at 
org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler.createDetailsInfo(SubtaskExecutionAttemptDetailsHandler.java:140)
   at 
org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler.archiveJsonWithPath(SubtaskExecutionAttemptDetailsHandler.java:120)
   at 
org.apache.flink.runtime.webmonitor.WebMonitorEndpoint.archiveJsonWithPath(WebMonitorEndpoint.java:780)
   at 
org.apache.flink.runtime.dispatcher.JsonResponseHistoryServerArchivist.archiveExecutionGraph(JsonResponseHistoryServerArchivist.java:57)
   at 
org.apache.flink.runtime.dispatcher.Dispatcher.archiveExecutionGraph(Dispatcher.java:758)
   at 
org.apache.flink.runtime.dispatcher.Dispatcher.jobReachedGloballyTerminalState(Dispatcher.java:730)
   at 
org.apache.flink.runtime.dispatcher.MiniDispatcher.jobReachedGloballyTerminalState(MiniDispatcher.java:138)
   at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$startJobManagerRunner$6(Dispatcher.java:341)
   at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
   at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
   at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
   at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
   at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
   at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
   at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
   at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
   at 
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
   at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
   at akka.actor.ActorCell.invoke(ActorCell.scala:495)
   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
   at akka.dispatch.Mailbox.run(Mailbox.scala:224)
   at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
   at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
   at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
   at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


{code}

  was:
In our product env, flink restart-strategy config

 

 

job failed, met NEP when writing an archive file to FileSystem
{code:java}
java.lang.NullPointerException
   at 
org.apache.flink.runtime.rest.handler.util.MutableIOMetrics.addIOMetrics(MutableIOMetrics.java:88)
   at 
org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler.createDetailsInfo(SubtaskExecutionAttemptDetailsHandler.java:140)
   at 
org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler.archiveJsonWithPath(SubtaskExecutionAttemptDetailsHandler.java:120)
   at 
org.apache.flink.runtime.webmonitor.WebMonitorEndpoint.archiveJsonWithPath(WebMonitorEndpoint.java:780)
   at 
org.apache.flink.runtime.dispatcher.JsonResponseHistoryServerArchivist.archiveExecutionGraph(JsonResponseHistoryServerArchivist.java:57)
   at 
org.apache.flink.runtime.dispatcher.Dispatcher.archiveExecutionGraph(Dispatcher.java:758)
   at 
org.apache.flink.runtime.dispatcher.Dispatcher.jobReachedGloballyTerminalState(Dispatcher.java:730)
   at 
org.apache.flink.runtime.dispatcher.MiniDispatcher.jobReachedGloballyTerminalState(MiniDispatcher.java:138)
   at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$startJobManagerRunner$6(Dispatcher.java:341)
   at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
   at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
   at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
   at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
   at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
   at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
   at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
   at 

[GitHub] [flink] JingsongLi commented on a change in pull request #8222: [FLINK-11518] [SQL/TABLE] Add partition related catalog APIs and implement them in GenericInMemoryCatalog

2019-04-19 Thread GitBox
JingsongLi commented on a change in pull request #8222: [FLINK-11518] 
[SQL/TABLE] Add partition related catalog APIs and implement them in 
GenericInMemoryCatalog
URL: https://github.com/apache/flink/pull/8222#discussion_r276928708
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/PartitionAlreadyExistException.java
 ##
 @@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.exceptions;
+
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.ObjectPath;
+
+/**
+ * Exception for adding an already existed partition.
+ */
+public class PartitionAlreadyExistException extends RuntimeException {
+   private static final String MSG = "Partition %s of table %s in catalog 
%s already exists.";
+
+   public PartitionAlreadyExistException(
+   String catalogName,
+   ObjectPath tablePath,
+   CatalogPartition.PartitionSpec partitionSpec) {
+
+   super(String.format(MSG, partitionSpec, 
tablePath.getFullName(), catalogName), null);
 
 Review comment:
   just remove `, null` ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8222: [FLINK-11518] [SQL/TABLE] Add partition related catalog APIs and implement them in GenericInMemoryCatalog

2019-04-19 Thread GitBox
JingsongLi commented on a change in pull request #8222: [FLINK-11518] 
[SQL/TABLE] Add partition related catalog APIs and implement them in 
GenericInMemoryCatalog
URL: https://github.com/apache/flink/pull/8222#discussion_r276926297
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java
 ##
 @@ -29,4 +32,15 @@
 * @return table statistics
 */
TableStats getStatistics();
+
+   /**
+* Check if the table is partitioend or not.
+*/
+   boolean isPartitioned();
+
+   /**
+*
+* @return
 
 Review comment:
   comment?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8222: [FLINK-11518] [SQL/TABLE] Add partition related catalog APIs and implement them in GenericInMemoryCatalog

2019-04-19 Thread GitBox
JingsongLi commented on a change in pull request #8222: [FLINK-11518] 
[SQL/TABLE] Add partition related catalog APIs and implement them in 
GenericInMemoryCatalog
URL: https://github.com/apache/flink/pull/8222#discussion_r276927642
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
 ##
 @@ -202,6 +212,8 @@ public void dropTable(ObjectPath tablePath, boolean 
ignoreIfNotExists) throws Ta
 
if (tableExists(tablePath)) {
tables.remove(tablePath);
+
+   partitions.remove(tablePath);
 
 Review comment:
   renameTable too? maybe add test?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8222: [FLINK-11518] [SQL/TABLE] Add partition related catalog APIs and implement them in GenericInMemoryCatalog

2019-04-19 Thread GitBox
JingsongLi commented on a change in pull request #8222: [FLINK-11518] 
[SQL/TABLE] Add partition related catalog APIs and implement them in 
GenericInMemoryCatalog
URL: https://github.com/apache/flink/pull/8222#discussion_r276926272
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java
 ##
 @@ -29,4 +32,15 @@
 * @return table statistics
 */
TableStats getStatistics();
+
+   /**
+* Check if the table is partitioend or not.
+*/
+   boolean isPartitioned();
+
+   /**
+*
+* @return
+*/
+   Set getPartitionKeys() throws TableNotPartitionedException;
 
 Review comment:
   LinkedHashSet?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12247) fix NPE when writing an archive file to a FileSystem

2019-04-19 Thread lamber-ken (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12247?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lamber-ken updated FLINK-12247:
---
Description: 
In our product env, flink restart-strategy config

 

 

job failed, met NEP when writing an archive file to FileSystem
{code:java}
java.lang.NullPointerException
   at 
org.apache.flink.runtime.rest.handler.util.MutableIOMetrics.addIOMetrics(MutableIOMetrics.java:88)
   at 
org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler.createDetailsInfo(SubtaskExecutionAttemptDetailsHandler.java:140)
   at 
org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler.archiveJsonWithPath(SubtaskExecutionAttemptDetailsHandler.java:120)
   at 
org.apache.flink.runtime.webmonitor.WebMonitorEndpoint.archiveJsonWithPath(WebMonitorEndpoint.java:780)
   at 
org.apache.flink.runtime.dispatcher.JsonResponseHistoryServerArchivist.archiveExecutionGraph(JsonResponseHistoryServerArchivist.java:57)
   at 
org.apache.flink.runtime.dispatcher.Dispatcher.archiveExecutionGraph(Dispatcher.java:758)
   at 
org.apache.flink.runtime.dispatcher.Dispatcher.jobReachedGloballyTerminalState(Dispatcher.java:730)
   at 
org.apache.flink.runtime.dispatcher.MiniDispatcher.jobReachedGloballyTerminalState(MiniDispatcher.java:138)
   at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$startJobManagerRunner$6(Dispatcher.java:341)
   at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
   at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
   at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
   at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
   at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
   at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
   at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
   at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
   at 
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
   at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
   at akka.actor.ActorCell.invoke(ActorCell.scala:495)
   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
   at akka.dispatch.Mailbox.run(Mailbox.scala:224)
   at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
   at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
   at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
   at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


{code}

  was:
job failed, met NEP when writing an archive file to FileSystem
{code:java}
java.lang.NullPointerException
   at 
org.apache.flink.runtime.rest.handler.util.MutableIOMetrics.addIOMetrics(MutableIOMetrics.java:88)
   at 
org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler.createDetailsInfo(SubtaskExecutionAttemptDetailsHandler.java:140)
   at 
org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler.archiveJsonWithPath(SubtaskExecutionAttemptDetailsHandler.java:120)
   at 
org.apache.flink.runtime.webmonitor.WebMonitorEndpoint.archiveJsonWithPath(WebMonitorEndpoint.java:780)
   at 
org.apache.flink.runtime.dispatcher.JsonResponseHistoryServerArchivist.archiveExecutionGraph(JsonResponseHistoryServerArchivist.java:57)
   at 
org.apache.flink.runtime.dispatcher.Dispatcher.archiveExecutionGraph(Dispatcher.java:758)
   at 
org.apache.flink.runtime.dispatcher.Dispatcher.jobReachedGloballyTerminalState(Dispatcher.java:730)
   at 
org.apache.flink.runtime.dispatcher.MiniDispatcher.jobReachedGloballyTerminalState(MiniDispatcher.java:138)
   at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$startJobManagerRunner$6(Dispatcher.java:341)
   at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
   at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
   at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
   at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
   at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
   at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
   at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
   at 

[jira] [Updated] (FLINK-12139) Flink on mesos - Parameterize disk space needed.

2019-04-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12139?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12139:
---
Labels: pull-request-available  (was: )

> Flink on mesos - Parameterize disk space needed.
> 
>
> Key: FLINK-12139
> URL: https://issues.apache.org/jira/browse/FLINK-12139
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / Mesos
>Reporter: Juan
>Assignee: Oleksandr Nitavskyi
>Priority: Minor
>  Labels: pull-request-available
>
> We are having a small issue while trying to deploy Flink on Mesos using 
> marathon. In our set up of Mesos we are required to specify the amount of 
> disk space we want to have for the applications we deploy there.
> The current default value in Flink is 0 and it's currently is not 
> parameterizable. This means that we ask 0 disk space for our instances so 
> Flink can't work.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] flinkbot commented on issue #8224: [FLINK-12139][Mesos] Add disk space parameter.

2019-04-19 Thread GitBox
flinkbot commented on issue #8224: [FLINK-12139][Mesos] Add disk space 
parameter.
URL: https://github.com/apache/flink/pull/8224#issuecomment-484801689
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] juangentile opened a new pull request #8224: [FLINK-12139][Mesos] Add disk space parameter.

2019-04-19 Thread GitBox
juangentile opened a new pull request #8224: [FLINK-12139][Mesos] Add disk 
space parameter.
URL: https://github.com/apache/flink/pull/8224
 
 
   
   
   ## What is the purpose of the change
   
   *Add a parameter to set the required disk space for a Mesos deployment. 
Before this change the default value is 0 with no option to change it. We 
maintain the default 0, we only make it parameterizable*
   
   
   ## Brief change log
   
 - *The new parameter is loaded in the class 
MesosTaskManagerParameters.java*
 - *The new parameter is used in the class MesosResourceManager*
   
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): don't know
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: yes
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on issue #8205: [FLINK-12238] [TABLE/SQL] Support database related operations in GenericHiveMetastoreCatalog and setup flink-connector-hive module

2019-04-19 Thread GitBox
wuchong commented on issue #8205: [FLINK-12238] [TABLE/SQL] Support database 
related operations in GenericHiveMetastoreCatalog and setup 
flink-connector-hive module
URL: https://github.com/apache/flink/pull/8205#issuecomment-484799383
 
 
   I think we should also add a NOTICE file to `flink-connector-hive` module, 
because it bundles hive dependency. You can take this as an example 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-elasticsearch2/src/main/resources/META-INF/NOTICE.
   
   See more: https://cwiki.apache.org/confluence/display/FLINK/Licensing


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #7571: [FLINK-10724] Refactor failure handling in check point coordinator

2019-04-19 Thread GitBox
yanghua commented on a change in pull request #7571: [FLINK-10724] Refactor 
failure handling in check point coordinator
URL: https://github.com/apache/flink/pull/7571#discussion_r276927208
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
 ##
 @@ -101,7 +100,7 @@
private final CheckpointStorageLocation targetLocation;
 
/** The promise to fulfill once the checkpoint has been completed. */
-   private final CompletableFuture 
onCompletionPromise;
+   private final CompletableFuture 
onCompletionPromise;
 
 Review comment:
   @StefanRRichter I have refactored my PR based on the new design document. 
Would you please have a look?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on issue #8202: [FLINK-12133] [table-runtime-blink] Support unbounded aggregate in streaming table runtime

2019-04-19 Thread GitBox
wuchong commented on issue #8202: [FLINK-12133] [table-runtime-blink] Support 
unbounded aggregate in streaming table runtime
URL: https://github.com/apache/flink/pull/8202#issuecomment-484797370
 
 
   Rebased.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #8205: [FLINK-12238] [TABLE/SQL] Support database related operations in GenericHiveMetastoreCatalog and setup flink-connector-hive module

2019-04-19 Thread GitBox
wuchong commented on a change in pull request #8205: [FLINK-12238] [TABLE/SQL] 
Support database related operations in GenericHiveMetastoreCatalog and setup 
flink-connector-hive module
URL: https://github.com/apache/flink/pull/8205#discussion_r276925651
 
 

 ##
 File path: flink-connectors/flink-connector-hive/pom.xml
 ##
 @@ -0,0 +1,360 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-connectors
+   1.9-SNAPSHOT
+   ..
+   
+
+   flink-connector-hive_${scala.binary.version}
 
 Review comment:
   I think only the modules depends on a scala module (e.g. 
`flink-streaming-java_2.11`) need to specify the scala version. Other modules 
shouldn't specify scala version. For example, `flink-core`, 
`flink-table-common`, `flink-table-api-java` do not have a scala version.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API

2019-04-19 Thread GitBox
sunjincheng121 commented on a change in pull request #7196: [FLINK-10974] 
[table] Add support for flatMap to table API
URL: https://github.com/apache/flink/pull/7196#discussion_r276917451
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala
 ##
 @@ -305,6 +305,36 @@ class CorrelateITCase extends AbstractTestBase {
 )
   }
 
+  @Test
+  def testFlatMap(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = StreamTableEnvironment.create(env)
+StreamITCase.testResults = mutable.MutableList()
+
+val func2 = new TableFunc2
+val ds = testData(env).toTable(tEnv, 'a, 'b, 'c)
+  // test non alias
+  .flatMap(func2('c))
+  .select('f0, 'f1)
+  // test the output field name of flatMap is the same as the field name 
of input table
 
 Review comment:
   ` input table`->`the input table`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API

2019-04-19 Thread GitBox
sunjincheng121 commented on a change in pull request #7196: [FLINK-10974] 
[table] Add support for flatMap to table API
URL: https://github.com/apache/flink/pull/7196#discussion_r276922137
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/operations/OperationTreeBuilder.scala
 ##
 @@ -365,6 +367,53 @@ class OperationTreeBuilder(private val tableEnv: 
TableEnvironment) {
   mapFunction.asInstanceOf[CallExpression].getFunctionDefinition.getType 
== SCALAR_FUNCTION
   }
 
+  def flatMap(tableFunction: Expression, child: TableOperation): 
TableOperation = {
+
+val resolver = resolverFor(tableCatalog, functionCatalog, child).build()
+val resolvedTableFunction = resolveSingleExpression(tableFunction, 
resolver)
+
+if (!isTableFunction(resolvedTableFunction)) {
+  throw new ValidationException("Only TableFunction can be used in the 
flatMap operator.")
+}
+
+val originFieldNames: Seq[String] =
+  resolvedTableFunction.asInstanceOf[CallExpression].getFunctionDefinition 
match {
+case tfd: TableFunctionDefinition =>
+  UserDefinedFunctionUtils.getFieldInfo(tfd.getResultType)._1
+  }
+
+def getUniqueName(inputName: String, usedFieldNames: Seq[String]): String 
= {
+  var i = 0
+  var resultName = inputName
+  while (usedFieldNames.contains(resultName)) {
+resultName = resultName + "_" + i
+i += 1
+  }
+  resultName
+}
+
+val usedFieldNames = 
child.asInstanceOf[LogicalNode].output.map(_.name).toBuffer
+val newFieldNames = originFieldNames.map({ e =>
+  val resultName = getUniqueName(e, usedFieldNames)
+  usedFieldNames.append(resultName)
+  resultName
+})
+
+val renamedTableFunction = ApiExpressionUtils.call(
+  BuiltInFunctionDefinitions.AS,
+  resolvedTableFunction +: 
newFieldNames.map(ApiExpressionUtils.valueLiteral(_)): _*)
+val joinNode = joinLateral(child, renamedTableFunction, JoinType.INNER, 
Optional.empty())
+val dropNode = dropColumns(
 
 Review comment:
   `dorpNode` ->`rightNode`, becaust `joinNode`=`leftNode`+`rightNode`. after 
`dropCloumns()`, all columns if from `rightNode`. What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API

2019-04-19 Thread GitBox
sunjincheng121 commented on a change in pull request #7196: [FLINK-10974] 
[table] Add support for flatMap to table API
URL: https://github.com/apache/flink/pull/7196#discussion_r276914756
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
 ##
 @@ -1019,10 +1020,40 @@
 * Scala Example:
 *
 * 
-* {@code val func = new MyMapFunction()
+* {@code
+*   val func = new MyMapFunction()
 *   tab.map(func('c))
 * }
 * 
 */
Table map(Expression mapFunction);
+
+   /**
+* Performs a flatMap operation with a table function.
+*
+* Example:
+*
+* 
+* {@code
+*   TableFunction func = new MyFlatMapFunction();
+*   tableEnv.registerFunction("func", func);
+*   table.flatMap("func(c)");
+* }
+* 
+*/
+   Table flatMap(String tableFunction);
+
+   /**
+* Performs a flatMap operation with a table function.
 
 Review comment:
   Performs a flatMap operation with an user-defined table function or built-in 
table function. The output will be flattened if the output type is a composite 
type. ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API

2019-04-19 Thread GitBox
sunjincheng121 commented on a change in pull request #7196: [FLINK-10974] 
[table] Add support for flatMap to table API
URL: https://github.com/apache/flink/pull/7196#discussion_r276923999
 
 

 ##
 File path: docs/dev/table/tableApi.md
 ##
 @@ -1842,11 +1843,27 @@ ScalarFunction func = new MyMapFunction();
 tableEnv.registerFunction("func", func);
 
 Table table = input
-  .map(func("c")).as("a, b")
+  .map("func(c)").as("a, b")
 {% endhighlight %}
   
 
 
+
+  
+FlatMap
+Batch Streaming
+  
+  
+Performs a flatMap operation with a table function.
+{% highlight java %}
+TableFunction func = new MyFlatMapFunction();
+tableEnv.registerFunction("func", func);
 
 Review comment:
   I think is better to add the `MyFlatMapFunction()` definition. So that user 
can clearly know why we can `as("a, b")`. What do you think? If so, please 
improve the map example as well.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on issue #8205: [FLINK-12238] [TABLE/SQL] Support database related operations in GenericHiveMetastoreCatalog and setup flink-connector-hive module

2019-04-19 Thread GitBox
KurtYoung commented on issue #8205: [FLINK-12238] [TABLE/SQL] Support database 
related operations in GenericHiveMetastoreCatalog and setup 
flink-connector-hive module
URL: https://github.com/apache/flink/pull/8205#issuecomment-484794093
 
 
   One minor thing: could you make the component name in commit message more 
align with other commits? In this case, I think [table] is enough.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on issue #8202: [FLINK-12133] [table-runtime-blink] Support unbounded aggregate in streaming table runtime

2019-04-19 Thread GitBox
KurtYoung commented on issue #8202: [FLINK-12133] [table-runtime-blink] Support 
unbounded aggregate in streaming table runtime
URL: https://github.com/apache/flink/pull/8202#issuecomment-484793449
 
 
   I just merged another PR, please resolve conflicts again


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung closed pull request #8176: [FLINK-12192] [table-planner-blink] Add support for generating optimized logical plan for grouping sets and distinct aggregate

2019-04-19 Thread GitBox
KurtYoung closed pull request #8176: [FLINK-12192] [table-planner-blink] Add 
support for generating optimized logical plan for grouping sets and distinct 
aggregate
URL: https://github.com/apache/flink/pull/8176
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-12192) Add support for generating optimized logical plan for grouping sets and distinct aggregate

2019-04-19 Thread Kurt Young (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12192?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young closed FLINK-12192.
--
   Resolution: Fixed
Fix Version/s: 1.9.0

fixed in 1c55f7a7e18e03f5b777c9fbe3234055904b00d6

> Add support for generating optimized logical plan for grouping sets and 
> distinct aggregate
> --
>
> Key: FLINK-12192
> URL: https://issues.apache.org/jira/browse/FLINK-12192
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Planner
>Reporter: godfrey he
>Assignee: godfrey he
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> This issue aims to supports generating optimized logical plan for grouping 
> sets and distinct aggregate. (mentioned in FLINK-12076 and FLINK-12098)
> for batch, query with distinct aggregate will be rewritten into two 
> non-distinct aggregates by extended 
> [AggregateExpandDistinctAggregatesRule|https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/rel/rules/AggregateExpandDistinctAggregatesRule.java],
>  the first aggregate computes the distinct key and non-distinct aggregate 
> function, and the second aggregate computes the distinct aggregate function 
> based on first aggregate result.  The first aggregate has grouping sets if 
> there are more than one distinct aggregate functions on different fields.
> for stream, query with distinct aggregate is handled by SplitAggregateRule in 
> FLINK-12161.
> query with grouping sets (or cube, rollup) will be rewritten into a regular 
> aggregate with expand, and the expand node will duplicates the input data for 
> each simple group.
>  e.g.
> {noformat}
> schema:
> MyTable: a: INT, b: BIGINT, c: VARCHAR(32), d: VARCHAR(32)
>  Original records:
> +-+-+-+-+
> |  a  |  b  |  c  |  d  |
> +-+-+-+-+
> |  1  |  1  |  c1 |  d1 |
> +-+-+-+-+
> |  1  |  2  |  c1 |  d2 |
> +-+-+-+-+
> |  2  |  1  |  c1 |  d1 |
> +-+-+-+-+
> SELECT a, c, SUM(b) as b FROM MyTable GROUP BY GROUPING SETS (a, c)
> logical plan after expanded:
> LogicalCalc(expr#0..3=[{inputs}], proj#0..1=[{exprs}], b=[$t3])
> LogicalAggregate(group=[{0, 2, 3}], groups=[[]], b=[SUM($1)])
> LogicalExpand(projects=[{a=[$0], b=[$1], c=[null], $e=[1]}, 
> {a=[null], b=[$1], c=[$2], $e=[2]}])
> LogicalNativeTableScan(table=[[builtin, default, MyTable]])
> notes:
> '$e = 1' is equivalent to 'group by a'
> '$e = 2' is equivalent to 'group by c'
> expanded records:
> +-+-+-+-+
> |  a  |  b  |  c  | $e  |
> +-+-+-+-+---+---
> |  1  |  1  | null|  1  |   |
> +-+-+-+-+  records expanded by record1
> | null|  1  |  c1 |  2  |   |
> +-+-+-+-+---+---
> |  1  |  2  | null|  1  |   |
> +-+-+-+-+  records expanded by record2
> | null|  2  |  c1 |  2  |   |
> +-+-+-+-+---+---
> |  2  |  1  | null|  1  |   |
> +-+-+-+-+  records expanded by record3
> | null|  1  |  c1 |  2  |   |
> +-+-+-+-+---+---
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   >