[GitHub] [flink] sunjincheng121 commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-04-10 Thread GitBox
sunjincheng121 commented on a change in pull request #8007: 
[FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r274267157
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
 ##
 @@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.util.StringUtils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A generic catalog implementation that holds all meta objects in memory.
+ */
+public class GenericInMemoryCatalog implements ReadableWritableCatalog {
+
+   public static final String DEFAULT_DB = "default";
+
+   private String currentDatabase = DEFAULT_DB;
+
+   private final String catalogName;
+   private final Map databases;
+   private final Map tables;
+
+   public GenericInMemoryCatalog(String name) {
+   checkArgument(!StringUtils.isNullOrWhitespaceOnly(name), "name 
cannot be null or empty");
+
+   this.catalogName = name;
+   this.databases = new LinkedHashMap<>();
+   this.databases.put(DEFAULT_DB, new GenericCatalogDatabase(new 
HashMap<>()));
+   this.tables = new LinkedHashMap<>();
+   }
+
+   @Override
+   public String getCurrentDatabase() {
+   return currentDatabase;
+   }
+
+   @Override
+   public void setCurrentDatabase(String databaseName) throws 
DatabaseNotExistException {
+   
checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));
+
+   if (!databaseExists(databaseName)) {
+   throw new DatabaseNotExistException(catalogName, 
databaseName);
+   }
+
+   currentDatabase = databaseName;
+   }
+
+   @Override
+   public void open() {
+
+   }
+
+   @Override
+   public void close() {
+
+   }
+
+   // -- databases --
+
+   @Override
+   public void createDatabase(String databaseName, CatalogDatabase db, 
boolean ignoreIfExists)
+   throws DatabaseAlreadyExistException {
+   
checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));
+   checkArgument(db != null);
+
+   if (databaseExists(databaseName)) {
+   if (!ignoreIfExists) {
+   throw new 
DatabaseAlreadyExistException(catalogName, databaseName);
+   }
+   } else {
+   databases.put(databaseName, db.copy());
+   }
+   }
+
+   @Override
+   public void dropDatabase(String databaseName, boolean 
ignoreIfNotExists) throws DatabaseNotExistException {
+   
checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));
+
+   if (databases.containsKey(databaseName)) {
+
+   // Make sure the database is empty
+   if (isDatabaseEmpty(databaseName)) {
+   databases.remove(databaseName);
+   } else {
+   throw new 
DatabaseNotEmptyException(catalogName, databaseName);
+   }
+   } else if (!ignoreIfNotExists) {
+   throw new DatabaseNotExistException(catalogName, 
databaseName);
+   }
+   }
+
+   private boolean isDatabaseEmpty(String databaseName) {
+   

[GitHub] [flink] hequn8128 commented on issue #8087: [FLINK-12029][table] Add column operations for TableApi

2019-04-10 Thread GitBox
hequn8128 commented on issue #8087: [FLINK-12029][table] Add column operations 
for TableApi
URL: https://github.com/apache/flink/pull/8087#issuecomment-481972396
 
 
   @sunjincheng121 @dawidwys I have updated the PR. Would be great if you can 
take another look. :-)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on issue #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-10 Thread GitBox
zhijiangW commented on issue #8090: [FLINK-12067][network] Refactor the 
constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#issuecomment-481971372
 
 
   @pnowojski , thanks for your review! I have rebased the master to solve the 
conflicts.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zentol commented on a change in pull request #8002: [FLINK-11923][metrics] MetricRegistryConfiguration provides MetricReporters Suppliers

2019-04-10 Thread GitBox
zentol commented on a change in pull request #8002: [FLINK-11923][metrics] 
MetricRegistryConfiguration provides MetricReporters Suppliers
URL: https://github.com/apache/flink/pull/8002#discussion_r274009874
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
 ##
 @@ -113,19 +114,12 @@ public MetricRegistryImpl(MetricRegistryConfiguration 
config) {
// by default, don't report anything
LOG.info("No metrics reporter configured, no metrics 
will be exposed/reported.");
} else {
-   // we have some reporters so
-   for (Tuple2 
reporterConfiguration: reporterConfigurations) {
-   String namedReporter = reporterConfiguration.f0;
-   Configuration reporterConfig = 
reporterConfiguration.f1;
-
-   final String className = 
reporterConfig.getString(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, null);
-   if (className == null) {
-   LOG.error("No reporter class set for 
reporter " + namedReporter + ". Metrics might not be exposed/reported.");
-   continue;
-   }
+   for (ReporterSetup reporterSetup : 
reporterConfigurations) {
 
 Review comment:
   I assumed you meant that the reporter class would have to hard-code the 
name, but you're suggesting that the reporter parses the interval and name from 
the configuration I guess.
   
   ~~So for one this wouldn't really work with the subsequent introduction of 
factories, which requires the MetricConfig to be assembled before the reporter 
is even instantiated. One could move the logic into the factory of course; my 
point is that this is not something we can do _now_.~~
   
   Reporters (or factories for that matter) shouldn't be aware that reporter 
names actually exist; this is an implementation detail of the metric system to 
allow for distinct configurations. Whether multiple reporters exist or not is 
not irrelevant to a given reporter, so they shouldn't have to deal with it. I 
also don't see a way how a reporter would be able to determine it's own name 
from the given configuration. The system-wide configuration does not contain 
this info in an obvious fashion, and the metric config does not contain it at 
all. I can only see this working if we (the MetricRegistry/-Configuration) 
explicitly write it into the MetricConfig, but this obviously doesn't make any 
sense.
   
   Admittedly, the interval makes more sense. It is true that the current 
reporter configuration is a mix between reporter-facing options (like 
reporter-specific arguments) and system-facing options (like the interval). The 
current approach however allows us to ensure that certain configuration options 
exist and are actually applied; in a design where the `Scheduled` reporter 
provides the interval you cannot guarantee that the interval is configurable 
for users, or that a configured interval is respected.
   The same applies to other system-facing reporter options, like delimiters. 
So long as reporter don't massively go out of their way to avoid it (i.e. not 
working at all against the MetricGroup interface (with some currently existing 
exceptions)) the configured delimiter _will_ be used.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zentol commented on a change in pull request #8002: [FLINK-11923][metrics] MetricRegistryConfiguration provides MetricReporters Suppliers

2019-04-10 Thread GitBox
zentol commented on a change in pull request #8002: [FLINK-11923][metrics] 
MetricRegistryConfiguration provides MetricReporters Suppliers
URL: https://github.com/apache/flink/pull/8002#discussion_r274009874
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
 ##
 @@ -113,19 +114,12 @@ public MetricRegistryImpl(MetricRegistryConfiguration 
config) {
// by default, don't report anything
LOG.info("No metrics reporter configured, no metrics 
will be exposed/reported.");
} else {
-   // we have some reporters so
-   for (Tuple2 
reporterConfiguration: reporterConfigurations) {
-   String namedReporter = reporterConfiguration.f0;
-   Configuration reporterConfig = 
reporterConfiguration.f1;
-
-   final String className = 
reporterConfig.getString(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, null);
-   if (className == null) {
-   LOG.error("No reporter class set for 
reporter " + namedReporter + ". Metrics might not be exposed/reported.");
-   continue;
-   }
+   for (ReporterSetup reporterSetup : 
reporterConfigurations) {
 
 Review comment:
   I assumed you meant that the reporter class would have to hard-code the 
name, but you're suggesting that the reporter parses the interval and name from 
the configuration I guess.
   
   ~~So for one this wouldn't really work with the subsequent introduction of 
factories, which requires the MetricConfig to be assembled before the reporter 
is even instantiated. One could move the logic into the factory of course; my 
point is that this is not something we can do _now_.~
   
   Reporters (or factories for that matter) shouldn't be aware that reporter 
names actually exist; this is an implementation detail of the metric system to 
allow for distinct configurations. Whether multiple reporters exist or not is 
not irrelevant to a given reporter, so they shouldn't have to deal with it. I 
also don't see a way how a reporter would be able to determine it's own name 
from the given configuration. The metric-wide configuration does not contain 
this info in an obvious fashion, and the metric config does not contain it at 
all. I can only see this working if we (the MetricRegistry/-Configuration) 
explicitly write it into the MetricConfig, but this obviously doesn't make any 
sense.
   
   Admittedly, the interval makes more sense. It is true that the current 
reporter configuration is a mix between reporter-facing options (like 
reporter-specific arguments) and system-facing options (like the interval). The 
current approach however allows us to ensure that certain configuration options 
exist and are actually applied; in a design where the `Scheduled` reporter 
provides the interval you cannot guarantee that the interval is configurable 
for users, or that a configured interval is respected.
   The same applies to other system-facing reporter options, like delimiters. 
So long as reporter don't massively go out of their way to avoid it (i.e. not 
working at all against the MetricGroup interface (with some currently existing 
exceptions)) the configured delimiter _will_ be used.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10929) Add support for Apache Arrow

2019-04-10 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16815073#comment-16815073
 ] 

vinoyang commented on FLINK-10929:
--

Hi [~fan_li_ya] If there is any design documentation, it would be better for 
discussing. And it may be a good start.

> Add support for Apache Arrow
> 
>
> Key: FLINK-10929
> URL: https://issues.apache.org/jira/browse/FLINK-10929
> Project: Flink
>  Issue Type: Wish
>  Components: Runtime / State Backends
>Reporter: Pedro Cardoso Silva
>Priority: Minor
> Attachments: image-2019-04-10-13-43-08-107.png
>
>
> Investigate the possibility of adding support for Apache Arrow as a 
> standardized columnar, memory format for data.
> Given the activity that [https://github.com/apache/arrow] is currently 
> getting and its claims objective of providing a zero-copy, standardized data 
> format across platforms, I think it makes sense for Flink to look into 
> supporting it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] hequn8128 commented on issue #8050: [FLINK-11067][table] Convert TableEnvironments to interfaces

2019-04-10 Thread GitBox
hequn8128 commented on issue #8050: [FLINK-11067][table] Convert 
TableEnvironments to interfaces
URL: https://github.com/apache/flink/pull/8050#issuecomment-481965728
 
 
   @bowenli86 Thanks for your review and providing valuable information for the 
catalog.
   
   - Re TableEnvironment.
   Yes, you are right. Currently, we must use a java or scala Environment to 
register table or aggregate functions due to TypeInformations are handled 
differently between java and scala. However, I think this would be solved by 
FLIP-37.
   
   - Re listTables.
   Nice spoil alert. Looking forward to it. :-)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-12066) Remove StateSerializerProvider field in keyed state backend

2019-04-10 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai closed FLINK-12066.
---
Resolution: Fixed

Merged for 1.9.0:
1132a52a58c3710a9a183111fda84cd1432605fa

> Remove StateSerializerProvider field in keyed state backend
> ---
>
> Key: FLINK-12066
> URL: https://issues.apache.org/jira/browse/FLINK-12066
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Yu Li
>Assignee: Yu Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> As mentioned in [PR review of 
> FLINK-10043|https://github.com/apache/flink/pull/7674#discussion_r257630962] 
> with Stefan and offline discussion with Gordon, after the refactoring work 
> serializer passed to {{RocksDBKeyedStateBackend}} constructor is a final one, 
> thus the {{StateSerializerProvider}} field is no longer needed.
> For {{HeapKeyedStateBackend}}, the only thing stops us to pass a final 
> serializer is the circle dependency between the backend and 
> {{HeapRestoreOperation}}, and we aim to decouple them by introducing a new 
> {{HeapInternalKeyContext}} as the bridge. More details please refer to the 
> coming PR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] asfgit closed pull request #8078: [FLINK-12066] [State Backends] Remove StateSerializerProvider field in keyed state backends

2019-04-10 Thread GitBox
asfgit closed pull request #8078: [FLINK-12066] [State Backends] Remove 
StateSerializerProvider field in keyed state backends
URL: https://github.com/apache/flink/pull/8078
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tzulitai commented on issue #8078: [FLINK-12066] [State Backends] Remove StateSerializerProvider field in keyed state backends

2019-04-10 Thread GitBox
tzulitai commented on issue #8078: [FLINK-12066] [State Backends] Remove 
StateSerializerProvider field in keyed state backends
URL: https://github.com/apache/flink/pull/8078#issuecomment-481962227
 
 
   Thanks, merging ..


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on issue #8110: [FLINK-12098] [table-planner-blink] Add support for generating optimized logical plan for simple group aggregate on stream

2019-04-10 Thread GitBox
KurtYoung commented on issue #8110: [FLINK-12098] [table-planner-blink] Add 
support for generating optimized logical plan for simple group aggregate on 
stream
URL: https://github.com/apache/flink/pull/8110#issuecomment-481956259
 
 
   LGTM, +1 to merge


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11935) Remove DateTimeUtils pull-in and fix datetime casting problem

2019-04-10 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16815059#comment-16815059
 ] 

vinoyang commented on FLINK-11935:
--

[~walterddr] It seems I found the key issue, and we can not delete the 
{{DateTimeUtils}} class file directly. I have reported the issue see 
CALCITE-2989. 

> Remove DateTimeUtils pull-in and fix datetime casting problem
> -
>
> Key: FLINK-11935
> URL: https://issues.apache.org/jira/browse/FLINK-11935
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Rong Rong
>Assignee: vinoyang
>Priority: Major
>
> This {{DateTimeUtils}} was pulled in in FLINK-7235.
> Originally the time operation was not correctly done via the {{ymdToJulian}} 
> function before the date {{1970-01-01}} thus we need the fix. similar to 
> addressing this problem:
> {code:java}
>  Optimized :1017-12-05 22:58:58.998 
>  Expected :1017-11-29 22:58:58.998
>  Actual :1017-12-05 22:58:58.998
> {code}
>  
> However, after pulling in avatica 1.13, I found out that the optimized plans 
> of the time operations are actually correct. it is in fact the casting part 
> that creates problem:
> For example, the following:
> *{{(plus(-12000.months, cast('2017-11-29 22:58:58.998', TIMESTAMP))}}*
> result in a StringTestExpression of:
> *{{CAST(1017-11-29 22:58:58.998):VARCHAR(65536) CHARACTER SET "UTF-16LE" 
> COLLATE "ISO-8859-1$en_US$primary" NOT NULL}}*
> but the testing results are:
> {code:java}
>  Optimized :1017-11-29 22:58:58.998
>  Expected :1017-11-29 22:58:58.998
>  Actual :1017-11-23 22:58:58.998
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12140) Support e2e sort merge join operator in batch mode

2019-04-10 Thread Jingsong Lee (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12140?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee updated FLINK-12140:
-
Summary: Support e2e sort merge join operator in batch mode  (was: Support 
sort merge join it case run in batch mode)

> Support e2e sort merge join operator in batch mode
> --
>
> Key: FLINK-12140
> URL: https://issues.apache.org/jira/browse/FLINK-12140
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Planner
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Complete BatchExecSortMergeJoin and support join it cases.
> Support queries like "select a, b, c, e from T1, T2 where T1.a = T2.d" run in 
> batch mode



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] zjffdu commented on issue #8144: [FLINK-12159]. Enable YarnMiniCluster integration test under non-secure mode

2019-04-10 Thread GitBox
zjffdu commented on issue #8144: [FLINK-12159]. Enable YarnMiniCluster 
integration test under non-secure mode
URL: https://github.com/apache/flink/pull/8144#issuecomment-481952307
 
 
   @tillrohrmann  Could you help review it ? Thanks


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-11813) Standby per job mode Dispatchers don't know job's JobSchedulingStatus

2019-04-10 Thread Zhu Zhu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16815027#comment-16815027
 ] 

Zhu Zhu edited comment on FLINK-11813 at 4/11/19 3:30 AM:
--

Hi Till, for your 2 questions above:

1. If one job with jobID xxx terminates, and later another job with the same 
jobID is submitted, I think Flink can regard it as a valid submission. 
Currently in our production use, there is a way that the client re-submit 
previously generated JobGraph to speed up the job launching, when the previous 
job is FAILED. In this case, job with the same ID are seen as different 
attempts.

   We did not handle the unexpected duplicated submission if the second 
submission comes after the first one is completed. Not sure in what case this 
may happen?

2. The process would be like this
 # submitting job -> setting status in RunningJobsRegistry to be pending in 
*Dispatcher* (null/NONE -> PENDING)
 # creating and launching JobManagerRunner which will try to acquire the HA 
leadership
 # once a JobManager is granted leadership, it changes the job status in 
RunningJobsRegistry to RUNNING  and starts the JobMaster(or creating a new 
JobMaster as proposed in FLINK-11719) (PENDING -> RUNNING)
 # when this job terminates, the JobManager removes the job from the 
RunningJobsRegistry (RUNNING -> NONE)

           So if it is the first time to launch the JM, the job status is 
PENDING so the job will be started. If it is a second time leadership gaining, 
and the first is completed, the job status would be NONE. Besides, if JM 
failover happens during the PENDING/RUNNING status, the new leader will also 
restart the job.

 

I totally agree that "the main problem is that the entries of the 
{{RunningJobsRegistry}} are bound to the lifecycle of the 
{{JobManagerRunner}}/{{Job}} instead of the {{Dispatcher"}}. I think the job 
submission in the Dispatcher is the beginning of lifecycle.

 

I agree with your proposal too, which can well handle the unexpected submission 
duplications.

A few questions for the proposal:

1. as in statement 5 the job status is changed to be RUNNING already in job 
submission, in statement 3 should we restart the job only if it is RUNNING?

2. in statement 7, if a Dispatcher terminates as expected(user stopping it, or 
job finishes in MiniDispatcher), the Dispatcher can safely clean the 
RunningJobsRegistry. Otherwise, in unexpected shutdowns, as [~Tison] said, it 
may be hard to decide whether this Dispatcher is the last to to shutdown. 
Should we keep the RunningJobsRegistry to avoid affecting running jobs in such 
corner cases.

3. Seems with this change, the JobManagerLeaderElectionService is not needed 
any more? 

 

 


was (Author: zhuzh):
Hi Till, for your 2 questions above:

1. If one job with jobID xxx terminates, and later another job with the same 
jobID is submitted, I think Flink can regard it as a valid submission. 
Currently in our production use, there is a way that the client re-submit 
previously generated JobGraph to speed up the job launching, when the previous 
job is FAILED. In this case, job with the same ID are seen as different 
attempts.

   We did not handle the unexpected duplicated submission if the second 
submission comes after the first one is completed. Not sure in what case this 
may happen?

2. The process would be like this
 # submitting job -> setting status in RunningJobsRegistry to be pending in 
*Dispatcher* (null/NONE -> PENDING)
 # creating and launching JobManagerRunner which will try to acquire the HA 
leadership
 # once a JobManager is granted leadership, it changes the job status in 
RunningJobsRegistry to RUNNING  and starts the JobMaster(or creating a new 
JobMaster as proposed in FLINK-11719) (PENDING -> RUNNING)
 # when this job terminates, the JobManager removes the job from the 
RunningJobsRegistry (RUNNING -> NONE)

           So if it is the first time to launch the JM, the job status is 
PENDING so the job will be started. If it is a second time leadership gaining, 
and the first is completed, the job status would be NONE. Besides, if JM 
failover happens during the PENDING/RUNNING status, the new leader will also 
restart the job.

 

I totally agree that "the main problem is that the entries of the 
{{RunningJobsRegistry}} are bound to the lifecycle of the 
{{JobManagerRunner}}/{{Job}} instead of the {{Dispatcher"}}. I think the job 
submission in the Dispatcher is the beginning of lifecycle.

 

I agree with your proposal too, which can well handle the unexpected submission 
duplications.

One thing to confirm is that, as in stage 5 the job status is changed to be 
RUNNING already in job submission, in stage 3 should we restart the job only if 
it is RUNNING?

 

 

> Standby per job mode Dispatchers don't know job's JobSchedulingStatus
> -
>
>  

[jira] [Commented] (FLINK-12140) Support sort merge join it case run in batch mode

2019-04-10 Thread Kurt Young (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16815052#comment-16815052
 ] 

Kurt Young commented on FLINK-12140:


you could just say support e2e join operator 

> Support sort merge join it case run in batch mode
> -
>
> Key: FLINK-12140
> URL: https://issues.apache.org/jira/browse/FLINK-12140
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Planner
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Complete BatchExecSortMergeJoin and support join it cases.
> Support queries like "select a, b, c, e from T1, T2 where T1.a = T2.d" run in 
> batch mode



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] hequn8128 commented on a change in pull request #8087: [FLINK-12029][table] Add column operations for TableApi

2019-04-10 Thread GitBox
hequn8128 commented on a change in pull request #8087: [FLINK-12029][table] Add 
column operations for TableApi
URL: https://github.com/apache/flink/pull/8087#discussion_r274246117
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/tableImpl.scala
 ##
 @@ -630,18 +672,25 @@ class OverWindowedTableImpl(
   overWindows)
   }
 
+  private def expandColumnsInOverWindow(overWindows: Seq[OverWindow]): 
Seq[OverWindow] = {
+overWindows.map { e =>
+  val expanedPartitioning = tableImpl.resolveCalls(e.getPartitioning)
 
 Review comment:
   Remove this method due to #8062


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on a change in pull request #8087: [FLINK-12029][table] Add column operations for TableApi

2019-04-10 Thread GitBox
hequn8128 commented on a change in pull request #8087: [FLINK-12029][table] Add 
column operations for TableApi
URL: https://github.com/apache/flink/pull/8087#discussion_r274245850
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/tableImpl.scala
 ##
 @@ -477,6 +484,18 @@ class TableImpl(
   private def wrap(operation: TableOperation): Table = {
 new TableImpl(tableEnv, operation)
   }
+
+  private[flink] def getOutputFieldReferences: 
Seq[UnresolvedFieldReferenceExpression] = {
+operationTree.asInstanceOf[LogicalNode]
+  .output.map(a => new UnresolvedFieldReferenceExpression(a.name))
+  }
+
+  private[flink] def resolveCalls(fields: Seq[Expression]): Seq[Expression] = {
+val outputFieldReferences = operationTree.asInstanceOf[LogicalNode]
+  .output.map(a => new UnresolvedFieldReferenceExpression(a.name))
+val expander = new ColumnsOperationExpander(outputFieldReferences)
 
 Review comment:
   Due to the refactor of #8062, the `getOutputFieldReferences()` method will 
not be used by other places, so I merged `getOutputFieldReferences()` and 
`resolveCalls()` into one.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-11813) Standby per job mode Dispatchers don't know job's JobSchedulingStatus

2019-04-10 Thread Zhu Zhu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16815027#comment-16815027
 ] 

Zhu Zhu edited comment on FLINK-11813 at 4/11/19 2:47 AM:
--

Hi Till, for your 2 questions above:

1. If one job with jobID xxx terminates, and later another job with the same 
jobID is submitted, I think Flink can regard it as a valid submission. 
Currently in our production use, there is a way that the client re-submit 
previously generated JobGraph to speed up the job launching, when the previous 
job is FAILED. In this case, job with the same ID are seen as different 
attempts.

   We did not handle the unexpected duplicated submission if the second 
submission comes after the first one is completed. Not sure in what case this 
may happen?

2. The process would be like this
 # submitting job -> setting status in RunningJobsRegistry to be pending in 
*Dispatcher* (null/NONE -> PENDING)
 # creating and launching JobManagerRunner which will try to acquire the HA 
leadership
 # once a JobManager is granted leadership, it changes the job status in 
RunningJobsRegistry to RUNNING  and starts the JobMaster(or creating a new 
JobMaster as proposed in FLINK-11719) (PENDING -> RUNNING)
 # when this job terminates, the JobManager removes the job from the 
RunningJobsRegistry (RUNNING -> NONE)

           So if it is the first time to launch the JM, the job status is 
PENDING so the job will be started. If it is a second time leadership gaining, 
and the first is completed, the job status would be NONE. Besides, if JM 
failover happens during the PENDING/RUNNING status, the new leader will also 
restart the job.

 

I totally agree that "the main problem is that the entries of the 
{{RunningJobsRegistry}} are bound to the lifecycle of the 
{{JobManagerRunner}}/{{Job}} instead of the {{Dispatcher"}}. I think the job 
submission in the Dispatcher is the beginning of lifecycle.

 

I agree with your proposal too, which can well handle the unexpected submission 
duplications.

One thing to confirm is that, as in stage 5 the job status is changed to be 
RUNNING already in job submission, in stage 3 should we restart the job only if 
it is RUNNING?

 

 


was (Author: zhuzh):
Hi Till, for your 2 questions above:

1. If one job with jobID xxx terminates, and later another job with the same 
jobID is submitted, I think Flink can regard it as a valid submission. 
Currently in our production use, there is a way that the client re-submit 
previously generated JobGraph to speed up the job launching, when the previous 
job is FAILED. In this case, job with the same ID are seen as different 
attempts.

   We did not handle the unexpected duplicated submission if the second 
submission comes after the first one is completed. Not sure in what case this 
may happen?

2. The process would be like this
 # submitting job -> setting status in RunningJobsRegistry to be pending in 
*Dispatcher* (null/NONE -> PENDING)
 # creating and launching JobManagerRunner which will try to acquire the HA 
leadership
 # once a JobManager is granted leadership, it changes the job status in 
RunningJobsRegistry to RUNNING  and starts the JobMaster(or creating a new 
JobMaster as proposed in FLINK-11719) (PENDING -> RUNNING)
 # when this job terminates, the JobManager removes the job from the 
RunningJobsRegistry (RUNNING -> NONE)

           So if it is the first time to launch the JM, the job status is 
PENDING so the job will be started. If it is a second time leadership gaining, 
and the first is completed, the job status would be NONE. Besides, if JM 
failover happens during the PENDING/RUNNING status, the new leader will also 
restart the job.

 

I totally agree that "the main problem is that the entries of the 
{{RunningJobsRegistry}} are bound to the lifecycle of the 
{{JobManagerRunner}}/{{Job}} instead of the {{Dispatcher"}}. I think the job 
submission in the Dispatcher is the beginning of lifecycle.

 

I think with your proposal if fine, which can handle the unexpected submission 
duplications.

One thing to confirm is that, as in stage 5 the job status is changed to be 
RUNNING already in job submission, in stage 3 should we restart the job only if 
it is RUNNING?

 

 

> Standby per job mode Dispatchers don't know job's JobSchedulingStatus
> -
>
> Key: FLINK-11813
> URL: https://issues.apache.org/jira/browse/FLINK-11813
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.6.4, 1.7.2, 1.8.0
>Reporter: Till Rohrmann
>Priority: Major
>
> At the moment, it can happen that standby {{Dispatchers}} in per job mode 
> will restart a terminated job after they gained leadership. The problem is 
> that we currently clear the 

[GitHub] [flink] zhijiangW commented on issue #8136: [FLINK-12154][network] Remove legacy fields for SingleInputGate

2019-04-10 Thread GitBox
zhijiangW commented on issue #8136: [FLINK-12154][network] Remove legacy fields 
for SingleInputGate
URL: https://github.com/apache/flink/pull/8136#issuecomment-481943959
 
 
   Thanks for your review @azagrebin .
   I checked the travis result and the failure is not related to my changes.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11813) Standby per job mode Dispatchers don't know job's JobSchedulingStatus

2019-04-10 Thread Zhu Zhu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16815027#comment-16815027
 ] 

Zhu Zhu commented on FLINK-11813:
-

Hi Till, for your 2 questions above:

1. If one job with jobID xxx terminates, and later another job with the same 
jobID is submitted, I think Flink can regard it as a valid submission. 
Currently in our production use, there is a way that the client re-submit 
previously generated JobGraph to speed up the job launching, when the previous 
job is FAILED. In this case, job with the same ID are seen as different 
attempts.

   We did not handle the unexpected duplicated submission if the second 
submission comes after the first one is completed. Not sure in what case this 
may happen?

2. The process would be like this
 # submitting job -> setting status in RunningJobsRegistry to be pending in 
*Dispatcher* (null/NONE -> PENDING)
 # creating and launching JobManagerRunner which will try to acquire the HA 
leadership
 # once a JobManager is granted leadership, it changes the job status in 
RunningJobsRegistry to RUNNING  and starts the JobMaster(or creating a new 
JobMaster as proposed in FLINK-11719) (PENDING -> RUNNING)
 # when this job terminates, the JobManager removes the job from the 
RunningJobsRegistry (RUNNING -> NONE)

           So if it is the first time to launch the JM, the job status is 
PENDING so the job will be started. If it is a second time leadership gaining, 
and the first is completed, the job status would be NONE. Besides, if JM 
failover happens during the PENDING/RUNNING status, the new leader will also 
restart the job.

 

I totally agree that "the main problem is that the entries of the 
{{RunningJobsRegistry}} are bound to the lifecycle of the 
{{JobManagerRunner}}/{{Job}} instead of the {{Dispatcher"}}. I think the job 
submission in the Dispatcher is the beginning of lifecycle.

 

I think with your proposal if fine, which can handle the unexpected submission 
duplications.

One thing to confirm is that, as in stage 5 the job status is changed to be 
RUNNING already in job submission, in stage 3 should we restart the job only if 
it is RUNNING?

 

 

> Standby per job mode Dispatchers don't know job's JobSchedulingStatus
> -
>
> Key: FLINK-11813
> URL: https://issues.apache.org/jira/browse/FLINK-11813
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.6.4, 1.7.2, 1.8.0
>Reporter: Till Rohrmann
>Priority: Major
>
> At the moment, it can happen that standby {{Dispatchers}} in per job mode 
> will restart a terminated job after they gained leadership. The problem is 
> that we currently clear the {{RunningJobsRegistry}} once a job has reached a 
> globally terminal state. After the leading {{Dispatcher}} terminates, a 
> standby {{Dispatcher}} will gain leadership. Without having the information 
> from the {{RunningJobsRegistry}} it cannot tell whether the job has been 
> executed or whether the {{Dispatcher}} needs to re-execute the job. At the 
> moment, the {{Dispatcher}} will assume that there was a fault and hence 
> re-execute the job. This can lead to duplicate results.
> I think we need some way to tell standby {{Dispatchers}} that a certain job 
> has been successfully executed. One trivial solution could be to not clean up 
> the {{RunningJobsRegistry}} but then we will clutter ZooKeeper.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] carp84 commented on issue #8078: [FLINK-12066] [State Backends] Remove StateSerializerProvider field in keyed state backends

2019-04-10 Thread GitBox
carp84 commented on issue #8078: [FLINK-12066] [State Backends] Remove 
StateSerializerProvider field in keyed state backends
URL: https://github.com/apache/flink/pull/8078#issuecomment-481940313
 
 
   Checking the travis log and the error (as shown below) is irrelative to 
change here, will rebase on latest master and force push to trigger another 
check.
   
   > gzip: stdin: not in gzip format
   tar: Child returned status 1
   tar: Error is not recoverable: exiting now
   [FAIL] Test script contains errors.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] jonpollock edited a comment on issue #7458: [FLINK-11302] FlinkS3FileSystem uses an incorrect path for temporary files.

2019-04-10 Thread GitBox
jonpollock edited a comment on issue #7458: [FLINK-11302] FlinkS3FileSystem 
uses an incorrect path for temporary files.
URL: https://github.com/apache/flink/pull/7458#issuecomment-481939883
 
 
   I know this is already closed and merged, but I'd just like to point out 
that the constructor of the FlinkS3FileSystem calls 
RefCountedTmpFileCreator.inDirectories, passing it a single File object. This 
method takes multiple files and each will be used as a way of spreading the 
temp files over multiple directories.
   
   The change you applied, limits this to just the first directory in the 
LOCAL_DIRS. I understand that you probably didn't want to change the 
Constructor signature, but I think that is only used in 
AbstractS3FileSystemFactory.
   
   In any case, thank you for your work and this solve a problem I've had for a 
few hours now.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] jonpollock commented on issue #7458: [FLINK-11302] FlinkS3FileSystem uses an incorrect path for temporary files.

2019-04-10 Thread GitBox
jonpollock commented on issue #7458: [FLINK-11302] FlinkS3FileSystem uses an 
incorrect path for temporary files.
URL: https://github.com/apache/flink/pull/7458#issuecomment-481939883
 
 
   I know this is already closed and merged, but I'd just like to point out 
that the constructor of the FlinkS3FileSystem calls 
RefCountedTmpFileCreator.inDirectories, passing it a single File object. This 
method takes multiple files and each will be used as a way of spreading the 
temp files over multiple directories.
   
   The change you applied, limits this to just the first directory in the 
LOCAL_DIRS. I understand that you probably didn't want to change the 
Constructor signature, but I think that is only used in 
AbstractS3FileSystemFactory.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zjffdu commented on a change in pull request #8038: [FLINK-11953] Introduce Plugin/Loading system and integrate it with FileSystem

2019-04-10 Thread GitBox
zjffdu commented on a change in pull request #8038: [FLINK-11953] Introduce 
Plugin/Loading system and integrate it with FileSystem
URL: https://github.com/apache/flink/pull/8038#discussion_r274236430
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/core/plugin/DirectoryBasedPluginFinder.java
 ##
 @@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.plugin;
+
+import org.apache.flink.util.function.FunctionUtils;
+
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.PathMatcher;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.stream.Collectors;
+
+/**
+ * This class is used to create a collection of {@link PluginDescriptor} based 
on directory structure for a given plugin
+ * root folder.
+ *
+ * The expected structure is as follows: the given plugins root folder, 
containing the plugins folder. One plugin folder
+ * contains all resources (jar files) belonging to a plugin. The name of the 
plugin folder becomes the plugin id.
+ * 
+ * plugins-root-folder/
+ *|plugin-a/ (folder of plugin a)
+ *||-plugin-a-1.jar (the jars containing the 
classes of plugin a)
+ *||-plugin-a-2.jar
+ *||-...
+ *|
+ *|plugin-b/
+ *||-plugin-b-1.jar
+ *   ...   |-...
+ * 
 
 Review comment:
   Another concern is that whether this plugin finder mechanism works in JM/TM 
? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-10 Thread GitBox
zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r274236223
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
 ##
 @@ -105,6 +106,302 @@ public NettyConfig nettyConfig() {
return nettyConfig;
}
 
+   public boolean isCreditBased() {
+   return isCreditBased;
+   }
+
+   // 

+
+   /**
+* Utility method to extract network related parameters from the 
configuration and to
+* sanity check them.
+*
+* @param configuration configuration object
+* @param maxJvmHeapMemory the maximum JVM heap size (in bytes)
+* @param localTaskManagerCommunication true, to skip initializing the 
network stack
+* @param taskManagerAddress identifying the IP address under which the 
TaskManager will be accessible
+* @return NetworkEnvironmentConfiguration
+*/
+   @Deprecated
+   public static NetworkEnvironmentConfiguration fromConfiguration(
+   Configuration configuration,
+   long maxJvmHeapMemory,
+   boolean localTaskManagerCommunication,
+   InetAddress taskManagerAddress) {
+
+   // > hosts / ports for communication and data exchange
+
+   final int dataport = 
configuration.getInteger(TaskManagerOptions.DATA_PORT);
+   ConfigurationParserUtils.checkConfigParameter(dataport >= 0, 
dataport, TaskManagerOptions.DATA_PORT.key(),
+   "Leave config parameter empty or use 0 to let the 
system choose a port automatically.");
+
+   final int pageSize = 
ConfigurationParserUtils.getPageSize(configuration);
+
+   final int numNetworkBuffers;
+   if (!hasNewNetworkConfig(configuration)) {
+   // fallback: number of network buffers
+   numNetworkBuffers = 
configuration.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS);
+
+   checkOldNetworkConfig(numNetworkBuffers);
+   } else {
+   if 
(configuration.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) {
+   LOG.info("Ignoring old (but still present) 
network buffer configuration via {}.",
+   
TaskManagerOptions.NETWORK_NUM_BUFFERS.key());
+   }
+
+   final long networkMemorySize = 
calculateNewNetworkBufferMemory(configuration, maxJvmHeapMemory);
+
+   // tolerate offcuts between intended and allocated 
memory due to segmentation (will be available to the user-space memory)
+   long numNetworkBuffersLong = networkMemorySize / 
pageSize;
+   if (numNetworkBuffersLong > Integer.MAX_VALUE) {
+   throw new IllegalArgumentException("The given 
number of memory bytes (" + networkMemorySize
+   + ") corresponds to more than MAX_INT 
pages.");
+   }
+   numNetworkBuffers = (int) numNetworkBuffersLong;
+   }
+
+   final NettyConfig nettyConfig;
+   if (!localTaskManagerCommunication) {
+   final InetSocketAddress taskManagerInetSocketAddress = 
new InetSocketAddress(taskManagerAddress, dataport);
+
+   nettyConfig = new 
NettyConfig(taskManagerInetSocketAddress.getAddress(), 
taskManagerInetSocketAddress.getPort(),
+   pageSize, 
ConfigurationParserUtils.getSlot(configuration), configuration);
+   } else {
+   nettyConfig = null;
+   }
+
+   int initialRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL);
+   int maxRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX);
+
+   int buffersPerChannel = 
configuration.getInteger(TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL);
+   int extraBuffersPerGate = 
configuration.getInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE);
+
+   boolean isCreditBased = 
configuration.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL);
+
+   return new NetworkEnvironmentConfiguration(
+   numNetworkBuffers,
+   pageSize,
+   initialRequestBackoff,
+   maxRequestBackoff,
+   buffersPerChannel,
+   extraBuffersPerGate,
+   isCreditBased,
+   nettyConfig);
+   }
+
+   /**
+* 

[GitHub] [flink] zjffdu commented on a change in pull request #8038: [FLINK-11953] Introduce Plugin/Loading system and integrate it with FileSystem

2019-04-10 Thread GitBox
zjffdu commented on a change in pull request #8038: [FLINK-11953] Introduce 
Plugin/Loading system and integrate it with FileSystem
URL: https://github.com/apache/flink/pull/8038#discussion_r274235386
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/core/plugin/PluginUtils.java
 ##
 @@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.plugin;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Utility functions for the plugin mechanism.
+ */
+public final class PluginUtils {
 
 Review comment:
   I also agree with @pnowojski to merge it into `PluginManager`. From a point 
viewer of plugin user, I would expect there's one only entry point to use 
plugin. So basically, I suppose it involves following 2 steps:
   1. init PluginManager
   2. create Plugin via PluginManager
   
   > some code could be interested in having multiple plugin manager or 
explicitly creating and passing in plugin managers.
   
   I am sure why user would do that. Only one scenario I can imagine is that 
user want to create a customized PluginManager explicitly for unit test. And I 
think the only thing to customize PluginManager is its plugin root folder, this 
could be set by a lot approaches like java property or system environment, so I 
think PluginManger itself could do that, we don't need PluginUtil for this. 
Maybe my understanding is wrong, just my 2 cents.  
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10929) Add support for Apache Arrow

2019-04-10 Thread Liya Fan (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16815009#comment-16815009
 ] 

Liya Fan commented on FLINK-10929:
--

We have imported Arrow in our efforts to vectorize Blink batch jobs. It is an 
incremental change, which can be easily turned off with a single flag, and it 
does not affect other parts of the code base.

[~fhueske], do you think it is a good time to make some initial attempts to 
incorporate such changes now ? :)

> Add support for Apache Arrow
> 
>
> Key: FLINK-10929
> URL: https://issues.apache.org/jira/browse/FLINK-10929
> Project: Flink
>  Issue Type: Wish
>  Components: Runtime / State Backends
>Reporter: Pedro Cardoso Silva
>Priority: Minor
> Attachments: image-2019-04-10-13-43-08-107.png
>
>
> Investigate the possibility of adding support for Apache Arrow as a 
> standardized columnar, memory format for data.
> Given the activity that [https://github.com/apache/arrow] is currently 
> getting and its claims objective of providing a zero-copy, standardized data 
> format across platforms, I think it makes sense for Flink to look into 
> supporting it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Issue Comment Deleted] (FLINK-11654) Multiple transactional KafkaProducers writing to same cluster have clashing transaction IDs

2019-04-10 Thread Shengnan YU (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11654?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shengnan YU updated FLINK-11654:

Comment: was deleted

(was: Hi, I got the same problem, any updated solution here?)

> Multiple transactional KafkaProducers writing to same cluster have clashing 
> transaction IDs
> ---
>
> Key: FLINK-11654
> URL: https://issues.apache.org/jira/browse/FLINK-11654
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.7.1
>Reporter: Jürgen Kreileder
>Priority: Major
> Fix For: 1.9.0
>
>
> We run multiple jobs on a cluster which write a lot to the same Kafka topic 
> from identically named sinks. When EXACTLY_ONCE semantic is enabled for the 
> KafkaProducers we run into a lot of ProducerFencedExceptions and all jobs go 
> into a restart cycle.
> Example exception from the Kafka log:
>  
> {code:java}
> [2019-02-18 18:05:28,485] ERROR [ReplicaManager broker=1] Error processing 
> append operation on partition finding-commands-dev-1-0 
> (kafka.server.ReplicaManager)
> org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is 
> no longer valid. There is probably another producer with a newer epoch. 483 
> (request epoch), 484 (server epoch)
> {code}
> The reason for this is the way FlinkKafkaProducer initializes the 
> TransactionalIdsGenerator:
> The IDs are only guaranteed to be unique for a single Job. But they can clash 
> between different Jobs (and Clusters).
>  
>  
> {code:java}
> --- 
> a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> +++ 
> b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> @@ -819,6 +819,7 @@ public class FlinkKafkaProducer
>                 nextTransactionalIdHintState = 
> context.getOperatorStateStore().getUnionListState(
>                         NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);
>                 transactionalIdsGenerator = new TransactionalIdsGenerator(
> + // the prefix probably should include job id and maybe cluster id
>                         getRuntimeContext().getTaskName() + "-" + 
> ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(),
>                         getRuntimeContext().getIndexOfThisSubtask(),
>                         
> getRuntimeContext().getNumberOfParallelSubtasks(),{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11654) Multiple transactional KafkaProducers writing to same cluster have clashing transaction IDs

2019-04-10 Thread Shengnan YU (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16815006#comment-16815006
 ] 

Shengnan YU commented on FLINK-11654:
-

Hi, I got the same problem, any updated solution here?

> Multiple transactional KafkaProducers writing to same cluster have clashing 
> transaction IDs
> ---
>
> Key: FLINK-11654
> URL: https://issues.apache.org/jira/browse/FLINK-11654
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.7.1
>Reporter: Jürgen Kreileder
>Priority: Major
> Fix For: 1.9.0
>
>
> We run multiple jobs on a cluster which write a lot to the same Kafka topic 
> from identically named sinks. When EXACTLY_ONCE semantic is enabled for the 
> KafkaProducers we run into a lot of ProducerFencedExceptions and all jobs go 
> into a restart cycle.
> Example exception from the Kafka log:
>  
> {code:java}
> [2019-02-18 18:05:28,485] ERROR [ReplicaManager broker=1] Error processing 
> append operation on partition finding-commands-dev-1-0 
> (kafka.server.ReplicaManager)
> org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is 
> no longer valid. There is probably another producer with a newer epoch. 483 
> (request epoch), 484 (server epoch)
> {code}
> The reason for this is the way FlinkKafkaProducer initializes the 
> TransactionalIdsGenerator:
> The IDs are only guaranteed to be unique for a single Job. But they can clash 
> between different Jobs (and Clusters).
>  
>  
> {code:java}
> --- 
> a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> +++ 
> b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> @@ -819,6 +819,7 @@ public class FlinkKafkaProducer
>                 nextTransactionalIdHintState = 
> context.getOperatorStateStore().getUnionListState(
>                         NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);
>                 transactionalIdsGenerator = new TransactionalIdsGenerator(
> + // the prefix probably should include job id and maybe cluster id
>                         getRuntimeContext().getTaskName() + "-" + 
> ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(),
>                         getRuntimeContext().getIndexOfThisSubtask(),
>                         
> getRuntimeContext().getNumberOfParallelSubtasks(),{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] zjffdu commented on a change in pull request #8038: [FLINK-11953] Introduce Plugin/Loading system and integrate it with FileSystem

2019-04-10 Thread GitBox
zjffdu commented on a change in pull request #8038: [FLINK-11953] Introduce 
Plugin/Loading system and integrate it with FileSystem
URL: https://github.com/apache/flink/pull/8038#discussion_r274232242
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/core/plugin/DirectoryBasedPluginFinder.java
 ##
 @@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.plugin;
+
+import org.apache.flink.util.function.FunctionUtils;
+
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.PathMatcher;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.stream.Collectors;
+
+/**
+ * This class is used to create a collection of {@link PluginDescriptor} based 
on directory structure for a given plugin
+ * root folder.
+ *
+ * The expected structure is as follows: the given plugins root folder, 
containing the plugins folder. One plugin folder
+ * contains all resources (jar files) belonging to a plugin. The name of the 
plugin folder becomes the plugin id.
+ * 
+ * plugins-root-folder/
+ *|plugin-a/ (folder of plugin a)
+ *||-plugin-a-1.jar (the jars containing the 
classes of plugin a)
+ *||-plugin-a-2.jar
+ *||-...
+ *|
+ *|plugin-b/
+ *||-plugin-b-1.jar
+ *   ...   |-...
+ * 
 
 Review comment:
   Would it better to add another layer of plugin type ? Something like this:
   
   plugins/FileSystem/HadoopFileSystem
   plugins/FileSystem/S3FileSystem
   plugins/Reporter/Reporter1
   plugins/Reporter/Reporter2
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12160) Support for transferring user config files on kubernetes

2019-04-10 Thread Yang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yang Wang updated FLINK-12160:
--
Issue Type: Sub-task  (was: New Feature)
Parent: FLINK-9953

> Support for transferring user config files on kubernetes
> 
>
> Key: FLINK-12160
> URL: https://issues.apache.org/jira/browse/FLINK-12160
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Yang Wang
>Priority: Major
>
> When submit a flink job on kubernetes, the jars will be transferred through 
> docker images or flink blob server. Also we will need a way to transfer 
> config files, such as hdfs-site.xml/core-site.xml. It could be saved in 
> config map in etcd and then mounted to jobmanager and taskmanager pod.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12160) Support for transferring user config files on kubernetes

2019-04-10 Thread Yang Wang (JIRA)
Yang Wang created FLINK-12160:
-

 Summary: Support for transferring user config files on kubernetes
 Key: FLINK-12160
 URL: https://issues.apache.org/jira/browse/FLINK-12160
 Project: Flink
  Issue Type: New Feature
  Components: Runtime / Coordination
Reporter: Yang Wang


When submit a flink job on kubernetes, the jars will be transferred through 
docker images or flink blob server. Also we will need a way to transfer config 
files, such as hdfs-site.xml/core-site.xml. It could be saved in config map in 
etcd and then mounted to jobmanager and taskmanager pod.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-11799) KryoSerializer/OperatorChain ignores copy failure resulting in NullPointerException

2019-04-10 Thread Liya Fan (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11799?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liya Fan reassigned FLINK-11799:


Assignee: Liya Fan

> KryoSerializer/OperatorChain ignores copy failure resulting in 
> NullPointerException
> ---
>
> Key: FLINK-11799
> URL: https://issues.apache.org/jira/browse/FLINK-11799
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.7.2
>Reporter: Jason Kania
>Assignee: Liya Fan
>Priority: Major
>
> I was encountering a problem with NullPointerExceptions with the deserialized 
> object reaching my ProcessFunction process() method implementation as a null 
> value. Upon investigation, I discovered two issues with the implementation of 
> the KryoSerializer copy().
> 1) The 'public T copy(T from)' method swallows the error if the kryo copy() 
> call generates an exception. The code should report the copy error at least 
> once as a warning to be aware that the kryo copy() is failing. I understand 
> that the code is there to handle the lack of a copy implementation but due to 
> the potential inefficiency of having to write and read the object instead of 
> copying it, this would seem useful information to share at the least. It is 
> also important to have a warning in case the cause of the copy error is 
> something that needs to be fixed.
> 2) The call to 'kryo.readObject(input, from.getClass())' does not handle the 
> fact that the kryo readObject(Input input, Class aClass) method may return a 
> null value if there are any issues. This could be handled with a check or 
> warning in the OperatorChain.CopyingChainingOutput.pushToOperator() method 
> but is also ignored there, allowing a null value to be passed along without 
> providing any reason for the null value in logging.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] flinkbot commented on issue #8145: Inst 1.8.0

2019-04-10 Thread GitBox
flinkbot commented on issue #8145: Inst 1.8.0
URL: https://github.com/apache/flink/pull/8145#issuecomment-481884659
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] DarqueBox opened a new pull request #8145: Inst 1.8.0

2019-04-10 Thread GitBox
DarqueBox opened a new pull request #8145: Inst 1.8.0
URL: https://github.com/apache/flink/pull/8145
 
 
   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #8136: [FLINK-12154][network] Remove legacy fields for SingleInputGate

2019-04-10 Thread GitBox
flinkbot edited a comment on issue #8136: [FLINK-12154][network] Remove legacy 
fields for SingleInputGate
URL: https://github.com/apache/flink/pull/8136#issuecomment-481642495
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ✅ 1. The [description] looks good.
   - Approved by @azagrebin
   * ✅ 2. There is [consensus] that the contribution should go into to Flink.
   - Approved by @azagrebin
   * ❗ 3. Needs [attention] from.
   - Needs attention by @pnowojski [committer]
   * ✅ 4. The change fits into the overall [architecture].
   - Approved by @azagrebin
   * ✅ 5. Overall code [quality] is good.
   - Approved by @azagrebin
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on issue #8136: [FLINK-12154][network] Remove legacy fields for SingleInputGate

2019-04-10 Thread GitBox
azagrebin commented on issue #8136: [FLINK-12154][network] Remove legacy fields 
for SingleInputGate
URL: https://github.com/apache/flink/pull/8136#issuecomment-481822687
 
 
   @flinkbot approve all
   @flinkbot attention @pnowojski 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10049) Unify the processing logic for NULL arguments in SQL built-in functions

2019-04-10 Thread Hequn Cheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16814703#comment-16814703
 ] 

Hequn Cheng commented on FLINK-10049:
-

Yes, I meant to unify the logic for the two problems I listed. Whether return 
null or throw NPE depends on the detailed logic of UDFs and may vary 
differently. What we have to do is to make sure the semantics and try to avoid 
exception if null is ok. :-)

+1 to rename the title.

Best, Hequn

> Unify the processing logic for NULL arguments in SQL built-in functions
> ---
>
> Key: FLINK-10049
> URL: https://issues.apache.org/jira/browse/FLINK-10049
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Reporter: Xingcan Cui
>Assignee: vinoyang
>Priority: Major
>
> Currently, the built-in functions treat NULL arguments in different ways. 
> E.g., ABS(NULL) returns NULL, while LOG10(NULL) throws an NPE. The general 
> SQL-way of handling NULL values should be that if one argument is NULL the 
> result is NULL. We should unify the processing logic for that.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] xuefuz commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-04-10 Thread GitBox
xuefuz commented on a change in pull request #8007: [FLINK-11474][table] Add 
ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r274073139
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableWritableCatalog.java
 ##
 @@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+
+/**
+ * An interface responsible for manipulating catalog metadata.
+ */
+public interface ReadableWritableCatalog extends ReadableCatalog {
+
+   // -- databases --
+
+   /**
+* Create a database.
+*
+* @param name   Name of the database to be created
+* @param database   The database definition
+* @param ignoreIfExists Flag to specify behavior when a database with 
the given name already exists:
+*   if set to false, throw a 
DatabaseAlreadyExistException,
+*   if set to true, do nothing.
+* @throws DatabaseAlreadyExistException if the given database already 
exists and ignoreIfExists is false
+*/
+   void createDatabase(String name, CatalogDatabase database, boolean 
ignoreIfExists)
+   throws DatabaseAlreadyExistException;
+
+   /**
+* Drop a database.
+*
+* @param name  Name of the database to be dropped.
+* @param ignoreIfNotExists Flag to specify behavior when the database 
does not exist:
+*  if set to false, throw an exception,
+*  if set to true, do nothing.
+* @throws DatabaseNotExistException if the given database does not 
exist
+*/
+   void dropDatabase(String name, boolean ignoreIfNotExists) throws 
DatabaseNotExistException;
+
+   /**
+* Modify an existing database.
+*
+* @param nameName of the database to be modified
+* @param newDatabaseThe new database definition
+* @param ignoreIfNotExists Flag to specify behavior when the given 
database does not exist:
+*  if set to false, throw an exception,
+*  if set to true, do nothing.
+* @throws DatabaseNotExistException if the given database does not 
exist
+*/
+   void alterDatabase(String name, CatalogDatabase newDatabase, boolean 
ignoreIfNotExists)
+   throws DatabaseNotExistException;
+
+   // -- tables and views --
+
+   /**
+* Drop a table or view.
+*
+* @param tablePath Path of the table or view to be dropped
+* @param ignoreIfNotExists Flag to specify behavior when the table or 
view does not exist:
+*  if set to false, throw an exception,
+*  if set to true, do nothing.
+* @throws TableNotExistException if the table or view does not exist
+*/
+   void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) throws 
TableNotExistException;
+
+   /**
+* Rename an existing table or view.
+*
+* @param tablePath   Path of the table or view to rename
+* @param newTableName the new name of the table or view
+* @param ignoreIfNotExists Flag to specify behavior when the table or 
view does not exist:
+*  if set to false, throw an exception,
+*  if set to true, do nothing.
+* @throws TableNotExistException if the table does not exist
+* @throws DatabaseNotExistException if the database in tablePath to 
doesn't exist
+*/
+   void renameTable(ObjectPath tablePath, String newTableName, boolean 
ignoreIfNotExists)
+   

[GitHub] [flink] piyushnarang commented on a change in pull request #8117: [FLINK-12115] [filesystems]: Add support for AzureFS

2019-04-10 Thread GitBox
piyushnarang commented on a change in pull request #8117: [FLINK-12115] 
[filesystems]: Add support for AzureFS
URL: https://github.com/apache/flink/pull/8117#discussion_r274070476
 
 

 ##
 File path: 
flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AbstractAzureFSFactory.java
 ##
 @@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.azurefs;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Abstract factory for AzureFS. Subclasses override to specify
+ * the correct scheme (wasb / wasbs).
+ */
+public abstract class AbstractAzureFSFactory implements FileSystemFactory {
+   private static final Logger LOG = 
LoggerFactory.getLogger(AzureFSFactory.class);
+
+   private Configuration flinkConfig;
+
+   @Override
+   public void configure(Configuration config) {
+   flinkConfig = config;
 
 Review comment:
   Yeah I was able to test this out and verify it works ok.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #8002: [FLINK-11923][metrics] MetricRegistryConfiguration provides MetricReporters Suppliers

2019-04-10 Thread GitBox
tillrohrmann commented on a change in pull request #8002: 
[FLINK-11923][metrics] MetricRegistryConfiguration provides MetricReporters 
Suppliers
URL: https://github.com/apache/flink/pull/8002#discussion_r274057620
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
 ##
 @@ -113,19 +114,12 @@ public MetricRegistryImpl(MetricRegistryConfiguration 
config) {
// by default, don't report anything
LOG.info("No metrics reporter configured, no metrics 
will be exposed/reported.");
} else {
-   // we have some reporters so
-   for (Tuple2 
reporterConfiguration: reporterConfigurations) {
-   String namedReporter = reporterConfiguration.f0;
-   Configuration reporterConfig = 
reporterConfiguration.f1;
-
-   final String className = 
reporterConfig.getString(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, null);
-   if (className == null) {
-   LOG.error("No reporter class set for 
reporter " + namedReporter + ". Metrics might not be exposed/reported.");
-   continue;
-   }
+   for (ReporterSetup reporterSetup : 
reporterConfigurations) {
+   final String namedReporter = 
reporterSetup.getName();
+   final MetricConfig metricConfig = 
reporterSetup.getConfiguration();
 
try {
-   String configuredPeriod = 
reporterConfig.getString(ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, 
null);
+   String configuredPeriod = 
metricConfig.getString(ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, null);
 
 Review comment:
   I think in general it is better to offer a restricted API because it usually 
simplifies things. Here we have to provide the correct key in order to find the 
right value. From a user's perspective who does not know the details of how 
things are stored in `MetricConfig` it would be easier to simply call 
`MetricConfig#getReporterInterval`. It would even be easier to call 
`ScheduledMetricReporter#getReportertInterval` to relate this discussion to my 
other point.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Myasuka commented on a change in pull request #8117: [FLINK-12115] [filesystems]: Add support for AzureFS

2019-04-10 Thread GitBox
Myasuka commented on a change in pull request #8117: [FLINK-12115] 
[filesystems]: Add support for AzureFS
URL: https://github.com/apache/flink/pull/8117#discussion_r274054284
 
 

 ##
 File path: 
flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AbstractAzureFSFactory.java
 ##
 @@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.azurefs;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Abstract factory for AzureFS. Subclasses override to specify
+ * the correct scheme (wasb / wasbs).
+ */
+public abstract class AbstractAzureFSFactory implements FileSystemFactory {
+   private static final Logger LOG = 
LoggerFactory.getLogger(AzureFSFactory.class);
+
+   private Configuration flinkConfig;
+
+   @Override
+   public void configure(Configuration config) {
+   flinkConfig = config;
 
 Review comment:
   hmm, previously I thought you did not take into the `hadoopConfig`, but then 
I found you just follow the `MapRFsFactory`'s way to instantiate the hadoop 
configuration when creating the file system instead of instantiating them 
within `FileSystemFactory#create(URI fsUri)` before creating the file system 
like other file factories did.
   If this works fine, it would be okay to do so.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-12153) 提交flink job到flink环境下报错

2019-04-10 Thread Dawid Wysakowicz (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12153?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dawid Wysakowicz closed FLINK-12153.

Resolution: Invalid

Please reopen with translated title or reach out to the mailing list

> 提交flink job到flink环境下报错
> --
>
> Key: FLINK-12153
> URL: https://issues.apache.org/jira/browse/FLINK-12153
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.7.2
> Environment: flink maven
> 
>  org.apache.flink
>  flink-streaming-java_2.12
>  1.7.1
> 
> 
> 
>  org.apache.flink
>  flink-connector-kafka-0.11_2.12
>  1.7.1
> 
> 
>  org.apache.hadoop
>  hadoop-hdfs
>  2.7.2
>  
>  
>  xml-apis
>  xml-apis
>  
>  
> 
> 
>  org.apache.hadoop
>  hadoop-common
>  2.7.2
> 
> 
> 
>  org.apache.flink
>  flink-hadoop-compatibility_2.12
>  1.7.1
> 
> 
> 
>  org.apache.flink
>  flink-connector-filesystem_2.12
>  1.7.1
> 
> 
> 
>  org.apache.flink
>  flink-connector-elasticsearch5_2.12
>  1.7.1
> 
>  
>  
> hadoop 环境版本 2.7.7
>  
>Reporter: gaojunjie
>Priority: Major
>
> java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are 
> only supported for HDFS and for Hadoop version 2.7 or newer
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.(HadoopRecoverableWriter.java:57)
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.createRecoverableWriter(HadoopFileSystem.java:202)
>   at 
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.(Buckets.java:112)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:242)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>   at java.lang.Thread.run(Thread.java:748)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] GJL commented on a change in pull request #7986: [FLINK-10517][rest] Add stability test

2019-04-10 Thread GitBox
GJL commented on a change in pull request #7986: [FLINK-10517][rest] Add 
stability test
URL: https://github.com/apache/flink/pull/7986#discussion_r274039553
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/compatibility/CompatibilityCheckResult.java
 ##
 @@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.compatibility;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * The (potentially aggregated) result of a compatibility check that may also 
contain a list of {@link AssertionError}
+ * for each found incompatibility.
+ */
+final class CompatibilityCheckResult {
+
+   private final Compatibility backwardCompatibility;
+   private final int backwardCompatibilityGrade;
+   private final Collection backwardCompatibilityErrors;
+
+   CompatibilityCheckResult(final Compatibility backwardCompatibility) {
+   this(backwardCompatibility, 1, Collections.emptyList());
+   if (backwardCompatibility == Compatibility.INCOMPATIBLE) {
+   throw new RuntimeException("This constructor must not 
be used for incompatible results.");
+   }
+   }
+
+   CompatibilityCheckResult(final AssertionError 
backwardCompatibilityError) {
+   this(Compatibility.INCOMPATIBLE, 0, 
Collections.singletonList(backwardCompatibilityError));
+   }
+
+   private CompatibilityCheckResult(final Compatibility 
backwardCompatibility, final int backwardCompatibilityGrade, final 
Collection backwardCompatibilityErrors) {
+   this.backwardCompatibility = backwardCompatibility;
+   this.backwardCompatibilityErrors = backwardCompatibilityErrors;
 
 Review comment:
   nit: change order of assignment (`backwardCompatibilityGrade` before 
`backwardCompatibilityErrors`)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] GJL commented on a change in pull request #7986: [FLINK-10517][rest] Add stability test

2019-04-10 Thread GitBox
GJL commented on a change in pull request #7986: [FLINK-10517][rest] Add 
stability test
URL: https://github.com/apache/flink/pull/7986#discussion_r274041784
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/compatibility/CompatibilityCheckResult.java
 ##
 @@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.compatibility;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * The (potentially aggregated) result of a compatibility check that may also 
contain a list of {@link AssertionError}
+ * for each found incompatibility.
+ */
+final class CompatibilityCheckResult {
+
+   private final Compatibility backwardCompatibility;
+   private final int backwardCompatibilityGrade;
+   private final Collection backwardCompatibilityErrors;
+
+   CompatibilityCheckResult(final Compatibility backwardCompatibility) {
+   this(backwardCompatibility, 1, Collections.emptyList());
+   if (backwardCompatibility == Compatibility.INCOMPATIBLE) {
 
 Review comment:
   nit: Maybe static methods, such as, 
`CompatibilityCheckResult.incompatible()`, 
`CompatibilityCheckResult.identical()`, or 
`CompatibilityCheckResult.compatible(final Compatibility ...)` would make it 
more obvious how to use this API. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] GJL commented on a change in pull request #7986: [FLINK-10517][rest] Add stability test

2019-04-10 Thread GitBox
GJL commented on a change in pull request #7986: [FLINK-10517][rest] Add 
stability test
URL: https://github.com/apache/flink/pull/7986#discussion_r274040526
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/compatibility/CompatibilityCheckResult.java
 ##
 @@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.compatibility;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * The (potentially aggregated) result of a compatibility check that may also 
contain a list of {@link AssertionError}
+ * for each found incompatibility.
+ */
+final class CompatibilityCheckResult {
+
+   private final Compatibility backwardCompatibility;
+   private final int backwardCompatibilityGrade;
+   private final Collection backwardCompatibilityErrors;
+
+   CompatibilityCheckResult(final Compatibility backwardCompatibility) {
+   this(backwardCompatibility, 1, Collections.emptyList());
+   if (backwardCompatibility == Compatibility.INCOMPATIBLE) {
+   throw new RuntimeException("This constructor must not 
be used for incompatible results.");
+   }
+   }
+
+   CompatibilityCheckResult(final AssertionError 
backwardCompatibilityError) {
+   this(Compatibility.INCOMPATIBLE, 0, 
Collections.singletonList(backwardCompatibilityError));
+   }
+
+   private CompatibilityCheckResult(final Compatibility 
backwardCompatibility, final int backwardCompatibilityGrade, final 
Collection backwardCompatibilityErrors) {
+   this.backwardCompatibility = backwardCompatibility;
+   this.backwardCompatibilityErrors = backwardCompatibilityErrors;
+   this.backwardCompatibilityGrade = backwardCompatibilityGrade;
 
 Review comment:
   nit: `backwardCompatibilityErrors` can be mutable (l. 70) and the getter 
makes this object potentially mutable


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] GJL commented on a change in pull request #7986: [FLINK-10517][rest] Add stability test

2019-04-10 Thread GitBox
GJL commented on a change in pull request #7986: [FLINK-10517][rest] Add 
stability test
URL: https://github.com/apache/flink/pull/7986#discussion_r274044893
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/compatibility/CompatibilityRoutine.java
 ##
 @@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.compatibility;
+
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+
+import org.junit.Assert;
+
+import java.util.Optional;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+
+import static 
org.apache.flink.runtime.rest.compatibility.Compatibility.COMPATIBLE;
+import static 
org.apache.flink.runtime.rest.compatibility.Compatibility.IDENTICAL;
+import static 
org.apache.flink.runtime.rest.compatibility.Compatibility.INCOMPATIBLE;
+
+/**
+ * Routine for checking the compatibility of a {@link MessageHeaders} pair.
+ *
+ * The 'extractor' {@link Function} generates a 'container', a 
jackson-compatible object containing the data that
+ * the routine bases it's compatibility-evaluation on.
+ * The 'assertion' {@link BiConsumer} accepts a pair of containers and asserts 
the compatibility. Incompatibilities are
+ * signaled by throwing an {@link AssertionError}. This implies that the 
method body will typically contain jUnit
+ * assertions.
+ */
+final class CompatibilityRoutine {
+
+   private final String key;
+   private final Class containerClass;
+   private final Function, C> extractor;
+   private final BiConsumer assertion;
+
+   CompatibilityRoutine(
+   final String key,
+   final Class containerClass,
+   final Function, C> extractor,
+   final BiConsumer assertion) {
+   this.key = key;
+   this.containerClass = containerClass;
+   this.extractor = extractor;
+   this.assertion = assertion;
+   }
+
+   String getKey() {
+   return key;
+   }
+
+   Class getContainerClass() {
+   return containerClass;
+   }
+
+   C getContainer(final MessageHeaders header) {
+   final C container = extractor.apply(header);
+   Assert.assertNotNull("Implementation error: Extractor returned 
null.", container);
+   return container;
+   }
+
+   public CompatibilityCheckResult checkCompatibility(final Optional 
old, final Optional cur) {
+   if (!old.isPresent() && !cur.isPresent()) {
+   Assert.fail(String.format(
+   "Implementation error: Compatibility check 
container for routine %s for both old and new version is null.", key));
+   }
+   if (!old.isPresent()) {
+   // allow addition of new compatibility routines
+   return new CompatibilityCheckResult(COMPATIBLE);
+   }
+   if (!cur.isPresent()) {
+   // forbid removal of compatibility routines
+   return new CompatibilityCheckResult(
+   new AssertionError(String.format(
+   "Compatibility check container for 
routine %s not found in current version.", key)));
+   }
+
+   Compatibility backwardCompatibility;
+   AssertionError backwardIncompatibilityCause = null;
+   try {
+   assertion.accept(old.get(), cur.get());
+   backwardCompatibility = COMPATIBLE;
+   } catch (final Exception | AssertionError e) {
+   backwardCompatibility = INCOMPATIBLE;
+   backwardIncompatibilityCause = new AssertionError(key + 
": " + e.getMessage());
+   }
+
+   Compatibility forwardCompatibility;
+   try {
+   assertion.accept(cur.get(), old.get());
+   forwardCompatibility = COMPATIBLE;
+   } catch (final Exception | AssertionError e) {
+   

[GitHub] [flink] an0 commented on a change in pull request #8106: [FLINK-12092] [docs]Clarify when `onTimer(...)` is called

2019-04-10 Thread GitBox
an0 commented on a change in pull request #8106: [FLINK-12092] [docs]Clarify 
when `onTimer(...)` is called
URL: https://github.com/apache/flink/pull/8106#discussion_r274040206
 
 

 ##
 File path: docs/dev/stream/operators/process_function.md
 ##
 @@ -44,8 +44,8 @@ For fault-tolerant state, the `ProcessFunction` gives access 
to Flink's [keyed s
 The timers allow applications to react to changes in processing time and in 
[event time]({{ site.baseurl }}/dev/event_time.html).
 Every call to the function `processElement(...)` gets a `Context` object which 
gives access to the element's
 event time timestamp, and to the *TimerService*. The `TimerService` can be 
used to register callbacks for future
-event-/processing-time instants. When a timer's particular time is reached, 
the `onTimer(...)` method is
-called. During that call, all states are again scoped to the key with which 
the timer was created, allowing
+event-/processing-time instants. The `onTimer(...)` method is
+called when such an event-time is first caught up by a watermark or such a 
processing-time is reached. During that call, all states are again scoped to 
the key with which the timer was created, allowing
 
 Review comment:
   Sure. Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11799) KryoSerializer/OperatorChain ignores copy failure resulting in NullPointerException

2019-04-10 Thread Jason Kania (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16814589#comment-16814589
 ] 

Jason Kania commented on FLINK-11799:
-

Hi Liya,

 

Your approach seems reasonable.

Thanks,

Jason

> KryoSerializer/OperatorChain ignores copy failure resulting in 
> NullPointerException
> ---
>
> Key: FLINK-11799
> URL: https://issues.apache.org/jira/browse/FLINK-11799
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.7.2
>Reporter: Jason Kania
>Priority: Major
>
> I was encountering a problem with NullPointerExceptions with the deserialized 
> object reaching my ProcessFunction process() method implementation as a null 
> value. Upon investigation, I discovered two issues with the implementation of 
> the KryoSerializer copy().
> 1) The 'public T copy(T from)' method swallows the error if the kryo copy() 
> call generates an exception. The code should report the copy error at least 
> once as a warning to be aware that the kryo copy() is failing. I understand 
> that the code is there to handle the lack of a copy implementation but due to 
> the potential inefficiency of having to write and read the object instead of 
> copying it, this would seem useful information to share at the least. It is 
> also important to have a warning in case the cause of the copy error is 
> something that needs to be fixed.
> 2) The call to 'kryo.readObject(input, from.getClass())' does not handle the 
> fact that the kryo readObject(Input input, Class aClass) method may return a 
> null value if there are any issues. This could be handled with a check or 
> warning in the OperatorChain.CopyingChainingOutput.pushToOperator() method 
> but is also ignored there, allowing a null value to be passed along without 
> providing any reason for the null value in logging.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] dawidwys commented on issue #8050: [FLINK-11067][table] Convert TableEnvironments to interfaces

2019-04-10 Thread GitBox
dawidwys commented on issue #8050: [FLINK-11067][table] Convert 
TableEnvironments to interfaces
URL: https://github.com/apache/flink/pull/8050#issuecomment-481752758
 
 
   Hi @hequn8128,
   I had another look and some discussion with @twalthr about this PR and we 
came up with few ideas how to proceed further with this PR:
   
   1. The `Stream/BatchTableDescriptor`, `Stream/BatchTableSource`, 
`Stream/BatchTableSink`, `Stream/BatchTableSourceFactory`, 
`Stream/BatchTableSinkFactory` should end up in the bridging module. In the 
long run we need to rework them anyway. The new versions will end up in the 
api-java.
   This might solve our problems with the reflection lookup for factories.
   2. Could you please split the PR into smaller commits that move a smaller 
"hierarchies" of classes one at a time. The last commit would move just extract 
the TableEnvironment once all the needed classes are already moved? This would 
help us review this PR tremendously.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] GJL commented on a change in pull request #7986: [FLINK-10517][rest] Add stability test

2019-04-10 Thread GitBox
GJL commented on a change in pull request #7986: [FLINK-10517][rest] Add 
stability test
URL: https://github.com/apache/flink/pull/7986#discussion_r274038147
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/compatibility/CompatibilityRoutine.java
 ##
 @@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.compatibility;
+
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+
+import org.junit.Assert;
+
+import java.util.Optional;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+
+import static 
org.apache.flink.runtime.rest.compatibility.Compatibility.COMPATIBLE;
+import static 
org.apache.flink.runtime.rest.compatibility.Compatibility.IDENTICAL;
+import static 
org.apache.flink.runtime.rest.compatibility.Compatibility.INCOMPATIBLE;
+
+/**
+ * Routine for checking the compatibility of a {@link MessageHeaders} pair.
+ *
+ * The 'extractor' {@link Function} generates a 'container', a 
jackson-compatible object containing the data that
+ * the routine bases it's compatibility-evaluation on.
+ * The 'assertion' {@link BiConsumer} accepts a pair of containers and asserts 
the compatibility. Incompatibilities are
+ * signaled by throwing an {@link AssertionError}. This implies that the 
method body will typically contain jUnit
+ * assertions.
+ */
+final class CompatibilityRoutine {
+
+   private final String key;
+   private final Class containerClass;
+   private final Function, C> extractor;
+   private final BiConsumer assertion;
+
+   CompatibilityRoutine(
+   final String key,
+   final Class containerClass,
+   final Function, C> extractor,
+   final BiConsumer assertion) {
+   this.key = key;
+   this.containerClass = containerClass;
+   this.extractor = extractor;
+   this.assertion = assertion;
+   }
+
+   String getKey() {
+   return key;
+   }
+
+   Class getContainerClass() {
+   return containerClass;
+   }
+
+   C getContainer(final MessageHeaders header) {
+   final C container = extractor.apply(header);
+   Assert.assertNotNull("Implementation error: Extractor returned 
null.", container);
+   return container;
+   }
+
+   public CompatibilityCheckResult checkCompatibility(final Optional 
old, final Optional cur) {
+   if (!old.isPresent() && !cur.isPresent()) {
+   Assert.fail(String.format(
+   "Implementation error: Compatibility check 
container for routine %s for both old and new version is null.", key));
+   }
+   if (!old.isPresent()) {
+   // allow addition of new compatibility routines
+   return new CompatibilityCheckResult(COMPATIBLE);
+   }
+   if (!cur.isPresent()) {
+   // forbid removal of compatibility routines
+   return new CompatibilityCheckResult(
+   new AssertionError(String.format(
+   "Compatibility check container for 
routine %s not found in current version.", key)));
+   }
+
+   Compatibility backwardCompatibility;
+   AssertionError backwardIncompatibilityCause = null;
+   try {
+   assertion.accept(old.get(), cur.get());
+   backwardCompatibility = COMPATIBLE;
+   } catch (final Exception | AssertionError e) {
+   backwardCompatibility = INCOMPATIBLE;
+   backwardIncompatibilityCause = new AssertionError(key + 
": " + e.getMessage());
+   }
+
+   Compatibility forwardCompatibility;
+   try {
+   assertion.accept(cur.get(), old.get());
+   forwardCompatibility = COMPATIBLE;
+   } catch (final Exception | AssertionError e) {
 
 Review comment:
   If the logic in the 

[GitHub] [flink] GJL commented on a change in pull request #7986: [FLINK-10517][rest] Add stability test

2019-04-10 Thread GitBox
GJL commented on a change in pull request #7986: [FLINK-10517][rest] Add 
stability test
URL: https://github.com/apache/flink/pull/7986#discussion_r274038147
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/compatibility/CompatibilityRoutine.java
 ##
 @@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.compatibility;
+
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+
+import org.junit.Assert;
+
+import java.util.Optional;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+
+import static 
org.apache.flink.runtime.rest.compatibility.Compatibility.COMPATIBLE;
+import static 
org.apache.flink.runtime.rest.compatibility.Compatibility.IDENTICAL;
+import static 
org.apache.flink.runtime.rest.compatibility.Compatibility.INCOMPATIBLE;
+
+/**
+ * Routine for checking the compatibility of a {@link MessageHeaders} pair.
+ *
+ * The 'extractor' {@link Function} generates a 'container', a 
jackson-compatible object containing the data that
+ * the routine bases it's compatibility-evaluation on.
+ * The 'assertion' {@link BiConsumer} accepts a pair of containers and asserts 
the compatibility. Incompatibilities are
+ * signaled by throwing an {@link AssertionError}. This implies that the 
method body will typically contain jUnit
+ * assertions.
+ */
+final class CompatibilityRoutine {
+
+   private final String key;
+   private final Class containerClass;
+   private final Function, C> extractor;
+   private final BiConsumer assertion;
+
+   CompatibilityRoutine(
+   final String key,
+   final Class containerClass,
+   final Function, C> extractor,
+   final BiConsumer assertion) {
+   this.key = key;
+   this.containerClass = containerClass;
+   this.extractor = extractor;
+   this.assertion = assertion;
+   }
+
+   String getKey() {
+   return key;
+   }
+
+   Class getContainerClass() {
+   return containerClass;
+   }
+
+   C getContainer(final MessageHeaders header) {
+   final C container = extractor.apply(header);
+   Assert.assertNotNull("Implementation error: Extractor returned 
null.", container);
+   return container;
+   }
+
+   public CompatibilityCheckResult checkCompatibility(final Optional 
old, final Optional cur) {
+   if (!old.isPresent() && !cur.isPresent()) {
+   Assert.fail(String.format(
+   "Implementation error: Compatibility check 
container for routine %s for both old and new version is null.", key));
+   }
+   if (!old.isPresent()) {
+   // allow addition of new compatibility routines
+   return new CompatibilityCheckResult(COMPATIBLE);
+   }
+   if (!cur.isPresent()) {
+   // forbid removal of compatibility routines
+   return new CompatibilityCheckResult(
+   new AssertionError(String.format(
+   "Compatibility check container for 
routine %s not found in current version.", key)));
+   }
+
+   Compatibility backwardCompatibility;
+   AssertionError backwardIncompatibilityCause = null;
+   try {
+   assertion.accept(old.get(), cur.get());
+   backwardCompatibility = COMPATIBLE;
+   } catch (final Exception | AssertionError e) {
+   backwardCompatibility = INCOMPATIBLE;
+   backwardIncompatibilityCause = new AssertionError(key + 
": " + e.getMessage());
+   }
+
+   Compatibility forwardCompatibility;
+   try {
+   assertion.accept(cur.get(), old.get());
+   forwardCompatibility = COMPATIBLE;
+   } catch (final Exception | AssertionError e) {
 
 Review comment:
   If the logic in the 

[GitHub] [flink] GJL commented on a change in pull request #7986: [FLINK-10517][rest] Add stability test

2019-04-10 Thread GitBox
GJL commented on a change in pull request #7986: [FLINK-10517][rest] Add 
stability test
URL: https://github.com/apache/flink/pull/7986#discussion_r274038147
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/compatibility/CompatibilityRoutine.java
 ##
 @@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.compatibility;
+
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+
+import org.junit.Assert;
+
+import java.util.Optional;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+
+import static 
org.apache.flink.runtime.rest.compatibility.Compatibility.COMPATIBLE;
+import static 
org.apache.flink.runtime.rest.compatibility.Compatibility.IDENTICAL;
+import static 
org.apache.flink.runtime.rest.compatibility.Compatibility.INCOMPATIBLE;
+
+/**
+ * Routine for checking the compatibility of a {@link MessageHeaders} pair.
+ *
+ * The 'extractor' {@link Function} generates a 'container', a 
jackson-compatible object containing the data that
+ * the routine bases it's compatibility-evaluation on.
+ * The 'assertion' {@link BiConsumer} accepts a pair of containers and asserts 
the compatibility. Incompatibilities are
+ * signaled by throwing an {@link AssertionError}. This implies that the 
method body will typically contain jUnit
+ * assertions.
+ */
+final class CompatibilityRoutine {
+
+   private final String key;
+   private final Class containerClass;
+   private final Function, C> extractor;
+   private final BiConsumer assertion;
+
+   CompatibilityRoutine(
+   final String key,
+   final Class containerClass,
+   final Function, C> extractor,
+   final BiConsumer assertion) {
+   this.key = key;
+   this.containerClass = containerClass;
+   this.extractor = extractor;
+   this.assertion = assertion;
+   }
+
+   String getKey() {
+   return key;
+   }
+
+   Class getContainerClass() {
+   return containerClass;
+   }
+
+   C getContainer(final MessageHeaders header) {
+   final C container = extractor.apply(header);
+   Assert.assertNotNull("Implementation error: Extractor returned 
null.", container);
+   return container;
+   }
+
+   public CompatibilityCheckResult checkCompatibility(final Optional 
old, final Optional cur) {
+   if (!old.isPresent() && !cur.isPresent()) {
+   Assert.fail(String.format(
+   "Implementation error: Compatibility check 
container for routine %s for both old and new version is null.", key));
+   }
+   if (!old.isPresent()) {
+   // allow addition of new compatibility routines
+   return new CompatibilityCheckResult(COMPATIBLE);
+   }
+   if (!cur.isPresent()) {
+   // forbid removal of compatibility routines
+   return new CompatibilityCheckResult(
+   new AssertionError(String.format(
+   "Compatibility check container for 
routine %s not found in current version.", key)));
+   }
+
+   Compatibility backwardCompatibility;
+   AssertionError backwardIncompatibilityCause = null;
+   try {
+   assertion.accept(old.get(), cur.get());
+   backwardCompatibility = COMPATIBLE;
+   } catch (final Exception | AssertionError e) {
+   backwardCompatibility = INCOMPATIBLE;
+   backwardIncompatibilityCause = new AssertionError(key + 
": " + e.getMessage());
+   }
+
+   Compatibility forwardCompatibility;
+   try {
+   assertion.accept(cur.get(), old.get());
+   forwardCompatibility = COMPATIBLE;
+   } catch (final Exception | AssertionError e) {
 
 Review comment:
   If the logic in the 

[GitHub] [flink] GJL commented on a change in pull request #7986: [FLINK-10517][rest] Add stability test

2019-04-10 Thread GitBox
GJL commented on a change in pull request #7986: [FLINK-10517][rest] Add 
stability test
URL: https://github.com/apache/flink/pull/7986#discussion_r274033867
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/compatibility/RestAPIStabilityTest.java
 ##
 @@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.compatibility;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.rest.util.DocumentingDispatcherRestEndpoint;
+import org.apache.flink.runtime.rest.util.DocumentingRestEndpoint;
+import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * Stability test and snapshot generator for the REST API.
+ */
+@RunWith(Parameterized.class)
+public final class RestAPIStabilityTest extends TestLogger {
+
+   private static final String REGENERATE_SNAPSHOT_PROPERTY = 
"generate-rest-snapshot";
+
+   private static final String SNAPSHOT_RESOURCE_PATTERN = 
"rest_api_%s.snapshot";
+
+   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+   @Parameterized.Parameters(name = "version = {0}")
+   public static Iterable getStableVersions() {
+   return Arrays.stream(RestAPIVersion.values())
+   .filter(RestAPIVersion::isStableVersion)
+   .collect(Collectors.toList());
+   }
+
+   private final RestAPIVersion apiVersion;
+
+   public RestAPIStabilityTest(final RestAPIVersion apiVersion) {
+   this.apiVersion = apiVersion;
+   }
+
+   @Test
+   public void testDispatcherRestAPIStability() throws IOException {
+   final String versionedSnapshotFileName = 
String.format(SNAPSHOT_RESOURCE_PATTERN, apiVersion.getURLVersionPrefix());
+
+   final RestAPISnapshot currentSnapshot = createSnapshot(new 
DocumentingDispatcherRestEndpoint());
+
+   if (System.getProperty(REGENERATE_SNAPSHOT_PROPERTY) != null) {
+   writeSnapshot(versionedSnapshotFileName, 
currentSnapshot);
+   }
+
+   final URL resource = 
RestAPIStabilityTest.class.getClassLoader().getResource(versionedSnapshotFileName);
+   if (resource == null) {
+   Assert.fail("Snapshot file does not exist. If you added 
a new version, re-run this test with" +
+   " -D" + REGENERATE_SNAPSHOT_PROPERTY + " being 
set.");
+   }
+   final RestAPISnapshot previousSnapshot = 
OBJECT_MAPPER.readValue(new File(resource.getFile()), RestAPISnapshot.class);
+
+   assertCompatible(previousSnapshot, currentSnapshot);
+   }
+
+   private static void writeSnapshot(final String 
versionedSnapshotFileName, final RestAPISnapshot snapshot) throws IOException {
+   OBJECT_MAPPER.writerWithDefaultPrettyPrinter()
+   .writeValue(
+   new File("src/test/resources/" + 
versionedSnapshotFileName),
+   snapshot);
+   System.out.println("REST API snapshot " + 
versionedSnapshotFileName + " was updated, please remember to commit the 
snapshot.");
+   }
+
+   private RestAPISnapshot createSnapshot(final DocumentingRestEndpoint 
dispatcherRestEndpoint) {
+   final List calls = 
dispatcherRestEndpoint.getSpecs().stream()

[GitHub] [flink] dawidwys commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-04-10 Thread GitBox
dawidwys commented on a change in pull request #8007: [FLINK-11474][table] Add 
ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r273911156
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogDatabase.java
 ##
 @@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import java.util.Map;
+
+/**
+ * Represents a database object in a catalog.
+ */
+public interface CatalogDatabase {
 
 Review comment:
   What is a database?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-10 Thread GitBox
pnowojski commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r274017522
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
 ##
 @@ -105,6 +106,302 @@ public NettyConfig nettyConfig() {
return nettyConfig;
}
 
+   public boolean isCreditBased() {
+   return isCreditBased;
+   }
+
+   // 

+
+   /**
+* Utility method to extract network related parameters from the 
configuration and to
+* sanity check them.
+*
+* @param configuration configuration object
+* @param maxJvmHeapMemory the maximum JVM heap size (in bytes)
+* @param localTaskManagerCommunication true, to skip initializing the 
network stack
+* @param taskManagerAddress identifying the IP address under which the 
TaskManager will be accessible
+* @return NetworkEnvironmentConfiguration
+*/
+   @Deprecated
+   public static NetworkEnvironmentConfiguration fromConfiguration(
+   Configuration configuration,
+   long maxJvmHeapMemory,
+   boolean localTaskManagerCommunication,
+   InetAddress taskManagerAddress) {
+
+   // > hosts / ports for communication and data exchange
+
+   final int dataport = 
configuration.getInteger(TaskManagerOptions.DATA_PORT);
+   ConfigurationParserUtils.checkConfigParameter(dataport >= 0, 
dataport, TaskManagerOptions.DATA_PORT.key(),
+   "Leave config parameter empty or use 0 to let the 
system choose a port automatically.");
+
+   final int pageSize = 
ConfigurationParserUtils.getPageSize(configuration);
+
+   final int numNetworkBuffers;
+   if (!hasNewNetworkConfig(configuration)) {
+   // fallback: number of network buffers
+   numNetworkBuffers = 
configuration.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS);
+
+   checkOldNetworkConfig(numNetworkBuffers);
+   } else {
+   if 
(configuration.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) {
+   LOG.info("Ignoring old (but still present) 
network buffer configuration via {}.",
+   
TaskManagerOptions.NETWORK_NUM_BUFFERS.key());
+   }
+
+   final long networkMemorySize = 
calculateNewNetworkBufferMemory(configuration, maxJvmHeapMemory);
+
+   // tolerate offcuts between intended and allocated 
memory due to segmentation (will be available to the user-space memory)
+   long numNetworkBuffersLong = networkMemorySize / 
pageSize;
+   if (numNetworkBuffersLong > Integer.MAX_VALUE) {
+   throw new IllegalArgumentException("The given 
number of memory bytes (" + networkMemorySize
+   + ") corresponds to more than MAX_INT 
pages.");
+   }
+   numNetworkBuffers = (int) numNetworkBuffersLong;
+   }
+
+   final NettyConfig nettyConfig;
+   if (!localTaskManagerCommunication) {
+   final InetSocketAddress taskManagerInetSocketAddress = 
new InetSocketAddress(taskManagerAddress, dataport);
+
+   nettyConfig = new 
NettyConfig(taskManagerInetSocketAddress.getAddress(), 
taskManagerInetSocketAddress.getPort(),
+   pageSize, 
ConfigurationParserUtils.getSlot(configuration), configuration);
+   } else {
+   nettyConfig = null;
+   }
+
+   int initialRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL);
+   int maxRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX);
+
+   int buffersPerChannel = 
configuration.getInteger(TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL);
+   int extraBuffersPerGate = 
configuration.getInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE);
+
+   boolean isCreditBased = 
configuration.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL);
+
+   return new NetworkEnvironmentConfiguration(
+   numNetworkBuffers,
+   pageSize,
+   initialRequestBackoff,
+   maxRequestBackoff,
+   buffersPerChannel,
+   extraBuffersPerGate,
+   isCreditBased,
+   nettyConfig);
+   }
+
+   /**
+* 

[GitHub] [flink] zentol commented on a change in pull request #8002: [FLINK-11923][metrics] MetricRegistryConfiguration provides MetricReporters Suppliers

2019-04-10 Thread GitBox
zentol commented on a change in pull request #8002: [FLINK-11923][metrics] 
MetricRegistryConfiguration provides MetricReporters Suppliers
URL: https://github.com/apache/flink/pull/8002#discussion_r274014721
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
 ##
 @@ -113,19 +114,12 @@ public MetricRegistryImpl(MetricRegistryConfiguration 
config) {
// by default, don't report anything
LOG.info("No metrics reporter configured, no metrics 
will be exposed/reported.");
} else {
-   // we have some reporters so
-   for (Tuple2 
reporterConfiguration: reporterConfigurations) {
-   String namedReporter = reporterConfiguration.f0;
-   Configuration reporterConfig = 
reporterConfiguration.f1;
-
-   final String className = 
reporterConfig.getString(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, null);
-   if (className == null) {
-   LOG.error("No reporter class set for 
reporter " + namedReporter + ". Metrics might not be exposed/reported.");
-   continue;
-   }
+   for (ReporterSetup reporterSetup : 
reporterConfigurations) {
+   final String namedReporter = 
reporterSetup.getName();
+   final MetricConfig metricConfig = 
reporterSetup.getConfiguration();
 
try {
-   String configuredPeriod = 
reporterConfig.getString(ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, 
null);
+   String configuredPeriod = 
metricConfig.getString(ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, null);
 
 Review comment:
   I may be biased here, but I don't believe we're hiding anything. 
`MetricConfig` is just a minor extension of `Properties` (which in hindsight 
shouldn't even exist) and I have no plans to change that. So long as that is 
the case there's no other place the value could come from but the configuration.
   
   As a more relatable example, would you say that 
`Configuration#getJobManagerHost()` hides in any way that the value is read 
from the configuration?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8144: [FLINK-12159]. Enable YarnMiniCluster integration test under non-secure mode

2019-04-10 Thread GitBox
flinkbot commented on issue #8144: [FLINK-12159]. Enable YarnMiniCluster 
integration test under non-secure mode
URL: https://github.com/apache/flink/pull/8144#issuecomment-481732968
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12159) Enable YarnMiniCluster integration test under non-secure mode

2019-04-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12159:
---
Labels: pull-request-available  (was: )

> Enable YarnMiniCluster integration test under non-secure mode
> -
>
> Key: FLINK-12159
> URL: https://issues.apache.org/jira/browse/FLINK-12159
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.8.0
>Reporter: Jeff Zhang
>Assignee: Jeff Zhang
>Priority: Major
>  Labels: pull-request-available
>
> Currently if third party want to use flink for integration under yarn mode, 
> it has to be secure mode, it doesn't make sense. We should also allow it 
> under non-secure mode.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] zjffdu opened a new pull request #8144: [FLINK-12159]. Enable YarnMiniCluster integration test under non-secure mode

2019-04-10 Thread GitBox
zjffdu opened a new pull request #8144: [FLINK-12159]. Enable YarnMiniCluster 
integration test under non-secure mode
URL: https://github.com/apache/flink/pull/8144
 
 
   ## What is the purpose of the change
   I can not run flink in YarnMiniCluster for integration test because 
yarn-site.xml is not shipped to yarn container (krb5Config must be non-empty 
otherwise yarn-site.xml won't been shipped). It doesn't make sense to to only 
allow yarn integration under secure mode. So this PR just try to enable 
YarnMiniCluster integration test under non-secure mode. I tested this PR for 
zeppelin flink integration test.
   
   
   ## Brief change log
   
   Straightforward code refactoring to separate `yarn-site.xml` and 
`krb5Config` file.
   
   
   ## Verifying this change
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   I verified this PR in zeppelin flink integration test.
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: ( no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no )
 - The S3 file system connector: (no )
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zentol commented on a change in pull request #8002: [FLINK-11923][metrics] MetricRegistryConfiguration provides MetricReporters Suppliers

2019-04-10 Thread GitBox
zentol commented on a change in pull request #8002: [FLINK-11923][metrics] 
MetricRegistryConfiguration provides MetricReporters Suppliers
URL: https://github.com/apache/flink/pull/8002#discussion_r274009874
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
 ##
 @@ -113,19 +114,12 @@ public MetricRegistryImpl(MetricRegistryConfiguration 
config) {
// by default, don't report anything
LOG.info("No metrics reporter configured, no metrics 
will be exposed/reported.");
} else {
-   // we have some reporters so
-   for (Tuple2 
reporterConfiguration: reporterConfigurations) {
-   String namedReporter = reporterConfiguration.f0;
-   Configuration reporterConfig = 
reporterConfiguration.f1;
-
-   final String className = 
reporterConfig.getString(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, null);
-   if (className == null) {
-   LOG.error("No reporter class set for 
reporter " + namedReporter + ". Metrics might not be exposed/reported.");
-   continue;
-   }
+   for (ReporterSetup reporterSetup : 
reporterConfigurations) {
 
 Review comment:
   I assumed you meant that the reporter class would have to hard-code the 
name, but you're suggesting that the reporter parses the interval and name from 
the configuration I guess.
   
   So for one this wouldn't really work with the subsequent introduction of 
factories, which requires the MetricConfig to be assembled before the reporter 
is even instantiated. One could move the logic into the factory of course; my 
point is that this is not something we can do _now_.
   
   Reporters (or factories for that matter) shouldn't be aware that reporter 
names actually exist; this is an implementation detail of the metric system to 
allow for distinct configurations. Whether multiple reporters exist or not is 
not irrelevant to a given reporter, so they shouldn't have to deal with it. I 
also don't see a way how a reporter would be able to determine it's own name 
from the given configuration. The metric-wide configuration does not contain 
this info in an obvious fashion, and the metric config does not contain it at 
all. I can only see this working if we (the MetricRegistry/-Configuration) 
explicitly write it into the MetricConfig, but this obviously doesn't make any 
sense.
   
   Admittedly, the interval makes more sense. It is true that the current 
reporter configuration is a mix between reporter-facing options (like 
reporter-specific arguments) and system-facing options (like the interval). The 
current approach however allows us to ensure that certain configuration options 
exist and are actually applied; in a design where the `Scheduled` reporter 
provides the interval you cannot guarantee that the interval is configurable 
for users, or that a configured interval is respected.
   The same applies to other system-facing reporter options, like delimiters. 
So long as reporter don't massively go out of their way to avoid it (i.e. not 
working at all against the MetricGroup interface (with some currently existing 
exceptions)) the configured delimiter _will_ be used.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] aloyszhang commented on issue #8143: [hotfix][javadocs] remove redundant 'for example' for InputFormat

2019-04-10 Thread GitBox
aloyszhang commented on issue #8143: [hotfix][javadocs] remove redundant 'for 
example' for InputFormat
URL: https://github.com/apache/flink/pull/8143#issuecomment-481727297
 
 
   @zentol 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8143: [hotfix][javadocs] remove redundant 'for example' for InputFormat

2019-04-10 Thread GitBox
flinkbot commented on issue #8143: [hotfix][javadocs] remove redundant 'for 
example' for InputFormat
URL: https://github.com/apache/flink/pull/8143#issuecomment-481726995
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] aloyszhang opened a new pull request #8143: [hotfix][javadocs] remove redundant 'for example' for InputFormat

2019-04-10 Thread GitBox
aloyszhang opened a new pull request #8143: [hotfix][javadocs] remove redundant 
'for example' for InputFormat
URL: https://github.com/apache/flink/pull/8143
 
 
   
   
   ## What is the purpose of the change
   
   This pull request remove the redundant 'for example' in javadoc of 
InputFormat.
   
   
   ## Brief change log
   
   change from ' such as for example a file path'  to ' such as a file path'
   
   
   ## Verifying this change
   
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-12159) Enable YarnMiniCluster integration test under non-secure mode

2019-04-10 Thread Jeff Zhang (JIRA)
Jeff Zhang created FLINK-12159:
--

 Summary: Enable YarnMiniCluster integration test under non-secure 
mode
 Key: FLINK-12159
 URL: https://issues.apache.org/jira/browse/FLINK-12159
 Project: Flink
  Issue Type: Improvement
  Components: Deployment / YARN
Affects Versions: 1.8.0
Reporter: Jeff Zhang


Currently if third party want to use flink for integration under yarn mode, it 
has to be secure mode, it doesn't make sense. We should also allow it under 
non-secure mode.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-12159) Enable YarnMiniCluster integration test under non-secure mode

2019-04-10 Thread Jeff Zhang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Zhang reassigned FLINK-12159:
--

Assignee: Jeff Zhang

> Enable YarnMiniCluster integration test under non-secure mode
> -
>
> Key: FLINK-12159
> URL: https://issues.apache.org/jira/browse/FLINK-12159
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.8.0
>Reporter: Jeff Zhang
>Assignee: Jeff Zhang
>Priority: Major
>
> Currently if third party want to use flink for integration under yarn mode, 
> it has to be secure mode, it doesn't make sense. We should also allow it 
> under non-secure mode.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] tillrohrmann commented on a change in pull request #8002: [FLINK-11923][metrics] MetricRegistryConfiguration provides MetricReporters Suppliers

2019-04-10 Thread GitBox
tillrohrmann commented on a change in pull request #8002: 
[FLINK-11923][metrics] MetricRegistryConfiguration provides MetricReporters 
Suppliers
URL: https://github.com/apache/flink/pull/8002#discussion_r273998850
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/metrics/ReporterSetupTest.java
 ##
 @@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.runtime.metrics.util.TestReporter;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Optional;
+
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for the {@link ReporterSetup}.
+ */
+public class ReporterSetupTest extends TestLogger {
+
+   /**
+* TestReporter1 class only for type differentiation.
+*/
+   static class TestReporter1 extends TestReporter {
+   }
+
+   /**
+* TestReporter2 class only for type differentiation.
+*/
+   static class TestReporter2 extends TestReporter {
+   }
+
+   /**
+* Verifies that a reporter can be configured with all it's arguments 
being forwarded.
+*/
+   @Test
+   public void testReporterArgumentForwarding() {
+   final Configuration config = new Configuration();
+
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
ReporterSetupTest.TestReporter1.class.getName());
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"reporter.arg1", "value1");
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"reporter.arg2", "value2");
+
+   final List reporterSetups = 
ReporterSetup.fromConfiguration(config);
+
+   Assert.assertEquals(1, reporterSetups.size());
+
+   final ReporterSetup reporterSetup = reporterSetups.get(0);
+   Assert.assertEquals("reporter", reporterSetup.getName());
+   Assert.assertEquals("value1", 
reporterSetup.getConfiguration().getString("arg1", null));
+   Assert.assertEquals("value2", 
reporterSetup.getConfiguration().getString("arg2", null));
+   
Assert.assertEquals(ReporterSetupTest.TestReporter1.class.getName(), 
reporterSetup.getConfiguration().getString("class", null));
+   }
+
+   /**
+* Verifies that multiple reporters can be configured with all their 
arguments being forwarded.
+*/
+   @Test
+   public void testSeveralReportersWithArgumentForwarding() {
 
 Review comment:
   W/o wanting to be pedantic here, but then we are missing the `0` case. I 
guess it does not hurt to keep the test but it adds a bit to our testing time.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #8002: [FLINK-11923][metrics] MetricRegistryConfiguration provides MetricReporters Suppliers

2019-04-10 Thread GitBox
tillrohrmann commented on a change in pull request #8002: 
[FLINK-11923][metrics] MetricRegistryConfiguration provides MetricReporters 
Suppliers
URL: https://github.com/apache/flink/pull/8002#discussion_r273995787
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
 ##
 @@ -113,19 +114,12 @@ public MetricRegistryImpl(MetricRegistryConfiguration 
config) {
// by default, don't report anything
LOG.info("No metrics reporter configured, no metrics 
will be exposed/reported.");
} else {
-   // we have some reporters so
-   for (Tuple2 
reporterConfiguration: reporterConfigurations) {
-   String namedReporter = reporterConfiguration.f0;
-   Configuration reporterConfig = 
reporterConfiguration.f1;
-
-   final String className = 
reporterConfig.getString(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, null);
-   if (className == null) {
-   LOG.error("No reporter class set for 
reporter " + namedReporter + ". Metrics might not be exposed/reported.");
-   continue;
-   }
+   for (ReporterSetup reporterSetup : 
reporterConfigurations) {
 
 Review comment:
   How so? The different instances of the 
`MetricReporterWithNameAndScopeDelimiter` could still return different values 
for `getName` and `getScopeDelimiter`. The motivating idea was to hide the fact 
that a reporter has a `MetricConfig` from which we obtain the values.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #8002: [FLINK-11923][metrics] MetricRegistryConfiguration provides MetricReporters Suppliers

2019-04-10 Thread GitBox
tillrohrmann commented on a change in pull request #8002: 
[FLINK-11923][metrics] MetricRegistryConfiguration provides MetricReporters 
Suppliers
URL: https://github.com/apache/flink/pull/8002#discussion_r273994424
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
 ##
 @@ -113,19 +114,12 @@ public MetricRegistryImpl(MetricRegistryConfiguration 
config) {
// by default, don't report anything
LOG.info("No metrics reporter configured, no metrics 
will be exposed/reported.");
} else {
-   // we have some reporters so
-   for (Tuple2 
reporterConfiguration: reporterConfigurations) {
-   String namedReporter = reporterConfiguration.f0;
-   Configuration reporterConfig = 
reporterConfiguration.f1;
-
-   final String className = 
reporterConfig.getString(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, null);
-   if (className == null) {
-   LOG.error("No reporter class set for 
reporter " + namedReporter + ". Metrics might not be exposed/reported.");
-   continue;
-   }
+   for (ReporterSetup reporterSetup : 
reporterConfigurations) {
+   final String namedReporter = 
reporterSetup.getName();
+   final MetricConfig metricConfig = 
reporterSetup.getConfiguration();
 
try {
-   String configuredPeriod = 
reporterConfig.getString(ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, 
null);
+   String configuredPeriod = 
metricConfig.getString(ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, null);
 
 Review comment:
   The benefit would be that we hide the fact that we read the value from the 
underlying `Configuration`. It would, thus, follow better the law of Demeter.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #8116: [FLINK-11920][scala] Reuse java classpath

2019-04-10 Thread GitBox
tillrohrmann commented on a change in pull request #8116: [FLINK-11920][scala] 
Reuse java classpath
URL: https://github.com/apache/flink/pull/8116#discussion_r273992844
 
 

 ##
 File path: 
flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerUpgradeTest.scala
 ##
 @@ -192,16 +192,7 @@ object EnumValueSerializerUpgradeTest {
 
 val settings = new GenericRunnerSettings(out.println _)
 
-val classLoader = Thread.currentThread().getContextClassLoader
-
-val urls = classLoader match {
-  case urlClassLoader: URLClassLoader =>
-urlClassLoader.getURLs
-  case x => throw new IllegalStateException(s"Not possible to extract URLs 
" +
-s"from class loader $x.")
-}
-
-settings.classpath.value = 
urls.map(_.toString).mkString(java.io.File.pathSeparator)
+settings.usejavacp.value = true
 
 Review comment:
   Maybe add a comment why we do this.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tzulitai removed a comment on issue #8140: [FLINK-12151][es1] Remove Elasticsearch 1 connector

2019-04-10 Thread GitBox
tzulitai removed a comment on issue #8140: [FLINK-12151][es1] Remove 
Elasticsearch 1 connector 
URL: https://github.com/apache/flink/pull/8140#issuecomment-481703278
 
 
   @flinkbot approve all


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #8140: [FLINK-12151][es1] Remove Elasticsearch 1 connector

2019-04-10 Thread GitBox
flinkbot edited a comment on issue #8140: [FLINK-12151][es1] Remove 
Elasticsearch 1 connector 
URL: https://github.com/apache/flink/pull/8140#issuecomment-481656785
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ✅ 1. The [description] looks good.
   - Approved by @tzulitai [PMC]
   * ✅ 2. There is [consensus] that the contribution should go into to Flink.
   - Approved by @tzulitai [PMC], @zentol [PMC]
   * ❓ 3. Needs [attention] from.
   * ✅ 4. The change fits into the overall [architecture].
   - Approved by @tzulitai [PMC]
   * ✅ 5. Overall code [quality] is good.
   - Approved by @tzulitai [PMC]
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tzulitai commented on issue #8140: [FLINK-12151][es1] Remove Elasticsearch 1 connector

2019-04-10 Thread GitBox
tzulitai commented on issue #8140: [FLINK-12151][es1] Remove Elasticsearch 1 
connector 
URL: https://github.com/apache/flink/pull/8140#issuecomment-481703278
 
 
   @flinkbot approve all


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tzulitai commented on a change in pull request #8140: [FLINK-12151][es1] Remove Elasticsearch 1 connector

2019-04-10 Thread GitBox
tzulitai commented on a change in pull request #8140: [FLINK-12151][es1] Remove 
Elasticsearch 1 connector 
URL: https://github.com/apache/flink/pull/8140#discussion_r273973666
 
 

 ##
 File path: docs/dev/connectors/elasticsearch.md
 ##
 @@ -82,54 +77,6 @@ Elasticsearch cluster.
 The example below shows how to configure and create a sink:
 
 
-
 
 Review comment:
   This only removes the Java variant. The Scala one is a bit further down and 
should also be removed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-12158) Update Java / Scala StatefulJobWBroadcastStateMigrationITCase for 1.8

2019-04-10 Thread leesf (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12158?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

leesf closed FLINK-12158.
-
Resolution: Duplicate

> Update Java / Scala StatefulJobWBroadcastStateMigrationITCase for 1.8
> -
>
> Key: FLINK-12158
> URL: https://issues.apache.org/jira/browse/FLINK-12158
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: leesf
>Assignee: leesf
>Priority: Major
> Fix For: 1.9.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12158) Update Java / Scala StatefulJobWBroadcastStateMigrationITCase for 1.8

2019-04-10 Thread leesf (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12158?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

leesf updated FLINK-12158:
--
Summary: Update Java / Scala StatefulJobWBroadcastStateMigrationITCase for 
1.8  (was: Update Java / Scala StatefulJobWBroadcastStateMigrationITCase for 
1.7)

> Update Java / Scala StatefulJobWBroadcastStateMigrationITCase for 1.8
> -
>
> Key: FLINK-12158
> URL: https://issues.apache.org/jira/browse/FLINK-12158
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: leesf
>Assignee: leesf
>Priority: Major
> Fix For: 1.9.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12158) Update Java / Scala StatefulJobWBroadcastStateMigrationITCase for 1.7

2019-04-10 Thread leesf (JIRA)
leesf created FLINK-12158:
-

 Summary: Update Java / Scala 
StatefulJobWBroadcastStateMigrationITCase for 1.7
 Key: FLINK-12158
 URL: https://issues.apache.org/jira/browse/FLINK-12158
 Project: Flink
  Issue Type: Sub-task
  Components: Tests
Reporter: leesf
Assignee: leesf
 Fix For: 1.9.0






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] flinkbot commented on issue #8142: [FLINK-12148][clients] Give precedence to specified local jar

2019-04-10 Thread GitBox
flinkbot commented on issue #8142: [FLINK-12148][clients] Give precedence to 
specified local jar
URL: https://github.com/apache/flink/pull/8142#issuecomment-481692125
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12148) Give precedence to specified local jar when same mainClass is included in both lib and specified jar

2019-04-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12148:
---
Labels: pull-request-available  (was: )

> Give precedence to specified local jar when same mainClass is included in 
> both lib and specified jar
> 
>
> Key: FLINK-12148
> URL: https://issues.apache.org/jira/browse/FLINK-12148
> Project: Flink
>  Issue Type: Bug
>  Components: Command Line Client
>Affects Versions: 1.8.0
>Reporter: leesf
>Assignee: leesf
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>
> When submitting the flink job with ./flink run -c mainClass localJar. If the 
> main class is included in the $FLINK_HOME/lib directory and the specified jar 
> package, then the mainClass in the $FLINK_HOME/lib directory is executed 
> instead of the specified localJar.
> For example, There is flink-examples-streaming_2.11-1.9-SNAPSHOT.jar in lib 
> directory that contains the 
> org.apache.flink.streaming.examples.wordcount.WordCount class, and 
> LocalWordCount.jar also contains 
> org.apache.flink.streaming.examples.wordcount The .WordCount class, then use 
> ./flink run -c org.apache.flink.streaming.examples.wordcount.WordCount 
> /tmp/LocalWordCount.jar to submit job, the WordCount class in 
> flink-examples-streaming_2.11-1.9-SNAPSHOT.jar is executed in instead of 
> LocalWordCount.jar.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] leesf opened a new pull request #8142: [FLINK-12148][clients] Give precedence to specified local jar

2019-04-10 Thread GitBox
leesf opened a new pull request #8142: [FLINK-12148][clients] Give precedence 
to specified local jar
URL: https://github.com/apache/flink/pull/8142
 
 
   ## What is the purpose of the change
   
   Give precedence to specified local jar when same mainClass is included in 
both lib and specified jar.
   
   ## Brief change log
   
   Change parentFirst to childFirst ClassLoader.
   
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not documented)
   
   cc @zentol 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] aljoscha commented on issue #8134: Update japicmp comparison version to latest Flink 1.8.0

2019-04-10 Thread GitBox
aljoscha commented on issue #8134: Update japicmp comparison version to latest 
Flink 1.8.0
URL: https://github.com/apache/flink/pull/8134#issuecomment-481677945
 
 
   merged


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] aljoscha closed pull request #8134: Update japicmp comparison version to latest Flink 1.8.0

2019-04-10 Thread GitBox
aljoscha closed pull request #8134: Update japicmp comparison version to latest 
Flink 1.8.0
URL: https://github.com/apache/flink/pull/8134
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #7938: [FLINK-10941] Keep slots which contain unconsumed result partitions (on top of #7186)

2019-04-10 Thread GitBox
pnowojski commented on a change in pull request #7938: [FLINK-10941] Keep slots 
which contain unconsumed result partitions (on top of #7186)
URL: https://github.com/apache/flink/pull/7938#discussion_r273939589
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
 ##
 @@ -707,6 +706,53 @@ public void testTaskManagerTimeout() throws Exception {
}
}
 
+   /**
+* Tests that idle but not releasable task managers will not be 
released even if timed out before it can be.
+*/
+   @Test
+   public void testTaskManagerNotReleasedBeforeItCanBe() throws Exception {
+   final long tmTimeout = 10L;
+
+   final CompletableFuture releaseFuture = new 
CompletableFuture<>();
+   final ResourceActions resourceManagerActions = new 
TestingResourceActionsBuilder()
+   .setReleaseResourceConsumer((instanceID, e) -> 
releaseFuture.complete(instanceID))
+   .build();
+   final ResourceManagerId resourceManagerId = 
ResourceManagerId.generate();
+   final ResourceID resourceID = ResourceID.generate();
+
+   final AtomicBoolean canBeReleased = new AtomicBoolean(false);
+   final TaskExecutorGateway taskExecutorGateway = new 
TestingTaskExecutorGatewayBuilder()
+   .setCanBeReleasedSupplier(canBeReleased::get)
+   .createTestingTaskExecutorGateway();
+   final TaskExecutorConnection taskManagerConnection = new 
TaskExecutorConnection(resourceID, taskExecutorGateway);
+
+   final SlotID slotId = new SlotID(resourceID, 0);
+   final ResourceProfile resourceProfile = new 
ResourceProfile(1.0, 1);
+   final SlotStatus slotStatus = new SlotStatus(slotId, 
resourceProfile);
+   final SlotReport slotReport = new SlotReport(slotStatus);
+
+   final Executor mainThreadExecutor = 
TestingUtils.defaultExecutor();
+
+   try (SlotManager slotManager = SlotManagerBuilder.newBuilder()
+   .setTaskManagerTimeout(Time.milliseconds(tmTimeout))
+   .build()) {
+
+   slotManager.start(resourceManagerId, 
mainThreadExecutor, resourceManagerActions);
+
+   mainThreadExecutor.execute(() -> 
slotManager.registerTaskManager(taskManagerConnection, slotReport));
+
+   // now it can not be released yet
+   canBeReleased.set(false);
+   
mainThreadExecutor.execute(slotManager::checkTaskManagerTimeouts);
 
 Review comment:
   shouldn't we wait for this to complete, because as it is now the assertion 
below might be a no-op (depending on the race condition)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #7938: [FLINK-10941] Keep slots which contain unconsumed result partitions (on top of #7186)

2019-04-10 Thread GitBox
pnowojski commented on a change in pull request #7938: [FLINK-10941] Keep slots 
which contain unconsumed result partitions (on top of #7186)
URL: https://github.com/apache/flink/pull/7938#discussion_r273935915
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ##
 @@ -124,7 +124,7 @@ private void enqueueAvailableReader(final 
NetworkSequenceViewReader reader) thro
return availableReaders;
}
 
-   public void notifyReaderCreated(final NetworkSequenceViewReader reader) 
{
+   void notifyReaderCreated(final NetworkSequenceViewReader reader) {
 
 Review comment:
   nit: are the changes in this file part of previous commit or a separate 
hotfix/refactor/clean up?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #7938: [FLINK-10941] Keep slots which contain unconsumed result partitions (on top of #7186)

2019-04-10 Thread GitBox
pnowojski commented on a change in pull request #7938: [FLINK-10941] Keep slots 
which contain unconsumed result partitions (on top of #7186)
URL: https://github.com/apache/flink/pull/7938#discussion_r273936111
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
 ##
 @@ -41,14 +41,17 @@
private final Time taskManagerRequestTimeout;
private final Time slotRequestTimeout;
private final Time taskManagerTimeout;
+   private final boolean waitResultConsumedBeforeRelease;
 
public SlotManagerConfiguration(
-   Time taskManagerRequestTimeout,
-   Time slotRequestTimeout,
-   Time taskManagerTimeout) {
+   Time taskManagerRequestTimeout,
 
 Review comment:
   formatting still?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #7938: [FLINK-10941] Keep slots which contain unconsumed result partitions (on top of #7186)

2019-04-10 Thread GitBox
pnowojski commented on a change in pull request #7938: [FLINK-10941] Keep slots 
which contain unconsumed result partitions (on top of #7186)
URL: https://github.com/apache/flink/pull/7938#discussion_r273935683
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
 ##
 @@ -378,7 +378,10 @@ public void releaseMemory(int toRelease) throws 
IOException {
}
 
/**
-* Whether this partition is released, e.g. all subpartitions are 
consumed or task is cancelled.
+* Whether this partition is released.
 
 Review comment:
   nit: Shouldn't this be a part of previous commit?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-12087) Introduce over window operators to blink batch

2019-04-10 Thread Kurt Young (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12087?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young closed FLINK-12087.
--
   Resolution: Fixed
Fix Version/s: 1.9.0

fixed in 91221d6507655c6af366adb5aea075640a0a90a2

> Introduce over window operators to blink batch
> --
>
> Key: FLINK-12087
> URL: https://issues.apache.org/jira/browse/FLINK-12087
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Runtime
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Introduce NonBufferOverWindowOperator: Some over windows do not need to 
> buffer data, such as rank, rows between unbounded preceding and 0, etc. We 
> introduce NonBufferOverWindowOperator to reduce the overhead of data copy in 
> buffer.
> Introduce BufferDataOverWindowOperator and OverWindowFrame: 1. Minimize 
> duplicate computation in various OverWindowFrame implementations. 2. An 
> OverWindowOperator can compute multiple window frames.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] KurtYoung merged pull request #8102: [FLINK-12087][table-runtime-blink] Introduce over window operators to blink batch

2019-04-10 Thread GitBox
KurtYoung merged pull request #8102: [FLINK-12087][table-runtime-blink] 
Introduce over window operators to blink batch
URL: https://github.com/apache/flink/pull/8102
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12117) CassandraConnectorITCase fails on Java 9

2019-04-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12117?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12117:
---
Labels: pull-request-available  (was: )

> CassandraConnectorITCase fails on Java 9
> 
>
> Key: FLINK-12117
> URL: https://issues.apache.org/jira/browse/FLINK-12117
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Cassandra, Tests
>Affects Versions: 1.9.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>
> From what I found cassandra never really supported Java 9, so we will likely 
> have to disable the tests in the jdk9 profile.
> {code}
> java.lang.ExceptionInInitializerError
>   at org.github.jamm.MemoryMeter.measure(MemoryMeter.java:178)
>   at org.apache.cassandra.utils.ObjectSizes.measure(ObjectSizes.java:162)
>   at org.apache.cassandra.utils.ObjectSizes.(ObjectSizes.java:39)
>   at 
> org.apache.cassandra.dht.RandomPartitioner.(RandomPartitioner.java:47)
>   at java.base/java.lang.Class.forName0(Native Method)
>   at java.base/java.lang.Class.forName(Class.java:292)
>   at 
> org.apache.cassandra.utils.FBUtilities.classForName(FBUtilities.java:434)
>   at 
> org.apache.cassandra.utils.FBUtilities.instanceOrConstruct(FBUtilities.java:450)
>   at 
> org.apache.cassandra.utils.FBUtilities.newPartitioner(FBUtilities.java:400)
>   at 
> org.apache.cassandra.config.DatabaseDescriptor.applyConfig(DatabaseDescriptor.java:353)
>   at 
> org.apache.cassandra.config.DatabaseDescriptor.(DatabaseDescriptor.java:119)
>   at 
> org.apache.cassandra.service.StartupChecks$4.execute(StartupChecks.java:167)
>   at 
> org.apache.cassandra.service.StartupChecks.verify(StartupChecks.java:107)
>   at 
> org.apache.cassandra.service.CassandraDaemon.setup(CassandraDaemon.java:162)
>   at 
> org.apache.cassandra.service.CassandraDaemon.init(CassandraDaemon.java:416)
>   at 
> org.apache.flink.streaming.connectors.cassandra.CassandraConnectorITCase$EmbeddedCassandraService.start(CassandraConnectorITCase.java:147)
>   at 
> org.apache.flink.streaming.connectors.cassandra.CassandraConnectorITCase.startCassandra(CassandraConnectorITCase.java:186)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:564)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>   at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>   at 
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> Caused by: java.lang.StringIndexOutOfBoundsException: begin 0, end -1, length 
> 5
>   at java.base/java.lang.String.checkBoundsBeginEnd(String.java:3116)
>   at java.base/java.lang.String.substring(String.java:1885)
>   at 
> org.github.jamm.MemoryLayoutSpecification.getEffectiveMemoryLayoutSpecification(MemoryLayoutSpecification.java:190)
>   at 
> org.github.jamm.MemoryLayoutSpecification.(MemoryLayoutSpecification.java:31)
>   ... 34 more
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] flinkbot commented on issue #8141: [FLINK-12117][cassandra] Disable tests on Java 9

2019-04-10 Thread GitBox
flinkbot commented on issue #8141: [FLINK-12117][cassandra] Disable tests on 
Java 9
URL: https://github.com/apache/flink/pull/8141#issuecomment-481671825
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zentol opened a new pull request #8141: [FLINK-12117][cassandra] Disable tests on Java 9

2019-04-10 Thread GitBox
zentol opened a new pull request #8141: [FLINK-12117][cassandra] Disable tests 
on Java 9
URL: https://github.com/apache/flink/pull/8141
 
 
   ## What is the purpose of the change
   
   Disables cassandra tests when run on Java 9 since cassandra just doesn't 
support it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #8138: [FLINK-12155][coordination] Remove legacy TaskManager

2019-04-10 Thread GitBox
flinkbot edited a comment on issue #8138: [FLINK-12155][coordination] Remove 
legacy TaskManager
URL: https://github.com/apache/flink/pull/8138#issuecomment-481652522
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ✅ 1. The [description] looks good.
   - Approved by @zentol [PMC]
   * ✅ 2. There is [consensus] that the contribution should go into to Flink.
   - Approved by @zentol [PMC]
   * ❓ 3. Needs [attention] from.
   * ✅ 4. The change fits into the overall [architecture].
   - Approved by @zentol [PMC]
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zentol commented on issue #8138: [FLINK-12155][coordination] Remove legacy TaskManager

2019-04-10 Thread GitBox
zentol commented on issue #8138: [FLINK-12155][coordination] Remove legacy 
TaskManager
URL: https://github.com/apache/flink/pull/8138#issuecomment-481670047
 
 
   @flinkbot approve-until architecture


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12157) Update maven download links to point to mvn central

2019-04-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12157?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12157:
---
Labels: pull-request-available  (was: )

> Update maven download links to point to mvn central
> ---
>
> Key: FLINK-12157
> URL: https://issues.apache.org/jira/browse/FLINK-12157
> Project: Flink
>  Issue Type: Improvement
>  Components: Project Website
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
>
> On our download page we link to maven artifacts via {{repository.apache.org}}.
> Just today there was a discussion about this topic on the INFRA users mailing 
> list, the conclusion being that this is not allowed; we should point to mvn 
> central instead.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-3685) Logical error in code for DateSerializer deserialize with reuse

2019-04-10 Thread Liya Fan (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-3685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16814396#comment-16814396
 ] 

Liya Fan commented on FLINK-3685:
-

Hi [~bowen.zheng], thank you for opening this issue. The NPE indicates bugs in 
the system. However, I do not think it is DateSerializer's responsibility, 
because it is the caller's responsibility to create a reuse object, and make 
sure it is not null. The bug should be elsewhere.

Can you please provide the whole source code to reproduce the problem?

BTW, the latest code no longer uses -1 as an indicator for null. The latest 
code looks like this:

@Override
 public Date deserialize(Date reuse, DataInputView source) throws IOException {
 final long v = source.readLong();
 if (v == Long.MIN_VALUE) {
 return null;
 }
 reuse.setTime(v);
 return reuse;
 }

> Logical error in code for DateSerializer deserialize with reuse
> ---
>
> Key: FLINK-3685
> URL: https://issues.apache.org/jira/browse/FLINK-3685
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Affects Versions: 1.0.0
>Reporter: ZhengBowen
>Priority: Major
>
> There is a logical error in the following function in DateSerializer.java 
> when source read '-1'
> function is:
> {code}
> public Date deserialize(Date reuse, DataInputView source) throws IOException {
>   long v = source.readLong();
>   if(v == -1L) {
>   return null;
>   }
>   reuse.setTime(v);
>   return reuse;
> }
> {code}
> when call this function for first time, if return null, then 'reuse' will be 
> set null by caller;
> when call this function for second time,if 'v!=-1' ,reuse.setTime(v) will 
> throw NPE.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12157) Update maven download links to point to mvn central

2019-04-10 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-12157:


 Summary: Update maven download links to point to mvn central
 Key: FLINK-12157
 URL: https://issues.apache.org/jira/browse/FLINK-12157
 Project: Flink
  Issue Type: Improvement
  Components: Project Website
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler


On our download page we link to maven artifacts via {{repository.apache.org}}.

Just today there was a discussion about this topic on the INFRA users mailing 
list, the conclusion being that this is not allowed; we should point to mvn 
central instead.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-12123) Upgrade Jepsen to 0.1.13 in flink-jepsen

2019-04-10 Thread Gary Yao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao closed FLINK-12123.

Resolution: Fixed

1.9: c3bd1bd00961cd6165993f018ef59ae9379af482

> Upgrade Jepsen to 0.1.13 in flink-jepsen
> 
>
> Key: FLINK-12123
> URL: https://issues.apache.org/jira/browse/FLINK-12123
> Project: Flink
>  Issue Type: Improvement
>  Components: Test Infrastructure
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> As Debian Jessie has reached EOL, we need to raise version of the jepsen 
> dependency in {{flink-jepsen/project.clj}} to 0.1.13 to get support for 
> Debian Stretch.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-3685) Logical error in code for DateSerializer deserialize with reuse

2019-04-10 Thread Ji Liu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-3685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16814379#comment-16814379
 ] 

Ji Liu commented on FLINK-3685:
---

Since this problem is fixed in 
[FLINK-3856|https://issues.apache.org/jira/browse/FLINK-3856], so this issue 
can be closed.

> Logical error in code for DateSerializer deserialize with reuse
> ---
>
> Key: FLINK-3685
> URL: https://issues.apache.org/jira/browse/FLINK-3685
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Affects Versions: 1.0.0
>Reporter: ZhengBowen
>Priority: Major
>
> There is a logical error in the following function in DateSerializer.java 
> when source read '-1'
> function is:
> {code}
> public Date deserialize(Date reuse, DataInputView source) throws IOException {
>   long v = source.readLong();
>   if(v == -1L) {
>   return null;
>   }
>   reuse.setTime(v);
>   return reuse;
> }
> {code}
> when call this function for first time, if return null, then 'reuse' will be 
> set null by caller;
> when call this function for second time,if 'v!=-1' ,reuse.setTime(v) will 
> throw NPE.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] flinkbot edited a comment on issue #8140: [FLINK-12151][es1] Remove Elasticsearch 1 connector

2019-04-10 Thread GitBox
flinkbot edited a comment on issue #8140: [FLINK-12151][es1] Remove 
Elasticsearch 1 connector 
URL: https://github.com/apache/flink/pull/8140#issuecomment-481656785
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ✅ 2. There is [consensus] that the contribution should go into to Flink.
   - Approved by @zentol [PMC]
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   3   >