[GitHub] [flink] danny0405 commented on a change in pull request #8324: [FLINK-11921][table] Upgrade to calcite 1.19
danny0405 commented on a change in pull request #8324: [FLINK-11921][table] Upgrade to calcite 1.19 URL: https://github.com/apache/flink/pull/8324#discussion_r284092827 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/calcite/avatica/util/DateTimeUtils.java ## @@ -783,6 +840,30 @@ private static long firstMondayOfFirstWeek(int year) { return janFirst + (11 - janFirstDow) % 7 - 3; } + /** Returns the ISO-8601 week number based on year, month, day. +* Per ISO-8601 it is the Monday of the week that contains Jan 4, +* or equivalently, it is a Monday between Dec 29 and Jan 4. +* Sometimes it is in the year before the given year, sometimes after. */ + private static int getIso8601WeekNumber(int julian, int year, int month, int day) { + long fmofw = firstMondayOfFirstWeek(year); + if (month == 12 && day > 28) { + if (31 - day + 4 > 7 - ((int) floorMod(julian, 7) + 1) Review comment: Although the Calcite bug may be fixed in future, lets's add UT for these transformation functions for our current code base. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] danny0405 commented on a change in pull request #8324: [FLINK-11921][table] Upgrade to calcite 1.19
danny0405 commented on a change in pull request #8324: [FLINK-11921][table] Upgrade to calcite 1.19 URL: https://github.com/apache/flink/pull/8324#discussion_r284092533 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/AuxiliaryConverter.java ## @@ -53,25 +53,6 @@ public Impl(SqlFunction f) { public RexNode convert(RexBuilder rexBuilder, RexNode groupCall, RexNode e) { return rexBuilder.makeCall(this.f, e); - // FLINK QUICK FIX - // we do not use this logic right now -// switch (f.getKind()) { -// case TUMBLE_START: -// case HOP_START: -// case SESSION_START: -// case SESSION_END: // TODO: ? -//return e; -// case TUMBLE_END: -//return rexBuilder.makeCall( -//SqlStdOperatorTable.PLUS, e, -//((RexCall) groupCall).operands.get(1)); -// case HOP_END: -//return rexBuilder.makeCall( -//SqlStdOperatorTable.PLUS, e, -//((RexCall) groupCall).operands.get(2)); -// default: Review comment: Let's just keep these comments. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] danny0405 commented on a change in pull request #8324: [FLINK-11921][table] Upgrade to calcite 1.19
danny0405 commented on a change in pull request #8324: [FLINK-11921][table] Upgrade to calcite 1.19 URL: https://github.com/apache/flink/pull/8324#discussion_r284092533 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/AuxiliaryConverter.java ## @@ -53,25 +53,6 @@ public Impl(SqlFunction f) { public RexNode convert(RexBuilder rexBuilder, RexNode groupCall, RexNode e) { return rexBuilder.makeCall(this.f, e); - // FLINK QUICK FIX - // we do not use this logic right now -// switch (f.getKind()) { -// case TUMBLE_START: -// case HOP_START: -// case SESSION_START: -// case SESSION_END: // TODO: ? -//return e; -// case TUMBLE_END: -//return rexBuilder.makeCall( -//SqlStdOperatorTable.PLUS, e, -//((RexCall) groupCall).operands.get(1)); -// case HOP_END: -//return rexBuilder.makeCall( -//SqlStdOperatorTable.PLUS, e, -//((RexCall) groupCall).operands.get(2)); -// default: Review comment: Let's just keep this comments. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] danny0405 commented on a change in pull request #8324: [FLINK-11921][table] Upgrade to calcite 1.19
danny0405 commented on a change in pull request #8324: [FLINK-11921][table] Upgrade to calcite 1.19 URL: https://github.com/apache/flink/pull/8324#discussion_r284092470 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/calcite/avatica/util/DateTimeUtils.java ## @@ -1011,6 +1103,16 @@ public static Calendar calendar() { return Calendar.getInstance(UTC_ZONE, Locale.ROOT); } + /** Returns whether a value is an {@code OffsetDateTime}. */ + public static boolean isOffsetDateTime(Object o) { + return OFFSET_DATE_TIME_HANDLER.isOffsetDateTime(o); + } + + /** Returns the value of a {@code OffsetDateTime} as a string. */ + public static String offsetDateTimeValue(Object o) { + return OFFSET_DATE_TIME_HANDLER.stringValue(o); + } Review comment: Does this mean if we do not have class "java.time.OffsetDateTime", we would always got an exception ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] danny0405 commented on a change in pull request #8324: [FLINK-11921][table] Upgrade to calcite 1.19
danny0405 commented on a change in pull request #8324: [FLINK-11921][table] Upgrade to calcite 1.19 URL: https://github.com/apache/flink/pull/8324#discussion_r284092067 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/calcite/avatica/util/DateTimeUtils.java ## @@ -1039,6 +1141,46 @@ public String getFraction() { return fraction; } } + + /** Deals with values of {@code java.time.OffsetDateTime} without introducing +* a compile-time dependency (because {@code OffsetDateTime} is only JDK 8 and +* higher). */ + private interface OffsetDateTimeHandler { + boolean isOffsetDateTime(Object o); + String stringValue(Object o); Review comment: Do we really need this interface `stringValue` ? The two implementations: one is static and one throws an exception, i didn't see why we should keep this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] danny0405 commented on a change in pull request #8324: [FLINK-11921][table] Upgrade to calcite 1.19
danny0405 commented on a change in pull request #8324: [FLINK-11921][table] Upgrade to calcite 1.19 URL: https://github.com/apache/flink/pull/8324#discussion_r284091738 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/calcite/avatica/util/DateTimeUtils.java ## @@ -882,6 +968,12 @@ public static int ymdToJulian(int year, int month, int day) { int a = (14 - month) / 12; int y = year + 4800 - a; int m = month + 12 * a - 3; +// return day + (153 * m + 2) / 5 +// + 365 * y +// + y / 4 +// - y / 100 +// + y / 400 Review comment: Useless comments This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] danny0405 commented on a change in pull request #8324: [FLINK-11921][table] Upgrade to calcite 1.19
danny0405 commented on a change in pull request #8324: [FLINK-11921][table] Upgrade to calcite 1.19 URL: https://github.com/apache/flink/pull/8324#discussion_r284091655 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/calcite/avatica/util/DateTimeUtils.java ## @@ -719,30 +757,49 @@ public static long timestampStringToUnixDate(String s) { } public static long unixDateExtract(TimeUnitRange range, long date) { - return julianExtract(range, (int) date + EPOCH_JULIAN); + switch (range) { + case EPOCH: + // no need to extract year/month/day, just multiply + return date * SECONDS_PER_DAY; + default: + return julianExtract(range, (int) date + EPOCH_JULIAN); + } } private static int julianExtract(TimeUnitRange range, int julian) { - // Algorithm the book "Astronomical Algorithms" by Jean Meeus, 1998 - int b, c; - if (julian > 2299160) { - int a = julian + 32044; - b = (4 * a + 3) / 146097; - c = a - b *146097 / 4; - } else { - b = 0; - c = julian + 32082; - } - int d = (4 * c + 3) / 1461; - int e = c - (1461 * d) / 4; - int m = (5 * e + 2) / 153; - int day = e - (153 * m + 2) / 5 + 1; - int month = m + 3 - 12 * (m / 10); - int year = b * 100 + d - 4800 + (m / 10); - + // this shifts the epoch back to astronomical year -4800 instead of the + // start of the Christian era in year AD 1 of the proleptic Gregorian + // calendar. + int j = julian + 32044; + int g = j / 146097; + int dg = j % 146097; + int c = (dg / 36524 + 1) * 3 / 4; + int dc = dg - c * 36524; + int b = dc / 1461; + int db = dc % 1461; + int a = (db / 365 + 1) * 3 / 4; + int da = db - a * 365; + + // integer number of full years elapsed since March 1, 4801 BC + int y = g * 400 + c * 100 + b * 4 + a; + // integer number of full months elapsed since the last March 1 + int m = (da * 5 + 308) / 153 - 2; + // number of days elapsed since day 1 of the month + int d = da - (m + 4) * 153 / 5 + 122; + int year = y - 4800 + (m + 2) / 12; Review comment: I have no idea whether this logic is right or not, let's just add enough Unit tests. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] danny0405 commented on a change in pull request #8324: [FLINK-11921][table] Upgrade to calcite 1.19
danny0405 commented on a change in pull request #8324: [FLINK-11921][table] Upgrade to calcite 1.19 URL: https://github.com/apache/flink/pull/8324#discussion_r284091504 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/calcite/avatica/util/DateTimeUtils.java ## @@ -379,6 +394,29 @@ private static void julianToString(StringBuilder buf, int julian) { int month = m + 3 - 12 * (m / 10); int year = b * 100 + d - 4800 + (m / 10); +// // this shifts the epoch back to astronomical year -4800 instead of the +// // start of the Christian era in year AD 1 of the proleptic Gregorian +// // calendar. +// int j = julian + 32044; +// int g = j / 146097; +// int dg = j % 146097; +// int c = (dg / 36524 + 1) * 3 / 4; +// int dc = dg - c * 36524; +// int b = dc / 1461; +// int db = dc % 1461; +// int a = (db / 365 + 1) * 3 / 4; +// int da = db - a * 365; +// +// // integer number of full years elapsed since March 1, 4801 BC +// int y = g * 400 + c * 100 + b * 4 + a; +// // integer number of full months elapsed since the last March 1 +// int m = (da * 5 + 308) / 153 - 2; +// // number of days elapsed since day 1 of the month +// int d = da - (m + 4) * 153 / 5 + 122; +// int year = y - 4800 + (m + 2) / 12; +// int month = (m + 2) % 12 + 1; +// int day = d + 1; Review comment: Useless comments This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] danny0405 commented on a change in pull request #8324: [FLINK-11921][table] Upgrade to calcite 1.19
danny0405 commented on a change in pull request #8324: [FLINK-11921][table] Upgrade to calcite 1.19 URL: https://github.com/apache/flink/pull/8324#discussion_r284091403 ## File path: flink-table/flink-table-planner/pom.xml ## @@ -136,17 +136,17 @@ under the License. org.apache.calcite calcite-core - 1.18.0 + 1.19.0
[jira] [Assigned] (FLINK-11607) Translate the "DataStream API Tutorial" page into Chinese
[ https://issues.apache.org/jira/browse/FLINK-11607?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhang Ziqiang reassigned FLINK-11607: - Assignee: Zhang Ziqiang (was: Benchao Li) > Translate the "DataStream API Tutorial" page into Chinese > - > > Key: FLINK-11607 > URL: https://issues.apache.org/jira/browse/FLINK-11607 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation >Reporter: Jark Wu >Assignee: Zhang Ziqiang >Priority: Major > > The page url is > https://ci.apache.org/projects/flink/flink-docs-master/tutorials/datastream_api.html > The markdown file is located in flink/docs/tutorials/datastream_api.zh.md > The markdown file will be created once FLINK-11529 is merged. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] dianfu commented on a change in pull request #8439: [FLINK-12485][python] Add completeness test for Table and TableEnviro…
dianfu commented on a change in pull request #8439: [FLINK-12485][python] Add completeness test for Table and TableEnviro… URL: https://github.com/apache/flink/pull/8439#discussion_r284080627 ## File path: flink-python/pyflink/table/tests/test_table_completeness.py ## @@ -0,0 +1,62 @@ +# ### +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from pyflink.testing.test_case_utils import PythonAPICompletenessTestCase +from pyflink.table import Table + + +class TableAPICompletenessTests(PythonAPICompletenessTestCase): +""" +Tests whether the Python :class:`Table` is consistent with +Java `org.apache.flink.table.api.Table`. +""" + +@classmethod +def python_class(cls): +return Table + +@classmethod +def java_class(cls): +return "org.apache.flink.table.api.Table" + +@classmethod +def excluded_methods(cls): +# row-based operators should be supported when UDFs supported in python. +return {'map', 'flatMap', 'flatAggregate', 'aggregate'} + +@classmethod +def java_method_name(cls, python_method_name): +""" +Due to 'as' is python keyword, so we use 'alias' +in Python API corresponding 'as' in Java API. + +:param python_method_name: +:return: +""" +return {'alias': 'as'}.get(python_method_name, python_method_name) + + +if __name__ == '__main__': +import unittest + +try: +import xmlrunner +testRunner = xmlrunner.XMLTestRunner(output='target/test-reports') +except ImportError: +testRunner = None +unittest.main(testRunner=testRunner, verbosity=2) Review comment: Add one empty line at the end of file This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #8439: [FLINK-12485][python] Add completeness test for Table and TableEnviro…
dianfu commented on a change in pull request #8439: [FLINK-12485][python] Add completeness test for Table and TableEnviro… URL: https://github.com/apache/flink/pull/8439#discussion_r284080392 ## File path: flink-python/pyflink/table/tests/test_environment_completeness.py ## @@ -0,0 +1,53 @@ +# ### Review comment: Removes the empty space here? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on issue #8439: [FLINK-12485][python] Add completeness test for Table and TableEnviro…
sunjincheng121 commented on issue #8439: [FLINK-12485][python] Add completeness test for Table and TableEnviro… URL: https://github.com/apache/flink/pull/8439#issuecomment-492495197 Thanks for the quick review! @dianfu I have update the PR according your comments! Best, Jincheng This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW edited a comment on issue #8362: [FLINK-11391] Introduce shuffle master interface
zhijiangW edited a comment on issue #8362: [FLINK-11391] Introduce shuffle master interface URL: https://github.com/apache/flink/pull/8362#issuecomment-492491082 Thanks for the confirmation @azagrebin . 1. Yes, I have not thought through the changes caused by single channel in `PartitionInfo` and all channels in `IGDD`. Just from the aspect of rpc call `taskManagerGateway.updatePartitions(partitionInfos)`, the parameter is a collection of `PartitionInfo` which is the same as array of channels in `IGDD`. Maybe the `IGDD` should support cache `ICDD` internally and replace the array with collection. It might involve in more refactoring and I would also further consider it. 2. From functional aspect the current way is no problem. But I was ever suggested in my PR not using `instanceof` via introducing the interface method `ChannelSelector#isBroadcast`. Because `instanceof` sounds like a hacky, not a proper solution. I am not sure whether it is not suggested in common sense atm, or maybe it is just a personal preference. I think you could confirm this way with other guys. :) BTW, I have not finished the whole review yet. I would continue on it later. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on issue #8362: [FLINK-11391] Introduce shuffle master interface
zhijiangW commented on issue #8362: [FLINK-11391] Introduce shuffle master interface URL: https://github.com/apache/flink/pull/8362#issuecomment-492491082 Thanks for the confirmation @azagrebin . 1. Yes, I have not thought through the changes caused by single channel in `PartitionInfo` and all channels in `IGDD`. Just from the aspect of rpc call `taskManagerGateway.updatePartitions(partitionInfos)`, the parameter is a collection of `PartitionInfo` which is the same as array of channels in `IGDD`. Maybe the `IGDD` should support cache `ICDD` internally and replace the array with collection. It might involve in more refactoring and I would also further consider it. 2. From functional aspect the current way is no problem. But I was ever suggested in my PR not using `instanceof` via introducing the interface method `ChannelSelector#isBroadcast`. Because `instanceof` sounds like a hacky, not a proper solution. I am not sure whether it is not suggested in common sense atm, or maybe it is just a personal preference. I think you could confirm this way with other guys. :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #8439: [FLINK-12485][python] Add completeness test for Table and TableEnviro…
dianfu commented on a change in pull request #8439: [FLINK-12485][python] Add completeness test for Table and TableEnviro… URL: https://github.com/apache/flink/pull/8439#discussion_r284063932 ## File path: flink-python/pyflink/testing/test_case_utils.py ## @@ -90,3 +92,82 @@ def setUp(self): super(PyFlinkBatchTableTestCase, self).setUp() self.t_config = TableConfig.Builder().as_batch_execution().set_parallelism(4).build() self.t_env = TableEnvironment.get_table_environment(self.t_config) + + +class PythonAPICompletenessTestCase(unittest.TestCase): +""" +Base class for python api completeness tests, i.e., +python api should be aligned with the Java API as much as possible. +""" + +@classmethod +def get_python_class_methods(cls, python_class): +return {cls.snake_to_camel(cls.java_method_name(method_name)) +for method_name in dir(python_class) if not method_name.startswith('_')} + +@staticmethod +def snake_to_camel(method_name): +output = ''.join(x.capitalize() or '_' for x in method_name.split('_')) +return output[0].lower() + output[1:] + +@staticmethod +def get_java_class_method(java_class): Review comment: get_java_class_method -> get_java_class_methods This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #8439: [FLINK-12485][python] Add completeness test for Table and TableEnviro…
dianfu commented on a change in pull request #8439: [FLINK-12485][python] Add completeness test for Table and TableEnviro… URL: https://github.com/apache/flink/pull/8439#discussion_r284065527 ## File path: flink-python/pyflink/testing/test_case_utils.py ## @@ -90,3 +92,82 @@ def setUp(self): super(PyFlinkBatchTableTestCase, self).setUp() self.t_config = TableConfig.Builder().as_batch_execution().set_parallelism(4).build() self.t_env = TableEnvironment.get_table_environment(self.t_config) + + +class PythonAPICompletenessTestCase(unittest.TestCase): +""" +Base class for python api completeness tests, i.e., +python api should be aligned with the Java API as much as possible. +""" + +@classmethod +def get_python_class_methods(cls, python_class): +return {cls.snake_to_camel(cls.java_method_name(method_name)) +for method_name in dir(python_class) if not method_name.startswith('_')} + +@staticmethod +def snake_to_camel(method_name): +output = ''.join(x.capitalize() or '_' for x in method_name.split('_')) +return output[0].lower() + output[1:] + +@staticmethod +def get_java_class_method(java_class): +gateway = get_gateway() +s = set() +method_arr = gateway.jvm.Class.forName(java_class).getDeclaredMethods() +for i in range(0, len(method_arr)): +s.add(method_arr[i].getName()) +return s + +@classmethod +def check_methods(cls): +java_methods = PythonAPICompletenessTestCase.get_java_class_method(cls.java_class()) +python_methods = cls.get_python_class_methods(cls.python_class()) +missing_methods = java_methods - python_methods - cls.exclude_methods() +if len(missing_methods) > 0: +print(missing_methods) +print('The Exception should be raised after FLINK-12407 is merged.') +# raise Exception('Methods: %s in Java class %s have not been added in Python class %s.' +#% (missing_methods, java_class, python_class)) Review comment: cls.java_class(), cls.python_class() This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #8439: [FLINK-12485][python] Add completeness test for Table and TableEnviro…
dianfu commented on a change in pull request #8439: [FLINK-12485][python] Add completeness test for Table and TableEnviro… URL: https://github.com/apache/flink/pull/8439#discussion_r284064889 ## File path: flink-python/pyflink/testing/test_case_utils.py ## @@ -90,3 +92,82 @@ def setUp(self): super(PyFlinkBatchTableTestCase, self).setUp() self.t_config = TableConfig.Builder().as_batch_execution().set_parallelism(4).build() self.t_env = TableEnvironment.get_table_environment(self.t_config) + + +class PythonAPICompletenessTestCase(unittest.TestCase): +""" +Base class for python api completeness tests, i.e., +python api should be aligned with the Java API as much as possible. +""" + +@classmethod +def get_python_class_methods(cls, python_class): +return {cls.snake_to_camel(cls.java_method_name(method_name)) +for method_name in dir(python_class) if not method_name.startswith('_')} + +@staticmethod +def snake_to_camel(method_name): +output = ''.join(x.capitalize() or '_' for x in method_name.split('_')) +return output[0].lower() + output[1:] + +@staticmethod +def get_java_class_method(java_class): +gateway = get_gateway() +s = set() +method_arr = gateway.jvm.Class.forName(java_class).getDeclaredMethods() +for i in range(0, len(method_arr)): +s.add(method_arr[i].getName()) +return s + +@classmethod +def check_methods(cls): +java_methods = PythonAPICompletenessTestCase.get_java_class_method(cls.java_class()) +python_methods = cls.get_python_class_methods(cls.python_class()) +missing_methods = java_methods - python_methods - cls.exclude_methods() +if len(missing_methods) > 0: +print(missing_methods) +print('The Exception should be raised after FLINK-12407 is merged.') +# raise Exception('Methods: %s in Java class %s have not been added in Python class %s.' +#% (missing_methods, java_class, python_class)) + +@classmethod +def java_method_name(cls, python_method_name): +""" +This method should be overWrite when the method name of the Python API cannot be +consistent with the Java API method name. we need to get the corresponding +Java API method name based on the Python method name. e.g.: 'as' is python +keyword, so we use 'alias' in Python API corresponding 'as' in Java API. + +:param python_method_name: Method name of Python API. +:return: Method name corresponding to Java API. +""" +return python_method_name + +@classmethod +@abstractmethod +def python_class(cls): +""" +Return the Python class that needs to be compared. such as :class:`Table`. +""" +pass + +@classmethod +@abstractmethod +def java_class(cls): +""" +Return the Java class that needs to be compared. such as `org.apache.flink.table.api.Table`. +""" +pass + +@classmethod +def exclude_methods(cls): +""" +Exclude method names that do not need to be checked. When adding excluded methods to the lists +you should give a good reason in a comment. +:return: +""" +return {} + +def test_completeness(self): +if self.python_class() is not None: Review comment: if self.python_class() is not None and self.java_class() is not None This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #8439: [FLINK-12485][python] Add completeness test for Table and TableEnviro…
dianfu commented on a change in pull request #8439: [FLINK-12485][python] Add completeness test for Table and TableEnviro… URL: https://github.com/apache/flink/pull/8439#discussion_r284062971 ## File path: flink-python/pyflink/table/tests/test_environment_completeness.py ## @@ -0,0 +1,53 @@ +# ### +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from pyflink.testing.test_case_utils import PythonAPICompletenessTestCase +from pyflink.table import TableEnvironment + + +class EnvironmentAPICompletenessTests(PythonAPICompletenessTestCase): +""" +Test whether the Python :class:`TableEnvironment` is consistent with Review comment: Test -> Tests This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #8439: [FLINK-12485][python] Add completeness test for Table and TableEnviro…
dianfu commented on a change in pull request #8439: [FLINK-12485][python] Add completeness test for Table and TableEnviro… URL: https://github.com/apache/flink/pull/8439#discussion_r284064639 ## File path: flink-python/pyflink/testing/test_case_utils.py ## @@ -90,3 +92,82 @@ def setUp(self): super(PyFlinkBatchTableTestCase, self).setUp() self.t_config = TableConfig.Builder().as_batch_execution().set_parallelism(4).build() self.t_env = TableEnvironment.get_table_environment(self.t_config) + + +class PythonAPICompletenessTestCase(unittest.TestCase): +""" +Base class for python api completeness tests, i.e., +python api should be aligned with the Java API as much as possible. +""" + +@classmethod +def get_python_class_methods(cls, python_class): +return {cls.snake_to_camel(cls.java_method_name(method_name)) +for method_name in dir(python_class) if not method_name.startswith('_')} + +@staticmethod +def snake_to_camel(method_name): +output = ''.join(x.capitalize() or '_' for x in method_name.split('_')) +return output[0].lower() + output[1:] + +@staticmethod +def get_java_class_method(java_class): +gateway = get_gateway() +s = set() +method_arr = gateway.jvm.Class.forName(java_class).getDeclaredMethods() +for i in range(0, len(method_arr)): +s.add(method_arr[i].getName()) +return s + +@classmethod +def check_methods(cls): +java_methods = PythonAPICompletenessTestCase.get_java_class_method(cls.java_class()) +python_methods = cls.get_python_class_methods(cls.python_class()) +missing_methods = java_methods - python_methods - cls.exclude_methods() +if len(missing_methods) > 0: +print(missing_methods) +print('The Exception should be raised after FLINK-12407 is merged.') +# raise Exception('Methods: %s in Java class %s have not been added in Python class %s.' +#% (missing_methods, java_class, python_class)) + +@classmethod +def java_method_name(cls, python_method_name): +""" +This method should be overWrite when the method name of the Python API cannot be +consistent with the Java API method name. we need to get the corresponding +Java API method name based on the Python method name. e.g.: 'as' is python +keyword, so we use 'alias' in Python API corresponding 'as' in Java API. + +:param python_method_name: Method name of Python API. +:return: Method name corresponding to Java API. Review comment: The corresponding method name of Java API. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #8439: [FLINK-12485][python] Add completeness test for Table and TableEnviro…
dianfu commented on a change in pull request #8439: [FLINK-12485][python] Add completeness test for Table and TableEnviro… URL: https://github.com/apache/flink/pull/8439#discussion_r284064278 ## File path: flink-python/pyflink/testing/test_case_utils.py ## @@ -90,3 +92,82 @@ def setUp(self): super(PyFlinkBatchTableTestCase, self).setUp() self.t_config = TableConfig.Builder().as_batch_execution().set_parallelism(4).build() self.t_env = TableEnvironment.get_table_environment(self.t_config) + + +class PythonAPICompletenessTestCase(unittest.TestCase): +""" +Base class for python api completeness tests, i.e., +python api should be aligned with the Java API as much as possible. +""" + +@classmethod +def get_python_class_methods(cls, python_class): +return {cls.snake_to_camel(cls.java_method_name(method_name)) +for method_name in dir(python_class) if not method_name.startswith('_')} + +@staticmethod +def snake_to_camel(method_name): +output = ''.join(x.capitalize() or '_' for x in method_name.split('_')) +return output[0].lower() + output[1:] + +@staticmethod +def get_java_class_method(java_class): +gateway = get_gateway() +s = set() +method_arr = gateway.jvm.Class.forName(java_class).getDeclaredMethods() +for i in range(0, len(method_arr)): +s.add(method_arr[i].getName()) +return s + +@classmethod +def check_methods(cls): +java_methods = PythonAPICompletenessTestCase.get_java_class_method(cls.java_class()) +python_methods = cls.get_python_class_methods(cls.python_class()) +missing_methods = java_methods - python_methods - cls.exclude_methods() +if len(missing_methods) > 0: +print(missing_methods) +print('The Exception should be raised after FLINK-12407 is merged.') +# raise Exception('Methods: %s in Java class %s have not been added in Python class %s.' +#% (missing_methods, java_class, python_class)) + +@classmethod +def java_method_name(cls, python_method_name): +""" +This method should be overWrite when the method name of the Python API cannot be Review comment: overWrite -> overwritten This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #8439: [FLINK-12485][python] Add completeness test for Table and TableEnviro…
dianfu commented on a change in pull request #8439: [FLINK-12485][python] Add completeness test for Table and TableEnviro… URL: https://github.com/apache/flink/pull/8439#discussion_r284064351 ## File path: flink-python/pyflink/testing/test_case_utils.py ## @@ -90,3 +92,82 @@ def setUp(self): super(PyFlinkBatchTableTestCase, self).setUp() self.t_config = TableConfig.Builder().as_batch_execution().set_parallelism(4).build() self.t_env = TableEnvironment.get_table_environment(self.t_config) + + +class PythonAPICompletenessTestCase(unittest.TestCase): +""" +Base class for python api completeness tests, i.e., +python api should be aligned with the Java API as much as possible. +""" + +@classmethod +def get_python_class_methods(cls, python_class): +return {cls.snake_to_camel(cls.java_method_name(method_name)) +for method_name in dir(python_class) if not method_name.startswith('_')} + +@staticmethod +def snake_to_camel(method_name): +output = ''.join(x.capitalize() or '_' for x in method_name.split('_')) +return output[0].lower() + output[1:] + +@staticmethod +def get_java_class_method(java_class): +gateway = get_gateway() +s = set() +method_arr = gateway.jvm.Class.forName(java_class).getDeclaredMethods() +for i in range(0, len(method_arr)): +s.add(method_arr[i].getName()) +return s + +@classmethod +def check_methods(cls): +java_methods = PythonAPICompletenessTestCase.get_java_class_method(cls.java_class()) +python_methods = cls.get_python_class_methods(cls.python_class()) +missing_methods = java_methods - python_methods - cls.exclude_methods() +if len(missing_methods) > 0: +print(missing_methods) +print('The Exception should be raised after FLINK-12407 is merged.') +# raise Exception('Methods: %s in Java class %s have not been added in Python class %s.' +#% (missing_methods, java_class, python_class)) + +@classmethod +def java_method_name(cls, python_method_name): +""" +This method should be overWrite when the method name of the Python API cannot be +consistent with the Java API method name. we need to get the corresponding Review comment: What about removing this line? `we need to get the corresponding Java API method name based on the Python method name.` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] Aitozi commented on issue #8441: [FLINK-12486][doc]clarify the storage of operator state and broadcast state individually
Aitozi commented on issue #8441: [FLINK-12486][doc]clarify the storage of operator state and broadcast state individually URL: https://github.com/apache/flink/pull/8441#issuecomment-492476714 e.. I think I have also updated the `state_backends.zh.md`, what do you point to @klion26 ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-12202) Consider introducing batch metric register in NetworkEnviroment
[ https://issues.apache.org/jira/browse/FLINK-12202?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16839963#comment-16839963 ] zhijiang commented on FLINK-12202: -- Hey [~azagrebin], as we confirmed before, this ticket is not needed because we would register metrics while creating batch of partitions/gates. So should we close this jira? > Consider introducing batch metric register in NetworkEnviroment > --- > > Key: FLINK-12202 > URL: https://issues.apache.org/jira/browse/FLINK-12202 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Metrics, Runtime / Network >Reporter: Andrey Zagrebin >Assignee: zhijiang >Priority: Major > > As we have some network specific metrics registered in TaskIOMetricGroup > (In/OutputBuffersGauge, In/OutputBufferPoolUsageGauge), we can introduce > batch metric registering in > NetworkEnviroment.registerMetrics(ProxyMetricGroup, partitions, gates), where > task passes its TaskIOMetricGroup into ProxyMetricGroup. This way we could > break a tie between task and NetworkEnviroment. > TaskIOMetricGroup.initializeBufferMetrics, In/OutputBuffersGauge, > In/OutputBufferPoolUsageGauge could be moved into > NetworkEnviroment.registerMetrics and network code. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] c4emmmm commented on a change in pull request #8402: [FLINK-12473][ml] Add the interface of ML pipeline and ML lib
c4e commented on a change in pull request #8402: [FLINK-12473][ml] Add the interface of ML pipeline and ML lib URL: https://github.com/apache/flink/pull/8402#discussion_r284071237 ## File path: flink-ml/flink-ml-api/src/main/java/org/apache/flink/ml/api/core/Pipeline.java ## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.api.core; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.ml.api.misc.param.Params; +import org.apache.flink.ml.util.persist.MLStageFactory; +import org.apache.flink.table.api.Table; + +import com.google.gson.Gson; +import com.google.gson.JsonArray; +import com.google.gson.JsonObject; + +import java.util.ArrayList; +import java.util.List; + +/** + * A pipeline is a linear workflow which chains {@link Estimator}s and {@link Transformer}s to + * execute an algorithm. + * + * A pipeline itself can either act as an Estimator or a Transformer, depending on the stages it + * includes. More specifically: + * + * + * If a Pipeline has an {@link Estimator}, one needs to call {@link Pipeline#fit(Table)} before use + * the pipeline as a {@link Transformer}. In this case the Pipeline is an {@link Estimator} and can + * produce a Pipeline as a {@link Model}. + * + * + * If a Pipeline has no {@link Estimator}, it is a {@link Transformer} and can be applied to a Table + * directly. In this case, {@link Pipeline#fit(Table)} will simply return the pipeline itself. + * + * + * + * In addition, a pipeline can also be used as a {@link PipelineStage} in another pipeline, just + * like an ordinary {@link Estimator} or {@link Transformer} as describe above. + */ +@PublicEvolving +public final class Pipeline implements Estimator, Transformer, + Model { + private static final long serialVersionUID = 1L; + private List stages; + private Params params; + + public Pipeline() { + this(new ArrayList<>()); + } + + public Pipeline(List stages) { + this.stages = stages; + this.params = new Params(); + } + + private static boolean isStageNeedFit(PipelineStage stage) { + return (stage instanceof Pipeline && ((Pipeline) stage).needFit()) || + (!(stage instanceof Pipeline) && stage instanceof Estimator); + } + + /** +* Appends a PipelineStage to the tail of this pipeline. +* +* @param stage the stage to be appended +*/ + public Pipeline appendStage(PipelineStage stage) { + stages.add(stage); + return this; + } + + /** +* Returns a list of all stages in this pipeline in order. +* +* @return a list of all stages in this pipeline in order. +*/ + public List getStages() { + return stages; + } + + /** +* Check whether the pipeline acts as an {@link Estimator} or not. When the return value is +* true, that means this pipeline contains an {@link Estimator} and thus users must invoke +* {@link #fit(Table)} before they can use this pipeline as a {@link Transformer}. Otherwise, +* the pipeline can be used as a {@link Transformer} directly. +* +* @return {@code true} if this pipeline has an Estimator, {@code false} otherwise +*/ + public boolean needFit() { + return this.getIndexOfLastEstimator() >= 0; + } + + public Params getParams() { + return params; + } + + //find the last Estimator or Pipeline that needs fit in stages, -1 stand for no Estimator in Pipeline + private int getIndexOfLastEstimator() { Review comment: I have renewed the Pipeline add left a comment. It's outside the code page so maybe you didn't see it. Sorry that i'm not familiar with github, maybe there's a better way to let you know. The new Pipeline has only final fields, except lastEstimatorIndex which is an internal cache. Every stage must be added via appendStage() method including in constructor or fromJson() method. The return of getStages() will be an
[jira] [Updated] (FLINK-12127) Move network related options to NetworkEnvironmentOptions
[ https://issues.apache.org/jira/browse/FLINK-12127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-12127: --- Labels: pull-request-available (was: ) > Move network related options to NetworkEnvironmentOptions > - > > Key: FLINK-12127 > URL: https://issues.apache.org/jira/browse/FLINK-12127 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > Labels: pull-request-available > > Some network related options in TaskManagerOptions could be moved into new > introduced `NetworkEnvironmentOptions` which would be used for different > shuffle services. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] flinkbot commented on issue #8445: [FLINK-12127][network, config] Move network related options form TaskManagerOptions and NettyConfig into NetworkEnvironmentOptions
flinkbot commented on issue #8445: [FLINK-12127][network,config] Move network related options form TaskManagerOptions and NettyConfig into NetworkEnvironmentOptions URL: https://github.com/apache/flink/pull/8445#issuecomment-492483428 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r284070033 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ## @@ -435,6 +439,12 @@ public boolean triggerCheckpoint(long timestamp, boolean isPeriodic) { triggerCheckpoint(timestamp, checkpointProperties, null, isPeriodic, false); return true; } catch (CheckpointException e) { + try { + long latestGeneratedCheckpointId = getCheckpointIdCounter().getAndIncrement(); Review comment: > BTW, I have one more important additional point, which is the `numUnsuccessfulCheckpointsTriggers` in checkpoint coordinator, which absolutely sounds like something that should now be moved into the failure manager, wdyt? After we provided `CheckpointFailureManager`, IMO the `numUnsuccessfulCheckpointsTriggers` is not valuable. Currently, it is incremented in the trigger lock of method `triggerCheckpoint`. The domain of trigger lock is the subset of the trigger phase which been `CheckpointFailureManager` understood. So the counting is not correct. And it is just for logging purpose. So I suggest we could remove it in this PR or in the third step (next step). If the logging is really necessary, we could do it again after we implemented the new counting logic based on checkpoint id sequence. What's your opinion? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on issue #8445: [FLINK-12127][network, config] Move network related options form TaskManagerOptions and NettyConfig into NetworkEnvironmentOptions
zhijiangW commented on issue #8445: [FLINK-12127][network,config] Move network related options form TaskManagerOptions and NettyConfig into NetworkEnvironmentOptions URL: https://github.com/apache/flink/pull/8445#issuecomment-492483315 @flinkbot attention @azagrebin This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW opened a new pull request #8445: [FLINK-12127][network, config] Move network related options form TaskManagerOptions and NettyConfig into NetworkEnvironmentOptions
zhijiangW opened a new pull request #8445: [FLINK-12127][network,config] Move network related options form TaskManagerOptions and NettyConfig into NetworkEnvironmentOptions URL: https://github.com/apache/flink/pull/8445 ## What is the purpose of the change *The introduced `NetworkEnvironmentOptions` could be used for different shuffle services in future, not only for current task manager. So it is better to extract network related parameters as independent options.* ## Brief change log - *Extract network related options from `TaskManagerOptions` into `NetworkEnvironmentOptions* - *Migrate network related options from `NettyConfig` into `NetworkEnvironmentOptions`* - *Modify the `config.md` for generating new `network_configuration.html`* - *Adjust the related tests* ## Verifying this change This change is a trivial rework without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-5243) Implement an example for BipartiteGraph
[ https://issues.apache.org/jira/browse/FLINK-5243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16839939#comment-16839939 ] Jasleen Kaur commented on FLINK-5243: - Thanks [~ivan.mushketyk]. Could I implement bipartite matching in the example? > Implement an example for BipartiteGraph > --- > > Key: FLINK-5243 > URL: https://issues.apache.org/jira/browse/FLINK-5243 > Project: Flink > Issue Type: Sub-task > Components: Library / Graph Processing (Gelly) >Reporter: Ivan Mushketyk >Priority: Major > Labels: beginner > > Should implement example for BipartiteGraph in gelly-examples project > similarly to examples for Graph class. > Depends on this: https://issues.apache.org/jira/browse/FLINK-2254 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] yanghua removed a comment on issue #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua removed a comment on issue #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#issuecomment-492471326 Irrelevant test error, Have created an issue to report it, will trigger rebuild~ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] danny0405 commented on issue #8324: [FLINK-11921][table] Upgrade to calcite 1.19
danny0405 commented on issue #8324: [FLINK-11921][table] Upgrade to calcite 1.19 URL: https://github.com/apache/flink/pull/8324#issuecomment-492478037 > @danny0405 Could you help to review this PR? Sure, i will try to finish this reviewing AFAIK. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #8439: [FLINK-12485][python] Add completeness test for Table and TableEnviro…
dianfu commented on a change in pull request #8439: [FLINK-12485][python] Add completeness test for Table and TableEnviro… URL: https://github.com/apache/flink/pull/8439#discussion_r284063074 ## File path: flink-python/pyflink/testing/test_case_utils.py ## @@ -90,3 +92,82 @@ def setUp(self): super(PyFlinkBatchTableTestCase, self).setUp() self.t_config = TableConfig.Builder().as_batch_execution().set_parallelism(4).build() self.t_env = TableEnvironment.get_table_environment(self.t_config) + + +class PythonAPICompletenessTestCase(unittest.TestCase): +""" +Base class for python api completeness tests, i.e., Review comment: python -> Python api -> API This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #8439: [FLINK-12485][python] Add completeness test for Table and TableEnviro…
dianfu commented on a change in pull request #8439: [FLINK-12485][python] Add completeness test for Table and TableEnviro… URL: https://github.com/apache/flink/pull/8439#discussion_r284063742 ## File path: flink-python/pyflink/testing/test_case_utils.py ## @@ -90,3 +92,82 @@ def setUp(self): super(PyFlinkBatchTableTestCase, self).setUp() self.t_config = TableConfig.Builder().as_batch_execution().set_parallelism(4).build() self.t_env = TableEnvironment.get_table_environment(self.t_config) + + +class PythonAPICompletenessTestCase(unittest.TestCase): +""" +Base class for python api completeness tests, i.e., +python api should be aligned with the Java API as much as possible. +""" + +@classmethod +def get_python_class_methods(cls, python_class): +return {cls.snake_to_camel(cls.java_method_name(method_name)) +for method_name in dir(python_class) if not method_name.startswith('_')} + +@staticmethod +def snake_to_camel(method_name): +output = ''.join(x.capitalize() or '_' for x in method_name.split('_')) +return output[0].lower() + output[1:] + +@staticmethod +def get_java_class_method(java_class): +gateway = get_gateway() +s = set() +method_arr = gateway.jvm.Class.forName(java_class).getDeclaredMethods() +for i in range(0, len(method_arr)): +s.add(method_arr[i].getName()) +return s + +@classmethod +def check_methods(cls): +java_methods = PythonAPICompletenessTestCase.get_java_class_method(cls.java_class()) +python_methods = cls.get_python_class_methods(cls.python_class()) +missing_methods = java_methods - python_methods - cls.exclude_methods() +if len(missing_methods) > 0: +print(missing_methods) +print('The Exception should be raised after FLINK-12407 is merged.') +# raise Exception('Methods: %s in Java class %s have not been added in Python class %s.' +#% (missing_methods, java_class, python_class)) + +@classmethod +def java_method_name(cls, python_method_name): +""" +This method should be overWrite when the method name of the Python API cannot be +consistent with the Java API method name. we need to get the corresponding +Java API method name based on the Python method name. e.g.: 'as' is python +keyword, so we use 'alias' in Python API corresponding 'as' in Java API. + +:param python_method_name: Method name of Python API. +:return: Method name corresponding to Java API. +""" +return python_method_name + +@classmethod +@abstractmethod +def python_class(cls): +""" +Return the Python class that needs to be compared. such as :class:`Table`. +""" +pass + +@classmethod +@abstractmethod +def java_class(cls): +""" +Return the Java class that needs to be compared. such as `org.apache.flink.table.api.Table`. +""" +pass + +@classmethod +def exclude_methods(cls): Review comment: exclude_methods -> excluded_methods This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on issue #8324: [FLINK-11921][table] Upgrade to calcite 1.19
KurtYoung commented on issue #8324: [FLINK-11921][table] Upgrade to calcite 1.19 URL: https://github.com/apache/flink/pull/8324#issuecomment-492471785 @danny0405 Could you help to review this PR? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11612) Translate the "Project Template for Java" page into Chinese
[ https://issues.apache.org/jira/browse/FLINK-11612?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16839915#comment-16839915 ] LakeShen commented on FLINK-11612: -- I will translate this page do my best > Translate the "Project Template for Java" page into Chinese > --- > > Key: FLINK-11612 > URL: https://issues.apache.org/jira/browse/FLINK-11612 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation >Reporter: Jark Wu >Assignee: LakeShen >Priority: Major > > The page url is > https://ci.apache.org/projects/flink/flink-docs-master/dev/projectsetup/java_api_quickstart.html > The markdown file is located in > flink/docs/dev/projectsetup/java_api_quickstart.zh.md > The markdown file will be created once FLINK-11529 is merged. > You can reference the translation from : > https://github.com/flink-china/1.6.0/blob/master/quickstart/java_api_quickstart.md -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-11612) Translate the "Project Template for Java" page into Chinese
[ https://issues.apache.org/jira/browse/FLINK-11612?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] LakeShen reassigned FLINK-11612: Assignee: LakeShen > Translate the "Project Template for Java" page into Chinese > --- > > Key: FLINK-11612 > URL: https://issues.apache.org/jira/browse/FLINK-11612 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation >Reporter: Jark Wu >Assignee: LakeShen >Priority: Major > > The page url is > https://ci.apache.org/projects/flink/flink-docs-master/dev/projectsetup/java_api_quickstart.html > The markdown file is located in > flink/docs/dev/projectsetup/java_api_quickstart.zh.md > The markdown file will be created once FLINK-11529 is merged. > You can reference the translation from : > https://github.com/flink-china/1.6.0/blob/master/quickstart/java_api_quickstart.md -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] yanghua commented on issue #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on issue #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#issuecomment-492471326 Irrelevant test error, Have created an issue to report it, will trigger rebuild~ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-12370) Integrated Travis for Python Table API
[ https://issues.apache.org/jira/browse/FLINK-12370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng closed FLINK-12370. --- Resolution: Fixed Fix Version/s: 1.9.0 Fixed in master: d95d395d92acca210e107663ed0e96c3285f0cf5 > Integrated Travis for Python Table API > -- > > Key: FLINK-12370 > URL: https://issues.apache.org/jira/browse/FLINK-12370 > Project: Flink > Issue Type: Sub-task > Components: API / Python >Affects Versions: 1.9.0 >Reporter: sunjincheng >Assignee: Wei Zhong >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 20m > Remaining Estimate: 0h > > Integrated Travis for Python Table API -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12512) TableSourceTest#testNestedProject test failed
vinoyang created FLINK-12512: Summary: TableSourceTest#testNestedProject test failed Key: FLINK-12512 URL: https://issues.apache.org/jira/browse/FLINK-12512 Project: Flink Issue Type: Improvement Components: Table SQL / API Reporter: vinoyang {code:java} 20:41:59.128 [ERROR] testNestedProject(org.apache.flink.table.api.stream.table.TableSourceTest) Time elapsed: 0.047 s <<< FAILURE! org.junit.ComparisonFailure: null expected:<...deepNested.nested2.f[lag AS nestedFlag, deepNested.nested2.num AS nestedNum]) StreamTableSourceScan(table=[[T]], fields=[id, deepNested, nested], source=[TestSource(read nested fields: id.*, deepNested.nested2.num, deepNested.nested2.flag], deepNested.nested1...> but was:<...deepNested.nested2.f[1 AS nestedFlag, deepNested.nested2.f0 AS nestedNum]) StreamTableSourceScan(table=[[T]], fields=[id, deepNested, nested], source=[TestSource(read nested fields: id.*, deepNested.nested2.f1, deepNested.nested2.f0], deepNested.nested1...> at org.apache.flink.table.api.stream.table.TableSourceTest.testNestedProject(TableSourceTest.scala:375) {code} log details : [https://api.travis-ci.org/v3/job/532319575/log.txt] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] asfgit closed pull request #8392: [FLINK-12370][python][travis] Integrated Travis for Python Table API.
asfgit closed pull request #8392: [FLINK-12370][python][travis] Integrated Travis for Python Table API. URL: https://github.com/apache/flink/pull/8392 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-11840) Introduce FlinkLogicalSnapshot
[ https://issues.apache.org/jira/browse/FLINK-11840?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kurt Young closed FLINK-11840. -- Resolution: Duplicate Fix Version/s: 1.9.0 > Introduce FlinkLogicalSnapshot > -- > > Key: FLINK-11840 > URL: https://issues.apache.org/jira/browse/FLINK-11840 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Planner >Reporter: godfrey he >Assignee: godfrey he >Priority: Major > Fix For: 1.9.0 > > > {{Snapshot}} hasn't been introduced into Calcite yet. It's a whole new > feature. Related changes include parser, sql validator, relational expression > and so on. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] dianfu commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.
dianfu commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API. URL: https://github.com/apache/flink/pull/8401#discussion_r284057878 ## File path: flink-python/pyflink/table/table.py ## @@ -106,6 +113,344 @@ def where(self, predicate): """ return Table(self._j_table.where(predicate)) +def group_by(self, fields): +""" +Groups the elements on some grouping keys. Use this before a selection with aggregations +to perform the aggregation on a per-group basis. Similar to a SQL GROUP BY statement. + +Example: +:: +>>> tab.group_by("key").select("key, value.avg") + +:param fields: Group keys. +:return: The grouped table. +""" +return GroupedTable(self._j_table.groupBy(fields)) + +def distinct(self): +""" +Removes duplicate values and returns onl +Example: +:: +>>> tab.select("key, value").distinct() + +:return: Result table. +""" +return Table(self._j_table.distinct()) + +def join(self, right, join_predicate=None): +""" +Joins two :class:`Table`s. Similar to a SQL join. The fields of the two joined +operations must not overlap, use :func:`~pyflink.table.Table.alias` to rename fields if +necessary. You can use where and select clauses after a join to further specify the +behaviour of the join. + +.. note:: +Both tables must be bound to the same :class:`TableEnvironment` . + +Example: +:: +>>> left.join(right).where("a = b && c > 3").select("a, b, d") +>>> left.join(right, "a = b") + +:param right: Right table. +:param join_predicate: Optional, the join predicate expression string. +:return: Result table. +""" +if join_predicate is not None: +return Table(self._j_table.join(right._j_table, join_predicate)) +else: +return Table(self._j_table.join(right._j_table)) + +def left_outer_join(self, right, join_predicate=None): +""" +Joins two :class:`Table`s. Similar to a SQL left outer join. The fields of the two joined +operations must not overlap, use :func:`~pyflink.table.Table.as_` to rename fields if +necessary. + +.. note:: +Both tables must be bound to the same :class:`TableEnvironment` and its +:class:`TableConfig` must have null check enabled (default). + +Example: +:: +>>> left.left_outer_join(right).select("a, b, d") +>>> left.left_outer_join(right, "a = b").select("a, b, d") + +:param right: Right table. +:param join_predicate: Optional, the join predicate expression string. +:return: Result table. +""" +if join_predicate is None: +return Table(self._j_table.leftOuterJoin(right._j_table)) +else: +return Table(self._j_table.leftOuterJoin( +right._j_table, join_predicate)) + +def right_outer_join(self, right, join_predicate): +""" +Joins two :class:`Table`s. Similar to a SQL right outer join. The fields of the two joined +operations must not overlap, use :func:`~pyflink.table.Table.as_` to rename fields if +necessary. + +.. note:: +Both tables must be bound to the same :class:`TableEnvironment` and its +:class:`TableConfig` must have null check enabled (default). + +Example: +:: +>>> left.right_outer_join(right, "a = b").select("a, b, d") + +:param right: Right table. +:param join_predicate: The join predicate expression string. +:return: Result table. +""" +return Table(self._j_table.rightOuterJoin( +right._j_table, join_predicate)) + +def full_outer_join(self, right, join_predicate): +""" +Joins two :class:`Table`s. Similar to a SQL full outer join. The fields of the two joined +operations must not overlap, use :func:`~pyflink.table.Table.as_` to rename fields if +necessary. + +.. note:: +Both tables must be bound to the same :class:`TableEnvironment` and its +:class:`TableConfig` must have null check enabled (default). + +Example: +:: +>>> left.full_outer_join(right, "a = b").select("a, b, d") + +:param right: Right table. +:param join_predicate: The join predicate expression string. +:return: Result table. +""" +return Table(self._j_table.fullOuterJoin( +right._j_table, join_predicate)) + +def minus(self, right): +""" +Minus of two :class:`Table`s with duplicate records removed. +Similar to a SQL EXCEPT clause. Minus returns records from the left table
[GitHub] [flink] klion26 commented on issue #8432: Release 1.8
klion26 commented on issue #8432: Release 1.8 URL: https://github.com/apache/flink/pull/8432#issuecomment-492467084 @sxganapa could you please close this pr which wants to merge release-1.8 to master This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on issue #8392: [FLINK-12370][python][travis] Integrated Travis for Python Table API.
sunjincheng121 commented on issue #8392: [FLINK-12370][python][travis] Integrated Travis for Python Table API. URL: https://github.com/apache/flink/pull/8392#issuecomment-492466610 Thanks for the fix @WeiZhong94 LGTM. +1 to merged @flinkbot approve all This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.
sunjincheng121 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API. URL: https://github.com/apache/flink/pull/8401#discussion_r284055809 ## File path: flink-python/pyflink/table/tests/test_batch_table_api.py ## @@ -0,0 +1,418 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# # distributed under the License is distributed on an "AS IS" BASIS, +# # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# # See the License for the specific language governing permissions and +# # limitations under the License. + +import os + +from pyflink.table.types import DataTypes +from pyflink.table.window import Tumble, Slide, Session + +from pyflink.testing.test_case_utils import PyFlinkBatchTableTestCase + + +class BatchTableTests(PyFlinkBatchTableTestCase): Review comment: Thanks for your reply @dianfu! I think it again, you are right, add base test class which can run both Batch and Stream is pretty good, but from the points of CI time cost view, for Python API wen only need check the `batchEvn` work well, and some special operators which only for batch need be check, other common operators test for stream is enough, because Python API only proxy to Java API, and Java API's test already cover the batch and stream. It's this make sense to you? @WeiZhong94 @dianfu This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.
sunjincheng121 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API. URL: https://github.com/apache/flink/pull/8401#discussion_r284054367 ## File path: flink-python/pyflink/table/table.py ## @@ -106,6 +113,344 @@ def where(self, predicate): """ return Table(self._j_table.where(predicate)) +def group_by(self, fields): +""" +Groups the elements on some grouping keys. Use this before a selection with aggregations +to perform the aggregation on a per-group basis. Similar to a SQL GROUP BY statement. + +Example: +:: +>>> tab.group_by("key").select("key, value.avg") + +:param fields: Group keys. +:return: The grouped table. +""" +return GroupedTable(self._j_table.groupBy(fields)) + +def distinct(self): +""" +Removes duplicate values and returns onl +Example: +:: +>>> tab.select("key, value").distinct() + +:return: Result table. +""" +return Table(self._j_table.distinct()) + +def join(self, right, join_predicate=None): +""" +Joins two :class:`Table`s. Similar to a SQL join. The fields of the two joined +operations must not overlap, use :func:`~pyflink.table.Table.alias` to rename fields if +necessary. You can use where and select clauses after a join to further specify the +behaviour of the join. + +.. note:: +Both tables must be bound to the same :class:`TableEnvironment` . + +Example: +:: +>>> left.join(right).where("a = b && c > 3").select("a, b, d") +>>> left.join(right, "a = b") + +:param right: Right table. +:param join_predicate: Optional, the join predicate expression string. +:return: Result table. +""" +if join_predicate is not None: +return Table(self._j_table.join(right._j_table, join_predicate)) +else: +return Table(self._j_table.join(right._j_table)) + +def left_outer_join(self, right, join_predicate=None): +""" +Joins two :class:`Table`s. Similar to a SQL left outer join. The fields of the two joined +operations must not overlap, use :func:`~pyflink.table.Table.as_` to rename fields if +necessary. + +.. note:: +Both tables must be bound to the same :class:`TableEnvironment` and its +:class:`TableConfig` must have null check enabled (default). + +Example: +:: +>>> left.left_outer_join(right).select("a, b, d") +>>> left.left_outer_join(right, "a = b").select("a, b, d") + +:param right: Right table. +:param join_predicate: Optional, the join predicate expression string. +:return: Result table. +""" +if join_predicate is None: +return Table(self._j_table.leftOuterJoin(right._j_table)) +else: +return Table(self._j_table.leftOuterJoin( +right._j_table, join_predicate)) + +def right_outer_join(self, right, join_predicate): +""" +Joins two :class:`Table`s. Similar to a SQL right outer join. The fields of the two joined +operations must not overlap, use :func:`~pyflink.table.Table.as_` to rename fields if +necessary. + +.. note:: +Both tables must be bound to the same :class:`TableEnvironment` and its +:class:`TableConfig` must have null check enabled (default). + +Example: +:: +>>> left.right_outer_join(right, "a = b").select("a, b, d") + +:param right: Right table. +:param join_predicate: The join predicate expression string. +:return: Result table. +""" +return Table(self._j_table.rightOuterJoin( +right._j_table, join_predicate)) + +def full_outer_join(self, right, join_predicate): +""" +Joins two :class:`Table`s. Similar to a SQL full outer join. The fields of the two joined +operations must not overlap, use :func:`~pyflink.table.Table.as_` to rename fields if +necessary. + +.. note:: +Both tables must be bound to the same :class:`TableEnvironment` and its +:class:`TableConfig` must have null check enabled (default). + +Example: +:: +>>> left.full_outer_join(right, "a = b").select("a, b, d") + +:param right: Right table. +:param join_predicate: The join predicate expression string. +:return: Result table. +""" +return Table(self._j_table.fullOuterJoin( +right._j_table, join_predicate)) + +def minus(self, right): +""" +Minus of two :class:`Table`s with duplicate records removed. +Similar to a SQL EXCEPT clause. Minus returns records from the left
[GitHub] [flink] asfgit closed pull request #8434: [FLINK-12234][hive] Support view related operations in HiveCatalog
asfgit closed pull request #8434: [FLINK-12234][hive] Support view related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8434 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-12234) Support view related operations in HiveCatalog
[ https://issues.apache.org/jira/browse/FLINK-12234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li closed FLINK-12234. Resolution: Fixed merged in 1.9.0: 544903d5cc77d6f90e0d77cb21ebf008b435e444 > Support view related operations in HiveCatalog > -- > > Key: FLINK-12234 > URL: https://issues.apache.org/jira/browse/FLINK-12234 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 0.5h > Remaining Estimate: 0h > > Support view related operations in HiveCatalog, which implements > ReadableWritableCatalog API -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-12505) Unify database operations to HiveCatalogBase from its subclasses
[ https://issues.apache.org/jira/browse/FLINK-12505?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li closed FLINK-12505. Resolution: Fixed merged in 1.9.0: f7ecfdbcdf0f1ab17097ebbe1d170b984961bf60 > Unify database operations to HiveCatalogBase from its subclasses > > > Key: FLINK-12505 > URL: https://issues.apache.org/jira/browse/FLINK-12505 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Hive >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > Currently each subclass of HiveCatalogBase has its own impl for catalog > related database APIs. We should unify them as much as possible -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] asfgit closed pull request #8433: [FLINK-12505][hive] Unify database operations to HiveCatalogBase from its subclasses
asfgit closed pull request #8433: [FLINK-12505][hive] Unify database operations to HiveCatalogBase from its subclasses URL: https://github.com/apache/flink/pull/8433 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on issue #8434: [FLINK-12234][hive] Support view related operations in HiveCatalog
bowenli86 commented on issue #8434: [FLINK-12234][hive] Support view related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8434#issuecomment-492395614 merging... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on issue #8433: [FLINK-12505][hive] Unify database operations to HiveCatalogBase from its subclasses
bowenli86 commented on issue #8433: [FLINK-12505][hive] Unify database operations to HiveCatalogBase from its subclasses URL: https://github.com/apache/flink/pull/8433#issuecomment-492395677 merging... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8434: [FLINK-12234][hive] Support view related operations in HiveCatalog
bowenli86 commented on a change in pull request #8434: [FLINK-12234][hive] Support view related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8434#discussion_r283981020 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogView.java ## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.hive; + +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogView; +import org.apache.flink.util.StringUtils; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A Hive catalog view implementation. + */ +public class HiveCatalogView implements CatalogView { + // Original text of the view definition. + private final String originalQuery; + + // Expanded text of the original view definition + // This is needed because the context such as current DB is + // lost after the session, in which view is defined, is gone. + // Expanded query text takes care of the this, as an example. + private final String expandedQuery; + + // Schema of the view (column names and types) + private final TableSchema tableSchema; + // Properties of the view + private final Map properties; + // Comment of the view + private String comment = "This is a hive catalog view."; Review comment: agree. This inherited behavior has been bothering me for a while. Opened FLINK-12511 to address this in all catalog metadata classes This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-12511) make variable "comment" in all catalog metadata classes final
Bowen Li created FLINK-12511: Summary: make variable "comment" in all catalog metadata classes final Key: FLINK-12511 URL: https://issues.apache.org/jira/browse/FLINK-12511 Project: Flink Issue Type: Improvement Affects Versions: 1.9.0 Reporter: Bowen Li Assignee: Bowen Li Fix For: 1.9.0 Because of historical reasons, currently the variable "comment" in all catalog metadata classes are not final yet, it has a default value and can be overwritten in constructor. It creates problems like overloaded constructors are built in a wrong way. We should remove the default value of "comment" and make it final that can only be assigned value upon construction. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] bowenli86 commented on a change in pull request #8434: [FLINK-12234][hive] Support view related operations in HiveCatalog
bowenli86 commented on a change in pull request #8434: [FLINK-12234][hive] Support view related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8434#discussion_r283981020 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogView.java ## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.hive; + +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogView; +import org.apache.flink.util.StringUtils; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A Hive catalog view implementation. + */ +public class HiveCatalogView implements CatalogView { + // Original text of the view definition. + private final String originalQuery; + + // Expanded text of the original view definition + // This is needed because the context such as current DB is + // lost after the session, in which view is defined, is gone. + // Expanded query text takes care of the this, as an example. + private final String expandedQuery; + + // Schema of the view (column names and types) + private final TableSchema tableSchema; + // Properties of the view + private final Map properties; + // Comment of the view + private String comment = "This is a hive catalog view."; Review comment: agree. This inherited behavior has been bothering me for a while. Opened FLINK-12511 for this This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sjwiesman commented on a change in pull request #8437: [FLINK-12508] Expand Testing Documentation
sjwiesman commented on a change in pull request #8437: [FLINK-12508] Expand Testing Documentation URL: https://github.com/apache/flink/pull/8437#discussion_r283954075 ## File path: docs/dev/stream/testing.md ## @@ -23,138 +23,387 @@ specific language governing permissions and limitations under the License. --> -This page briefly discusses how to test a Flink application in your IDE or a local environment. +Testing is an integral part of every software development process. As such Apache Flink comes with tooling to test your Apache Flink application code on different levels of the testing pyramid. * This will be replaced by the TOC {:toc} -## Unit testing +## Testing User-Defined Functions -Usually, one can assume that Flink produces correct results outside of a user-defined `Function`. Therefore, it is recommended to test `Function` classes that contain the main business logic with unit tests as much as possible. +Usually, one can assume that Flink produces correct results outside of a user-defined function. Therefore, it is recommended to test these classes that contain the main business logic with unit tests as much as possible. -For example if one implements the following `ReduceFunction`: +### Unit Testing Stateless, Timeless UDFs + + +For example, let's take the following stateless `MapFunction`. {% highlight java %} -public class SumReduce implements ReduceFunction { +public class IncrementMapFunction implements MapFunction { @Override -public Long reduce(Long value1, Long value2) throws Exception { -return value1 + value2; +public Long map(Long record) throws Exception { +return record +1 ; } } {% endhighlight %} {% highlight scala %} -class SumReduce extends ReduceFunction[Long] { +class IncrementMapFunction extends MapFunction[Long, Long] { -override def reduce(value1: java.lang.Long, value2: java.lang.Long): java.lang.Long = { -value1 + value2 +override def map(record: java.lang.Long): java.lang.Long = { +record + 1 } } {% endhighlight %} -It is very easy to unit test it with your favorite framework by passing suitable arguments and verify the output: +It is very easy to unit test such a function with your favorite testing framework by passing suitable arguments and verifying the output. {% highlight java %} -public class SumReduceTest { +public class IncrementMapFunctionTest { @Test -public void testSum() throws Exception { +public void testIncrement() throws Exception { // instantiate your function -SumReduce sumReduce = new SumReduce(); +IncrementMapFunction incrementer = new IncrementMapFunction(); // call the methods that you have implemented -assertEquals(42L, sumReduce.reduce(40L, 2L)); +assertEquals(3L, incrementer.map(2L)); } } {% endhighlight %} {% highlight scala %} -class SumReduceTest extends FlatSpec with Matchers { +class IncrementMapFunctionTest extends FlatSpec with Matchers { + +"IncrementMapFunction" should "increment values" in { +// instantiate your function +val incrementer: IncrementMapFunction = new IncrementMapFunction() + +// call the methods that you have implemented +incremeter.map(2) should be (3) +} +} +{% endhighlight %} + + + +Similarly, a user-defined function which uses an `org.apache.flink.util.Collector` (e.g. a `FlatMapFunction` or `ProcessFunction`) can be easily tested by providing a mock object instead of a real collector. A `FlatMapFunction` with the same functionality as the `IncrementMapFunction` could be unit tested as follows. + + + +{% highlight java %} +public class IncrementFlatMapFunctionTest { -"SumReduce" should "add values" in { +@Test +public void testIncrement() throws Exception { // instantiate your function -val sumReduce: SumReduce = new SumReduce() +IncrementFlatMapFunction incrementer = new IncrementFlatMapFunction(); + +Collector collector = mock(Collector.class); // call the methods that you have implemented -sumReduce.reduce(40L, 2L) should be (42L) +incrementer.flatMap(2L, collector) + +//verify collector was called with the right output +Mockito.verify(collector, times(1)).collect(3L); } } {% endhighlight %} + + +{% highlight scala %} +class IncrementFlatMapFunctionTest extends FlatSpec with MockFactory { + +"IncrementFlatMapFunction" should "increment values" in { + // instantiate your function + val incrementer : IncrementFlatMapFunction = new IncrementFlatMapFunction() + + val collector = mock[Collector[Integer]] + + //verify collector was called with the right output + (collector.collect _).expects(3) + + // call the methods that you have implemented + flattenFunction.flatMap(2, collector) + } +} +{% endhighlight %} +
[GitHub] [flink] sjwiesman commented on a change in pull request #8437: [FLINK-12508] Expand Testing Documentation
sjwiesman commented on a change in pull request #8437: [FLINK-12508] Expand Testing Documentation URL: https://github.com/apache/flink/pull/8437#discussion_r283959368 ## File path: docs/dev/stream/testing.md ## @@ -181,85 +430,76 @@ public class ExampleIntegrationTest extends AbstractTestBase { {% highlight scala %} -class ExampleIntegrationTest extends AbstractTestBase { +class StreamingJobIntegrationTest extends FlatSpec with Matchers with BeforeAndAfter { -@Test -def testMultiply(): Unit = { -val env = StreamExecutionEnvironment.getExecutionEnvironment + val flinkCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder() +.setNumberSlotsPerTaskManager(1) +.setNumberTaskManagers(1) +.build) -// configure your test environment -env.setParallelism(1) + before { +flinkCluster.before() + } -// values are collected in a static variable -CollectSink.values.clear() + after { +flinkCluster.after() + } -// create a stream of custom elements and apply transformations -env -.fromElements(1L, 21L, 22L) -.map(new MultiplyByTwo()) -.addSink(new CollectSink()) -// execute -env.execute() + "IncrementFlatMapFunction pipeline" should "incrementValues" in { -// verify your results -assertEquals(Lists.newArrayList(2L, 42L, 44L), CollectSink.values) -} -} +val env = StreamExecutionEnvironment.getExecutionEnvironment + +// configure your test environment +env.setParallelism(2) +// values are collected in a static variable +CollectSink.values.clear() + +// create a stream of custom elements and apply transformations +env.fromElements(1, 21, 22) + .map(new IncrementMapFunction()) + .addSink(new CollectSink()) + +// execute +env.execute() + +// verify your results +CollectSink.values should contain allOf (1,22,23) +} +} // create a testing sink class CollectSink extends SinkFunction[Long] { -override def invoke(value: java.lang.Long): Unit = { -synchronized { -values.add(value) -} + override def invoke(value: Long): Unit = { +synchronized { + CollectSink.values.add(value) } + } } object CollectSink { - // must be static -val values: List[Long] = new ArrayList() +val values: util.List[Long] = new util.ArrayList() } {% endhighlight %} -The static variable in `CollectSink` is used here because Flink serializes all operators before distributing them across a cluster. -Communicating with operators instantiated by a local Flink mini cluster via static variables is one way around this issue. -Alternatively, you could for example write the data to files in a temporary directory with your test sink. -You can also implement your own custom sources for emitting watermarks. - -## Testing checkpointing and state handling +A few remarks on integration testing with `MiniClusterWithClientResource`: -One way to test state handling is to enable checkpointing in integration tests. +* In order not to copy your whole pipeline code from production to test, make sources and sinks pluggable in your production code and inject special test sources and test sinks in your tests. -You can do that by configuring your `StreamExecutionEnvironment` in the test: - - - -{% highlight java %} -env.enableCheckpointing(500); -env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100)); -{% endhighlight %} - - - -{% highlight scala %} -env.enableCheckpointing(500) -env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100)) -{% endhighlight %} - - +* The static variable in `CollectSink` is used here because Flink serializes all operators before distributing them across a cluster. +Communicating with operators instantiated by a local Flink mini cluster via static variables is one way around this issue. +Alternatively, you could for example write the data to files in a temporary directory with your test sink. -And for example adding to your Flink application an identity mapper operator that will throw an exception -once every `1000ms`. However writing such test could be tricky because of time dependencies between the actions. +* You can also implement your own custom *parallel* source function for emitting watermarks. Review comment: I feel like "also" implies the previous sentence was also about managing watermarks. ```suggestion * You can implement a custom *parallel* source function for emitting watermarks if your job uses event timer timers. ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at:
[GitHub] [flink] sjwiesman commented on a change in pull request #8437: [FLINK-12508] Expand Testing Documentation
sjwiesman commented on a change in pull request #8437: [FLINK-12508] Expand Testing Documentation URL: https://github.com/apache/flink/pull/8437#discussion_r283957833 ## File path: docs/dev/stream/testing.md ## @@ -23,138 +23,387 @@ specific language governing permissions and limitations under the License. --> -This page briefly discusses how to test a Flink application in your IDE or a local environment. +Testing is an integral part of every software development process. As such Apache Flink comes with tooling to test your Apache Flink application code on different levels of the testing pyramid. * This will be replaced by the TOC {:toc} -## Unit testing +## Testing User-Defined Functions -Usually, one can assume that Flink produces correct results outside of a user-defined `Function`. Therefore, it is recommended to test `Function` classes that contain the main business logic with unit tests as much as possible. +Usually, one can assume that Flink produces correct results outside of a user-defined function. Therefore, it is recommended to test these classes that contain the main business logic with unit tests as much as possible. -For example if one implements the following `ReduceFunction`: +### Unit Testing Stateless, Timeless UDFs + + +For example, let's take the following stateless `MapFunction`. {% highlight java %} -public class SumReduce implements ReduceFunction { +public class IncrementMapFunction implements MapFunction { @Override -public Long reduce(Long value1, Long value2) throws Exception { -return value1 + value2; +public Long map(Long record) throws Exception { +return record +1 ; } } {% endhighlight %} {% highlight scala %} -class SumReduce extends ReduceFunction[Long] { +class IncrementMapFunction extends MapFunction[Long, Long] { -override def reduce(value1: java.lang.Long, value2: java.lang.Long): java.lang.Long = { -value1 + value2 +override def map(record: java.lang.Long): java.lang.Long = { +record + 1 } } {% endhighlight %} -It is very easy to unit test it with your favorite framework by passing suitable arguments and verify the output: +It is very easy to unit test such a function with your favorite testing framework by passing suitable arguments and verifying the output. {% highlight java %} -public class SumReduceTest { +public class IncrementMapFunctionTest { @Test -public void testSum() throws Exception { +public void testIncrement() throws Exception { // instantiate your function -SumReduce sumReduce = new SumReduce(); +IncrementMapFunction incrementer = new IncrementMapFunction(); // call the methods that you have implemented -assertEquals(42L, sumReduce.reduce(40L, 2L)); +assertEquals(3L, incrementer.map(2L)); } } {% endhighlight %} {% highlight scala %} -class SumReduceTest extends FlatSpec with Matchers { +class IncrementMapFunctionTest extends FlatSpec with Matchers { + +"IncrementMapFunction" should "increment values" in { +// instantiate your function +val incrementer: IncrementMapFunction = new IncrementMapFunction() + +// call the methods that you have implemented +incremeter.map(2) should be (3) +} +} +{% endhighlight %} + + + +Similarly, a user-defined function which uses an `org.apache.flink.util.Collector` (e.g. a `FlatMapFunction` or `ProcessFunction`) can be easily tested by providing a mock object instead of a real collector. A `FlatMapFunction` with the same functionality as the `IncrementMapFunction` could be unit tested as follows. + + + +{% highlight java %} +public class IncrementFlatMapFunctionTest { -"SumReduce" should "add values" in { +@Test +public void testIncrement() throws Exception { // instantiate your function -val sumReduce: SumReduce = new SumReduce() +IncrementFlatMapFunction incrementer = new IncrementFlatMapFunction(); + +Collector collector = mock(Collector.class); // call the methods that you have implemented -sumReduce.reduce(40L, 2L) should be (42L) +incrementer.flatMap(2L, collector) + +//verify collector was called with the right output +Mockito.verify(collector, times(1)).collect(3L); } } {% endhighlight %} + + +{% highlight scala %} +class IncrementFlatMapFunctionTest extends FlatSpec with MockFactory { + +"IncrementFlatMapFunction" should "increment values" in { + // instantiate your function + val incrementer : IncrementFlatMapFunction = new IncrementFlatMapFunction() + + val collector = mock[Collector[Integer]] + + //verify collector was called with the right output + (collector.collect _).expects(3) + + // call the methods that you have implemented + flattenFunction.flatMap(2, collector) + } +} +{% endhighlight %} +
[GitHub] [flink] sjwiesman commented on a change in pull request #8437: [FLINK-12508] Expand Testing Documentation
sjwiesman commented on a change in pull request #8437: [FLINK-12508] Expand Testing Documentation URL: https://github.com/apache/flink/pull/8437#discussion_r283954443 ## File path: docs/dev/stream/testing.md ## @@ -23,138 +23,387 @@ specific language governing permissions and limitations under the License. --> -This page briefly discusses how to test a Flink application in your IDE or a local environment. +Testing is an integral part of every software development process. As such Apache Flink comes with tooling to test your Apache Flink application code on different levels of the testing pyramid. * This will be replaced by the TOC {:toc} -## Unit testing +## Testing User-Defined Functions -Usually, one can assume that Flink produces correct results outside of a user-defined `Function`. Therefore, it is recommended to test `Function` classes that contain the main business logic with unit tests as much as possible. +Usually, one can assume that Flink produces correct results outside of a user-defined function. Therefore, it is recommended to test these classes that contain the main business logic with unit tests as much as possible. -For example if one implements the following `ReduceFunction`: +### Unit Testing Stateless, Timeless UDFs + + +For example, let's take the following stateless `MapFunction`. {% highlight java %} -public class SumReduce implements ReduceFunction { +public class IncrementMapFunction implements MapFunction { @Override -public Long reduce(Long value1, Long value2) throws Exception { -return value1 + value2; +public Long map(Long record) throws Exception { +return record +1 ; } } {% endhighlight %} {% highlight scala %} -class SumReduce extends ReduceFunction[Long] { +class IncrementMapFunction extends MapFunction[Long, Long] { -override def reduce(value1: java.lang.Long, value2: java.lang.Long): java.lang.Long = { -value1 + value2 +override def map(record: java.lang.Long): java.lang.Long = { +record + 1 } } {% endhighlight %} -It is very easy to unit test it with your favorite framework by passing suitable arguments and verify the output: +It is very easy to unit test such a function with your favorite testing framework by passing suitable arguments and verifying the output. {% highlight java %} -public class SumReduceTest { +public class IncrementMapFunctionTest { @Test -public void testSum() throws Exception { +public void testIncrement() throws Exception { // instantiate your function -SumReduce sumReduce = new SumReduce(); +IncrementMapFunction incrementer = new IncrementMapFunction(); // call the methods that you have implemented -assertEquals(42L, sumReduce.reduce(40L, 2L)); +assertEquals(3L, incrementer.map(2L)); } } {% endhighlight %} {% highlight scala %} -class SumReduceTest extends FlatSpec with Matchers { +class IncrementMapFunctionTest extends FlatSpec with Matchers { + +"IncrementMapFunction" should "increment values" in { +// instantiate your function +val incrementer: IncrementMapFunction = new IncrementMapFunction() + +// call the methods that you have implemented +incremeter.map(2) should be (3) +} +} +{% endhighlight %} + + + +Similarly, a user-defined function which uses an `org.apache.flink.util.Collector` (e.g. a `FlatMapFunction` or `ProcessFunction`) can be easily tested by providing a mock object instead of a real collector. A `FlatMapFunction` with the same functionality as the `IncrementMapFunction` could be unit tested as follows. + + + +{% highlight java %} +public class IncrementFlatMapFunctionTest { -"SumReduce" should "add values" in { +@Test +public void testIncrement() throws Exception { // instantiate your function -val sumReduce: SumReduce = new SumReduce() +IncrementFlatMapFunction incrementer = new IncrementFlatMapFunction(); + +Collector collector = mock(Collector.class); // call the methods that you have implemented -sumReduce.reduce(40L, 2L) should be (42L) +incrementer.flatMap(2L, collector) + +//verify collector was called with the right output +Mockito.verify(collector, times(1)).collect(3L); } } {% endhighlight %} + + +{% highlight scala %} +class IncrementFlatMapFunctionTest extends FlatSpec with MockFactory { + +"IncrementFlatMapFunction" should "increment values" in { + // instantiate your function + val incrementer : IncrementFlatMapFunction = new IncrementFlatMapFunction() + + val collector = mock[Collector[Integer]] + + //verify collector was called with the right output + (collector.collect _).expects(3) + + // call the methods that you have implemented + flattenFunction.flatMap(2, collector) + } +} +{% endhighlight %} +
[GitHub] [flink] sjwiesman commented on a change in pull request #8437: [FLINK-12508] Expand Testing Documentation
sjwiesman commented on a change in pull request #8437: [FLINK-12508] Expand Testing Documentation URL: https://github.com/apache/flink/pull/8437#discussion_r283958605 ## File path: docs/dev/stream/testing.md ## @@ -181,85 +430,76 @@ public class ExampleIntegrationTest extends AbstractTestBase { {% highlight scala %} -class ExampleIntegrationTest extends AbstractTestBase { +class StreamingJobIntegrationTest extends FlatSpec with Matchers with BeforeAndAfter { -@Test -def testMultiply(): Unit = { -val env = StreamExecutionEnvironment.getExecutionEnvironment + val flinkCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder() +.setNumberSlotsPerTaskManager(1) +.setNumberTaskManagers(1) +.build) -// configure your test environment -env.setParallelism(1) + before { +flinkCluster.before() + } -// values are collected in a static variable -CollectSink.values.clear() + after { +flinkCluster.after() + } -// create a stream of custom elements and apply transformations -env -.fromElements(1L, 21L, 22L) -.map(new MultiplyByTwo()) -.addSink(new CollectSink()) -// execute -env.execute() + "IncrementFlatMapFunction pipeline" should "incrementValues" in { -// verify your results -assertEquals(Lists.newArrayList(2L, 42L, 44L), CollectSink.values) -} -} +val env = StreamExecutionEnvironment.getExecutionEnvironment + +// configure your test environment +env.setParallelism(2) +// values are collected in a static variable +CollectSink.values.clear() + +// create a stream of custom elements and apply transformations +env.fromElements(1, 21, 22) + .map(new IncrementMapFunction()) + .addSink(new CollectSink()) + +// execute +env.execute() + +// verify your results +CollectSink.values should contain allOf (1,22,23) +} +} // create a testing sink class CollectSink extends SinkFunction[Long] { -override def invoke(value: java.lang.Long): Unit = { -synchronized { -values.add(value) -} + override def invoke(value: Long): Unit = { +synchronized { + CollectSink.values.add(value) } + } } object CollectSink { - // must be static -val values: List[Long] = new ArrayList() +val values: util.List[Long] = new util.ArrayList() } {% endhighlight %} -The static variable in `CollectSink` is used here because Flink serializes all operators before distributing them across a cluster. -Communicating with operators instantiated by a local Flink mini cluster via static variables is one way around this issue. -Alternatively, you could for example write the data to files in a temporary directory with your test sink. -You can also implement your own custom sources for emitting watermarks. - -## Testing checkpointing and state handling +A few remarks on integration testing with `MiniClusterWithClientResource`: -One way to test state handling is to enable checkpointing in integration tests. +* In order not to copy your whole pipeline code from production to test, make sources and sinks pluggable in your production code and inject special test sources and test sinks in your tests. -You can do that by configuring your `StreamExecutionEnvironment` in the test: - - - -{% highlight java %} -env.enableCheckpointing(500); -env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100)); -{% endhighlight %} - - - -{% highlight scala %} -env.enableCheckpointing(500) -env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100)) -{% endhighlight %} - - +* The static variable in `CollectSink` is used here because Flink serializes all operators before distributing them across a cluster. +Communicating with operators instantiated by a local Flink mini cluster via static variables is one way around this issue. +Alternatively, you could for example write the data to files in a temporary directory with your test sink. Review comment: Unneccessary ```suggestion Alternatively, you could write the data to files in a temporary directory with your test sink. ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sjwiesman commented on a change in pull request #8437: [FLINK-12508] Expand Testing Documentation
sjwiesman commented on a change in pull request #8437: [FLINK-12508] Expand Testing Documentation URL: https://github.com/apache/flink/pull/8437#discussion_r283963231 ## File path: docs/dev/stream/testing.md ## @@ -23,138 +23,387 @@ specific language governing permissions and limitations under the License. --> -This page briefly discusses how to test a Flink application in your IDE or a local environment. +Testing is an integral part of every software development process. As such Apache Flink comes with tooling to test your Apache Flink application code on different levels of the testing pyramid. * This will be replaced by the TOC {:toc} -## Unit testing +## Testing User-Defined Functions -Usually, one can assume that Flink produces correct results outside of a user-defined `Function`. Therefore, it is recommended to test `Function` classes that contain the main business logic with unit tests as much as possible. +Usually, one can assume that Flink produces correct results outside of a user-defined function. Therefore, it is recommended to test these classes that contain the main business logic with unit tests as much as possible. Review comment: ```suggestion Usually, one can assume that Flink produces correct results outside of a user-defined function. Therefore, it is recommended to test those classes that contain the main business logic with unit tests as much as possible. ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sjwiesman commented on a change in pull request #8437: [FLINK-12508] Expand Testing Documentation
sjwiesman commented on a change in pull request #8437: [FLINK-12508] Expand Testing Documentation URL: https://github.com/apache/flink/pull/8437#discussion_r283951933 ## File path: docs/dev/stream/testing.md ## @@ -23,138 +23,387 @@ specific language governing permissions and limitations under the License. --> -This page briefly discusses how to test a Flink application in your IDE or a local environment. +Testing is an integral part of every software development process. As such Apache Flink comes with tooling to test your Apache Flink application code on different levels of the testing pyramid. Review comment: The first sentence is not a complete sentence. ```suggestion Testing is an integral part of every software development process as such Apache Flink comes with tooling to test your application code on multiple levels of the testing pyramid. ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sjwiesman commented on a change in pull request #8437: [FLINK-12508] Expand Testing Documentation
sjwiesman commented on a change in pull request #8437: [FLINK-12508] Expand Testing Documentation URL: https://github.com/apache/flink/pull/8437#discussion_r283962452 ## File path: docs/dev/stream/testing.md ## @@ -181,85 +430,76 @@ public class ExampleIntegrationTest extends AbstractTestBase { {% highlight scala %} -class ExampleIntegrationTest extends AbstractTestBase { +class StreamingJobIntegrationTest extends FlatSpec with Matchers with BeforeAndAfter { -@Test -def testMultiply(): Unit = { -val env = StreamExecutionEnvironment.getExecutionEnvironment + val flinkCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder() +.setNumberSlotsPerTaskManager(1) +.setNumberTaskManagers(1) +.build) -// configure your test environment -env.setParallelism(1) + before { +flinkCluster.before() + } -// values are collected in a static variable -CollectSink.values.clear() + after { +flinkCluster.after() + } -// create a stream of custom elements and apply transformations -env -.fromElements(1L, 21L, 22L) -.map(new MultiplyByTwo()) -.addSink(new CollectSink()) -// execute -env.execute() + "IncrementFlatMapFunction pipeline" should "incrementValues" in { -// verify your results -assertEquals(Lists.newArrayList(2L, 42L, 44L), CollectSink.values) -} -} +val env = StreamExecutionEnvironment.getExecutionEnvironment + +// configure your test environment +env.setParallelism(2) +// values are collected in a static variable +CollectSink.values.clear() + +// create a stream of custom elements and apply transformations +env.fromElements(1, 21, 22) + .map(new IncrementMapFunction()) + .addSink(new CollectSink()) + +// execute +env.execute() + +// verify your results +CollectSink.values should contain allOf (1,22,23) +} +} // create a testing sink class CollectSink extends SinkFunction[Long] { -override def invoke(value: java.lang.Long): Unit = { -synchronized { -values.add(value) -} + override def invoke(value: Long): Unit = { +synchronized { + CollectSink.values.add(value) } + } } object CollectSink { - // must be static -val values: List[Long] = new ArrayList() +val values: util.List[Long] = new util.ArrayList() } {% endhighlight %} -The static variable in `CollectSink` is used here because Flink serializes all operators before distributing them across a cluster. -Communicating with operators instantiated by a local Flink mini cluster via static variables is one way around this issue. -Alternatively, you could for example write the data to files in a temporary directory with your test sink. -You can also implement your own custom sources for emitting watermarks. - -## Testing checkpointing and state handling +A few remarks on integration testing with `MiniClusterWithClientResource`: -One way to test state handling is to enable checkpointing in integration tests. +* In order not to copy your whole pipeline code from production to test, make sources and sinks pluggable in your production code and inject special test sources and test sinks in your tests. -You can do that by configuring your `StreamExecutionEnvironment` in the test: - - - -{% highlight java %} -env.enableCheckpointing(500); -env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100)); -{% endhighlight %} - - - -{% highlight scala %} -env.enableCheckpointing(500) -env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100)) -{% endhighlight %} - - +* The static variable in `CollectSink` is used here because Flink serializes all operators before distributing them across a cluster. +Communicating with operators instantiated by a local Flink mini cluster via static variables is one way around this issue. +Alternatively, you could for example write the data to files in a temporary directory with your test sink. -And for example adding to your Flink application an identity mapper operator that will throw an exception -once every `1000ms`. However writing such test could be tricky because of time dependencies between the actions. +* You can also implement your own custom *parallel* source function for emitting watermarks. -Another approach is to write a unit test using the Flink internal testing utility `AbstractStreamOperatorTestHarness` from the `flink-streaming-java` module. +* It is recommended to always test your pipelines locally with a parallelism > 1 in order to identify bugs which only surface, when the pipelines is executed in parallel. -For an example of how to do that please have a look at the `org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest` also in the `flink-streaming-java` module. +* If you use `@Rule` instead
[GitHub] [flink] sjwiesman commented on a change in pull request #8437: [FLINK-12508] Expand Testing Documentation
sjwiesman commented on a change in pull request #8437: [FLINK-12508] Expand Testing Documentation URL: https://github.com/apache/flink/pull/8437#discussion_r283961380 ## File path: docs/dev/stream/testing.md ## @@ -181,85 +430,76 @@ public class ExampleIntegrationTest extends AbstractTestBase { {% highlight scala %} -class ExampleIntegrationTest extends AbstractTestBase { +class StreamingJobIntegrationTest extends FlatSpec with Matchers with BeforeAndAfter { -@Test -def testMultiply(): Unit = { -val env = StreamExecutionEnvironment.getExecutionEnvironment + val flinkCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder() +.setNumberSlotsPerTaskManager(1) +.setNumberTaskManagers(1) +.build) -// configure your test environment -env.setParallelism(1) + before { +flinkCluster.before() + } -// values are collected in a static variable -CollectSink.values.clear() + after { +flinkCluster.after() + } -// create a stream of custom elements and apply transformations -env -.fromElements(1L, 21L, 22L) -.map(new MultiplyByTwo()) -.addSink(new CollectSink()) -// execute -env.execute() + "IncrementFlatMapFunction pipeline" should "incrementValues" in { -// verify your results -assertEquals(Lists.newArrayList(2L, 42L, 44L), CollectSink.values) -} -} +val env = StreamExecutionEnvironment.getExecutionEnvironment + +// configure your test environment +env.setParallelism(2) +// values are collected in a static variable +CollectSink.values.clear() + +// create a stream of custom elements and apply transformations +env.fromElements(1, 21, 22) + .map(new IncrementMapFunction()) + .addSink(new CollectSink()) + +// execute +env.execute() + +// verify your results +CollectSink.values should contain allOf (1,22,23) +} +} // create a testing sink class CollectSink extends SinkFunction[Long] { -override def invoke(value: java.lang.Long): Unit = { -synchronized { -values.add(value) -} + override def invoke(value: Long): Unit = { +synchronized { + CollectSink.values.add(value) } + } } object CollectSink { - // must be static -val values: List[Long] = new ArrayList() +val values: util.List[Long] = new util.ArrayList() } {% endhighlight %} -The static variable in `CollectSink` is used here because Flink serializes all operators before distributing them across a cluster. -Communicating with operators instantiated by a local Flink mini cluster via static variables is one way around this issue. -Alternatively, you could for example write the data to files in a temporary directory with your test sink. -You can also implement your own custom sources for emitting watermarks. - -## Testing checkpointing and state handling +A few remarks on integration testing with `MiniClusterWithClientResource`: -One way to test state handling is to enable checkpointing in integration tests. +* In order not to copy your whole pipeline code from production to test, make sources and sinks pluggable in your production code and inject special test sources and test sinks in your tests. -You can do that by configuring your `StreamExecutionEnvironment` in the test: - - - -{% highlight java %} -env.enableCheckpointing(500); -env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100)); -{% endhighlight %} - - - -{% highlight scala %} -env.enableCheckpointing(500) -env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100)) -{% endhighlight %} - - +* The static variable in `CollectSink` is used here because Flink serializes all operators before distributing them across a cluster. +Communicating with operators instantiated by a local Flink mini cluster via static variables is one way around this issue. +Alternatively, you could for example write the data to files in a temporary directory with your test sink. -And for example adding to your Flink application an identity mapper operator that will throw an exception -once every `1000ms`. However writing such test could be tricky because of time dependencies between the actions. +* You can also implement your own custom *parallel* source function for emitting watermarks. -Another approach is to write a unit test using the Flink internal testing utility `AbstractStreamOperatorTestHarness` from the `flink-streaming-java` module. +* It is recommended to always test your pipelines locally with a parallelism > 1 in order to identify bugs which only surface, when the pipelines is executed in parallel. -For an example of how to do that please have a look at the `org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest` also in the `flink-streaming-java` module. +* If you use `@Rule` instead
[GitHub] [flink] sjwiesman commented on a change in pull request #8437: [FLINK-12508] Expand Testing Documentation
sjwiesman commented on a change in pull request #8437: [FLINK-12508] Expand Testing Documentation URL: https://github.com/apache/flink/pull/8437#discussion_r283959906 ## File path: docs/dev/stream/testing.md ## @@ -181,85 +430,76 @@ public class ExampleIntegrationTest extends AbstractTestBase { {% highlight scala %} -class ExampleIntegrationTest extends AbstractTestBase { +class StreamingJobIntegrationTest extends FlatSpec with Matchers with BeforeAndAfter { -@Test -def testMultiply(): Unit = { -val env = StreamExecutionEnvironment.getExecutionEnvironment + val flinkCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder() +.setNumberSlotsPerTaskManager(1) +.setNumberTaskManagers(1) +.build) -// configure your test environment -env.setParallelism(1) + before { +flinkCluster.before() + } -// values are collected in a static variable -CollectSink.values.clear() + after { +flinkCluster.after() + } -// create a stream of custom elements and apply transformations -env -.fromElements(1L, 21L, 22L) -.map(new MultiplyByTwo()) -.addSink(new CollectSink()) -// execute -env.execute() + "IncrementFlatMapFunction pipeline" should "incrementValues" in { -// verify your results -assertEquals(Lists.newArrayList(2L, 42L, 44L), CollectSink.values) -} -} +val env = StreamExecutionEnvironment.getExecutionEnvironment + +// configure your test environment +env.setParallelism(2) +// values are collected in a static variable +CollectSink.values.clear() + +// create a stream of custom elements and apply transformations +env.fromElements(1, 21, 22) + .map(new IncrementMapFunction()) + .addSink(new CollectSink()) + +// execute +env.execute() + +// verify your results +CollectSink.values should contain allOf (1,22,23) +} +} // create a testing sink class CollectSink extends SinkFunction[Long] { -override def invoke(value: java.lang.Long): Unit = { -synchronized { -values.add(value) -} + override def invoke(value: Long): Unit = { +synchronized { + CollectSink.values.add(value) } + } } object CollectSink { - // must be static -val values: List[Long] = new ArrayList() +val values: util.List[Long] = new util.ArrayList() } {% endhighlight %} -The static variable in `CollectSink` is used here because Flink serializes all operators before distributing them across a cluster. -Communicating with operators instantiated by a local Flink mini cluster via static variables is one way around this issue. -Alternatively, you could for example write the data to files in a temporary directory with your test sink. -You can also implement your own custom sources for emitting watermarks. - -## Testing checkpointing and state handling +A few remarks on integration testing with `MiniClusterWithClientResource`: -One way to test state handling is to enable checkpointing in integration tests. +* In order not to copy your whole pipeline code from production to test, make sources and sinks pluggable in your production code and inject special test sources and test sinks in your tests. -You can do that by configuring your `StreamExecutionEnvironment` in the test: - - - -{% highlight java %} -env.enableCheckpointing(500); -env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100)); -{% endhighlight %} - - - -{% highlight scala %} -env.enableCheckpointing(500) -env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100)) -{% endhighlight %} - - +* The static variable in `CollectSink` is used here because Flink serializes all operators before distributing them across a cluster. +Communicating with operators instantiated by a local Flink mini cluster via static variables is one way around this issue. +Alternatively, you could for example write the data to files in a temporary directory with your test sink. -And for example adding to your Flink application an identity mapper operator that will throw an exception -once every `1000ms`. However writing such test could be tricky because of time dependencies between the actions. +* You can also implement your own custom *parallel* source function for emitting watermarks. -Another approach is to write a unit test using the Flink internal testing utility `AbstractStreamOperatorTestHarness` from the `flink-streaming-java` module. +* It is recommended to always test your pipelines locally with a parallelism > 1 in order to identify bugs which only surface, when the pipelines is executed in parallel. Review comment: ```suggestion * It is recommended to always test your pipelines locally with a parallelism > 1 to identify bugs which only surface for the pipelines executed in parallel. ```
[GitHub] [flink] sjwiesman commented on a change in pull request #8437: [FLINK-12508] Expand Testing Documentation
sjwiesman commented on a change in pull request #8437: [FLINK-12508] Expand Testing Documentation URL: https://github.com/apache/flink/pull/8437#discussion_r283958118 ## File path: docs/dev/stream/testing.md ## @@ -23,138 +23,387 @@ specific language governing permissions and limitations under the License. --> -This page briefly discusses how to test a Flink application in your IDE or a local environment. +Testing is an integral part of every software development process. As such Apache Flink comes with tooling to test your Apache Flink application code on different levels of the testing pyramid. * This will be replaced by the TOC {:toc} -## Unit testing +## Testing User-Defined Functions -Usually, one can assume that Flink produces correct results outside of a user-defined `Function`. Therefore, it is recommended to test `Function` classes that contain the main business logic with unit tests as much as possible. +Usually, one can assume that Flink produces correct results outside of a user-defined function. Therefore, it is recommended to test these classes that contain the main business logic with unit tests as much as possible. -For example if one implements the following `ReduceFunction`: +### Unit Testing Stateless, Timeless UDFs + + +For example, let's take the following stateless `MapFunction`. {% highlight java %} -public class SumReduce implements ReduceFunction { +public class IncrementMapFunction implements MapFunction { @Override -public Long reduce(Long value1, Long value2) throws Exception { -return value1 + value2; +public Long map(Long record) throws Exception { +return record +1 ; } } {% endhighlight %} {% highlight scala %} -class SumReduce extends ReduceFunction[Long] { +class IncrementMapFunction extends MapFunction[Long, Long] { -override def reduce(value1: java.lang.Long, value2: java.lang.Long): java.lang.Long = { -value1 + value2 +override def map(record: java.lang.Long): java.lang.Long = { +record + 1 } } {% endhighlight %} -It is very easy to unit test it with your favorite framework by passing suitable arguments and verify the output: +It is very easy to unit test such a function with your favorite testing framework by passing suitable arguments and verifying the output. {% highlight java %} -public class SumReduceTest { +public class IncrementMapFunctionTest { @Test -public void testSum() throws Exception { +public void testIncrement() throws Exception { // instantiate your function -SumReduce sumReduce = new SumReduce(); +IncrementMapFunction incrementer = new IncrementMapFunction(); // call the methods that you have implemented -assertEquals(42L, sumReduce.reduce(40L, 2L)); +assertEquals(3L, incrementer.map(2L)); } } {% endhighlight %} {% highlight scala %} -class SumReduceTest extends FlatSpec with Matchers { +class IncrementMapFunctionTest extends FlatSpec with Matchers { + +"IncrementMapFunction" should "increment values" in { +// instantiate your function +val incrementer: IncrementMapFunction = new IncrementMapFunction() + +// call the methods that you have implemented +incremeter.map(2) should be (3) +} +} +{% endhighlight %} + + + +Similarly, a user-defined function which uses an `org.apache.flink.util.Collector` (e.g. a `FlatMapFunction` or `ProcessFunction`) can be easily tested by providing a mock object instead of a real collector. A `FlatMapFunction` with the same functionality as the `IncrementMapFunction` could be unit tested as follows. + + + +{% highlight java %} +public class IncrementFlatMapFunctionTest { -"SumReduce" should "add values" in { +@Test +public void testIncrement() throws Exception { // instantiate your function -val sumReduce: SumReduce = new SumReduce() +IncrementFlatMapFunction incrementer = new IncrementFlatMapFunction(); + +Collector collector = mock(Collector.class); // call the methods that you have implemented -sumReduce.reduce(40L, 2L) should be (42L) +incrementer.flatMap(2L, collector) + +//verify collector was called with the right output +Mockito.verify(collector, times(1)).collect(3L); } } {% endhighlight %} + + +{% highlight scala %} +class IncrementFlatMapFunctionTest extends FlatSpec with MockFactory { + +"IncrementFlatMapFunction" should "increment values" in { + // instantiate your function + val incrementer : IncrementFlatMapFunction = new IncrementFlatMapFunction() + + val collector = mock[Collector[Integer]] + + //verify collector was called with the right output + (collector.collect _).expects(3) + + // call the methods that you have implemented + flattenFunction.flatMap(2, collector) + } +} +{% endhighlight %} +
[GitHub] [flink] sjwiesman commented on a change in pull request #8437: [FLINK-12508] Expand Testing Documentation
sjwiesman commented on a change in pull request #8437: [FLINK-12508] Expand Testing Documentation URL: https://github.com/apache/flink/pull/8437#discussion_r283952634 ## File path: docs/dev/stream/testing.md ## @@ -23,138 +23,387 @@ specific language governing permissions and limitations under the License. --> -This page briefly discusses how to test a Flink application in your IDE or a local environment. +Testing is an integral part of every software development process. As such Apache Flink comes with tooling to test your Apache Flink application code on different levels of the testing pyramid. * This will be replaced by the TOC {:toc} -## Unit testing +## Testing User-Defined Functions -Usually, one can assume that Flink produces correct results outside of a user-defined `Function`. Therefore, it is recommended to test `Function` classes that contain the main business logic with unit tests as much as possible. +Usually, one can assume that Flink produces correct results outside of a user-defined function. Therefore, it is recommended to test these classes that contain the main business logic with unit tests as much as possible. -For example if one implements the following `ReduceFunction`: +### Unit Testing Stateless, Timeless UDFs + + +For example, let's take the following stateless `MapFunction`. {% highlight java %} -public class SumReduce implements ReduceFunction { +public class IncrementMapFunction implements MapFunction { @Override -public Long reduce(Long value1, Long value2) throws Exception { -return value1 + value2; +public Long map(Long record) throws Exception { +return record +1 ; } } {% endhighlight %} {% highlight scala %} -class SumReduce extends ReduceFunction[Long] { +class IncrementMapFunction extends MapFunction[Long, Long] { -override def reduce(value1: java.lang.Long, value2: java.lang.Long): java.lang.Long = { -value1 + value2 +override def map(record: java.lang.Long): java.lang.Long = { Review comment: Scala has its own primitive types that auto coerces to the correct java boxed or unboxed primitive. ```suggestion override def map(record: Long): Long = { ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-10190) Unable to use custom endpoint in Kinesis producer
[ https://issues.apache.org/jira/browse/FLINK-10190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-10190: --- Labels: pull-request-available (was: ) > Unable to use custom endpoint in Kinesis producer > - > > Key: FLINK-10190 > URL: https://issues.apache.org/jira/browse/FLINK-10190 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Affects Versions: 1.6.0 >Reporter: Sergei Poganshev >Priority: Major > Labels: pull-request-available > > There's a check in > [KinesisConfigUtil|https://github.com/apache/flink/blob/7d034d4ef6986ba5ccda6f5e8c587b8fdd88be8e/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java#L269] > that validates the fact that either AWS_REGION or AWS_ENDPOINT is specified > (not both), while Kinesis producer requires a region in any case (even with > custom endpoint). > Also the error message for that validation outputs AWS_REGION twice. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] flinkbot commented on issue #8444: [FLINK-10190] [Kinesis Connector] Allow AWS_REGION to be supplied along with custom Kinesis endpoint
flinkbot commented on issue #8444: [FLINK-10190] [Kinesis Connector] Allow AWS_REGION to be supplied along with custom Kinesis endpoint URL: https://github.com/apache/flink/pull/8444#issuecomment-492376902 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] skidder opened a new pull request #8444: [FLINK-10190] [Kinesis Connector] Allow AWS_REGION to be supplied along with custom Kinesis endpoint
skidder opened a new pull request #8444: [FLINK-10190] [Kinesis Connector] Allow AWS_REGION to be supplied along with custom Kinesis endpoint URL: https://github.com/apache/flink/pull/8444 ## What is the purpose of the change Allow an AWS region (e.g. `us-east-1`) to be supplied along with a custom Kinesis endpoint in the Kinesis configuration. Using a [Kinesis VPC endpoint](https://aws.amazon.com/blogs/aws/new-aws-privatelink-endpoints-kinesis-ec2-systems-manager-and-elb-apis-in-your-vpc/) can prevent traffic leaving the AWS network. This greatly reduces costs by eliminating NAT traversal and cross-AZ transfers. However, this requires that the Kinesis endpoint URL be signed for a specific region. This is accomplished by including the region name along with the endpoint URL when constructing the Kinesis client. ## Brief change log - Permit both region and endpoint to be set - Set the region on the endpoint configuration when both region and endpoint are set ## Verifying this change This change added tests and can be verified by running connector unit tests. ## Does this pull request potentially affect one of the following parts: No ## Documentation Documentation for the Kinesis connector has been updated. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-12504) NullPoint here NullPointException there.. It's every where
[ https://issues.apache.org/jira/browse/FLINK-12504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16839718#comment-16839718 ] Ken Krugler commented on FLINK-12504: - If I Google "Flink user mailing list", the first hit is for [https://flink.apache.org/community.html#mailing-lists] On this page is a Subscribe link for the *user*@flink.apache.org mailing list. > NullPoint here NullPointException there.. It's every where > -- > > Key: FLINK-12504 > URL: https://issues.apache.org/jira/browse/FLINK-12504 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Reporter: Chethan UK >Priority: Major > > I was trying to push data from Kafka to Cassandra, after around 220K > sometimes, 300K points are pushed into C*, this > java.lang.NullPointerException throws in.. > ``` > java.lang.NullPointerException > at > org.apache.flink.api.common.functions.util.PrintSinkOutputWriter.write(PrintSinkOutputWriter.java:73) > at > org.apache.flink.streaming.api.functions.sink.PrintSinkFunction.invoke(PrintSinkFunction.java:81) > at > org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:649) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:602) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) > at > org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398) > at > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185) > at > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:711) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:93) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:97) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:748) > ``` > How can normal Flink Users understand these error? The Job's keep failing and > it's very unstable to be considered in production... > > In RoadMap, is there plans to make Kotlin supported language as well? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-7697) Add metrics for Elasticsearch Sink
[ https://issues.apache.org/jira/browse/FLINK-7697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16837512#comment-16837512 ] Piyush Goyal edited comment on FLINK-7697 at 5/14/19 7:01 PM: -- +1 on adding metrics to ES Sink. Few others metrics that would be useful. * Number of failed index equests (tagged for failure reasons). * End-to-End latency for bulk request. [~xueyu] , [~yew1eb] Is anyone working on it currently ? We(at Netflix) are starting to add these metrics to our private fork. If no one else has started working on it, we'll be happy to contribute it. was (Author: pgoyal): +1 on adding metrics to ES Sink. Few others metrics that would be useful. * Number of failed index equests (tagged for failure reasons). * End-to-End latency for bulk request. [~xueyu] [~yew1eb] Is anyone working on it currently ? We(at Netflix) are starting to add these metrics to our private fork. If no one else has started working on it, we'll be happy to contribute it. > Add metrics for Elasticsearch Sink > -- > > Key: FLINK-7697 > URL: https://issues.apache.org/jira/browse/FLINK-7697 > Project: Flink > Issue Type: Wish > Components: Connectors / ElasticSearch >Reporter: Hai Zhou >Assignee: xueyu >Priority: Major > > We should add metrics to track events write to ElasticasearchSink. > eg. > * number of successful bulk sends > * number of documents inserted > * number of documents updated > * number of documents version conflicts -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-7697) Add metrics for Elasticsearch Sink
[ https://issues.apache.org/jira/browse/FLINK-7697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16837512#comment-16837512 ] Piyush Goyal edited comment on FLINK-7697 at 5/14/19 7:00 PM: -- +1 on adding metrics to ES Sink. Few others metrics that would be useful. * Number of failed index equests (tagged for failure reasons). * End-to-End latency for bulk request. [~xueyu] [~yew1eb] Is anyone working on it currently ? We(at Netflix) are starting to add these metrics to our private fork. If no one else has started working on it, we'll be happy to contribute it. was (Author: pgoyal): +1 on adding metrics to ES Sink. Few others metrics that would be useful. * Number of failed index equests (tagged for failure reasons). * End-to-End latency for bulk request. Is anyone working on it currently ? We(at Netflix) are starting to add these metrics to our private fork. If no one else has started working on it, we'll be happy to contribute it. > Add metrics for Elasticsearch Sink > -- > > Key: FLINK-7697 > URL: https://issues.apache.org/jira/browse/FLINK-7697 > Project: Flink > Issue Type: Wish > Components: Connectors / ElasticSearch >Reporter: Hai Zhou >Assignee: xueyu >Priority: Major > > We should add metrics to track events write to ElasticasearchSink. > eg. > * number of successful bulk sends > * number of documents inserted > * number of documents updated > * number of documents version conflicts -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] xuefuz commented on a change in pull request #8433: [FLINK-12505][hive] Unify database operations to HiveCatalogBase from its subclasses
xuefuz commented on a change in pull request #8433: [FLINK-12505][hive] Unify database operations to HiveCatalogBase from its subclasses URL: https://github.com/apache/flink/pull/8433#discussion_r283934117 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java ## @@ -86,22 +84,20 @@ public GenericHiveMetastoreCatalog(String catalogName, HiveConf hiveConf) { // -- databases -- @Override - public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException { - Database hiveDb = getHiveDatabase(databaseName); - - return new GenericCatalogDatabase(hiveDb.getParameters(), hiveDb.getDescription()); - } - - @Override - public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists) - throws DatabaseAlreadyExistException, CatalogException { - createHiveDatabase(GenericHiveMetastoreCatalogUtil.createHiveDatabase(name, database), ignoreIfExists); + protected CatalogDatabase createCatalogDatabase(Database hiveDatabase) { + return new GenericCatalogDatabase( + hiveDatabase.getParameters(), + hiveDatabase.getDescription() + ); } @Override - public void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists) - throws DatabaseNotExistException, CatalogException { - alterHiveDatabase(name, GenericHiveMetastoreCatalogUtil.createHiveDatabase(name, newDatabase), ignoreIfNotExists); + protected Database createHiveDatabase(String databaseName, CatalogDatabase catalogDatabase) { + return new Database( + databaseName, + catalogDatabase.getComment(), + null, Review comment: Nit: It would be nice to explain what "null" here means. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8434: [FLINK-12234][hive] Support view related operations in HiveCatalog
xuefuz commented on a change in pull request #8434: [FLINK-12234][hive] Support view related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8434#discussion_r283930256 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogView.java ## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.hive; + +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogView; +import org.apache.flink.util.StringUtils; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A Hive catalog view implementation. + */ +public class HiveCatalogView implements CatalogView { + // Original text of the view definition. + private final String originalQuery; + + // Expanded text of the original view definition + // This is needed because the context such as current DB is + // lost after the session, in which view is defined, is gone. + // Expanded query text takes care of the this, as an example. + private final String expandedQuery; + + // Schema of the view (column names and types) + private final TableSchema tableSchema; + // Properties of the view + private final Map properties; + // Comment of the view + private String comment = "This is a hive catalog view."; Review comment: Nit: It seems the assigned value will be overwritten any way in the constructor, so we might as well remove it and make the variable final. (I guess other existing classes might have the same problem.) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8434: [FLINK-12234][hive] Support view related operations in HiveCatalog
xuefuz commented on a change in pull request #8434: [FLINK-12234][hive] Support view related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8434#discussion_r283926412 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java ## @@ -169,8 +181,18 @@ protected Table createHiveTable(ObjectPath tablePath, CatalogBaseTable table) { sd.setCols(allColumns); hiveTable.setPartitionKeys(new ArrayList<>()); } + } else if (table instanceof CatalogView) { + HiveCatalogView view = (HiveCatalogView) table; + + // TODO: [FLINK-12398] Support partitioned view in catalog API + sd.setCols(allColumns); + hiveTable.setPartitionKeys(new ArrayList<>()); + + hiveTable.setViewOriginalText(view.getOriginalQuery()); + hiveTable.setViewExpandedText(view.getExpandedQuery()); + hiveTable.setTableType(TableType.VIRTUAL_VIEW.name()); } else { - throw new UnsupportedOperationException("HiveCatalog doesn't support view yet"); + throw new CatalogException("HiveCatalog only supports CatalogTable and CatalogView"); Review comment: HiveCatalogTable/View? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8434: [FLINK-12234][hive] Support view related operations in HiveCatalog
xuefuz commented on a change in pull request #8434: [FLINK-12234][hive] Support view related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8434#discussion_r283925761 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java ## @@ -169,8 +181,18 @@ protected Table createHiveTable(ObjectPath tablePath, CatalogBaseTable table) { sd.setCols(allColumns); hiveTable.setPartitionKeys(new ArrayList<>()); } + } else if (table instanceof CatalogView) { Review comment: should we check against HiveCatalogView instead, as we are casting table to it right after this line? Same applies to the check in the if condition. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dawidwys commented on a change in pull request #8390: [FLINK-12469][table] Clean up catalog API on default/current database
dawidwys commented on a change in pull request #8390: [FLINK-12469][table] Clean up catalog API on default/current database URL: https://github.com/apache/flink/pull/8390#discussion_r283833595 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java ## @@ -65,9 +64,15 @@ private final Map> partitionColumnStats; public GenericInMemoryCatalog(String name) { + this(name, DEFAULT_DB); + } + + public GenericInMemoryCatalog(String name, String defaultDatabase) { Review comment: I agree we should not get pulled into the discussion around yaml file. It's irrelevant from the perspective of this ctor. We can any arbitrary logic in the `CatalogFactory`. I still believe thought from the clean code perspective we should allow for passing a CatalogDatabase (or GenericInMemoryCatalogDatabse if we want to limit the scope) in the ctor. Such approach helps to write a testable code(http://misko.hevery.com/attachments/Guide-Writing%20Testable%20Code.pdf or http://misko.hevery.com/code-reviewers-guide/) (which in turn is easier to use, as tests also verify that the interfaces are usable). I see no reason why we should not allow for that. But in the end I will leave it for your call. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dawidwys commented on a change in pull request #8390: [FLINK-12469][table] Clean up catalog API on default/current database
dawidwys commented on a change in pull request #8390: [FLINK-12469][table] Clean up catalog API on default/current database URL: https://github.com/apache/flink/pull/8390#discussion_r283833595 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java ## @@ -65,9 +64,15 @@ private final Map> partitionColumnStats; public GenericInMemoryCatalog(String name) { + this(name, DEFAULT_DB); + } + + public GenericInMemoryCatalog(String name, String defaultDatabase) { Review comment: I agree we should not get pulled into the discussion around yaml file. It's irrelevant from the perspective of these ctor. We can any arbitrary logic in the `CatalogFactory`. I still believe thought from the clean code perspective we should allow for passing a CatalogDatabase (or GenericInMemoryCatalogDatabse if we want to limit the scope) in the ctor. Such approach helps to write a testable code(http://misko.hevery.com/attachments/Guide-Writing%20Testable%20Code.pdf or http://misko.hevery.com/code-reviewers-guide/) (which in turn is easier to use, as tests also verify that the interfaces are usable). I see no reason why we should not allow for that. But in the end I will leave it for your call. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dawidwys edited a comment on issue #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs
dawidwys edited a comment on issue #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs URL: https://github.com/apache/flink/pull/8404#issuecomment-492337827 @bowenli86 The goal is to have a common `TableEnvironment` for both blink planner and legacy/flink planner. The entry point to either of them will be `Planner` interface, but the `CatalogManager` will not be part of it. The `CatalogManager` will entirely reside in the api module in `TableEnvironment`. The only fully functional `TableEnvironment` that exists in the master branch is the flink's one, thus this is the one we should work with. The one in `blink-planner` is just the minimal version to be able to run some of the tests. At some point though we have to figure out how do we want to share e.g. the `org.apache.flink.table.catalog.CatalogManagerCalciteSchema` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dawidwys commented on issue #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs
dawidwys commented on issue #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs URL: https://github.com/apache/flink/pull/8404#issuecomment-492337827 @bowenli86 The goal is to have a common `TableEnvironment` for both blink planner and legacy/flink planner. The entry point to either of them will be `Planner` interface, but the `CatalogManager` will not be part of it. The `CatalogManager` will entirely reside in the api module in `TableEnvironment`. The only fully functional `TableEnvironment` that exists in the master branch is the flink's one, thus this is the one we should work with. The one in `blink-planner` is just the minimal version to be able to run some of the tests. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] StefanRRichter edited a comment on issue #8431: [FLINK-12480] Introduce mailbox to StreamTask main-loop
StefanRRichter edited a comment on issue #8431: [FLINK-12480] Introduce mailbox to StreamTask main-loop URL: https://github.com/apache/flink/pull/8431#issuecomment-492336130 Maybe to extend the explanation a bit, the `MailboxSender` interface is in function similar to the `Executor` interface. Just we want a bit more control over when letters are accepted in the queue. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanyan300300 commented on a change in pull request #8215: [FLINK-8801][yarn/s3] Fix jars downloading issues due to inconsistent timestamp in S3 Filesystem
yanyan300300 commented on a change in pull request #8215: [FLINK-8801][yarn/s3] Fix jars downloading issues due to inconsistent timestamp in S3 Filesystem URL: https://github.com/apache/flink/pull/8215#discussion_r283919936 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java ## @@ -206,9 +236,9 @@ public static void deleteApplicationFiles(final Map env) { * @return YARN resource */ private static LocalResource registerLocalResource( - Path remoteRsrcPath, - long resourceSize, - long resourceModificationTime) { Review comment: Thanks, updated. I was referring line 411. Seems not every method is following this convention... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] StefanRRichter commented on issue #8431: [FLINK-12480] Introduce mailbox to StreamTask main-loop
StefanRRichter commented on issue #8431: [FLINK-12480] Introduce mailbox to StreamTask main-loop URL: https://github.com/apache/flink/pull/8431#issuecomment-492336130 Maybe to extend the explanation a bit, the `MailboxSender` interface is in function not unlike the `Executor` interface. Just we want a bit more control over when letters are accepted in the queue. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-12469) Clean up catalog API on default/current DB
[ https://issues.apache.org/jira/browse/FLINK-12469?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li closed FLINK-12469. Resolution: Fixed Fix Version/s: 1.9.0 merged in 1.9.0: 1c3ac2f82b707ae12556a5883980da8050510244 > Clean up catalog API on default/current DB > -- > > Key: FLINK-12469 > URL: https://issues.apache.org/jira/browse/FLINK-12469 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Affects Versions: 1.9.0 >Reporter: Xuefu Zhang >Assignee: Xuefu Zhang >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 20m > Remaining Estimate: 0h > > Currently catalog API has get/setCurrentDatabase(), which is more user > session specific. In our design principal, catalog instance is agnostic to > user sessions. Thus, current database concept doesn't belong there. However, > a catalog should support a (configurable) default database, which would be > taken as user's current database when user's session doesn't specify a > current DB. > This JIRA is to remove current database concept from catalog api and add > default database instead. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] StefanRRichter commented on a change in pull request #8431: [FLINK-12480] Introduce mailbox to StreamTask main-loop
StefanRRichter commented on a change in pull request #8431: [FLINK-12480] Introduce mailbox to StreamTask main-loop URL: https://github.com/apache/flink/pull/8431#discussion_r283917456 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImpl.java ## @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks.mailbox; + +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; +import javax.annotation.concurrent.GuardedBy; + +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Implementation of {@link Mailbox} inspired by {@link java.util.concurrent.ArrayBlockingQueue} and tailored towards + * our use case with multiple writers, single reader and volatile reads instead of lock & read on {@link #count}. + */ +public class MailboxImpl implements Mailbox { + + /** +* The enqueued letters. +*/ + @GuardedBy("lock") + private final Runnable[] ringBuffer; + + /** +* Lock for all concurrent ops. +*/ + private final ReentrantLock lock; + + /** +* Condition that is triggered when the buffer is no longer empty. +*/ + @GuardedBy("lock") + private final Condition notEmpty; + + /** +* Condition that is triggered when the buffer is no longer full. +*/ + @GuardedBy("lock") + private final Condition notFull; + + /** +* Index of the ring buffer head. +*/ + @GuardedBy("lock") + private int headIndex; + + /** +* Index of the ring buffer tail. +*/ + @GuardedBy("lock") + private int tailIndex; + + /** +* Number of letters in the mailbox. +*/ + @GuardedBy("lock") + private volatile int count; + + /** +* A mask to wrap around the indexes of the ring buffer. We use this to avoid ifs or modulo ops. +*/ + private final int moduloMask; + + public MailboxImpl() { + this(6); // 2^6 = 64 + } + + public MailboxImpl(int capacityPow2) { + final int capacity = 1 << capacityPow2; + Preconditions.checkState(capacity > 0); + this.moduloMask = capacity - 1; + this.ringBuffer = new Runnable[capacity]; + this.lock = new ReentrantLock(); + this.notEmpty = lock.newCondition(); + this.notFull = lock.newCondition(); + } + + @Override + public boolean hasMail() { + return count > 0; + } + + @Override + public Runnable tryTakeMail() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return isEmpty() ? null : takeInternal(); + } finally { + lock.unlock(); + } + } + + @Nonnull + @Override + public Runnable takeMail() throws InterruptedException { + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + try { + while (isEmpty()) { + notEmpty.await(); + } + return takeInternal(); + } finally { + lock.unlock(); + } + } + + @Override + public void waitUntilHasMail() throws InterruptedException { + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + try { + while (isEmpty()) { + notEmpty.await(); + } + } finally { + lock.unlock(); + } + } + + //-- + + @Override + public boolean tryPutMail(@Nonnull Runnable letter) { + final ReentrantLock lock = this.lock; +
[GitHub] [flink] kezhuw commented on a change in pull request #8431: [FLINK-12480] Introduce mailbox to StreamTask main-loop
kezhuw commented on a change in pull request #8431: [FLINK-12480] Introduce mailbox to StreamTask main-loop URL: https://github.com/apache/flink/pull/8431#discussion_r283892928 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java ## @@ -98,12 +96,9 @@ public void init() throws Exception { } @Override - protected void run() throws Exception { - // cache processor reference on the stack, to make the code more JIT friendly - final StreamInputProcessor inputProcessor = this.inputProcessor; - - while (running && inputProcessor.processInput()) { - // all the work happens in the "processInput" method + protected void performDefaultAction(ActionContext context) throws Exception { + if (!inputProcessor.processInput()) { Review comment: Thanks, I did not notice that. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] StefanRRichter commented on issue #8431: [FLINK-12480] Introduce mailbox to StreamTask main-loop
StefanRRichter commented on issue #8431: [FLINK-12480] Introduce mailbox to StreamTask main-loop URL: https://github.com/apache/flink/pull/8431#issuecomment-492334593 @kezhuw the letters will always be just `Runnable`, and right now there are two cases: checkpoint trigger and processing timers. Stream elements are processed as part of the default action, they don't go through the mailbox for performance reasons. Imagine the default method as the often-called, hot action of the task. The letters are non-frequent request from other threads to run some code for them in the task's main thread to avoid concurrency. Thats all. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] kezhuw commented on a change in pull request #8431: [FLINK-12480] Introduce mailbox to StreamTask main-loop
kezhuw commented on a change in pull request #8431: [FLINK-12480] Introduce mailbox to StreamTask main-loop URL: https://github.com/apache/flink/pull/8431#discussion_r283916080 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImpl.java ## @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks.mailbox; + +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; +import javax.annotation.concurrent.GuardedBy; + +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Implementation of {@link Mailbox} inspired by {@link java.util.concurrent.ArrayBlockingQueue} and tailored towards + * our use case with multiple writers, single reader and volatile reads instead of lock & read on {@link #count}. + */ +public class MailboxImpl implements Mailbox { + + /** +* The enqueued letters. +*/ + @GuardedBy("lock") + private final Runnable[] ringBuffer; + + /** +* Lock for all concurrent ops. +*/ + private final ReentrantLock lock; + + /** +* Condition that is triggered when the buffer is no longer empty. +*/ + @GuardedBy("lock") + private final Condition notEmpty; + + /** +* Condition that is triggered when the buffer is no longer full. +*/ + @GuardedBy("lock") + private final Condition notFull; + + /** +* Index of the ring buffer head. +*/ + @GuardedBy("lock") + private int headIndex; + + /** +* Index of the ring buffer tail. +*/ + @GuardedBy("lock") + private int tailIndex; + + /** +* Number of letters in the mailbox. +*/ + @GuardedBy("lock") + private volatile int count; + + /** +* A mask to wrap around the indexes of the ring buffer. We use this to avoid ifs or modulo ops. +*/ + private final int moduloMask; + + public MailboxImpl() { + this(6); // 2^6 = 64 + } + + public MailboxImpl(int capacityPow2) { + final int capacity = 1 << capacityPow2; + Preconditions.checkState(capacity > 0); + this.moduloMask = capacity - 1; + this.ringBuffer = new Runnable[capacity]; + this.lock = new ReentrantLock(); + this.notEmpty = lock.newCondition(); + this.notFull = lock.newCondition(); + } + + @Override + public boolean hasMail() { + return count > 0; + } + + @Override + public Runnable tryTakeMail() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return isEmpty() ? null : takeInternal(); + } finally { + lock.unlock(); + } + } + + @Nonnull + @Override + public Runnable takeMail() throws InterruptedException { + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + try { + while (isEmpty()) { + notEmpty.await(); + } + return takeInternal(); + } finally { + lock.unlock(); + } + } + + @Override + public void waitUntilHasMail() throws InterruptedException { + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + try { + while (isEmpty()) { + notEmpty.await(); + } + } finally { + lock.unlock(); + } + } + + //-- + + @Override + public boolean tryPutMail(@Nonnull Runnable letter) { + final ReentrantLock lock = this.lock; +
[GitHub] [flink] dawidwys commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs
dawidwys commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs URL: https://github.com/apache/flink/pull/8404#discussion_r283916005 ## File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala ## @@ -118,8 +118,12 @@ abstract class TableTestUtil { } else if (expectedLine == TableTestUtil.ANY_SUBTREE) { break -} -else if (expectedLine != actualLine) { +} else if (!verifyCatalogPath && actualLine.contains("table=[[")) { Review comment: I did not want change all of the tests. That's why I added an option to strip the default catalog & database during validation. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dawidwys commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs
dawidwys commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs URL: https://github.com/apache/flink/pull/8404#discussion_r283915654 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java ## @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; + +import org.apache.calcite.linq4j.tree.Expression; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.calcite.schema.Function; +import org.apache.calcite.schema.Schema; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.SchemaVersion; +import org.apache.calcite.schema.Schemas; +import org.apache.calcite.schema.Table; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; + +/** + * A mapping between Flink catalog's database and Calcite's schema. + * Tables are registered as tables in the schema. + */ +class DatabaseCalciteSchema implements Schema { + private final String dbName; + private final Catalog catalog; + + public DatabaseCalciteSchema(String dbName, Catalog catalog) { + this.dbName = dbName; + this.catalog = catalog; + } + + @Override + public Table getTable(String tableName) { + + ObjectPath tablePath = new ObjectPath(dbName, tableName); + + try { + if (!catalog.tableExists(tablePath)) { + return null; + } + + CatalogBaseTable table = catalog.getTable(tablePath); + + if (table instanceof CalciteCatalogTable) { + return ((CalciteCatalogTable) table).getTable(); + } else { + throw new TableException("Unsupported table type: " + table); + } + } catch (Exception e) { + throw new TableException("Could not find table: " + tableName, e); Review comment: Actually we should never end up here with `TableNotExistException` as we are checking `catalog.tableExist()` before. That's why I went with `TableExceptin` here. I added appropriate comment there. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] asfgit closed pull request #8390: [FLINK-12469][table] Clean up catalog API on default/current database
asfgit closed pull request #8390: [FLINK-12469][table] Clean up catalog API on default/current database URL: https://github.com/apache/flink/pull/8390 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] kezhuw commented on issue #8431: [FLINK-12480] Introduce mailbox to StreamTask main-loop
kezhuw commented on issue #8431: [FLINK-12480] Introduce mailbox to StreamTask main-loop URL: https://github.com/apache/flink/pull/8431#issuecomment-492331773 @StefanRRichter After read through https://docs.google.com/document/d/1eDpsUKv2FqwZiS1Pm6gYO5eFHScBHfULKmH1-ZEWB4g, I have some doubts about this feature which may relate to your discussion with @1u0 about message abstraction, I use word "letter" in following description to avoid conflicting with @1u0's point: * Will `StreamElement` handling, checkpoint handling be represented as letters in mailbox ? * Is there any cases that sender of letter do not know how to handle that letter, eg. do not know how to wrap it as `Runnable` ? If it is true to second question, I think it is the responsibility of mailbox, which has more knowledges than senders, to wrap not runnable letters as runnable letters if we don't want casting in processing of letters. Even in this case, I think overloading methods is a more appropriate approach than introducing generic message type as the latter is intrusive to type signature and may incur to unnecessary wrapper classes. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] walterddr commented on a change in pull request #8402: [FLINK-12473][ml] Add the interface of ML pipeline and ML lib
walterddr commented on a change in pull request #8402: [FLINK-12473][ml] Add the interface of ML pipeline and ML lib URL: https://github.com/apache/flink/pull/8402#discussion_r283907768 ## File path: flink-ml/flink-ml-api/src/main/java/org/apache/flink/ml/api/core/Pipeline.java ## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.api.core; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.ml.api.misc.param.Params; +import org.apache.flink.ml.util.persist.MLStageFactory; +import org.apache.flink.table.api.Table; + +import com.google.gson.Gson; +import com.google.gson.JsonArray; +import com.google.gson.JsonObject; + +import java.util.ArrayList; +import java.util.List; + +/** + * A pipeline is a linear workflow which chains {@link Estimator}s and {@link Transformer}s to + * execute an algorithm. + * + * A pipeline itself can either act as an Estimator or a Transformer, depending on the stages it + * includes. More specifically: + * + * + * If a Pipeline has an {@link Estimator}, one needs to call {@link Pipeline#fit(Table)} before use + * the pipeline as a {@link Transformer}. In this case the Pipeline is an {@link Estimator} and can + * produce a Pipeline as a {@link Model}. + * + * + * If a Pipeline has no {@link Estimator}, it is a {@link Transformer} and can be applied to a Table + * directly. In this case, {@link Pipeline#fit(Table)} will simply return the pipeline itself. + * + * + * + * In addition, a pipeline can also be used as a {@link PipelineStage} in another pipeline, just + * like an ordinary {@link Estimator} or {@link Transformer} as describe above. + */ +@PublicEvolving +public final class Pipeline implements Estimator, Transformer, + Model { + private static final long serialVersionUID = 1L; + private List stages; + private Params params; + + public Pipeline() { + this(new ArrayList<>()); + } + + public Pipeline(List stages) { + this.stages = stages; + this.params = new Params(); + } + + private static boolean isStageNeedFit(PipelineStage stage) { + return (stage instanceof Pipeline && ((Pipeline) stage).needFit()) || + (!(stage instanceof Pipeline) && stage instanceof Estimator); + } + + /** +* Appends a PipelineStage to the tail of this pipeline. +* +* @param stage the stage to be appended +*/ + public Pipeline appendStage(PipelineStage stage) { + stages.add(stage); + return this; + } + + /** +* Returns a list of all stages in this pipeline in order. +* +* @return a list of all stages in this pipeline in order. +*/ + public List getStages() { + return stages; + } + + /** +* Check whether the pipeline acts as an {@link Estimator} or not. When the return value is +* true, that means this pipeline contains an {@link Estimator} and thus users must invoke +* {@link #fit(Table)} before they can use this pipeline as a {@link Transformer}. Otherwise, +* the pipeline can be used as a {@link Transformer} directly. +* +* @return {@code true} if this pipeline has an Estimator, {@code false} otherwise +*/ + public boolean needFit() { + return this.getIndexOfLastEstimator() >= 0; + } + + public Params getParams() { + return params; + } + + //find the last Estimator or Pipeline that needs fit in stages, -1 stand for no Estimator in Pipeline + private int getIndexOfLastEstimator() { Review comment: Yes, I think that's also a possibility. Just we need to carefully maintain the code to limit what API can alter the "pipeline" (e.g. each time pipeline changes, we need to update the state). One more thinking on this is, we can always make the pipeline final, since every time we `fit` the pipeline, we create a new one. what do you think @c4e
[GitHub] [flink] walterddr commented on issue #8389: [FLINK-12399][table] Fix FilterableTableSource does not change after applyPredicate
walterddr commented on issue #8389: [FLINK-12399][table] Fix FilterableTableSource does not change after applyPredicate URL: https://github.com/apache/flink/pull/8389#issuecomment-492325084 @godfreyhe yeah you are right. this is not exactly "explain source", ideally speaking we need to put "what filter has been pushed down" in the `explainSource` result since that actually "explain" the data source. I will draft a design to change the API in the JIRA. how about we move the discussion there? I will close this PR if we all agree. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] walterddr commented on a change in pull request #8389: [FLINK-12399][table] Fix FilterableTableSource does not change after applyPredicate
walterddr commented on a change in pull request #8389: [FLINK-12399][table] Fix FilterableTableSource does not change after applyPredicate URL: https://github.com/apache/flink/pull/8389#discussion_r283906054 ## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala ## @@ -81,7 +81,13 @@ class FlinkLogicalTableSourceScan( val terms = super.explainTerms(pw) .item("fields", tableSource.getTableSchema.getFieldNames.mkString(", ")) -val sourceDesc = tableSource.explainSource() +val auxiliarySourceDesc = tableSource match { + case fts: FilterableTableSource[_] => +s"FilterPushDown=${fts.isFilterPushedDown.toString}" Review comment: hmm. I think this is valid. I original think since there could only be one "match" and "onMatch", all the different filters either can be pushed down all cannot (e.g. always the same set of filter being push down). but I think yes, there's nothing preventing calcite to choose to partially push down (not partially possible, but partially push down some of the filter because of volcano planner thinks it is more efficient than another). I doubt this is possible at this moment. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8390: [FLINK-12469][table] Clean up catalog API on default/current database
xuefuz commented on a change in pull request #8390: [FLINK-12469][table] Clean up catalog API on default/current database URL: https://github.com/apache/flink/pull/8390#discussion_r283902136 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java ## @@ -65,9 +64,15 @@ private final Map> partitionColumnStats; public GenericInMemoryCatalog(String name) { + this(name, DEFAULT_DB); + } + + public GenericInMemoryCatalog(String name, String defaultDatabase) { Review comment: Since we will have PR coming for YAML file structure for catalogs, let's defer the discussion until we see it. Now as far as I am concerned, I don't see a use case for a user to create a default DB before creating a catalog. It's also common for a db product to create default db automatically. If user needs there specific databases, they can create their own (possibly via DDL). Our in-memory catalog is following the suite except we allow user specific a name other than "default" with the change here. This addition seems to me even optional. Nevertheless, if we find it necessary that taking a database created by user to instantiate the in-memory DB, we can add it at that time. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #8324: [FLINK-11921][table] Upgrade to calcite 1.19
flinkbot edited a comment on issue #8324: [FLINK-11921][table] Upgrade to calcite 1.19 URL: https://github.com/apache/flink/pull/8324#issuecomment-488032392 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❗ 3. Needs [attention] from. - Needs attention by @kurtyoung, @twalthr [PMC], @wuchong [committer] * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services