[GitHub] [flink] sunjincheng121 commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
sunjincheng121 commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other … URL: https://github.com/apache/flink/pull/8007#discussion_r274267157 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java ## @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.util.StringUtils; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * A generic catalog implementation that holds all meta objects in memory. + */ +public class GenericInMemoryCatalog implements ReadableWritableCatalog { + + public static final String DEFAULT_DB = "default"; + + private String currentDatabase = DEFAULT_DB; + + private final String catalogName; + private final Map databases; + private final Map tables; + + public GenericInMemoryCatalog(String name) { + checkArgument(!StringUtils.isNullOrWhitespaceOnly(name), "name cannot be null or empty"); + + this.catalogName = name; + this.databases = new LinkedHashMap<>(); + this.databases.put(DEFAULT_DB, new GenericCatalogDatabase(new HashMap<>())); + this.tables = new LinkedHashMap<>(); + } + + @Override + public String getCurrentDatabase() { + return currentDatabase; + } + + @Override + public void setCurrentDatabase(String databaseName) throws DatabaseNotExistException { + checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName)); + + if (!databaseExists(databaseName)) { + throw new DatabaseNotExistException(catalogName, databaseName); + } + + currentDatabase = databaseName; + } + + @Override + public void open() { + + } + + @Override + public void close() { + + } + + // -- databases -- + + @Override + public void createDatabase(String databaseName, CatalogDatabase db, boolean ignoreIfExists) + throws DatabaseAlreadyExistException { + checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName)); + checkArgument(db != null); + + if (databaseExists(databaseName)) { + if (!ignoreIfExists) { + throw new DatabaseAlreadyExistException(catalogName, databaseName); + } + } else { + databases.put(databaseName, db.copy()); + } + } + + @Override + public void dropDatabase(String databaseName, boolean ignoreIfNotExists) throws DatabaseNotExistException { + checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName)); + + if (databases.containsKey(databaseName)) { + + // Make sure the database is empty + if (isDatabaseEmpty(databaseName)) { + databases.remove(databaseName); + } else { + throw new DatabaseNotEmptyException(catalogName, databaseName); + } + } else if (!ignoreIfNotExists) { + throw new DatabaseNotExistException(catalogName, databaseName); + } + } + + private boolean isDatabaseEmpty(String databaseName) { +
[GitHub] [flink] hequn8128 commented on issue #8087: [FLINK-12029][table] Add column operations for TableApi
hequn8128 commented on issue #8087: [FLINK-12029][table] Add column operations for TableApi URL: https://github.com/apache/flink/pull/8087#issuecomment-481972396 @sunjincheng121 @dawidwys I have updated the PR. Would be great if you can take another look. :-) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on issue #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment
zhijiangW commented on issue #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment URL: https://github.com/apache/flink/pull/8090#issuecomment-481971372 @pnowojski , thanks for your review! I have rebased the master to solve the conflicts. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zentol commented on a change in pull request #8002: [FLINK-11923][metrics] MetricRegistryConfiguration provides MetricReporters Suppliers
zentol commented on a change in pull request #8002: [FLINK-11923][metrics] MetricRegistryConfiguration provides MetricReporters Suppliers URL: https://github.com/apache/flink/pull/8002#discussion_r274009874 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java ## @@ -113,19 +114,12 @@ public MetricRegistryImpl(MetricRegistryConfiguration config) { // by default, don't report anything LOG.info("No metrics reporter configured, no metrics will be exposed/reported."); } else { - // we have some reporters so - for (Tuple2 reporterConfiguration: reporterConfigurations) { - String namedReporter = reporterConfiguration.f0; - Configuration reporterConfig = reporterConfiguration.f1; - - final String className = reporterConfig.getString(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, null); - if (className == null) { - LOG.error("No reporter class set for reporter " + namedReporter + ". Metrics might not be exposed/reported."); - continue; - } + for (ReporterSetup reporterSetup : reporterConfigurations) { Review comment: I assumed you meant that the reporter class would have to hard-code the name, but you're suggesting that the reporter parses the interval and name from the configuration I guess. ~~So for one this wouldn't really work with the subsequent introduction of factories, which requires the MetricConfig to be assembled before the reporter is even instantiated. One could move the logic into the factory of course; my point is that this is not something we can do _now_.~~ Reporters (or factories for that matter) shouldn't be aware that reporter names actually exist; this is an implementation detail of the metric system to allow for distinct configurations. Whether multiple reporters exist or not is not irrelevant to a given reporter, so they shouldn't have to deal with it. I also don't see a way how a reporter would be able to determine it's own name from the given configuration. The system-wide configuration does not contain this info in an obvious fashion, and the metric config does not contain it at all. I can only see this working if we (the MetricRegistry/-Configuration) explicitly write it into the MetricConfig, but this obviously doesn't make any sense. Admittedly, the interval makes more sense. It is true that the current reporter configuration is a mix between reporter-facing options (like reporter-specific arguments) and system-facing options (like the interval). The current approach however allows us to ensure that certain configuration options exist and are actually applied; in a design where the `Scheduled` reporter provides the interval you cannot guarantee that the interval is configurable for users, or that a configured interval is respected. The same applies to other system-facing reporter options, like delimiters. So long as reporter don't massively go out of their way to avoid it (i.e. not working at all against the MetricGroup interface (with some currently existing exceptions)) the configured delimiter _will_ be used. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zentol commented on a change in pull request #8002: [FLINK-11923][metrics] MetricRegistryConfiguration provides MetricReporters Suppliers
zentol commented on a change in pull request #8002: [FLINK-11923][metrics] MetricRegistryConfiguration provides MetricReporters Suppliers URL: https://github.com/apache/flink/pull/8002#discussion_r274009874 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java ## @@ -113,19 +114,12 @@ public MetricRegistryImpl(MetricRegistryConfiguration config) { // by default, don't report anything LOG.info("No metrics reporter configured, no metrics will be exposed/reported."); } else { - // we have some reporters so - for (Tuple2 reporterConfiguration: reporterConfigurations) { - String namedReporter = reporterConfiguration.f0; - Configuration reporterConfig = reporterConfiguration.f1; - - final String className = reporterConfig.getString(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, null); - if (className == null) { - LOG.error("No reporter class set for reporter " + namedReporter + ". Metrics might not be exposed/reported."); - continue; - } + for (ReporterSetup reporterSetup : reporterConfigurations) { Review comment: I assumed you meant that the reporter class would have to hard-code the name, but you're suggesting that the reporter parses the interval and name from the configuration I guess. ~~So for one this wouldn't really work with the subsequent introduction of factories, which requires the MetricConfig to be assembled before the reporter is even instantiated. One could move the logic into the factory of course; my point is that this is not something we can do _now_.~ Reporters (or factories for that matter) shouldn't be aware that reporter names actually exist; this is an implementation detail of the metric system to allow for distinct configurations. Whether multiple reporters exist or not is not irrelevant to a given reporter, so they shouldn't have to deal with it. I also don't see a way how a reporter would be able to determine it's own name from the given configuration. The metric-wide configuration does not contain this info in an obvious fashion, and the metric config does not contain it at all. I can only see this working if we (the MetricRegistry/-Configuration) explicitly write it into the MetricConfig, but this obviously doesn't make any sense. Admittedly, the interval makes more sense. It is true that the current reporter configuration is a mix between reporter-facing options (like reporter-specific arguments) and system-facing options (like the interval). The current approach however allows us to ensure that certain configuration options exist and are actually applied; in a design where the `Scheduled` reporter provides the interval you cannot guarantee that the interval is configurable for users, or that a configured interval is respected. The same applies to other system-facing reporter options, like delimiters. So long as reporter don't massively go out of their way to avoid it (i.e. not working at all against the MetricGroup interface (with some currently existing exceptions)) the configured delimiter _will_ be used. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10929) Add support for Apache Arrow
[ https://issues.apache.org/jira/browse/FLINK-10929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16815073#comment-16815073 ] vinoyang commented on FLINK-10929: -- Hi [~fan_li_ya] If there is any design documentation, it would be better for discussing. And it may be a good start. > Add support for Apache Arrow > > > Key: FLINK-10929 > URL: https://issues.apache.org/jira/browse/FLINK-10929 > Project: Flink > Issue Type: Wish > Components: Runtime / State Backends >Reporter: Pedro Cardoso Silva >Priority: Minor > Attachments: image-2019-04-10-13-43-08-107.png > > > Investigate the possibility of adding support for Apache Arrow as a > standardized columnar, memory format for data. > Given the activity that [https://github.com/apache/arrow] is currently > getting and its claims objective of providing a zero-copy, standardized data > format across platforms, I think it makes sense for Flink to look into > supporting it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] hequn8128 commented on issue #8050: [FLINK-11067][table] Convert TableEnvironments to interfaces
hequn8128 commented on issue #8050: [FLINK-11067][table] Convert TableEnvironments to interfaces URL: https://github.com/apache/flink/pull/8050#issuecomment-481965728 @bowenli86 Thanks for your review and providing valuable information for the catalog. - Re TableEnvironment. Yes, you are right. Currently, we must use a java or scala Environment to register table or aggregate functions due to TypeInformations are handled differently between java and scala. However, I think this would be solved by FLIP-37. - Re listTables. Nice spoil alert. Looking forward to it. :-) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-12066) Remove StateSerializerProvider field in keyed state backend
[ https://issues.apache.org/jira/browse/FLINK-12066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai closed FLINK-12066. --- Resolution: Fixed Merged for 1.9.0: 1132a52a58c3710a9a183111fda84cd1432605fa > Remove StateSerializerProvider field in keyed state backend > --- > > Key: FLINK-12066 > URL: https://issues.apache.org/jira/browse/FLINK-12066 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Yu Li >Assignee: Yu Li >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > As mentioned in [PR review of > FLINK-10043|https://github.com/apache/flink/pull/7674#discussion_r257630962] > with Stefan and offline discussion with Gordon, after the refactoring work > serializer passed to {{RocksDBKeyedStateBackend}} constructor is a final one, > thus the {{StateSerializerProvider}} field is no longer needed. > For {{HeapKeyedStateBackend}}, the only thing stops us to pass a final > serializer is the circle dependency between the backend and > {{HeapRestoreOperation}}, and we aim to decouple them by introducing a new > {{HeapInternalKeyContext}} as the bridge. More details please refer to the > coming PR. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] asfgit closed pull request #8078: [FLINK-12066] [State Backends] Remove StateSerializerProvider field in keyed state backends
asfgit closed pull request #8078: [FLINK-12066] [State Backends] Remove StateSerializerProvider field in keyed state backends URL: https://github.com/apache/flink/pull/8078 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tzulitai commented on issue #8078: [FLINK-12066] [State Backends] Remove StateSerializerProvider field in keyed state backends
tzulitai commented on issue #8078: [FLINK-12066] [State Backends] Remove StateSerializerProvider field in keyed state backends URL: https://github.com/apache/flink/pull/8078#issuecomment-481962227 Thanks, merging .. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on issue #8110: [FLINK-12098] [table-planner-blink] Add support for generating optimized logical plan for simple group aggregate on stream
KurtYoung commented on issue #8110: [FLINK-12098] [table-planner-blink] Add support for generating optimized logical plan for simple group aggregate on stream URL: https://github.com/apache/flink/pull/8110#issuecomment-481956259 LGTM, +1 to merge This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11935) Remove DateTimeUtils pull-in and fix datetime casting problem
[ https://issues.apache.org/jira/browse/FLINK-11935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16815059#comment-16815059 ] vinoyang commented on FLINK-11935: -- [~walterddr] It seems I found the key issue, and we can not delete the {{DateTimeUtils}} class file directly. I have reported the issue see CALCITE-2989. > Remove DateTimeUtils pull-in and fix datetime casting problem > - > > Key: FLINK-11935 > URL: https://issues.apache.org/jira/browse/FLINK-11935 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Rong Rong >Assignee: vinoyang >Priority: Major > > This {{DateTimeUtils}} was pulled in in FLINK-7235. > Originally the time operation was not correctly done via the {{ymdToJulian}} > function before the date {{1970-01-01}} thus we need the fix. similar to > addressing this problem: > {code:java} > Optimized :1017-12-05 22:58:58.998 > Expected :1017-11-29 22:58:58.998 > Actual :1017-12-05 22:58:58.998 > {code} > > However, after pulling in avatica 1.13, I found out that the optimized plans > of the time operations are actually correct. it is in fact the casting part > that creates problem: > For example, the following: > *{{(plus(-12000.months, cast('2017-11-29 22:58:58.998', TIMESTAMP))}}* > result in a StringTestExpression of: > *{{CAST(1017-11-29 22:58:58.998):VARCHAR(65536) CHARACTER SET "UTF-16LE" > COLLATE "ISO-8859-1$en_US$primary" NOT NULL}}* > but the testing results are: > {code:java} > Optimized :1017-11-29 22:58:58.998 > Expected :1017-11-29 22:58:58.998 > Actual :1017-11-23 22:58:58.998 > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12140) Support e2e sort merge join operator in batch mode
[ https://issues.apache.org/jira/browse/FLINK-12140?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee updated FLINK-12140: - Summary: Support e2e sort merge join operator in batch mode (was: Support sort merge join it case run in batch mode) > Support e2e sort merge join operator in batch mode > -- > > Key: FLINK-12140 > URL: https://issues.apache.org/jira/browse/FLINK-12140 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Planner >Reporter: Jingsong Lee >Assignee: Jingsong Lee >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Complete BatchExecSortMergeJoin and support join it cases. > Support queries like "select a, b, c, e from T1, T2 where T1.a = T2.d" run in > batch mode -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] zjffdu commented on issue #8144: [FLINK-12159]. Enable YarnMiniCluster integration test under non-secure mode
zjffdu commented on issue #8144: [FLINK-12159]. Enable YarnMiniCluster integration test under non-secure mode URL: https://github.com/apache/flink/pull/8144#issuecomment-481952307 @tillrohrmann Could you help review it ? Thanks This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Comment Edited] (FLINK-11813) Standby per job mode Dispatchers don't know job's JobSchedulingStatus
[ https://issues.apache.org/jira/browse/FLINK-11813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16815027#comment-16815027 ] Zhu Zhu edited comment on FLINK-11813 at 4/11/19 3:30 AM: -- Hi Till, for your 2 questions above: 1. If one job with jobID xxx terminates, and later another job with the same jobID is submitted, I think Flink can regard it as a valid submission. Currently in our production use, there is a way that the client re-submit previously generated JobGraph to speed up the job launching, when the previous job is FAILED. In this case, job with the same ID are seen as different attempts. We did not handle the unexpected duplicated submission if the second submission comes after the first one is completed. Not sure in what case this may happen? 2. The process would be like this # submitting job -> setting status in RunningJobsRegistry to be pending in *Dispatcher* (null/NONE -> PENDING) # creating and launching JobManagerRunner which will try to acquire the HA leadership # once a JobManager is granted leadership, it changes the job status in RunningJobsRegistry to RUNNING and starts the JobMaster(or creating a new JobMaster as proposed in FLINK-11719) (PENDING -> RUNNING) # when this job terminates, the JobManager removes the job from the RunningJobsRegistry (RUNNING -> NONE) So if it is the first time to launch the JM, the job status is PENDING so the job will be started. If it is a second time leadership gaining, and the first is completed, the job status would be NONE. Besides, if JM failover happens during the PENDING/RUNNING status, the new leader will also restart the job. I totally agree that "the main problem is that the entries of the {{RunningJobsRegistry}} are bound to the lifecycle of the {{JobManagerRunner}}/{{Job}} instead of the {{Dispatcher"}}. I think the job submission in the Dispatcher is the beginning of lifecycle. I agree with your proposal too, which can well handle the unexpected submission duplications. A few questions for the proposal: 1. as in statement 5 the job status is changed to be RUNNING already in job submission, in statement 3 should we restart the job only if it is RUNNING? 2. in statement 7, if a Dispatcher terminates as expected(user stopping it, or job finishes in MiniDispatcher), the Dispatcher can safely clean the RunningJobsRegistry. Otherwise, in unexpected shutdowns, as [~Tison] said, it may be hard to decide whether this Dispatcher is the last to to shutdown. Should we keep the RunningJobsRegistry to avoid affecting running jobs in such corner cases. 3. Seems with this change, the JobManagerLeaderElectionService is not needed any more? was (Author: zhuzh): Hi Till, for your 2 questions above: 1. If one job with jobID xxx terminates, and later another job with the same jobID is submitted, I think Flink can regard it as a valid submission. Currently in our production use, there is a way that the client re-submit previously generated JobGraph to speed up the job launching, when the previous job is FAILED. In this case, job with the same ID are seen as different attempts. We did not handle the unexpected duplicated submission if the second submission comes after the first one is completed. Not sure in what case this may happen? 2. The process would be like this # submitting job -> setting status in RunningJobsRegistry to be pending in *Dispatcher* (null/NONE -> PENDING) # creating and launching JobManagerRunner which will try to acquire the HA leadership # once a JobManager is granted leadership, it changes the job status in RunningJobsRegistry to RUNNING and starts the JobMaster(or creating a new JobMaster as proposed in FLINK-11719) (PENDING -> RUNNING) # when this job terminates, the JobManager removes the job from the RunningJobsRegistry (RUNNING -> NONE) So if it is the first time to launch the JM, the job status is PENDING so the job will be started. If it is a second time leadership gaining, and the first is completed, the job status would be NONE. Besides, if JM failover happens during the PENDING/RUNNING status, the new leader will also restart the job. I totally agree that "the main problem is that the entries of the {{RunningJobsRegistry}} are bound to the lifecycle of the {{JobManagerRunner}}/{{Job}} instead of the {{Dispatcher"}}. I think the job submission in the Dispatcher is the beginning of lifecycle. I agree with your proposal too, which can well handle the unexpected submission duplications. One thing to confirm is that, as in stage 5 the job status is changed to be RUNNING already in job submission, in stage 3 should we restart the job only if it is RUNNING? > Standby per job mode Dispatchers don't know job's JobSchedulingStatus > - > >
[jira] [Commented] (FLINK-12140) Support sort merge join it case run in batch mode
[ https://issues.apache.org/jira/browse/FLINK-12140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16815052#comment-16815052 ] Kurt Young commented on FLINK-12140: you could just say support e2e join operator > Support sort merge join it case run in batch mode > - > > Key: FLINK-12140 > URL: https://issues.apache.org/jira/browse/FLINK-12140 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Planner >Reporter: Jingsong Lee >Assignee: Jingsong Lee >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Complete BatchExecSortMergeJoin and support join it cases. > Support queries like "select a, b, c, e from T1, T2 where T1.a = T2.d" run in > batch mode -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] hequn8128 commented on a change in pull request #8087: [FLINK-12029][table] Add column operations for TableApi
hequn8128 commented on a change in pull request #8087: [FLINK-12029][table] Add column operations for TableApi URL: https://github.com/apache/flink/pull/8087#discussion_r274246117 ## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/tableImpl.scala ## @@ -630,18 +672,25 @@ class OverWindowedTableImpl( overWindows) } + private def expandColumnsInOverWindow(overWindows: Seq[OverWindow]): Seq[OverWindow] = { +overWindows.map { e => + val expanedPartitioning = tableImpl.resolveCalls(e.getPartitioning) Review comment: Remove this method due to #8062 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] hequn8128 commented on a change in pull request #8087: [FLINK-12029][table] Add column operations for TableApi
hequn8128 commented on a change in pull request #8087: [FLINK-12029][table] Add column operations for TableApi URL: https://github.com/apache/flink/pull/8087#discussion_r274245850 ## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/tableImpl.scala ## @@ -477,6 +484,18 @@ class TableImpl( private def wrap(operation: TableOperation): Table = { new TableImpl(tableEnv, operation) } + + private[flink] def getOutputFieldReferences: Seq[UnresolvedFieldReferenceExpression] = { +operationTree.asInstanceOf[LogicalNode] + .output.map(a => new UnresolvedFieldReferenceExpression(a.name)) + } + + private[flink] def resolveCalls(fields: Seq[Expression]): Seq[Expression] = { +val outputFieldReferences = operationTree.asInstanceOf[LogicalNode] + .output.map(a => new UnresolvedFieldReferenceExpression(a.name)) +val expander = new ColumnsOperationExpander(outputFieldReferences) Review comment: Due to the refactor of #8062, the `getOutputFieldReferences()` method will not be used by other places, so I merged `getOutputFieldReferences()` and `resolveCalls()` into one. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Comment Edited] (FLINK-11813) Standby per job mode Dispatchers don't know job's JobSchedulingStatus
[ https://issues.apache.org/jira/browse/FLINK-11813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16815027#comment-16815027 ] Zhu Zhu edited comment on FLINK-11813 at 4/11/19 2:47 AM: -- Hi Till, for your 2 questions above: 1. If one job with jobID xxx terminates, and later another job with the same jobID is submitted, I think Flink can regard it as a valid submission. Currently in our production use, there is a way that the client re-submit previously generated JobGraph to speed up the job launching, when the previous job is FAILED. In this case, job with the same ID are seen as different attempts. We did not handle the unexpected duplicated submission if the second submission comes after the first one is completed. Not sure in what case this may happen? 2. The process would be like this # submitting job -> setting status in RunningJobsRegistry to be pending in *Dispatcher* (null/NONE -> PENDING) # creating and launching JobManagerRunner which will try to acquire the HA leadership # once a JobManager is granted leadership, it changes the job status in RunningJobsRegistry to RUNNING and starts the JobMaster(or creating a new JobMaster as proposed in FLINK-11719) (PENDING -> RUNNING) # when this job terminates, the JobManager removes the job from the RunningJobsRegistry (RUNNING -> NONE) So if it is the first time to launch the JM, the job status is PENDING so the job will be started. If it is a second time leadership gaining, and the first is completed, the job status would be NONE. Besides, if JM failover happens during the PENDING/RUNNING status, the new leader will also restart the job. I totally agree that "the main problem is that the entries of the {{RunningJobsRegistry}} are bound to the lifecycle of the {{JobManagerRunner}}/{{Job}} instead of the {{Dispatcher"}}. I think the job submission in the Dispatcher is the beginning of lifecycle. I agree with your proposal too, which can well handle the unexpected submission duplications. One thing to confirm is that, as in stage 5 the job status is changed to be RUNNING already in job submission, in stage 3 should we restart the job only if it is RUNNING? was (Author: zhuzh): Hi Till, for your 2 questions above: 1. If one job with jobID xxx terminates, and later another job with the same jobID is submitted, I think Flink can regard it as a valid submission. Currently in our production use, there is a way that the client re-submit previously generated JobGraph to speed up the job launching, when the previous job is FAILED. In this case, job with the same ID are seen as different attempts. We did not handle the unexpected duplicated submission if the second submission comes after the first one is completed. Not sure in what case this may happen? 2. The process would be like this # submitting job -> setting status in RunningJobsRegistry to be pending in *Dispatcher* (null/NONE -> PENDING) # creating and launching JobManagerRunner which will try to acquire the HA leadership # once a JobManager is granted leadership, it changes the job status in RunningJobsRegistry to RUNNING and starts the JobMaster(or creating a new JobMaster as proposed in FLINK-11719) (PENDING -> RUNNING) # when this job terminates, the JobManager removes the job from the RunningJobsRegistry (RUNNING -> NONE) So if it is the first time to launch the JM, the job status is PENDING so the job will be started. If it is a second time leadership gaining, and the first is completed, the job status would be NONE. Besides, if JM failover happens during the PENDING/RUNNING status, the new leader will also restart the job. I totally agree that "the main problem is that the entries of the {{RunningJobsRegistry}} are bound to the lifecycle of the {{JobManagerRunner}}/{{Job}} instead of the {{Dispatcher"}}. I think the job submission in the Dispatcher is the beginning of lifecycle. I think with your proposal if fine, which can handle the unexpected submission duplications. One thing to confirm is that, as in stage 5 the job status is changed to be RUNNING already in job submission, in stage 3 should we restart the job only if it is RUNNING? > Standby per job mode Dispatchers don't know job's JobSchedulingStatus > - > > Key: FLINK-11813 > URL: https://issues.apache.org/jira/browse/FLINK-11813 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.6.4, 1.7.2, 1.8.0 >Reporter: Till Rohrmann >Priority: Major > > At the moment, it can happen that standby {{Dispatchers}} in per job mode > will restart a terminated job after they gained leadership. The problem is > that we currently clear the
[GitHub] [flink] zhijiangW commented on issue #8136: [FLINK-12154][network] Remove legacy fields for SingleInputGate
zhijiangW commented on issue #8136: [FLINK-12154][network] Remove legacy fields for SingleInputGate URL: https://github.com/apache/flink/pull/8136#issuecomment-481943959 Thanks for your review @azagrebin . I checked the travis result and the failure is not related to my changes. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11813) Standby per job mode Dispatchers don't know job's JobSchedulingStatus
[ https://issues.apache.org/jira/browse/FLINK-11813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16815027#comment-16815027 ] Zhu Zhu commented on FLINK-11813: - Hi Till, for your 2 questions above: 1. If one job with jobID xxx terminates, and later another job with the same jobID is submitted, I think Flink can regard it as a valid submission. Currently in our production use, there is a way that the client re-submit previously generated JobGraph to speed up the job launching, when the previous job is FAILED. In this case, job with the same ID are seen as different attempts. We did not handle the unexpected duplicated submission if the second submission comes after the first one is completed. Not sure in what case this may happen? 2. The process would be like this # submitting job -> setting status in RunningJobsRegistry to be pending in *Dispatcher* (null/NONE -> PENDING) # creating and launching JobManagerRunner which will try to acquire the HA leadership # once a JobManager is granted leadership, it changes the job status in RunningJobsRegistry to RUNNING and starts the JobMaster(or creating a new JobMaster as proposed in FLINK-11719) (PENDING -> RUNNING) # when this job terminates, the JobManager removes the job from the RunningJobsRegistry (RUNNING -> NONE) So if it is the first time to launch the JM, the job status is PENDING so the job will be started. If it is a second time leadership gaining, and the first is completed, the job status would be NONE. Besides, if JM failover happens during the PENDING/RUNNING status, the new leader will also restart the job. I totally agree that "the main problem is that the entries of the {{RunningJobsRegistry}} are bound to the lifecycle of the {{JobManagerRunner}}/{{Job}} instead of the {{Dispatcher"}}. I think the job submission in the Dispatcher is the beginning of lifecycle. I think with your proposal if fine, which can handle the unexpected submission duplications. One thing to confirm is that, as in stage 5 the job status is changed to be RUNNING already in job submission, in stage 3 should we restart the job only if it is RUNNING? > Standby per job mode Dispatchers don't know job's JobSchedulingStatus > - > > Key: FLINK-11813 > URL: https://issues.apache.org/jira/browse/FLINK-11813 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.6.4, 1.7.2, 1.8.0 >Reporter: Till Rohrmann >Priority: Major > > At the moment, it can happen that standby {{Dispatchers}} in per job mode > will restart a terminated job after they gained leadership. The problem is > that we currently clear the {{RunningJobsRegistry}} once a job has reached a > globally terminal state. After the leading {{Dispatcher}} terminates, a > standby {{Dispatcher}} will gain leadership. Without having the information > from the {{RunningJobsRegistry}} it cannot tell whether the job has been > executed or whether the {{Dispatcher}} needs to re-execute the job. At the > moment, the {{Dispatcher}} will assume that there was a fault and hence > re-execute the job. This can lead to duplicate results. > I think we need some way to tell standby {{Dispatchers}} that a certain job > has been successfully executed. One trivial solution could be to not clean up > the {{RunningJobsRegistry}} but then we will clutter ZooKeeper. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] carp84 commented on issue #8078: [FLINK-12066] [State Backends] Remove StateSerializerProvider field in keyed state backends
carp84 commented on issue #8078: [FLINK-12066] [State Backends] Remove StateSerializerProvider field in keyed state backends URL: https://github.com/apache/flink/pull/8078#issuecomment-481940313 Checking the travis log and the error (as shown below) is irrelative to change here, will rebase on latest master and force push to trigger another check. > gzip: stdin: not in gzip format tar: Child returned status 1 tar: Error is not recoverable: exiting now [FAIL] Test script contains errors. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] jonpollock edited a comment on issue #7458: [FLINK-11302] FlinkS3FileSystem uses an incorrect path for temporary files.
jonpollock edited a comment on issue #7458: [FLINK-11302] FlinkS3FileSystem uses an incorrect path for temporary files. URL: https://github.com/apache/flink/pull/7458#issuecomment-481939883 I know this is already closed and merged, but I'd just like to point out that the constructor of the FlinkS3FileSystem calls RefCountedTmpFileCreator.inDirectories, passing it a single File object. This method takes multiple files and each will be used as a way of spreading the temp files over multiple directories. The change you applied, limits this to just the first directory in the LOCAL_DIRS. I understand that you probably didn't want to change the Constructor signature, but I think that is only used in AbstractS3FileSystemFactory. In any case, thank you for your work and this solve a problem I've had for a few hours now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] jonpollock commented on issue #7458: [FLINK-11302] FlinkS3FileSystem uses an incorrect path for temporary files.
jonpollock commented on issue #7458: [FLINK-11302] FlinkS3FileSystem uses an incorrect path for temporary files. URL: https://github.com/apache/flink/pull/7458#issuecomment-481939883 I know this is already closed and merged, but I'd just like to point out that the constructor of the FlinkS3FileSystem calls RefCountedTmpFileCreator.inDirectories, passing it a single File object. This method takes multiple files and each will be used as a way of spreading the temp files over multiple directories. The change you applied, limits this to just the first directory in the LOCAL_DIRS. I understand that you probably didn't want to change the Constructor signature, but I think that is only used in AbstractS3FileSystemFactory. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zjffdu commented on a change in pull request #8038: [FLINK-11953] Introduce Plugin/Loading system and integrate it with FileSystem
zjffdu commented on a change in pull request #8038: [FLINK-11953] Introduce Plugin/Loading system and integrate it with FileSystem URL: https://github.com/apache/flink/pull/8038#discussion_r274236430 ## File path: flink-core/src/main/java/org/apache/flink/core/plugin/DirectoryBasedPluginFinder.java ## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.plugin; + +import org.apache.flink.util.function.FunctionUtils; + +import java.io.IOException; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.PathMatcher; +import java.util.Arrays; +import java.util.Collection; +import java.util.Comparator; +import java.util.stream.Collectors; + +/** + * This class is used to create a collection of {@link PluginDescriptor} based on directory structure for a given plugin + * root folder. + * + * The expected structure is as follows: the given plugins root folder, containing the plugins folder. One plugin folder + * contains all resources (jar files) belonging to a plugin. The name of the plugin folder becomes the plugin id. + * + * plugins-root-folder/ + *|plugin-a/ (folder of plugin a) + *||-plugin-a-1.jar (the jars containing the classes of plugin a) + *||-plugin-a-2.jar + *||-... + *| + *|plugin-b/ + *||-plugin-b-1.jar + * ... |-... + * Review comment: Another concern is that whether this plugin finder mechanism works in JM/TM ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment
zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment URL: https://github.com/apache/flink/pull/8090#discussion_r274236223 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java ## @@ -105,6 +106,302 @@ public NettyConfig nettyConfig() { return nettyConfig; } + public boolean isCreditBased() { + return isCreditBased; + } + + // + + /** +* Utility method to extract network related parameters from the configuration and to +* sanity check them. +* +* @param configuration configuration object +* @param maxJvmHeapMemory the maximum JVM heap size (in bytes) +* @param localTaskManagerCommunication true, to skip initializing the network stack +* @param taskManagerAddress identifying the IP address under which the TaskManager will be accessible +* @return NetworkEnvironmentConfiguration +*/ + @Deprecated + public static NetworkEnvironmentConfiguration fromConfiguration( + Configuration configuration, + long maxJvmHeapMemory, + boolean localTaskManagerCommunication, + InetAddress taskManagerAddress) { + + // > hosts / ports for communication and data exchange + + final int dataport = configuration.getInteger(TaskManagerOptions.DATA_PORT); + ConfigurationParserUtils.checkConfigParameter(dataport >= 0, dataport, TaskManagerOptions.DATA_PORT.key(), + "Leave config parameter empty or use 0 to let the system choose a port automatically."); + + final int pageSize = ConfigurationParserUtils.getPageSize(configuration); + + final int numNetworkBuffers; + if (!hasNewNetworkConfig(configuration)) { + // fallback: number of network buffers + numNetworkBuffers = configuration.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS); + + checkOldNetworkConfig(numNetworkBuffers); + } else { + if (configuration.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) { + LOG.info("Ignoring old (but still present) network buffer configuration via {}.", + TaskManagerOptions.NETWORK_NUM_BUFFERS.key()); + } + + final long networkMemorySize = calculateNewNetworkBufferMemory(configuration, maxJvmHeapMemory); + + // tolerate offcuts between intended and allocated memory due to segmentation (will be available to the user-space memory) + long numNetworkBuffersLong = networkMemorySize / pageSize; + if (numNetworkBuffersLong > Integer.MAX_VALUE) { + throw new IllegalArgumentException("The given number of memory bytes (" + networkMemorySize + + ") corresponds to more than MAX_INT pages."); + } + numNetworkBuffers = (int) numNetworkBuffersLong; + } + + final NettyConfig nettyConfig; + if (!localTaskManagerCommunication) { + final InetSocketAddress taskManagerInetSocketAddress = new InetSocketAddress(taskManagerAddress, dataport); + + nettyConfig = new NettyConfig(taskManagerInetSocketAddress.getAddress(), taskManagerInetSocketAddress.getPort(), + pageSize, ConfigurationParserUtils.getSlot(configuration), configuration); + } else { + nettyConfig = null; + } + + int initialRequestBackoff = configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL); + int maxRequestBackoff = configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX); + + int buffersPerChannel = configuration.getInteger(TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL); + int extraBuffersPerGate = configuration.getInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE); + + boolean isCreditBased = configuration.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL); + + return new NetworkEnvironmentConfiguration( + numNetworkBuffers, + pageSize, + initialRequestBackoff, + maxRequestBackoff, + buffersPerChannel, + extraBuffersPerGate, + isCreditBased, + nettyConfig); + } + + /** +*
[GitHub] [flink] zjffdu commented on a change in pull request #8038: [FLINK-11953] Introduce Plugin/Loading system and integrate it with FileSystem
zjffdu commented on a change in pull request #8038: [FLINK-11953] Introduce Plugin/Loading system and integrate it with FileSystem URL: https://github.com/apache/flink/pull/8038#discussion_r274235386 ## File path: flink-core/src/main/java/org/apache/flink/core/plugin/PluginUtils.java ## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.plugin; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.Collection; +import java.util.Collections; + +/** + * Utility functions for the plugin mechanism. + */ +public final class PluginUtils { Review comment: I also agree with @pnowojski to merge it into `PluginManager`. From a point viewer of plugin user, I would expect there's one only entry point to use plugin. So basically, I suppose it involves following 2 steps: 1. init PluginManager 2. create Plugin via PluginManager > some code could be interested in having multiple plugin manager or explicitly creating and passing in plugin managers. I am sure why user would do that. Only one scenario I can imagine is that user want to create a customized PluginManager explicitly for unit test. And I think the only thing to customize PluginManager is its plugin root folder, this could be set by a lot approaches like java property or system environment, so I think PluginManger itself could do that, we don't need PluginUtil for this. Maybe my understanding is wrong, just my 2 cents. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10929) Add support for Apache Arrow
[ https://issues.apache.org/jira/browse/FLINK-10929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16815009#comment-16815009 ] Liya Fan commented on FLINK-10929: -- We have imported Arrow in our efforts to vectorize Blink batch jobs. It is an incremental change, which can be easily turned off with a single flag, and it does not affect other parts of the code base. [~fhueske], do you think it is a good time to make some initial attempts to incorporate such changes now ? :) > Add support for Apache Arrow > > > Key: FLINK-10929 > URL: https://issues.apache.org/jira/browse/FLINK-10929 > Project: Flink > Issue Type: Wish > Components: Runtime / State Backends >Reporter: Pedro Cardoso Silva >Priority: Minor > Attachments: image-2019-04-10-13-43-08-107.png > > > Investigate the possibility of adding support for Apache Arrow as a > standardized columnar, memory format for data. > Given the activity that [https://github.com/apache/arrow] is currently > getting and its claims objective of providing a zero-copy, standardized data > format across platforms, I think it makes sense for Flink to look into > supporting it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Issue Comment Deleted] (FLINK-11654) Multiple transactional KafkaProducers writing to same cluster have clashing transaction IDs
[ https://issues.apache.org/jira/browse/FLINK-11654?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shengnan YU updated FLINK-11654: Comment: was deleted (was: Hi, I got the same problem, any updated solution here?) > Multiple transactional KafkaProducers writing to same cluster have clashing > transaction IDs > --- > > Key: FLINK-11654 > URL: https://issues.apache.org/jira/browse/FLINK-11654 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.7.1 >Reporter: Jürgen Kreileder >Priority: Major > Fix For: 1.9.0 > > > We run multiple jobs on a cluster which write a lot to the same Kafka topic > from identically named sinks. When EXACTLY_ONCE semantic is enabled for the > KafkaProducers we run into a lot of ProducerFencedExceptions and all jobs go > into a restart cycle. > Example exception from the Kafka log: > > {code:java} > [2019-02-18 18:05:28,485] ERROR [ReplicaManager broker=1] Error processing > append operation on partition finding-commands-dev-1-0 > (kafka.server.ReplicaManager) > org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is > no longer valid. There is probably another producer with a newer epoch. 483 > (request epoch), 484 (server epoch) > {code} > The reason for this is the way FlinkKafkaProducer initializes the > TransactionalIdsGenerator: > The IDs are only guaranteed to be unique for a single Job. But they can clash > between different Jobs (and Clusters). > > > {code:java} > --- > a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java > +++ > b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java > @@ -819,6 +819,7 @@ public class FlinkKafkaProducer > nextTransactionalIdHintState = > context.getOperatorStateStore().getUnionListState( > NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR); > transactionalIdsGenerator = new TransactionalIdsGenerator( > + // the prefix probably should include job id and maybe cluster id > getRuntimeContext().getTaskName() + "-" + > ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(), > getRuntimeContext().getIndexOfThisSubtask(), > > getRuntimeContext().getNumberOfParallelSubtasks(),{code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11654) Multiple transactional KafkaProducers writing to same cluster have clashing transaction IDs
[ https://issues.apache.org/jira/browse/FLINK-11654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16815006#comment-16815006 ] Shengnan YU commented on FLINK-11654: - Hi, I got the same problem, any updated solution here? > Multiple transactional KafkaProducers writing to same cluster have clashing > transaction IDs > --- > > Key: FLINK-11654 > URL: https://issues.apache.org/jira/browse/FLINK-11654 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.7.1 >Reporter: Jürgen Kreileder >Priority: Major > Fix For: 1.9.0 > > > We run multiple jobs on a cluster which write a lot to the same Kafka topic > from identically named sinks. When EXACTLY_ONCE semantic is enabled for the > KafkaProducers we run into a lot of ProducerFencedExceptions and all jobs go > into a restart cycle. > Example exception from the Kafka log: > > {code:java} > [2019-02-18 18:05:28,485] ERROR [ReplicaManager broker=1] Error processing > append operation on partition finding-commands-dev-1-0 > (kafka.server.ReplicaManager) > org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is > no longer valid. There is probably another producer with a newer epoch. 483 > (request epoch), 484 (server epoch) > {code} > The reason for this is the way FlinkKafkaProducer initializes the > TransactionalIdsGenerator: > The IDs are only guaranteed to be unique for a single Job. But they can clash > between different Jobs (and Clusters). > > > {code:java} > --- > a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java > +++ > b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java > @@ -819,6 +819,7 @@ public class FlinkKafkaProducer > nextTransactionalIdHintState = > context.getOperatorStateStore().getUnionListState( > NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR); > transactionalIdsGenerator = new TransactionalIdsGenerator( > + // the prefix probably should include job id and maybe cluster id > getRuntimeContext().getTaskName() + "-" + > ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(), > getRuntimeContext().getIndexOfThisSubtask(), > > getRuntimeContext().getNumberOfParallelSubtasks(),{code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] zjffdu commented on a change in pull request #8038: [FLINK-11953] Introduce Plugin/Loading system and integrate it with FileSystem
zjffdu commented on a change in pull request #8038: [FLINK-11953] Introduce Plugin/Loading system and integrate it with FileSystem URL: https://github.com/apache/flink/pull/8038#discussion_r274232242 ## File path: flink-core/src/main/java/org/apache/flink/core/plugin/DirectoryBasedPluginFinder.java ## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.plugin; + +import org.apache.flink.util.function.FunctionUtils; + +import java.io.IOException; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.PathMatcher; +import java.util.Arrays; +import java.util.Collection; +import java.util.Comparator; +import java.util.stream.Collectors; + +/** + * This class is used to create a collection of {@link PluginDescriptor} based on directory structure for a given plugin + * root folder. + * + * The expected structure is as follows: the given plugins root folder, containing the plugins folder. One plugin folder + * contains all resources (jar files) belonging to a plugin. The name of the plugin folder becomes the plugin id. + * + * plugins-root-folder/ + *|plugin-a/ (folder of plugin a) + *||-plugin-a-1.jar (the jars containing the classes of plugin a) + *||-plugin-a-2.jar + *||-... + *| + *|plugin-b/ + *||-plugin-b-1.jar + * ... |-... + * Review comment: Would it better to add another layer of plugin type ? Something like this: plugins/FileSystem/HadoopFileSystem plugins/FileSystem/S3FileSystem plugins/Reporter/Reporter1 plugins/Reporter/Reporter2 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12160) Support for transferring user config files on kubernetes
[ https://issues.apache.org/jira/browse/FLINK-12160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yang Wang updated FLINK-12160: -- Issue Type: Sub-task (was: New Feature) Parent: FLINK-9953 > Support for transferring user config files on kubernetes > > > Key: FLINK-12160 > URL: https://issues.apache.org/jira/browse/FLINK-12160 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: Yang Wang >Priority: Major > > When submit a flink job on kubernetes, the jars will be transferred through > docker images or flink blob server. Also we will need a way to transfer > config files, such as hdfs-site.xml/core-site.xml. It could be saved in > config map in etcd and then mounted to jobmanager and taskmanager pod. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12160) Support for transferring user config files on kubernetes
Yang Wang created FLINK-12160: - Summary: Support for transferring user config files on kubernetes Key: FLINK-12160 URL: https://issues.apache.org/jira/browse/FLINK-12160 Project: Flink Issue Type: New Feature Components: Runtime / Coordination Reporter: Yang Wang When submit a flink job on kubernetes, the jars will be transferred through docker images or flink blob server. Also we will need a way to transfer config files, such as hdfs-site.xml/core-site.xml. It could be saved in config map in etcd and then mounted to jobmanager and taskmanager pod. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-11799) KryoSerializer/OperatorChain ignores copy failure resulting in NullPointerException
[ https://issues.apache.org/jira/browse/FLINK-11799?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Liya Fan reassigned FLINK-11799: Assignee: Liya Fan > KryoSerializer/OperatorChain ignores copy failure resulting in > NullPointerException > --- > > Key: FLINK-11799 > URL: https://issues.apache.org/jira/browse/FLINK-11799 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.7.2 >Reporter: Jason Kania >Assignee: Liya Fan >Priority: Major > > I was encountering a problem with NullPointerExceptions with the deserialized > object reaching my ProcessFunction process() method implementation as a null > value. Upon investigation, I discovered two issues with the implementation of > the KryoSerializer copy(). > 1) The 'public T copy(T from)' method swallows the error if the kryo copy() > call generates an exception. The code should report the copy error at least > once as a warning to be aware that the kryo copy() is failing. I understand > that the code is there to handle the lack of a copy implementation but due to > the potential inefficiency of having to write and read the object instead of > copying it, this would seem useful information to share at the least. It is > also important to have a warning in case the cause of the copy error is > something that needs to be fixed. > 2) The call to 'kryo.readObject(input, from.getClass())' does not handle the > fact that the kryo readObject(Input input, Class aClass) method may return a > null value if there are any issues. This could be handled with a check or > warning in the OperatorChain.CopyingChainingOutput.pushToOperator() method > but is also ignored there, allowing a null value to be passed along without > providing any reason for the null value in logging. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] flinkbot commented on issue #8145: Inst 1.8.0
flinkbot commented on issue #8145: Inst 1.8.0 URL: https://github.com/apache/flink/pull/8145#issuecomment-481884659 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] DarqueBox opened a new pull request #8145: Inst 1.8.0
DarqueBox opened a new pull request #8145: Inst 1.8.0 URL: https://github.com/apache/flink/pull/8145 ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #8136: [FLINK-12154][network] Remove legacy fields for SingleInputGate
flinkbot edited a comment on issue #8136: [FLINK-12154][network] Remove legacy fields for SingleInputGate URL: https://github.com/apache/flink/pull/8136#issuecomment-481642495 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ✅ 1. The [description] looks good. - Approved by @azagrebin * ✅ 2. There is [consensus] that the contribution should go into to Flink. - Approved by @azagrebin * ❗ 3. Needs [attention] from. - Needs attention by @pnowojski [committer] * ✅ 4. The change fits into the overall [architecture]. - Approved by @azagrebin * ✅ 5. Overall code [quality] is good. - Approved by @azagrebin Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on issue #8136: [FLINK-12154][network] Remove legacy fields for SingleInputGate
azagrebin commented on issue #8136: [FLINK-12154][network] Remove legacy fields for SingleInputGate URL: https://github.com/apache/flink/pull/8136#issuecomment-481822687 @flinkbot approve all @flinkbot attention @pnowojski This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10049) Unify the processing logic for NULL arguments in SQL built-in functions
[ https://issues.apache.org/jira/browse/FLINK-10049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16814703#comment-16814703 ] Hequn Cheng commented on FLINK-10049: - Yes, I meant to unify the logic for the two problems I listed. Whether return null or throw NPE depends on the detailed logic of UDFs and may vary differently. What we have to do is to make sure the semantics and try to avoid exception if null is ok. :-) +1 to rename the title. Best, Hequn > Unify the processing logic for NULL arguments in SQL built-in functions > --- > > Key: FLINK-10049 > URL: https://issues.apache.org/jira/browse/FLINK-10049 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Reporter: Xingcan Cui >Assignee: vinoyang >Priority: Major > > Currently, the built-in functions treat NULL arguments in different ways. > E.g., ABS(NULL) returns NULL, while LOG10(NULL) throws an NPE. The general > SQL-way of handling NULL values should be that if one argument is NULL the > result is NULL. We should unify the processing logic for that. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] xuefuz commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
xuefuz commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other … URL: https://github.com/apache/flink/pull/8007#discussion_r274073139 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableWritableCatalog.java ## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; + +/** + * An interface responsible for manipulating catalog metadata. + */ +public interface ReadableWritableCatalog extends ReadableCatalog { + + // -- databases -- + + /** +* Create a database. +* +* @param name Name of the database to be created +* @param database The database definition +* @param ignoreIfExists Flag to specify behavior when a database with the given name already exists: +* if set to false, throw a DatabaseAlreadyExistException, +* if set to true, do nothing. +* @throws DatabaseAlreadyExistException if the given database already exists and ignoreIfExists is false +*/ + void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists) + throws DatabaseAlreadyExistException; + + /** +* Drop a database. +* +* @param name Name of the database to be dropped. +* @param ignoreIfNotExists Flag to specify behavior when the database does not exist: +* if set to false, throw an exception, +* if set to true, do nothing. +* @throws DatabaseNotExistException if the given database does not exist +*/ + void dropDatabase(String name, boolean ignoreIfNotExists) throws DatabaseNotExistException; + + /** +* Modify an existing database. +* +* @param nameName of the database to be modified +* @param newDatabaseThe new database definition +* @param ignoreIfNotExists Flag to specify behavior when the given database does not exist: +* if set to false, throw an exception, +* if set to true, do nothing. +* @throws DatabaseNotExistException if the given database does not exist +*/ + void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists) + throws DatabaseNotExistException; + + // -- tables and views -- + + /** +* Drop a table or view. +* +* @param tablePath Path of the table or view to be dropped +* @param ignoreIfNotExists Flag to specify behavior when the table or view does not exist: +* if set to false, throw an exception, +* if set to true, do nothing. +* @throws TableNotExistException if the table or view does not exist +*/ + void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) throws TableNotExistException; + + /** +* Rename an existing table or view. +* +* @param tablePath Path of the table or view to rename +* @param newTableName the new name of the table or view +* @param ignoreIfNotExists Flag to specify behavior when the table or view does not exist: +* if set to false, throw an exception, +* if set to true, do nothing. +* @throws TableNotExistException if the table does not exist +* @throws DatabaseNotExistException if the database in tablePath to doesn't exist +*/ + void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) +
[GitHub] [flink] piyushnarang commented on a change in pull request #8117: [FLINK-12115] [filesystems]: Add support for AzureFS
piyushnarang commented on a change in pull request #8117: [FLINK-12115] [filesystems]: Add support for AzureFS URL: https://github.com/apache/flink/pull/8117#discussion_r274070476 ## File path: flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AbstractAzureFSFactory.java ## @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.azurefs; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.FileSystemFactory; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Abstract factory for AzureFS. Subclasses override to specify + * the correct scheme (wasb / wasbs). + */ +public abstract class AbstractAzureFSFactory implements FileSystemFactory { + private static final Logger LOG = LoggerFactory.getLogger(AzureFSFactory.class); + + private Configuration flinkConfig; + + @Override + public void configure(Configuration config) { + flinkConfig = config; Review comment: Yeah I was able to test this out and verify it works ok. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #8002: [FLINK-11923][metrics] MetricRegistryConfiguration provides MetricReporters Suppliers
tillrohrmann commented on a change in pull request #8002: [FLINK-11923][metrics] MetricRegistryConfiguration provides MetricReporters Suppliers URL: https://github.com/apache/flink/pull/8002#discussion_r274057620 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java ## @@ -113,19 +114,12 @@ public MetricRegistryImpl(MetricRegistryConfiguration config) { // by default, don't report anything LOG.info("No metrics reporter configured, no metrics will be exposed/reported."); } else { - // we have some reporters so - for (Tuple2 reporterConfiguration: reporterConfigurations) { - String namedReporter = reporterConfiguration.f0; - Configuration reporterConfig = reporterConfiguration.f1; - - final String className = reporterConfig.getString(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, null); - if (className == null) { - LOG.error("No reporter class set for reporter " + namedReporter + ". Metrics might not be exposed/reported."); - continue; - } + for (ReporterSetup reporterSetup : reporterConfigurations) { + final String namedReporter = reporterSetup.getName(); + final MetricConfig metricConfig = reporterSetup.getConfiguration(); try { - String configuredPeriod = reporterConfig.getString(ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, null); + String configuredPeriod = metricConfig.getString(ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, null); Review comment: I think in general it is better to offer a restricted API because it usually simplifies things. Here we have to provide the correct key in order to find the right value. From a user's perspective who does not know the details of how things are stored in `MetricConfig` it would be easier to simply call `MetricConfig#getReporterInterval`. It would even be easier to call `ScheduledMetricReporter#getReportertInterval` to relate this discussion to my other point. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] Myasuka commented on a change in pull request #8117: [FLINK-12115] [filesystems]: Add support for AzureFS
Myasuka commented on a change in pull request #8117: [FLINK-12115] [filesystems]: Add support for AzureFS URL: https://github.com/apache/flink/pull/8117#discussion_r274054284 ## File path: flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AbstractAzureFSFactory.java ## @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.azurefs; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.FileSystemFactory; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Abstract factory for AzureFS. Subclasses override to specify + * the correct scheme (wasb / wasbs). + */ +public abstract class AbstractAzureFSFactory implements FileSystemFactory { + private static final Logger LOG = LoggerFactory.getLogger(AzureFSFactory.class); + + private Configuration flinkConfig; + + @Override + public void configure(Configuration config) { + flinkConfig = config; Review comment: hmm, previously I thought you did not take into the `hadoopConfig`, but then I found you just follow the `MapRFsFactory`'s way to instantiate the hadoop configuration when creating the file system instead of instantiating them within `FileSystemFactory#create(URI fsUri)` before creating the file system like other file factories did. If this works fine, it would be okay to do so. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-12153) 提交flink job到flink环境下报错
[ https://issues.apache.org/jira/browse/FLINK-12153?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dawid Wysakowicz closed FLINK-12153. Resolution: Invalid Please reopen with translated title or reach out to the mailing list > 提交flink job到flink环境下报错 > -- > > Key: FLINK-12153 > URL: https://issues.apache.org/jira/browse/FLINK-12153 > Project: Flink > Issue Type: Bug >Affects Versions: 1.7.2 > Environment: flink maven > > org.apache.flink > flink-streaming-java_2.12 > 1.7.1 > > > > org.apache.flink > flink-connector-kafka-0.11_2.12 > 1.7.1 > > > org.apache.hadoop > hadoop-hdfs > 2.7.2 > > > xml-apis > xml-apis > > > > > org.apache.hadoop > hadoop-common > 2.7.2 > > > > org.apache.flink > flink-hadoop-compatibility_2.12 > 1.7.1 > > > > org.apache.flink > flink-connector-filesystem_2.12 > 1.7.1 > > > > org.apache.flink > flink-connector-elasticsearch5_2.12 > 1.7.1 > > > > hadoop 环境版本 2.7.7 > >Reporter: gaojunjie >Priority: Major > > java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are > only supported for HDFS and for Hadoop version 2.7 or newer > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.(HadoopRecoverableWriter.java:57) > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.createRecoverableWriter(HadoopFileSystem.java:202) > at > org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.(Buckets.java:112) > at > org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:242) > at > org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) > at java.lang.Thread.run(Thread.java:748) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] GJL commented on a change in pull request #7986: [FLINK-10517][rest] Add stability test
GJL commented on a change in pull request #7986: [FLINK-10517][rest] Add stability test URL: https://github.com/apache/flink/pull/7986#discussion_r274039553 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/rest/compatibility/CompatibilityCheckResult.java ## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.compatibility; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +/** + * The (potentially aggregated) result of a compatibility check that may also contain a list of {@link AssertionError} + * for each found incompatibility. + */ +final class CompatibilityCheckResult { + + private final Compatibility backwardCompatibility; + private final int backwardCompatibilityGrade; + private final Collection backwardCompatibilityErrors; + + CompatibilityCheckResult(final Compatibility backwardCompatibility) { + this(backwardCompatibility, 1, Collections.emptyList()); + if (backwardCompatibility == Compatibility.INCOMPATIBLE) { + throw new RuntimeException("This constructor must not be used for incompatible results."); + } + } + + CompatibilityCheckResult(final AssertionError backwardCompatibilityError) { + this(Compatibility.INCOMPATIBLE, 0, Collections.singletonList(backwardCompatibilityError)); + } + + private CompatibilityCheckResult(final Compatibility backwardCompatibility, final int backwardCompatibilityGrade, final Collection backwardCompatibilityErrors) { + this.backwardCompatibility = backwardCompatibility; + this.backwardCompatibilityErrors = backwardCompatibilityErrors; Review comment: nit: change order of assignment (`backwardCompatibilityGrade` before `backwardCompatibilityErrors`) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #7986: [FLINK-10517][rest] Add stability test
GJL commented on a change in pull request #7986: [FLINK-10517][rest] Add stability test URL: https://github.com/apache/flink/pull/7986#discussion_r274041784 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/rest/compatibility/CompatibilityCheckResult.java ## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.compatibility; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +/** + * The (potentially aggregated) result of a compatibility check that may also contain a list of {@link AssertionError} + * for each found incompatibility. + */ +final class CompatibilityCheckResult { + + private final Compatibility backwardCompatibility; + private final int backwardCompatibilityGrade; + private final Collection backwardCompatibilityErrors; + + CompatibilityCheckResult(final Compatibility backwardCompatibility) { + this(backwardCompatibility, 1, Collections.emptyList()); + if (backwardCompatibility == Compatibility.INCOMPATIBLE) { Review comment: nit: Maybe static methods, such as, `CompatibilityCheckResult.incompatible()`, `CompatibilityCheckResult.identical()`, or `CompatibilityCheckResult.compatible(final Compatibility ...)` would make it more obvious how to use this API. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #7986: [FLINK-10517][rest] Add stability test
GJL commented on a change in pull request #7986: [FLINK-10517][rest] Add stability test URL: https://github.com/apache/flink/pull/7986#discussion_r274040526 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/rest/compatibility/CompatibilityCheckResult.java ## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.compatibility; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +/** + * The (potentially aggregated) result of a compatibility check that may also contain a list of {@link AssertionError} + * for each found incompatibility. + */ +final class CompatibilityCheckResult { + + private final Compatibility backwardCompatibility; + private final int backwardCompatibilityGrade; + private final Collection backwardCompatibilityErrors; + + CompatibilityCheckResult(final Compatibility backwardCompatibility) { + this(backwardCompatibility, 1, Collections.emptyList()); + if (backwardCompatibility == Compatibility.INCOMPATIBLE) { + throw new RuntimeException("This constructor must not be used for incompatible results."); + } + } + + CompatibilityCheckResult(final AssertionError backwardCompatibilityError) { + this(Compatibility.INCOMPATIBLE, 0, Collections.singletonList(backwardCompatibilityError)); + } + + private CompatibilityCheckResult(final Compatibility backwardCompatibility, final int backwardCompatibilityGrade, final Collection backwardCompatibilityErrors) { + this.backwardCompatibility = backwardCompatibility; + this.backwardCompatibilityErrors = backwardCompatibilityErrors; + this.backwardCompatibilityGrade = backwardCompatibilityGrade; Review comment: nit: `backwardCompatibilityErrors` can be mutable (l. 70) and the getter makes this object potentially mutable This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #7986: [FLINK-10517][rest] Add stability test
GJL commented on a change in pull request #7986: [FLINK-10517][rest] Add stability test URL: https://github.com/apache/flink/pull/7986#discussion_r274044893 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/rest/compatibility/CompatibilityRoutine.java ## @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.compatibility; + +import org.apache.flink.runtime.rest.messages.MessageHeaders; + +import org.junit.Assert; + +import java.util.Optional; +import java.util.function.BiConsumer; +import java.util.function.Function; + +import static org.apache.flink.runtime.rest.compatibility.Compatibility.COMPATIBLE; +import static org.apache.flink.runtime.rest.compatibility.Compatibility.IDENTICAL; +import static org.apache.flink.runtime.rest.compatibility.Compatibility.INCOMPATIBLE; + +/** + * Routine for checking the compatibility of a {@link MessageHeaders} pair. + * + * The 'extractor' {@link Function} generates a 'container', a jackson-compatible object containing the data that + * the routine bases it's compatibility-evaluation on. + * The 'assertion' {@link BiConsumer} accepts a pair of containers and asserts the compatibility. Incompatibilities are + * signaled by throwing an {@link AssertionError}. This implies that the method body will typically contain jUnit + * assertions. + */ +final class CompatibilityRoutine { + + private final String key; + private final Class containerClass; + private final Function, C> extractor; + private final BiConsumer assertion; + + CompatibilityRoutine( + final String key, + final Class containerClass, + final Function, C> extractor, + final BiConsumer assertion) { + this.key = key; + this.containerClass = containerClass; + this.extractor = extractor; + this.assertion = assertion; + } + + String getKey() { + return key; + } + + Class getContainerClass() { + return containerClass; + } + + C getContainer(final MessageHeaders header) { + final C container = extractor.apply(header); + Assert.assertNotNull("Implementation error: Extractor returned null.", container); + return container; + } + + public CompatibilityCheckResult checkCompatibility(final Optional old, final Optional cur) { + if (!old.isPresent() && !cur.isPresent()) { + Assert.fail(String.format( + "Implementation error: Compatibility check container for routine %s for both old and new version is null.", key)); + } + if (!old.isPresent()) { + // allow addition of new compatibility routines + return new CompatibilityCheckResult(COMPATIBLE); + } + if (!cur.isPresent()) { + // forbid removal of compatibility routines + return new CompatibilityCheckResult( + new AssertionError(String.format( + "Compatibility check container for routine %s not found in current version.", key))); + } + + Compatibility backwardCompatibility; + AssertionError backwardIncompatibilityCause = null; + try { + assertion.accept(old.get(), cur.get()); + backwardCompatibility = COMPATIBLE; + } catch (final Exception | AssertionError e) { + backwardCompatibility = INCOMPATIBLE; + backwardIncompatibilityCause = new AssertionError(key + ": " + e.getMessage()); + } + + Compatibility forwardCompatibility; + try { + assertion.accept(cur.get(), old.get()); + forwardCompatibility = COMPATIBLE; + } catch (final Exception | AssertionError e) { +
[GitHub] [flink] an0 commented on a change in pull request #8106: [FLINK-12092] [docs]Clarify when `onTimer(...)` is called
an0 commented on a change in pull request #8106: [FLINK-12092] [docs]Clarify when `onTimer(...)` is called URL: https://github.com/apache/flink/pull/8106#discussion_r274040206 ## File path: docs/dev/stream/operators/process_function.md ## @@ -44,8 +44,8 @@ For fault-tolerant state, the `ProcessFunction` gives access to Flink's [keyed s The timers allow applications to react to changes in processing time and in [event time]({{ site.baseurl }}/dev/event_time.html). Every call to the function `processElement(...)` gets a `Context` object which gives access to the element's event time timestamp, and to the *TimerService*. The `TimerService` can be used to register callbacks for future -event-/processing-time instants. When a timer's particular time is reached, the `onTimer(...)` method is -called. During that call, all states are again scoped to the key with which the timer was created, allowing +event-/processing-time instants. The `onTimer(...)` method is +called when such an event-time is first caught up by a watermark or such a processing-time is reached. During that call, all states are again scoped to the key with which the timer was created, allowing Review comment: Sure. Done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11799) KryoSerializer/OperatorChain ignores copy failure resulting in NullPointerException
[ https://issues.apache.org/jira/browse/FLINK-11799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16814589#comment-16814589 ] Jason Kania commented on FLINK-11799: - Hi Liya, Your approach seems reasonable. Thanks, Jason > KryoSerializer/OperatorChain ignores copy failure resulting in > NullPointerException > --- > > Key: FLINK-11799 > URL: https://issues.apache.org/jira/browse/FLINK-11799 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.7.2 >Reporter: Jason Kania >Priority: Major > > I was encountering a problem with NullPointerExceptions with the deserialized > object reaching my ProcessFunction process() method implementation as a null > value. Upon investigation, I discovered two issues with the implementation of > the KryoSerializer copy(). > 1) The 'public T copy(T from)' method swallows the error if the kryo copy() > call generates an exception. The code should report the copy error at least > once as a warning to be aware that the kryo copy() is failing. I understand > that the code is there to handle the lack of a copy implementation but due to > the potential inefficiency of having to write and read the object instead of > copying it, this would seem useful information to share at the least. It is > also important to have a warning in case the cause of the copy error is > something that needs to be fixed. > 2) The call to 'kryo.readObject(input, from.getClass())' does not handle the > fact that the kryo readObject(Input input, Class aClass) method may return a > null value if there are any issues. This could be handled with a check or > warning in the OperatorChain.CopyingChainingOutput.pushToOperator() method > but is also ignored there, allowing a null value to be passed along without > providing any reason for the null value in logging. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] dawidwys commented on issue #8050: [FLINK-11067][table] Convert TableEnvironments to interfaces
dawidwys commented on issue #8050: [FLINK-11067][table] Convert TableEnvironments to interfaces URL: https://github.com/apache/flink/pull/8050#issuecomment-481752758 Hi @hequn8128, I had another look and some discussion with @twalthr about this PR and we came up with few ideas how to proceed further with this PR: 1. The `Stream/BatchTableDescriptor`, `Stream/BatchTableSource`, `Stream/BatchTableSink`, `Stream/BatchTableSourceFactory`, `Stream/BatchTableSinkFactory` should end up in the bridging module. In the long run we need to rework them anyway. The new versions will end up in the api-java. This might solve our problems with the reflection lookup for factories. 2. Could you please split the PR into smaller commits that move a smaller "hierarchies" of classes one at a time. The last commit would move just extract the TableEnvironment once all the needed classes are already moved? This would help us review this PR tremendously. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #7986: [FLINK-10517][rest] Add stability test
GJL commented on a change in pull request #7986: [FLINK-10517][rest] Add stability test URL: https://github.com/apache/flink/pull/7986#discussion_r274038147 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/rest/compatibility/CompatibilityRoutine.java ## @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.compatibility; + +import org.apache.flink.runtime.rest.messages.MessageHeaders; + +import org.junit.Assert; + +import java.util.Optional; +import java.util.function.BiConsumer; +import java.util.function.Function; + +import static org.apache.flink.runtime.rest.compatibility.Compatibility.COMPATIBLE; +import static org.apache.flink.runtime.rest.compatibility.Compatibility.IDENTICAL; +import static org.apache.flink.runtime.rest.compatibility.Compatibility.INCOMPATIBLE; + +/** + * Routine for checking the compatibility of a {@link MessageHeaders} pair. + * + * The 'extractor' {@link Function} generates a 'container', a jackson-compatible object containing the data that + * the routine bases it's compatibility-evaluation on. + * The 'assertion' {@link BiConsumer} accepts a pair of containers and asserts the compatibility. Incompatibilities are + * signaled by throwing an {@link AssertionError}. This implies that the method body will typically contain jUnit + * assertions. + */ +final class CompatibilityRoutine { + + private final String key; + private final Class containerClass; + private final Function, C> extractor; + private final BiConsumer assertion; + + CompatibilityRoutine( + final String key, + final Class containerClass, + final Function, C> extractor, + final BiConsumer assertion) { + this.key = key; + this.containerClass = containerClass; + this.extractor = extractor; + this.assertion = assertion; + } + + String getKey() { + return key; + } + + Class getContainerClass() { + return containerClass; + } + + C getContainer(final MessageHeaders header) { + final C container = extractor.apply(header); + Assert.assertNotNull("Implementation error: Extractor returned null.", container); + return container; + } + + public CompatibilityCheckResult checkCompatibility(final Optional old, final Optional cur) { + if (!old.isPresent() && !cur.isPresent()) { + Assert.fail(String.format( + "Implementation error: Compatibility check container for routine %s for both old and new version is null.", key)); + } + if (!old.isPresent()) { + // allow addition of new compatibility routines + return new CompatibilityCheckResult(COMPATIBLE); + } + if (!cur.isPresent()) { + // forbid removal of compatibility routines + return new CompatibilityCheckResult( + new AssertionError(String.format( + "Compatibility check container for routine %s not found in current version.", key))); + } + + Compatibility backwardCompatibility; + AssertionError backwardIncompatibilityCause = null; + try { + assertion.accept(old.get(), cur.get()); + backwardCompatibility = COMPATIBLE; + } catch (final Exception | AssertionError e) { + backwardCompatibility = INCOMPATIBLE; + backwardIncompatibilityCause = new AssertionError(key + ": " + e.getMessage()); + } + + Compatibility forwardCompatibility; + try { + assertion.accept(cur.get(), old.get()); + forwardCompatibility = COMPATIBLE; + } catch (final Exception | AssertionError e) { Review comment: If the logic in the
[GitHub] [flink] GJL commented on a change in pull request #7986: [FLINK-10517][rest] Add stability test
GJL commented on a change in pull request #7986: [FLINK-10517][rest] Add stability test URL: https://github.com/apache/flink/pull/7986#discussion_r274038147 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/rest/compatibility/CompatibilityRoutine.java ## @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.compatibility; + +import org.apache.flink.runtime.rest.messages.MessageHeaders; + +import org.junit.Assert; + +import java.util.Optional; +import java.util.function.BiConsumer; +import java.util.function.Function; + +import static org.apache.flink.runtime.rest.compatibility.Compatibility.COMPATIBLE; +import static org.apache.flink.runtime.rest.compatibility.Compatibility.IDENTICAL; +import static org.apache.flink.runtime.rest.compatibility.Compatibility.INCOMPATIBLE; + +/** + * Routine for checking the compatibility of a {@link MessageHeaders} pair. + * + * The 'extractor' {@link Function} generates a 'container', a jackson-compatible object containing the data that + * the routine bases it's compatibility-evaluation on. + * The 'assertion' {@link BiConsumer} accepts a pair of containers and asserts the compatibility. Incompatibilities are + * signaled by throwing an {@link AssertionError}. This implies that the method body will typically contain jUnit + * assertions. + */ +final class CompatibilityRoutine { + + private final String key; + private final Class containerClass; + private final Function, C> extractor; + private final BiConsumer assertion; + + CompatibilityRoutine( + final String key, + final Class containerClass, + final Function, C> extractor, + final BiConsumer assertion) { + this.key = key; + this.containerClass = containerClass; + this.extractor = extractor; + this.assertion = assertion; + } + + String getKey() { + return key; + } + + Class getContainerClass() { + return containerClass; + } + + C getContainer(final MessageHeaders header) { + final C container = extractor.apply(header); + Assert.assertNotNull("Implementation error: Extractor returned null.", container); + return container; + } + + public CompatibilityCheckResult checkCompatibility(final Optional old, final Optional cur) { + if (!old.isPresent() && !cur.isPresent()) { + Assert.fail(String.format( + "Implementation error: Compatibility check container for routine %s for both old and new version is null.", key)); + } + if (!old.isPresent()) { + // allow addition of new compatibility routines + return new CompatibilityCheckResult(COMPATIBLE); + } + if (!cur.isPresent()) { + // forbid removal of compatibility routines + return new CompatibilityCheckResult( + new AssertionError(String.format( + "Compatibility check container for routine %s not found in current version.", key))); + } + + Compatibility backwardCompatibility; + AssertionError backwardIncompatibilityCause = null; + try { + assertion.accept(old.get(), cur.get()); + backwardCompatibility = COMPATIBLE; + } catch (final Exception | AssertionError e) { + backwardCompatibility = INCOMPATIBLE; + backwardIncompatibilityCause = new AssertionError(key + ": " + e.getMessage()); + } + + Compatibility forwardCompatibility; + try { + assertion.accept(cur.get(), old.get()); + forwardCompatibility = COMPATIBLE; + } catch (final Exception | AssertionError e) { Review comment: If the logic in the
[GitHub] [flink] GJL commented on a change in pull request #7986: [FLINK-10517][rest] Add stability test
GJL commented on a change in pull request #7986: [FLINK-10517][rest] Add stability test URL: https://github.com/apache/flink/pull/7986#discussion_r274038147 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/rest/compatibility/CompatibilityRoutine.java ## @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.compatibility; + +import org.apache.flink.runtime.rest.messages.MessageHeaders; + +import org.junit.Assert; + +import java.util.Optional; +import java.util.function.BiConsumer; +import java.util.function.Function; + +import static org.apache.flink.runtime.rest.compatibility.Compatibility.COMPATIBLE; +import static org.apache.flink.runtime.rest.compatibility.Compatibility.IDENTICAL; +import static org.apache.flink.runtime.rest.compatibility.Compatibility.INCOMPATIBLE; + +/** + * Routine for checking the compatibility of a {@link MessageHeaders} pair. + * + * The 'extractor' {@link Function} generates a 'container', a jackson-compatible object containing the data that + * the routine bases it's compatibility-evaluation on. + * The 'assertion' {@link BiConsumer} accepts a pair of containers and asserts the compatibility. Incompatibilities are + * signaled by throwing an {@link AssertionError}. This implies that the method body will typically contain jUnit + * assertions. + */ +final class CompatibilityRoutine { + + private final String key; + private final Class containerClass; + private final Function, C> extractor; + private final BiConsumer assertion; + + CompatibilityRoutine( + final String key, + final Class containerClass, + final Function, C> extractor, + final BiConsumer assertion) { + this.key = key; + this.containerClass = containerClass; + this.extractor = extractor; + this.assertion = assertion; + } + + String getKey() { + return key; + } + + Class getContainerClass() { + return containerClass; + } + + C getContainer(final MessageHeaders header) { + final C container = extractor.apply(header); + Assert.assertNotNull("Implementation error: Extractor returned null.", container); + return container; + } + + public CompatibilityCheckResult checkCompatibility(final Optional old, final Optional cur) { + if (!old.isPresent() && !cur.isPresent()) { + Assert.fail(String.format( + "Implementation error: Compatibility check container for routine %s for both old and new version is null.", key)); + } + if (!old.isPresent()) { + // allow addition of new compatibility routines + return new CompatibilityCheckResult(COMPATIBLE); + } + if (!cur.isPresent()) { + // forbid removal of compatibility routines + return new CompatibilityCheckResult( + new AssertionError(String.format( + "Compatibility check container for routine %s not found in current version.", key))); + } + + Compatibility backwardCompatibility; + AssertionError backwardIncompatibilityCause = null; + try { + assertion.accept(old.get(), cur.get()); + backwardCompatibility = COMPATIBLE; + } catch (final Exception | AssertionError e) { + backwardCompatibility = INCOMPATIBLE; + backwardIncompatibilityCause = new AssertionError(key + ": " + e.getMessage()); + } + + Compatibility forwardCompatibility; + try { + assertion.accept(cur.get(), old.get()); + forwardCompatibility = COMPATIBLE; + } catch (final Exception | AssertionError e) { Review comment: If the logic in the
[GitHub] [flink] GJL commented on a change in pull request #7986: [FLINK-10517][rest] Add stability test
GJL commented on a change in pull request #7986: [FLINK-10517][rest] Add stability test URL: https://github.com/apache/flink/pull/7986#discussion_r274033867 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/rest/compatibility/RestAPIStabilityTest.java ## @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.compatibility; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.rest.util.DocumentingDispatcherRestEndpoint; +import org.apache.flink.runtime.rest.util.DocumentingRestEndpoint; +import org.apache.flink.runtime.rest.versioning.RestAPIVersion; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.File; +import java.io.IOException; +import java.net.URL; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +/** + * Stability test and snapshot generator for the REST API. + */ +@RunWith(Parameterized.class) +public final class RestAPIStabilityTest extends TestLogger { + + private static final String REGENERATE_SNAPSHOT_PROPERTY = "generate-rest-snapshot"; + + private static final String SNAPSHOT_RESOURCE_PATTERN = "rest_api_%s.snapshot"; + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + @Parameterized.Parameters(name = "version = {0}") + public static Iterable getStableVersions() { + return Arrays.stream(RestAPIVersion.values()) + .filter(RestAPIVersion::isStableVersion) + .collect(Collectors.toList()); + } + + private final RestAPIVersion apiVersion; + + public RestAPIStabilityTest(final RestAPIVersion apiVersion) { + this.apiVersion = apiVersion; + } + + @Test + public void testDispatcherRestAPIStability() throws IOException { + final String versionedSnapshotFileName = String.format(SNAPSHOT_RESOURCE_PATTERN, apiVersion.getURLVersionPrefix()); + + final RestAPISnapshot currentSnapshot = createSnapshot(new DocumentingDispatcherRestEndpoint()); + + if (System.getProperty(REGENERATE_SNAPSHOT_PROPERTY) != null) { + writeSnapshot(versionedSnapshotFileName, currentSnapshot); + } + + final URL resource = RestAPIStabilityTest.class.getClassLoader().getResource(versionedSnapshotFileName); + if (resource == null) { + Assert.fail("Snapshot file does not exist. If you added a new version, re-run this test with" + + " -D" + REGENERATE_SNAPSHOT_PROPERTY + " being set."); + } + final RestAPISnapshot previousSnapshot = OBJECT_MAPPER.readValue(new File(resource.getFile()), RestAPISnapshot.class); + + assertCompatible(previousSnapshot, currentSnapshot); + } + + private static void writeSnapshot(final String versionedSnapshotFileName, final RestAPISnapshot snapshot) throws IOException { + OBJECT_MAPPER.writerWithDefaultPrettyPrinter() + .writeValue( + new File("src/test/resources/" + versionedSnapshotFileName), + snapshot); + System.out.println("REST API snapshot " + versionedSnapshotFileName + " was updated, please remember to commit the snapshot."); + } + + private RestAPISnapshot createSnapshot(final DocumentingRestEndpoint dispatcherRestEndpoint) { + final List calls = dispatcherRestEndpoint.getSpecs().stream()
[GitHub] [flink] dawidwys commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
dawidwys commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other … URL: https://github.com/apache/flink/pull/8007#discussion_r273911156 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogDatabase.java ## @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import java.util.Map; + +/** + * Represents a database object in a catalog. + */ +public interface CatalogDatabase { Review comment: What is a database? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment
pnowojski commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment URL: https://github.com/apache/flink/pull/8090#discussion_r274017522 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java ## @@ -105,6 +106,302 @@ public NettyConfig nettyConfig() { return nettyConfig; } + public boolean isCreditBased() { + return isCreditBased; + } + + // + + /** +* Utility method to extract network related parameters from the configuration and to +* sanity check them. +* +* @param configuration configuration object +* @param maxJvmHeapMemory the maximum JVM heap size (in bytes) +* @param localTaskManagerCommunication true, to skip initializing the network stack +* @param taskManagerAddress identifying the IP address under which the TaskManager will be accessible +* @return NetworkEnvironmentConfiguration +*/ + @Deprecated + public static NetworkEnvironmentConfiguration fromConfiguration( + Configuration configuration, + long maxJvmHeapMemory, + boolean localTaskManagerCommunication, + InetAddress taskManagerAddress) { + + // > hosts / ports for communication and data exchange + + final int dataport = configuration.getInteger(TaskManagerOptions.DATA_PORT); + ConfigurationParserUtils.checkConfigParameter(dataport >= 0, dataport, TaskManagerOptions.DATA_PORT.key(), + "Leave config parameter empty or use 0 to let the system choose a port automatically."); + + final int pageSize = ConfigurationParserUtils.getPageSize(configuration); + + final int numNetworkBuffers; + if (!hasNewNetworkConfig(configuration)) { + // fallback: number of network buffers + numNetworkBuffers = configuration.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS); + + checkOldNetworkConfig(numNetworkBuffers); + } else { + if (configuration.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) { + LOG.info("Ignoring old (but still present) network buffer configuration via {}.", + TaskManagerOptions.NETWORK_NUM_BUFFERS.key()); + } + + final long networkMemorySize = calculateNewNetworkBufferMemory(configuration, maxJvmHeapMemory); + + // tolerate offcuts between intended and allocated memory due to segmentation (will be available to the user-space memory) + long numNetworkBuffersLong = networkMemorySize / pageSize; + if (numNetworkBuffersLong > Integer.MAX_VALUE) { + throw new IllegalArgumentException("The given number of memory bytes (" + networkMemorySize + + ") corresponds to more than MAX_INT pages."); + } + numNetworkBuffers = (int) numNetworkBuffersLong; + } + + final NettyConfig nettyConfig; + if (!localTaskManagerCommunication) { + final InetSocketAddress taskManagerInetSocketAddress = new InetSocketAddress(taskManagerAddress, dataport); + + nettyConfig = new NettyConfig(taskManagerInetSocketAddress.getAddress(), taskManagerInetSocketAddress.getPort(), + pageSize, ConfigurationParserUtils.getSlot(configuration), configuration); + } else { + nettyConfig = null; + } + + int initialRequestBackoff = configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL); + int maxRequestBackoff = configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX); + + int buffersPerChannel = configuration.getInteger(TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL); + int extraBuffersPerGate = configuration.getInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE); + + boolean isCreditBased = configuration.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL); + + return new NetworkEnvironmentConfiguration( + numNetworkBuffers, + pageSize, + initialRequestBackoff, + maxRequestBackoff, + buffersPerChannel, + extraBuffersPerGate, + isCreditBased, + nettyConfig); + } + + /** +*
[GitHub] [flink] zentol commented on a change in pull request #8002: [FLINK-11923][metrics] MetricRegistryConfiguration provides MetricReporters Suppliers
zentol commented on a change in pull request #8002: [FLINK-11923][metrics] MetricRegistryConfiguration provides MetricReporters Suppliers URL: https://github.com/apache/flink/pull/8002#discussion_r274014721 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java ## @@ -113,19 +114,12 @@ public MetricRegistryImpl(MetricRegistryConfiguration config) { // by default, don't report anything LOG.info("No metrics reporter configured, no metrics will be exposed/reported."); } else { - // we have some reporters so - for (Tuple2 reporterConfiguration: reporterConfigurations) { - String namedReporter = reporterConfiguration.f0; - Configuration reporterConfig = reporterConfiguration.f1; - - final String className = reporterConfig.getString(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, null); - if (className == null) { - LOG.error("No reporter class set for reporter " + namedReporter + ". Metrics might not be exposed/reported."); - continue; - } + for (ReporterSetup reporterSetup : reporterConfigurations) { + final String namedReporter = reporterSetup.getName(); + final MetricConfig metricConfig = reporterSetup.getConfiguration(); try { - String configuredPeriod = reporterConfig.getString(ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, null); + String configuredPeriod = metricConfig.getString(ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, null); Review comment: I may be biased here, but I don't believe we're hiding anything. `MetricConfig` is just a minor extension of `Properties` (which in hindsight shouldn't even exist) and I have no plans to change that. So long as that is the case there's no other place the value could come from but the configuration. As a more relatable example, would you say that `Configuration#getJobManagerHost()` hides in any way that the value is read from the configuration? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8144: [FLINK-12159]. Enable YarnMiniCluster integration test under non-secure mode
flinkbot commented on issue #8144: [FLINK-12159]. Enable YarnMiniCluster integration test under non-secure mode URL: https://github.com/apache/flink/pull/8144#issuecomment-481732968 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12159) Enable YarnMiniCluster integration test under non-secure mode
[ https://issues.apache.org/jira/browse/FLINK-12159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-12159: --- Labels: pull-request-available (was: ) > Enable YarnMiniCluster integration test under non-secure mode > - > > Key: FLINK-12159 > URL: https://issues.apache.org/jira/browse/FLINK-12159 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Affects Versions: 1.8.0 >Reporter: Jeff Zhang >Assignee: Jeff Zhang >Priority: Major > Labels: pull-request-available > > Currently if third party want to use flink for integration under yarn mode, > it has to be secure mode, it doesn't make sense. We should also allow it > under non-secure mode. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] zjffdu opened a new pull request #8144: [FLINK-12159]. Enable YarnMiniCluster integration test under non-secure mode
zjffdu opened a new pull request #8144: [FLINK-12159]. Enable YarnMiniCluster integration test under non-secure mode URL: https://github.com/apache/flink/pull/8144 ## What is the purpose of the change I can not run flink in YarnMiniCluster for integration test because yarn-site.xml is not shipped to yarn container (krb5Config must be non-empty otherwise yarn-site.xml won't been shipped). It doesn't make sense to to only allow yarn integration under secure mode. So this PR just try to enable YarnMiniCluster integration test under non-secure mode. I tested this PR for zeppelin flink integration test. ## Brief change log Straightforward code refactoring to separate `yarn-site.xml` and `krb5Config` file. ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. I verified this PR in zeppelin flink integration test. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: ( no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no ) - The S3 file system connector: (no ) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zentol commented on a change in pull request #8002: [FLINK-11923][metrics] MetricRegistryConfiguration provides MetricReporters Suppliers
zentol commented on a change in pull request #8002: [FLINK-11923][metrics] MetricRegistryConfiguration provides MetricReporters Suppliers URL: https://github.com/apache/flink/pull/8002#discussion_r274009874 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java ## @@ -113,19 +114,12 @@ public MetricRegistryImpl(MetricRegistryConfiguration config) { // by default, don't report anything LOG.info("No metrics reporter configured, no metrics will be exposed/reported."); } else { - // we have some reporters so - for (Tuple2 reporterConfiguration: reporterConfigurations) { - String namedReporter = reporterConfiguration.f0; - Configuration reporterConfig = reporterConfiguration.f1; - - final String className = reporterConfig.getString(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, null); - if (className == null) { - LOG.error("No reporter class set for reporter " + namedReporter + ". Metrics might not be exposed/reported."); - continue; - } + for (ReporterSetup reporterSetup : reporterConfigurations) { Review comment: I assumed you meant that the reporter class would have to hard-code the name, but you're suggesting that the reporter parses the interval and name from the configuration I guess. So for one this wouldn't really work with the subsequent introduction of factories, which requires the MetricConfig to be assembled before the reporter is even instantiated. One could move the logic into the factory of course; my point is that this is not something we can do _now_. Reporters (or factories for that matter) shouldn't be aware that reporter names actually exist; this is an implementation detail of the metric system to allow for distinct configurations. Whether multiple reporters exist or not is not irrelevant to a given reporter, so they shouldn't have to deal with it. I also don't see a way how a reporter would be able to determine it's own name from the given configuration. The metric-wide configuration does not contain this info in an obvious fashion, and the metric config does not contain it at all. I can only see this working if we (the MetricRegistry/-Configuration) explicitly write it into the MetricConfig, but this obviously doesn't make any sense. Admittedly, the interval makes more sense. It is true that the current reporter configuration is a mix between reporter-facing options (like reporter-specific arguments) and system-facing options (like the interval). The current approach however allows us to ensure that certain configuration options exist and are actually applied; in a design where the `Scheduled` reporter provides the interval you cannot guarantee that the interval is configurable for users, or that a configured interval is respected. The same applies to other system-facing reporter options, like delimiters. So long as reporter don't massively go out of their way to avoid it (i.e. not working at all against the MetricGroup interface (with some currently existing exceptions)) the configured delimiter _will_ be used. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] aloyszhang commented on issue #8143: [hotfix][javadocs] remove redundant 'for example' for InputFormat
aloyszhang commented on issue #8143: [hotfix][javadocs] remove redundant 'for example' for InputFormat URL: https://github.com/apache/flink/pull/8143#issuecomment-481727297 @zentol This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8143: [hotfix][javadocs] remove redundant 'for example' for InputFormat
flinkbot commented on issue #8143: [hotfix][javadocs] remove redundant 'for example' for InputFormat URL: https://github.com/apache/flink/pull/8143#issuecomment-481726995 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] aloyszhang opened a new pull request #8143: [hotfix][javadocs] remove redundant 'for example' for InputFormat
aloyszhang opened a new pull request #8143: [hotfix][javadocs] remove redundant 'for example' for InputFormat URL: https://github.com/apache/flink/pull/8143 ## What is the purpose of the change This pull request remove the redundant 'for example' in javadoc of InputFormat. ## Brief change log change from ' such as for example a file path' to ' such as a file path' ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-12159) Enable YarnMiniCluster integration test under non-secure mode
Jeff Zhang created FLINK-12159: -- Summary: Enable YarnMiniCluster integration test under non-secure mode Key: FLINK-12159 URL: https://issues.apache.org/jira/browse/FLINK-12159 Project: Flink Issue Type: Improvement Components: Deployment / YARN Affects Versions: 1.8.0 Reporter: Jeff Zhang Currently if third party want to use flink for integration under yarn mode, it has to be secure mode, it doesn't make sense. We should also allow it under non-secure mode. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-12159) Enable YarnMiniCluster integration test under non-secure mode
[ https://issues.apache.org/jira/browse/FLINK-12159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Zhang reassigned FLINK-12159: -- Assignee: Jeff Zhang > Enable YarnMiniCluster integration test under non-secure mode > - > > Key: FLINK-12159 > URL: https://issues.apache.org/jira/browse/FLINK-12159 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Affects Versions: 1.8.0 >Reporter: Jeff Zhang >Assignee: Jeff Zhang >Priority: Major > > Currently if third party want to use flink for integration under yarn mode, > it has to be secure mode, it doesn't make sense. We should also allow it > under non-secure mode. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] tillrohrmann commented on a change in pull request #8002: [FLINK-11923][metrics] MetricRegistryConfiguration provides MetricReporters Suppliers
tillrohrmann commented on a change in pull request #8002: [FLINK-11923][metrics] MetricRegistryConfiguration provides MetricReporters Suppliers URL: https://github.com/apache/flink/pull/8002#discussion_r273998850 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/metrics/ReporterSetupTest.java ## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.metrics; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.MetricOptions; +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.metrics.reporter.MetricReporter; +import org.apache.flink.runtime.metrics.util.TestReporter; +import org.apache.flink.util.TestLogger; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; +import java.util.Optional; + +import static org.hamcrest.core.IsInstanceOf.instanceOf; +import static org.junit.Assert.assertEquals; + +/** + * Tests for the {@link ReporterSetup}. + */ +public class ReporterSetupTest extends TestLogger { + + /** +* TestReporter1 class only for type differentiation. +*/ + static class TestReporter1 extends TestReporter { + } + + /** +* TestReporter2 class only for type differentiation. +*/ + static class TestReporter2 extends TestReporter { + } + + /** +* Verifies that a reporter can be configured with all it's arguments being forwarded. +*/ + @Test + public void testReporterArgumentForwarding() { + final Configuration config = new Configuration(); + + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, ReporterSetupTest.TestReporter1.class.getName()); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "reporter.arg1", "value1"); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "reporter.arg2", "value2"); + + final List reporterSetups = ReporterSetup.fromConfiguration(config); + + Assert.assertEquals(1, reporterSetups.size()); + + final ReporterSetup reporterSetup = reporterSetups.get(0); + Assert.assertEquals("reporter", reporterSetup.getName()); + Assert.assertEquals("value1", reporterSetup.getConfiguration().getString("arg1", null)); + Assert.assertEquals("value2", reporterSetup.getConfiguration().getString("arg2", null)); + Assert.assertEquals(ReporterSetupTest.TestReporter1.class.getName(), reporterSetup.getConfiguration().getString("class", null)); + } + + /** +* Verifies that multiple reporters can be configured with all their arguments being forwarded. +*/ + @Test + public void testSeveralReportersWithArgumentForwarding() { Review comment: W/o wanting to be pedantic here, but then we are missing the `0` case. I guess it does not hurt to keep the test but it adds a bit to our testing time. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #8002: [FLINK-11923][metrics] MetricRegistryConfiguration provides MetricReporters Suppliers
tillrohrmann commented on a change in pull request #8002: [FLINK-11923][metrics] MetricRegistryConfiguration provides MetricReporters Suppliers URL: https://github.com/apache/flink/pull/8002#discussion_r273995787 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java ## @@ -113,19 +114,12 @@ public MetricRegistryImpl(MetricRegistryConfiguration config) { // by default, don't report anything LOG.info("No metrics reporter configured, no metrics will be exposed/reported."); } else { - // we have some reporters so - for (Tuple2 reporterConfiguration: reporterConfigurations) { - String namedReporter = reporterConfiguration.f0; - Configuration reporterConfig = reporterConfiguration.f1; - - final String className = reporterConfig.getString(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, null); - if (className == null) { - LOG.error("No reporter class set for reporter " + namedReporter + ". Metrics might not be exposed/reported."); - continue; - } + for (ReporterSetup reporterSetup : reporterConfigurations) { Review comment: How so? The different instances of the `MetricReporterWithNameAndScopeDelimiter` could still return different values for `getName` and `getScopeDelimiter`. The motivating idea was to hide the fact that a reporter has a `MetricConfig` from which we obtain the values. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #8002: [FLINK-11923][metrics] MetricRegistryConfiguration provides MetricReporters Suppliers
tillrohrmann commented on a change in pull request #8002: [FLINK-11923][metrics] MetricRegistryConfiguration provides MetricReporters Suppliers URL: https://github.com/apache/flink/pull/8002#discussion_r273994424 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java ## @@ -113,19 +114,12 @@ public MetricRegistryImpl(MetricRegistryConfiguration config) { // by default, don't report anything LOG.info("No metrics reporter configured, no metrics will be exposed/reported."); } else { - // we have some reporters so - for (Tuple2 reporterConfiguration: reporterConfigurations) { - String namedReporter = reporterConfiguration.f0; - Configuration reporterConfig = reporterConfiguration.f1; - - final String className = reporterConfig.getString(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, null); - if (className == null) { - LOG.error("No reporter class set for reporter " + namedReporter + ". Metrics might not be exposed/reported."); - continue; - } + for (ReporterSetup reporterSetup : reporterConfigurations) { + final String namedReporter = reporterSetup.getName(); + final MetricConfig metricConfig = reporterSetup.getConfiguration(); try { - String configuredPeriod = reporterConfig.getString(ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, null); + String configuredPeriod = metricConfig.getString(ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, null); Review comment: The benefit would be that we hide the fact that we read the value from the underlying `Configuration`. It would, thus, follow better the law of Demeter. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #8116: [FLINK-11920][scala] Reuse java classpath
tillrohrmann commented on a change in pull request #8116: [FLINK-11920][scala] Reuse java classpath URL: https://github.com/apache/flink/pull/8116#discussion_r273992844 ## File path: flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerUpgradeTest.scala ## @@ -192,16 +192,7 @@ object EnumValueSerializerUpgradeTest { val settings = new GenericRunnerSettings(out.println _) -val classLoader = Thread.currentThread().getContextClassLoader - -val urls = classLoader match { - case urlClassLoader: URLClassLoader => -urlClassLoader.getURLs - case x => throw new IllegalStateException(s"Not possible to extract URLs " + -s"from class loader $x.") -} - -settings.classpath.value = urls.map(_.toString).mkString(java.io.File.pathSeparator) +settings.usejavacp.value = true Review comment: Maybe add a comment why we do this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tzulitai removed a comment on issue #8140: [FLINK-12151][es1] Remove Elasticsearch 1 connector
tzulitai removed a comment on issue #8140: [FLINK-12151][es1] Remove Elasticsearch 1 connector URL: https://github.com/apache/flink/pull/8140#issuecomment-481703278 @flinkbot approve all This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #8140: [FLINK-12151][es1] Remove Elasticsearch 1 connector
flinkbot edited a comment on issue #8140: [FLINK-12151][es1] Remove Elasticsearch 1 connector URL: https://github.com/apache/flink/pull/8140#issuecomment-481656785 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ✅ 1. The [description] looks good. - Approved by @tzulitai [PMC] * ✅ 2. There is [consensus] that the contribution should go into to Flink. - Approved by @tzulitai [PMC], @zentol [PMC] * ❓ 3. Needs [attention] from. * ✅ 4. The change fits into the overall [architecture]. - Approved by @tzulitai [PMC] * ✅ 5. Overall code [quality] is good. - Approved by @tzulitai [PMC] Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tzulitai commented on issue #8140: [FLINK-12151][es1] Remove Elasticsearch 1 connector
tzulitai commented on issue #8140: [FLINK-12151][es1] Remove Elasticsearch 1 connector URL: https://github.com/apache/flink/pull/8140#issuecomment-481703278 @flinkbot approve all This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tzulitai commented on a change in pull request #8140: [FLINK-12151][es1] Remove Elasticsearch 1 connector
tzulitai commented on a change in pull request #8140: [FLINK-12151][es1] Remove Elasticsearch 1 connector URL: https://github.com/apache/flink/pull/8140#discussion_r273973666 ## File path: docs/dev/connectors/elasticsearch.md ## @@ -82,54 +77,6 @@ Elasticsearch cluster. The example below shows how to configure and create a sink: - Review comment: This only removes the Java variant. The Scala one is a bit further down and should also be removed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-12158) Update Java / Scala StatefulJobWBroadcastStateMigrationITCase for 1.8
[ https://issues.apache.org/jira/browse/FLINK-12158?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] leesf closed FLINK-12158. - Resolution: Duplicate > Update Java / Scala StatefulJobWBroadcastStateMigrationITCase for 1.8 > - > > Key: FLINK-12158 > URL: https://issues.apache.org/jira/browse/FLINK-12158 > Project: Flink > Issue Type: Sub-task > Components: Tests >Reporter: leesf >Assignee: leesf >Priority: Major > Fix For: 1.9.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12158) Update Java / Scala StatefulJobWBroadcastStateMigrationITCase for 1.8
[ https://issues.apache.org/jira/browse/FLINK-12158?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] leesf updated FLINK-12158: -- Summary: Update Java / Scala StatefulJobWBroadcastStateMigrationITCase for 1.8 (was: Update Java / Scala StatefulJobWBroadcastStateMigrationITCase for 1.7) > Update Java / Scala StatefulJobWBroadcastStateMigrationITCase for 1.8 > - > > Key: FLINK-12158 > URL: https://issues.apache.org/jira/browse/FLINK-12158 > Project: Flink > Issue Type: Sub-task > Components: Tests >Reporter: leesf >Assignee: leesf >Priority: Major > Fix For: 1.9.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12158) Update Java / Scala StatefulJobWBroadcastStateMigrationITCase for 1.7
leesf created FLINK-12158: - Summary: Update Java / Scala StatefulJobWBroadcastStateMigrationITCase for 1.7 Key: FLINK-12158 URL: https://issues.apache.org/jira/browse/FLINK-12158 Project: Flink Issue Type: Sub-task Components: Tests Reporter: leesf Assignee: leesf Fix For: 1.9.0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] flinkbot commented on issue #8142: [FLINK-12148][clients] Give precedence to specified local jar
flinkbot commented on issue #8142: [FLINK-12148][clients] Give precedence to specified local jar URL: https://github.com/apache/flink/pull/8142#issuecomment-481692125 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12148) Give precedence to specified local jar when same mainClass is included in both lib and specified jar
[ https://issues.apache.org/jira/browse/FLINK-12148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-12148: --- Labels: pull-request-available (was: ) > Give precedence to specified local jar when same mainClass is included in > both lib and specified jar > > > Key: FLINK-12148 > URL: https://issues.apache.org/jira/browse/FLINK-12148 > Project: Flink > Issue Type: Bug > Components: Command Line Client >Affects Versions: 1.8.0 >Reporter: leesf >Assignee: leesf >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > > When submitting the flink job with ./flink run -c mainClass localJar. If the > main class is included in the $FLINK_HOME/lib directory and the specified jar > package, then the mainClass in the $FLINK_HOME/lib directory is executed > instead of the specified localJar. > For example, There is flink-examples-streaming_2.11-1.9-SNAPSHOT.jar in lib > directory that contains the > org.apache.flink.streaming.examples.wordcount.WordCount class, and > LocalWordCount.jar also contains > org.apache.flink.streaming.examples.wordcount The .WordCount class, then use > ./flink run -c org.apache.flink.streaming.examples.wordcount.WordCount > /tmp/LocalWordCount.jar to submit job, the WordCount class in > flink-examples-streaming_2.11-1.9-SNAPSHOT.jar is executed in instead of > LocalWordCount.jar. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] leesf opened a new pull request #8142: [FLINK-12148][clients] Give precedence to specified local jar
leesf opened a new pull request #8142: [FLINK-12148][clients] Give precedence to specified local jar URL: https://github.com/apache/flink/pull/8142 ## What is the purpose of the change Give precedence to specified local jar when same mainClass is included in both lib and specified jar. ## Brief change log Change parentFirst to childFirst ClassLoader. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not documented) cc @zentol This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] aljoscha commented on issue #8134: Update japicmp comparison version to latest Flink 1.8.0
aljoscha commented on issue #8134: Update japicmp comparison version to latest Flink 1.8.0 URL: https://github.com/apache/flink/pull/8134#issuecomment-481677945 merged This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] aljoscha closed pull request #8134: Update japicmp comparison version to latest Flink 1.8.0
aljoscha closed pull request #8134: Update japicmp comparison version to latest Flink 1.8.0 URL: https://github.com/apache/flink/pull/8134 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #7938: [FLINK-10941] Keep slots which contain unconsumed result partitions (on top of #7186)
pnowojski commented on a change in pull request #7938: [FLINK-10941] Keep slots which contain unconsumed result partitions (on top of #7186) URL: https://github.com/apache/flink/pull/7938#discussion_r273939589 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java ## @@ -707,6 +706,53 @@ public void testTaskManagerTimeout() throws Exception { } } + /** +* Tests that idle but not releasable task managers will not be released even if timed out before it can be. +*/ + @Test + public void testTaskManagerNotReleasedBeforeItCanBe() throws Exception { + final long tmTimeout = 10L; + + final CompletableFuture releaseFuture = new CompletableFuture<>(); + final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder() + .setReleaseResourceConsumer((instanceID, e) -> releaseFuture.complete(instanceID)) + .build(); + final ResourceManagerId resourceManagerId = ResourceManagerId.generate(); + final ResourceID resourceID = ResourceID.generate(); + + final AtomicBoolean canBeReleased = new AtomicBoolean(false); + final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder() + .setCanBeReleasedSupplier(canBeReleased::get) + .createTestingTaskExecutorGateway(); + final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway); + + final SlotID slotId = new SlotID(resourceID, 0); + final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1); + final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile); + final SlotReport slotReport = new SlotReport(slotStatus); + + final Executor mainThreadExecutor = TestingUtils.defaultExecutor(); + + try (SlotManager slotManager = SlotManagerBuilder.newBuilder() + .setTaskManagerTimeout(Time.milliseconds(tmTimeout)) + .build()) { + + slotManager.start(resourceManagerId, mainThreadExecutor, resourceManagerActions); + + mainThreadExecutor.execute(() -> slotManager.registerTaskManager(taskManagerConnection, slotReport)); + + // now it can not be released yet + canBeReleased.set(false); + mainThreadExecutor.execute(slotManager::checkTaskManagerTimeouts); Review comment: shouldn't we wait for this to complete, because as it is now the assertion below might be a no-op (depending on the race condition) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #7938: [FLINK-10941] Keep slots which contain unconsumed result partitions (on top of #7186)
pnowojski commented on a change in pull request #7938: [FLINK-10941] Keep slots which contain unconsumed result partitions (on top of #7186) URL: https://github.com/apache/flink/pull/7938#discussion_r273935915 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java ## @@ -124,7 +124,7 @@ private void enqueueAvailableReader(final NetworkSequenceViewReader reader) thro return availableReaders; } - public void notifyReaderCreated(final NetworkSequenceViewReader reader) { + void notifyReaderCreated(final NetworkSequenceViewReader reader) { Review comment: nit: are the changes in this file part of previous commit or a separate hotfix/refactor/clean up? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #7938: [FLINK-10941] Keep slots which contain unconsumed result partitions (on top of #7186)
pnowojski commented on a change in pull request #7938: [FLINK-10941] Keep slots which contain unconsumed result partitions (on top of #7186) URL: https://github.com/apache/flink/pull/7938#discussion_r273936111 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java ## @@ -41,14 +41,17 @@ private final Time taskManagerRequestTimeout; private final Time slotRequestTimeout; private final Time taskManagerTimeout; + private final boolean waitResultConsumedBeforeRelease; public SlotManagerConfiguration( - Time taskManagerRequestTimeout, - Time slotRequestTimeout, - Time taskManagerTimeout) { + Time taskManagerRequestTimeout, Review comment: formatting still? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #7938: [FLINK-10941] Keep slots which contain unconsumed result partitions (on top of #7186)
pnowojski commented on a change in pull request #7938: [FLINK-10941] Keep slots which contain unconsumed result partitions (on top of #7186) URL: https://github.com/apache/flink/pull/7938#discussion_r273935683 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java ## @@ -378,7 +378,10 @@ public void releaseMemory(int toRelease) throws IOException { } /** -* Whether this partition is released, e.g. all subpartitions are consumed or task is cancelled. +* Whether this partition is released. Review comment: nit: Shouldn't this be a part of previous commit? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-12087) Introduce over window operators to blink batch
[ https://issues.apache.org/jira/browse/FLINK-12087?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kurt Young closed FLINK-12087. -- Resolution: Fixed Fix Version/s: 1.9.0 fixed in 91221d6507655c6af366adb5aea075640a0a90a2 > Introduce over window operators to blink batch > -- > > Key: FLINK-12087 > URL: https://issues.apache.org/jira/browse/FLINK-12087 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Runtime >Reporter: Jingsong Lee >Assignee: Jingsong Lee >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 20m > Remaining Estimate: 0h > > Introduce NonBufferOverWindowOperator: Some over windows do not need to > buffer data, such as rank, rows between unbounded preceding and 0, etc. We > introduce NonBufferOverWindowOperator to reduce the overhead of data copy in > buffer. > Introduce BufferDataOverWindowOperator and OverWindowFrame: 1. Minimize > duplicate computation in various OverWindowFrame implementations. 2. An > OverWindowOperator can compute multiple window frames. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] KurtYoung merged pull request #8102: [FLINK-12087][table-runtime-blink] Introduce over window operators to blink batch
KurtYoung merged pull request #8102: [FLINK-12087][table-runtime-blink] Introduce over window operators to blink batch URL: https://github.com/apache/flink/pull/8102 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12117) CassandraConnectorITCase fails on Java 9
[ https://issues.apache.org/jira/browse/FLINK-12117?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-12117: --- Labels: pull-request-available (was: ) > CassandraConnectorITCase fails on Java 9 > > > Key: FLINK-12117 > URL: https://issues.apache.org/jira/browse/FLINK-12117 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Cassandra, Tests >Affects Versions: 1.9.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > > From what I found cassandra never really supported Java 9, so we will likely > have to disable the tests in the jdk9 profile. > {code} > java.lang.ExceptionInInitializerError > at org.github.jamm.MemoryMeter.measure(MemoryMeter.java:178) > at org.apache.cassandra.utils.ObjectSizes.measure(ObjectSizes.java:162) > at org.apache.cassandra.utils.ObjectSizes.(ObjectSizes.java:39) > at > org.apache.cassandra.dht.RandomPartitioner.(RandomPartitioner.java:47) > at java.base/java.lang.Class.forName0(Native Method) > at java.base/java.lang.Class.forName(Class.java:292) > at > org.apache.cassandra.utils.FBUtilities.classForName(FBUtilities.java:434) > at > org.apache.cassandra.utils.FBUtilities.instanceOrConstruct(FBUtilities.java:450) > at > org.apache.cassandra.utils.FBUtilities.newPartitioner(FBUtilities.java:400) > at > org.apache.cassandra.config.DatabaseDescriptor.applyConfig(DatabaseDescriptor.java:353) > at > org.apache.cassandra.config.DatabaseDescriptor.(DatabaseDescriptor.java:119) > at > org.apache.cassandra.service.StartupChecks$4.execute(StartupChecks.java:167) > at > org.apache.cassandra.service.StartupChecks.verify(StartupChecks.java:107) > at > org.apache.cassandra.service.CassandraDaemon.setup(CassandraDaemon.java:162) > at > org.apache.cassandra.service.CassandraDaemon.init(CassandraDaemon.java:416) > at > org.apache.flink.streaming.connectors.cassandra.CassandraConnectorITCase$EmbeddedCassandraService.start(CassandraConnectorITCase.java:147) > at > org.apache.flink.streaming.connectors.cassandra.CassandraConnectorITCase.startCassandra(CassandraConnectorITCase.java:186) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:564) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) > at > com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47) > at > com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) > at > com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) > Caused by: java.lang.StringIndexOutOfBoundsException: begin 0, end -1, length > 5 > at java.base/java.lang.String.checkBoundsBeginEnd(String.java:3116) > at java.base/java.lang.String.substring(String.java:1885) > at > org.github.jamm.MemoryLayoutSpecification.getEffectiveMemoryLayoutSpecification(MemoryLayoutSpecification.java:190) > at > org.github.jamm.MemoryLayoutSpecification.(MemoryLayoutSpecification.java:31) > ... 34 more > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] flinkbot commented on issue #8141: [FLINK-12117][cassandra] Disable tests on Java 9
flinkbot commented on issue #8141: [FLINK-12117][cassandra] Disable tests on Java 9 URL: https://github.com/apache/flink/pull/8141#issuecomment-481671825 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zentol opened a new pull request #8141: [FLINK-12117][cassandra] Disable tests on Java 9
zentol opened a new pull request #8141: [FLINK-12117][cassandra] Disable tests on Java 9 URL: https://github.com/apache/flink/pull/8141 ## What is the purpose of the change Disables cassandra tests when run on Java 9 since cassandra just doesn't support it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #8138: [FLINK-12155][coordination] Remove legacy TaskManager
flinkbot edited a comment on issue #8138: [FLINK-12155][coordination] Remove legacy TaskManager URL: https://github.com/apache/flink/pull/8138#issuecomment-481652522 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ✅ 1. The [description] looks good. - Approved by @zentol [PMC] * ✅ 2. There is [consensus] that the contribution should go into to Flink. - Approved by @zentol [PMC] * ❓ 3. Needs [attention] from. * ✅ 4. The change fits into the overall [architecture]. - Approved by @zentol [PMC] * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zentol commented on issue #8138: [FLINK-12155][coordination] Remove legacy TaskManager
zentol commented on issue #8138: [FLINK-12155][coordination] Remove legacy TaskManager URL: https://github.com/apache/flink/pull/8138#issuecomment-481670047 @flinkbot approve-until architecture This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12157) Update maven download links to point to mvn central
[ https://issues.apache.org/jira/browse/FLINK-12157?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-12157: --- Labels: pull-request-available (was: ) > Update maven download links to point to mvn central > --- > > Key: FLINK-12157 > URL: https://issues.apache.org/jira/browse/FLINK-12157 > Project: Flink > Issue Type: Improvement > Components: Project Website >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > > On our download page we link to maven artifacts via {{repository.apache.org}}. > Just today there was a discussion about this topic on the INFRA users mailing > list, the conclusion being that this is not allowed; we should point to mvn > central instead. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-3685) Logical error in code for DateSerializer deserialize with reuse
[ https://issues.apache.org/jira/browse/FLINK-3685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16814396#comment-16814396 ] Liya Fan commented on FLINK-3685: - Hi [~bowen.zheng], thank you for opening this issue. The NPE indicates bugs in the system. However, I do not think it is DateSerializer's responsibility, because it is the caller's responsibility to create a reuse object, and make sure it is not null. The bug should be elsewhere. Can you please provide the whole source code to reproduce the problem? BTW, the latest code no longer uses -1 as an indicator for null. The latest code looks like this: @Override public Date deserialize(Date reuse, DataInputView source) throws IOException { final long v = source.readLong(); if (v == Long.MIN_VALUE) { return null; } reuse.setTime(v); return reuse; } > Logical error in code for DateSerializer deserialize with reuse > --- > > Key: FLINK-3685 > URL: https://issues.apache.org/jira/browse/FLINK-3685 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System >Affects Versions: 1.0.0 >Reporter: ZhengBowen >Priority: Major > > There is a logical error in the following function in DateSerializer.java > when source read '-1' > function is: > {code} > public Date deserialize(Date reuse, DataInputView source) throws IOException { > long v = source.readLong(); > if(v == -1L) { > return null; > } > reuse.setTime(v); > return reuse; > } > {code} > when call this function for first time, if return null, then 'reuse' will be > set null by caller; > when call this function for second time,if 'v!=-1' ,reuse.setTime(v) will > throw NPE. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12157) Update maven download links to point to mvn central
Chesnay Schepler created FLINK-12157: Summary: Update maven download links to point to mvn central Key: FLINK-12157 URL: https://issues.apache.org/jira/browse/FLINK-12157 Project: Flink Issue Type: Improvement Components: Project Website Reporter: Chesnay Schepler Assignee: Chesnay Schepler On our download page we link to maven artifacts via {{repository.apache.org}}. Just today there was a discussion about this topic on the INFRA users mailing list, the conclusion being that this is not allowed; we should point to mvn central instead. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-12123) Upgrade Jepsen to 0.1.13 in flink-jepsen
[ https://issues.apache.org/jira/browse/FLINK-12123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gary Yao closed FLINK-12123. Resolution: Fixed 1.9: c3bd1bd00961cd6165993f018ef59ae9379af482 > Upgrade Jepsen to 0.1.13 in flink-jepsen > > > Key: FLINK-12123 > URL: https://issues.apache.org/jira/browse/FLINK-12123 > Project: Flink > Issue Type: Improvement > Components: Test Infrastructure >Reporter: Gary Yao >Assignee: Gary Yao >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 20m > Remaining Estimate: 0h > > As Debian Jessie has reached EOL, we need to raise version of the jepsen > dependency in {{flink-jepsen/project.clj}} to 0.1.13 to get support for > Debian Stretch. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-3685) Logical error in code for DateSerializer deserialize with reuse
[ https://issues.apache.org/jira/browse/FLINK-3685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16814379#comment-16814379 ] Ji Liu commented on FLINK-3685: --- Since this problem is fixed in [FLINK-3856|https://issues.apache.org/jira/browse/FLINK-3856], so this issue can be closed. > Logical error in code for DateSerializer deserialize with reuse > --- > > Key: FLINK-3685 > URL: https://issues.apache.org/jira/browse/FLINK-3685 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System >Affects Versions: 1.0.0 >Reporter: ZhengBowen >Priority: Major > > There is a logical error in the following function in DateSerializer.java > when source read '-1' > function is: > {code} > public Date deserialize(Date reuse, DataInputView source) throws IOException { > long v = source.readLong(); > if(v == -1L) { > return null; > } > reuse.setTime(v); > return reuse; > } > {code} > when call this function for first time, if return null, then 'reuse' will be > set null by caller; > when call this function for second time,if 'v!=-1' ,reuse.setTime(v) will > throw NPE. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] flinkbot edited a comment on issue #8140: [FLINK-12151][es1] Remove Elasticsearch 1 connector
flinkbot edited a comment on issue #8140: [FLINK-12151][es1] Remove Elasticsearch 1 connector URL: https://github.com/apache/flink/pull/8140#issuecomment-481656785 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ✅ 2. There is [consensus] that the contribution should go into to Flink. - Approved by @zentol [PMC] * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services