[GitHub] [flink] danny0405 commented on a change in pull request #8324: [FLINK-11921][table] Upgrade to calcite 1.19

2019-05-14 Thread GitBox
danny0405 commented on a change in pull request #8324: [FLINK-11921][table] 
Upgrade to calcite 1.19
URL: https://github.com/apache/flink/pull/8324#discussion_r284092827
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/calcite/avatica/util/DateTimeUtils.java
 ##
 @@ -783,6 +840,30 @@ private static long firstMondayOfFirstWeek(int year) {
return janFirst + (11 - janFirstDow) % 7 - 3;
}
 
+   /** Returns the ISO-8601 week number based on year, month, day.
+* Per ISO-8601 it is the Monday of the week that contains Jan 4,
+* or equivalently, it is a Monday between Dec 29 and Jan 4.
+* Sometimes it is in the year before the given year, sometimes after. 
*/
+   private static int getIso8601WeekNumber(int julian, int year, int 
month, int day) {
+   long fmofw = firstMondayOfFirstWeek(year);
+   if (month == 12 && day > 28) {
+   if (31 - day + 4 > 7 - ((int) floorMod(julian, 7) + 1)
 
 Review comment:
   Although the Calcite bug may be fixed in future, lets's add UT for these 
transformation functions for our current code base.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] danny0405 commented on a change in pull request #8324: [FLINK-11921][table] Upgrade to calcite 1.19

2019-05-14 Thread GitBox
danny0405 commented on a change in pull request #8324: [FLINK-11921][table] 
Upgrade to calcite 1.19
URL: https://github.com/apache/flink/pull/8324#discussion_r284092533
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/AuxiliaryConverter.java
 ##
 @@ -53,25 +53,6 @@ public Impl(SqlFunction f) {
public RexNode convert(RexBuilder rexBuilder, RexNode groupCall,
RexNode e) {
return rexBuilder.makeCall(this.f, e);
-   // FLINK QUICK FIX
-   // we do not use this logic right now
-//  switch (f.getKind()) {
-//  case TUMBLE_START:
-//  case HOP_START:
-//  case SESSION_START:
-//  case SESSION_END: // TODO: ?
-//return e;
-//  case TUMBLE_END:
-//return rexBuilder.makeCall(
-//SqlStdOperatorTable.PLUS, e,
-//((RexCall) groupCall).operands.get(1));
-//  case HOP_END:
-//return rexBuilder.makeCall(
-//SqlStdOperatorTable.PLUS, e,
-//((RexCall) groupCall).operands.get(2));
-//  default:
 
 Review comment:
   Let's just keep these comments.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] danny0405 commented on a change in pull request #8324: [FLINK-11921][table] Upgrade to calcite 1.19

2019-05-14 Thread GitBox
danny0405 commented on a change in pull request #8324: [FLINK-11921][table] 
Upgrade to calcite 1.19
URL: https://github.com/apache/flink/pull/8324#discussion_r284092533
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/AuxiliaryConverter.java
 ##
 @@ -53,25 +53,6 @@ public Impl(SqlFunction f) {
public RexNode convert(RexBuilder rexBuilder, RexNode groupCall,
RexNode e) {
return rexBuilder.makeCall(this.f, e);
-   // FLINK QUICK FIX
-   // we do not use this logic right now
-//  switch (f.getKind()) {
-//  case TUMBLE_START:
-//  case HOP_START:
-//  case SESSION_START:
-//  case SESSION_END: // TODO: ?
-//return e;
-//  case TUMBLE_END:
-//return rexBuilder.makeCall(
-//SqlStdOperatorTable.PLUS, e,
-//((RexCall) groupCall).operands.get(1));
-//  case HOP_END:
-//return rexBuilder.makeCall(
-//SqlStdOperatorTable.PLUS, e,
-//((RexCall) groupCall).operands.get(2));
-//  default:
 
 Review comment:
   Let's just keep this comments.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] danny0405 commented on a change in pull request #8324: [FLINK-11921][table] Upgrade to calcite 1.19

2019-05-14 Thread GitBox
danny0405 commented on a change in pull request #8324: [FLINK-11921][table] 
Upgrade to calcite 1.19
URL: https://github.com/apache/flink/pull/8324#discussion_r284092470
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/calcite/avatica/util/DateTimeUtils.java
 ##
 @@ -1011,6 +1103,16 @@ public static Calendar calendar() {
return Calendar.getInstance(UTC_ZONE, Locale.ROOT);
}
 
+   /** Returns whether a value is an {@code OffsetDateTime}. */
+   public static boolean isOffsetDateTime(Object o) {
+   return OFFSET_DATE_TIME_HANDLER.isOffsetDateTime(o);
+   }
+
+   /** Returns the value of a {@code OffsetDateTime} as a string. */
+   public static String offsetDateTimeValue(Object o) {
+   return OFFSET_DATE_TIME_HANDLER.stringValue(o);
+   }
 
 Review comment:
   Does this mean if we do not have class "java.time.OffsetDateTime", we would 
always got an exception ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] danny0405 commented on a change in pull request #8324: [FLINK-11921][table] Upgrade to calcite 1.19

2019-05-14 Thread GitBox
danny0405 commented on a change in pull request #8324: [FLINK-11921][table] 
Upgrade to calcite 1.19
URL: https://github.com/apache/flink/pull/8324#discussion_r284092067
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/calcite/avatica/util/DateTimeUtils.java
 ##
 @@ -1039,6 +1141,46 @@ public String getFraction() {
return fraction;
}
}
+
+   /** Deals with values of {@code java.time.OffsetDateTime} without 
introducing
+* a compile-time dependency (because {@code OffsetDateTime} is only 
JDK 8 and
+* higher). */
+   private interface OffsetDateTimeHandler {
+   boolean isOffsetDateTime(Object o);
+   String stringValue(Object o);
 
 Review comment:
   Do we really need this interface `stringValue` ? The two implementations: 
one is static and one throws an exception, i didn't see why we should keep this.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] danny0405 commented on a change in pull request #8324: [FLINK-11921][table] Upgrade to calcite 1.19

2019-05-14 Thread GitBox
danny0405 commented on a change in pull request #8324: [FLINK-11921][table] 
Upgrade to calcite 1.19
URL: https://github.com/apache/flink/pull/8324#discussion_r284091738
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/calcite/avatica/util/DateTimeUtils.java
 ##
 @@ -882,6 +968,12 @@ public static int ymdToJulian(int year, int month, int 
day) {
int a = (14 - month) / 12;
int y = year + 4800 - a;
int m = month + 12 * a - 3;
+// return day + (153 * m + 2) / 5
+// + 365 * y
+// + y / 4
+// - y / 100
+// + y / 400
 
 Review comment:
   Useless comments


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] danny0405 commented on a change in pull request #8324: [FLINK-11921][table] Upgrade to calcite 1.19

2019-05-14 Thread GitBox
danny0405 commented on a change in pull request #8324: [FLINK-11921][table] 
Upgrade to calcite 1.19
URL: https://github.com/apache/flink/pull/8324#discussion_r284091655
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/calcite/avatica/util/DateTimeUtils.java
 ##
 @@ -719,30 +757,49 @@ public static long timestampStringToUnixDate(String s) {
}
 
public static long unixDateExtract(TimeUnitRange range, long date) {
-   return julianExtract(range, (int) date + EPOCH_JULIAN);
+   switch (range) {
+   case EPOCH:
+   // no need to extract year/month/day, just 
multiply
+   return date * SECONDS_PER_DAY;
+   default:
+   return julianExtract(range, (int) date + 
EPOCH_JULIAN);
+   }
}
 
private static int julianExtract(TimeUnitRange range, int julian) {
-   // Algorithm the book "Astronomical Algorithms" by Jean Meeus, 
1998
-   int b, c;
-   if (julian > 2299160) {
-   int a = julian + 32044;
-   b = (4 * a + 3) / 146097;
-   c = a - b *146097 / 4;
-   } else {
-   b = 0;
-   c = julian + 32082;
-   }
-   int d = (4 * c + 3) / 1461;
-   int e = c - (1461 * d) / 4;
-   int m = (5 * e + 2) / 153;
-   int day = e - (153 * m + 2) / 5 + 1;
-   int month = m + 3 - 12 * (m / 10);
-   int year = b * 100 + d - 4800 + (m / 10);
-
+   // this shifts the epoch back to astronomical year -4800 
instead of the
+   // start of the Christian era in year AD 1 of the proleptic 
Gregorian
+   // calendar.
+   int j = julian + 32044;
+   int g = j / 146097;
+   int dg = j % 146097;
+   int c = (dg / 36524 + 1) * 3 / 4;
+   int dc = dg - c * 36524;
+   int b = dc / 1461;
+   int db = dc % 1461;
+   int a = (db / 365 + 1) * 3 / 4;
+   int da = db - a * 365;
+
+   // integer number of full years elapsed since March 1, 4801 BC
+   int y = g * 400 + c * 100 + b * 4 + a;
+   // integer number of full months elapsed since the last March 1
+   int m = (da * 5 + 308) / 153 - 2;
+   // number of days elapsed since day 1 of the month
+   int d = da - (m + 4) * 153 / 5 + 122;
+   int year = y - 4800 + (m + 2) / 12;
 
 Review comment:
   I have no idea whether this logic is right or not, let's just add enough 
Unit tests.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] danny0405 commented on a change in pull request #8324: [FLINK-11921][table] Upgrade to calcite 1.19

2019-05-14 Thread GitBox
danny0405 commented on a change in pull request #8324: [FLINK-11921][table] 
Upgrade to calcite 1.19
URL: https://github.com/apache/flink/pull/8324#discussion_r284091504
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/calcite/avatica/util/DateTimeUtils.java
 ##
 @@ -379,6 +394,29 @@ private static void julianToString(StringBuilder buf, int 
julian) {
int month = m + 3 - 12 * (m / 10);
int year = b * 100 + d - 4800 + (m / 10);
 
+// // this shifts the epoch back to astronomical year -4800 
instead of the
+// // start of the Christian era in year AD 1 of the proleptic 
Gregorian
+// // calendar.
+// int j = julian + 32044;
+// int g = j / 146097;
+// int dg = j % 146097;
+// int c = (dg / 36524 + 1) * 3 / 4;
+// int dc = dg - c * 36524;
+// int b = dc / 1461;
+// int db = dc % 1461;
+// int a = (db / 365 + 1) * 3 / 4;
+// int da = db - a * 365;
+//
+// // integer number of full years elapsed since March 1, 4801 BC
+// int y = g * 400 + c * 100 + b * 4 + a;
+// // integer number of full months elapsed since the last March 1
+// int m = (da * 5 + 308) / 153 - 2;
+// // number of days elapsed since day 1 of the month
+// int d = da - (m + 4) * 153 / 5 + 122;
+// int year = y - 4800 + (m + 2) / 12;
+// int month = (m + 2) % 12 + 1;
+// int day = d + 1;
 
 Review comment:
   Useless comments


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] danny0405 commented on a change in pull request #8324: [FLINK-11921][table] Upgrade to calcite 1.19

2019-05-14 Thread GitBox
danny0405 commented on a change in pull request #8324: [FLINK-11921][table] 
Upgrade to calcite 1.19
URL: https://github.com/apache/flink/pull/8324#discussion_r284091403
 
 

 ##
 File path: flink-table/flink-table-planner/pom.xml
 ##
 @@ -136,17 +136,17 @@ under the License.
org.apache.calcite
calcite-core

-   1.18.0
+   1.19.0


[jira] [Assigned] (FLINK-11607) Translate the "DataStream API Tutorial" page into Chinese

2019-05-14 Thread Zhang Ziqiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11607?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhang Ziqiang reassigned FLINK-11607:
-

Assignee: Zhang Ziqiang  (was: Benchao Li)

> Translate the "DataStream API Tutorial" page into Chinese
> -
>
> Key: FLINK-11607
> URL: https://issues.apache.org/jira/browse/FLINK-11607
> Project: Flink
>  Issue Type: Sub-task
>  Components: chinese-translation, Documentation
>Reporter: Jark Wu
>Assignee: Zhang Ziqiang
>Priority: Major
>
> The page url is 
> https://ci.apache.org/projects/flink/flink-docs-master/tutorials/datastream_api.html
> The markdown file is located in flink/docs/tutorials/datastream_api.zh.md
> The markdown file will be created once FLINK-11529 is merged.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] dianfu commented on a change in pull request #8439: [FLINK-12485][python] Add completeness test for Table and TableEnviro…

2019-05-14 Thread GitBox
dianfu commented on a change in pull request #8439: [FLINK-12485][python] Add 
completeness test for Table and TableEnviro…
URL: https://github.com/apache/flink/pull/8439#discussion_r284080627
 
 

 ##
 File path: flink-python/pyflink/table/tests/test_table_completeness.py
 ##
 @@ -0,0 +1,62 @@
+# 
###
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from pyflink.testing.test_case_utils import PythonAPICompletenessTestCase
+from pyflink.table import Table
+
+
+class TableAPICompletenessTests(PythonAPICompletenessTestCase):
+"""
+Tests whether the Python :class:`Table` is consistent with
+Java `org.apache.flink.table.api.Table`.
+"""
+
+@classmethod
+def python_class(cls):
+return Table
+
+@classmethod
+def java_class(cls):
+return "org.apache.flink.table.api.Table"
+
+@classmethod
+def excluded_methods(cls):
+# row-based operators should be supported when UDFs supported in 
python.
+return {'map', 'flatMap', 'flatAggregate',  'aggregate'}
+
+@classmethod
+def java_method_name(cls, python_method_name):
+"""
+Due to 'as' is python keyword, so we use 'alias'
+in Python API corresponding 'as' in Java API.
+
+:param python_method_name:
+:return:
+"""
+return {'alias': 'as'}.get(python_method_name, python_method_name)
+
+
+if __name__ == '__main__':
+import unittest
+
+try:
+import xmlrunner
+testRunner = xmlrunner.XMLTestRunner(output='target/test-reports')
+except ImportError:
+testRunner = None
+unittest.main(testRunner=testRunner, verbosity=2)
 
 Review comment:
   Add one empty line at the end of file


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #8439: [FLINK-12485][python] Add completeness test for Table and TableEnviro…

2019-05-14 Thread GitBox
dianfu commented on a change in pull request #8439: [FLINK-12485][python] Add 
completeness test for Table and TableEnviro…
URL: https://github.com/apache/flink/pull/8439#discussion_r284080392
 
 

 ##
 File path: flink-python/pyflink/table/tests/test_environment_completeness.py
 ##
 @@ -0,0 +1,53 @@
+# 
###
 
 Review comment:
   Removes the empty space here?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on issue #8439: [FLINK-12485][python] Add completeness test for Table and TableEnviro…

2019-05-14 Thread GitBox
sunjincheng121 commented on issue #8439: [FLINK-12485][python] Add completeness 
test for Table and TableEnviro…
URL: https://github.com/apache/flink/pull/8439#issuecomment-492495197
 
 
   Thanks for the quick review! @dianfu 
   
   I have update the PR according your comments!
   
   Best,
   Jincheng


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW edited a comment on issue #8362: [FLINK-11391] Introduce shuffle master interface

2019-05-14 Thread GitBox
zhijiangW edited a comment on issue #8362: [FLINK-11391] Introduce shuffle 
master interface
URL: https://github.com/apache/flink/pull/8362#issuecomment-492491082
 
 
   Thanks for the confirmation @azagrebin .
   
   1. Yes, I have not thought through the changes caused by single channel in 
`PartitionInfo` and all channels in `IGDD`. Just from the aspect of rpc call 
`taskManagerGateway.updatePartitions(partitionInfos)`, the parameter is a 
collection of `PartitionInfo` which is the same as array of channels in `IGDD`. 
Maybe the `IGDD` should support cache `ICDD` internally and replace the array 
with collection. It might involve in more refactoring and I would also further 
consider it.
   
   2. From functional aspect the current way is no problem. But I was ever 
suggested in my PR not using `instanceof` via introducing the interface method 
`ChannelSelector#isBroadcast`. Because `instanceof` sounds like a hacky, not a 
proper solution. I am not sure whether it is not suggested in common sense atm, 
 or maybe it is just a personal preference.  I think you could confirm this way 
with other guys. :)
   
   BTW, I have not finished the whole review yet. I would continue on it later.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on issue #8362: [FLINK-11391] Introduce shuffle master interface

2019-05-14 Thread GitBox
zhijiangW commented on issue #8362: [FLINK-11391] Introduce shuffle master 
interface
URL: https://github.com/apache/flink/pull/8362#issuecomment-492491082
 
 
   Thanks for the confirmation @azagrebin .
   
   1. Yes, I have not thought through the changes caused by single channel in 
`PartitionInfo` and all channels in `IGDD`. Just from the aspect of rpc call 
`taskManagerGateway.updatePartitions(partitionInfos)`, the parameter is a 
collection of `PartitionInfo` which is the same as array of channels in `IGDD`. 
Maybe the `IGDD` should support cache `ICDD` internally and replace the array 
with collection. It might involve in more refactoring and I would also further 
consider it.
   
   2. From functional aspect the current way is no problem. But I was ever 
suggested in my PR not using `instanceof` via introducing the interface method 
`ChannelSelector#isBroadcast`. Because `instanceof` sounds like a hacky, not a 
proper solution. I am not sure whether it is not suggested in common sense atm, 
 or maybe it is just a personal preference.  I think you could confirm this way 
with other guys. :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #8439: [FLINK-12485][python] Add completeness test for Table and TableEnviro…

2019-05-14 Thread GitBox
dianfu commented on a change in pull request #8439: [FLINK-12485][python] Add 
completeness test for Table and TableEnviro…
URL: https://github.com/apache/flink/pull/8439#discussion_r284063932
 
 

 ##
 File path: flink-python/pyflink/testing/test_case_utils.py
 ##
 @@ -90,3 +92,82 @@ def setUp(self):
 super(PyFlinkBatchTableTestCase, self).setUp()
 self.t_config = 
TableConfig.Builder().as_batch_execution().set_parallelism(4).build()
 self.t_env = TableEnvironment.get_table_environment(self.t_config)
+
+
+class PythonAPICompletenessTestCase(unittest.TestCase):
+"""
+Base class for python api completeness tests, i.e.,
+python api should be aligned with the Java API as much as possible.
+"""
+
+@classmethod
+def get_python_class_methods(cls, python_class):
+return {cls.snake_to_camel(cls.java_method_name(method_name))
+for method_name in dir(python_class) if not 
method_name.startswith('_')}
+
+@staticmethod
+def snake_to_camel(method_name):
+output = ''.join(x.capitalize() or '_' for x in method_name.split('_'))
+return output[0].lower() + output[1:]
+
+@staticmethod
+def get_java_class_method(java_class):
 
 Review comment:
   get_java_class_method -> get_java_class_methods


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #8439: [FLINK-12485][python] Add completeness test for Table and TableEnviro…

2019-05-14 Thread GitBox
dianfu commented on a change in pull request #8439: [FLINK-12485][python] Add 
completeness test for Table and TableEnviro…
URL: https://github.com/apache/flink/pull/8439#discussion_r284065527
 
 

 ##
 File path: flink-python/pyflink/testing/test_case_utils.py
 ##
 @@ -90,3 +92,82 @@ def setUp(self):
 super(PyFlinkBatchTableTestCase, self).setUp()
 self.t_config = 
TableConfig.Builder().as_batch_execution().set_parallelism(4).build()
 self.t_env = TableEnvironment.get_table_environment(self.t_config)
+
+
+class PythonAPICompletenessTestCase(unittest.TestCase):
+"""
+Base class for python api completeness tests, i.e.,
+python api should be aligned with the Java API as much as possible.
+"""
+
+@classmethod
+def get_python_class_methods(cls, python_class):
+return {cls.snake_to_camel(cls.java_method_name(method_name))
+for method_name in dir(python_class) if not 
method_name.startswith('_')}
+
+@staticmethod
+def snake_to_camel(method_name):
+output = ''.join(x.capitalize() or '_' for x in method_name.split('_'))
+return output[0].lower() + output[1:]
+
+@staticmethod
+def get_java_class_method(java_class):
+gateway = get_gateway()
+s = set()
+method_arr = gateway.jvm.Class.forName(java_class).getDeclaredMethods()
+for i in range(0, len(method_arr)):
+s.add(method_arr[i].getName())
+return s
+
+@classmethod
+def check_methods(cls):
+java_methods = 
PythonAPICompletenessTestCase.get_java_class_method(cls.java_class())
+python_methods = cls.get_python_class_methods(cls.python_class())
+missing_methods = java_methods - python_methods - cls.exclude_methods()
+if len(missing_methods) > 0:
+print(missing_methods)
+print('The Exception should be raised after FLINK-12407 is 
merged.')
+# raise Exception('Methods: %s in Java class %s have not been 
added in Python class %s.'
+#% (missing_methods, java_class, python_class))
 
 Review comment:
   cls.java_class(), cls.python_class()


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #8439: [FLINK-12485][python] Add completeness test for Table and TableEnviro…

2019-05-14 Thread GitBox
dianfu commented on a change in pull request #8439: [FLINK-12485][python] Add 
completeness test for Table and TableEnviro…
URL: https://github.com/apache/flink/pull/8439#discussion_r284064889
 
 

 ##
 File path: flink-python/pyflink/testing/test_case_utils.py
 ##
 @@ -90,3 +92,82 @@ def setUp(self):
 super(PyFlinkBatchTableTestCase, self).setUp()
 self.t_config = 
TableConfig.Builder().as_batch_execution().set_parallelism(4).build()
 self.t_env = TableEnvironment.get_table_environment(self.t_config)
+
+
+class PythonAPICompletenessTestCase(unittest.TestCase):
+"""
+Base class for python api completeness tests, i.e.,
+python api should be aligned with the Java API as much as possible.
+"""
+
+@classmethod
+def get_python_class_methods(cls, python_class):
+return {cls.snake_to_camel(cls.java_method_name(method_name))
+for method_name in dir(python_class) if not 
method_name.startswith('_')}
+
+@staticmethod
+def snake_to_camel(method_name):
+output = ''.join(x.capitalize() or '_' for x in method_name.split('_'))
+return output[0].lower() + output[1:]
+
+@staticmethod
+def get_java_class_method(java_class):
+gateway = get_gateway()
+s = set()
+method_arr = gateway.jvm.Class.forName(java_class).getDeclaredMethods()
+for i in range(0, len(method_arr)):
+s.add(method_arr[i].getName())
+return s
+
+@classmethod
+def check_methods(cls):
+java_methods = 
PythonAPICompletenessTestCase.get_java_class_method(cls.java_class())
+python_methods = cls.get_python_class_methods(cls.python_class())
+missing_methods = java_methods - python_methods - cls.exclude_methods()
+if len(missing_methods) > 0:
+print(missing_methods)
+print('The Exception should be raised after FLINK-12407 is 
merged.')
+# raise Exception('Methods: %s in Java class %s have not been 
added in Python class %s.'
+#% (missing_methods, java_class, python_class))
+
+@classmethod
+def java_method_name(cls, python_method_name):
+"""
+This method should be overWrite when the method name of the Python API 
cannot be
+consistent with the Java API method name. we need to get the 
corresponding
+Java API method name based on the Python method name. e.g.: 'as' is 
python
+keyword, so we use 'alias' in Python API corresponding 'as' in Java 
API.
+
+:param python_method_name: Method name of Python API.
+:return: Method name corresponding to Java API.
+"""
+return python_method_name
+
+@classmethod
+@abstractmethod
+def python_class(cls):
+"""
+Return the Python class that needs to be compared. such as 
:class:`Table`.
+"""
+pass
+
+@classmethod
+@abstractmethod
+def java_class(cls):
+"""
+Return the Java class that needs to be compared. such as 
`org.apache.flink.table.api.Table`.
+"""
+pass
+
+@classmethod
+def exclude_methods(cls):
+"""
+Exclude method names that do not need to be checked. When adding 
excluded methods to the lists
+you should give a good reason in a comment.
+:return:
+"""
+return {}
+
+def test_completeness(self):
+if self.python_class() is not None:
 
 Review comment:
   if self.python_class() is not None and self.java_class() is not None


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #8439: [FLINK-12485][python] Add completeness test for Table and TableEnviro…

2019-05-14 Thread GitBox
dianfu commented on a change in pull request #8439: [FLINK-12485][python] Add 
completeness test for Table and TableEnviro…
URL: https://github.com/apache/flink/pull/8439#discussion_r284062971
 
 

 ##
 File path: flink-python/pyflink/table/tests/test_environment_completeness.py
 ##
 @@ -0,0 +1,53 @@
+# 
###
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from pyflink.testing.test_case_utils import PythonAPICompletenessTestCase
+from pyflink.table import TableEnvironment
+
+
+class EnvironmentAPICompletenessTests(PythonAPICompletenessTestCase):
+"""
+Test whether the Python :class:`TableEnvironment` is consistent with
 
 Review comment:
   Test -> Tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #8439: [FLINK-12485][python] Add completeness test for Table and TableEnviro…

2019-05-14 Thread GitBox
dianfu commented on a change in pull request #8439: [FLINK-12485][python] Add 
completeness test for Table and TableEnviro…
URL: https://github.com/apache/flink/pull/8439#discussion_r284064639
 
 

 ##
 File path: flink-python/pyflink/testing/test_case_utils.py
 ##
 @@ -90,3 +92,82 @@ def setUp(self):
 super(PyFlinkBatchTableTestCase, self).setUp()
 self.t_config = 
TableConfig.Builder().as_batch_execution().set_parallelism(4).build()
 self.t_env = TableEnvironment.get_table_environment(self.t_config)
+
+
+class PythonAPICompletenessTestCase(unittest.TestCase):
+"""
+Base class for python api completeness tests, i.e.,
+python api should be aligned with the Java API as much as possible.
+"""
+
+@classmethod
+def get_python_class_methods(cls, python_class):
+return {cls.snake_to_camel(cls.java_method_name(method_name))
+for method_name in dir(python_class) if not 
method_name.startswith('_')}
+
+@staticmethod
+def snake_to_camel(method_name):
+output = ''.join(x.capitalize() or '_' for x in method_name.split('_'))
+return output[0].lower() + output[1:]
+
+@staticmethod
+def get_java_class_method(java_class):
+gateway = get_gateway()
+s = set()
+method_arr = gateway.jvm.Class.forName(java_class).getDeclaredMethods()
+for i in range(0, len(method_arr)):
+s.add(method_arr[i].getName())
+return s
+
+@classmethod
+def check_methods(cls):
+java_methods = 
PythonAPICompletenessTestCase.get_java_class_method(cls.java_class())
+python_methods = cls.get_python_class_methods(cls.python_class())
+missing_methods = java_methods - python_methods - cls.exclude_methods()
+if len(missing_methods) > 0:
+print(missing_methods)
+print('The Exception should be raised after FLINK-12407 is 
merged.')
+# raise Exception('Methods: %s in Java class %s have not been 
added in Python class %s.'
+#% (missing_methods, java_class, python_class))
+
+@classmethod
+def java_method_name(cls, python_method_name):
+"""
+This method should be overWrite when the method name of the Python API 
cannot be
+consistent with the Java API method name. we need to get the 
corresponding
+Java API method name based on the Python method name. e.g.: 'as' is 
python
+keyword, so we use 'alias' in Python API corresponding 'as' in Java 
API.
+
+:param python_method_name: Method name of Python API.
+:return: Method name corresponding to Java API.
 
 Review comment:
   The corresponding method name of Java API.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #8439: [FLINK-12485][python] Add completeness test for Table and TableEnviro…

2019-05-14 Thread GitBox
dianfu commented on a change in pull request #8439: [FLINK-12485][python] Add 
completeness test for Table and TableEnviro…
URL: https://github.com/apache/flink/pull/8439#discussion_r284064278
 
 

 ##
 File path: flink-python/pyflink/testing/test_case_utils.py
 ##
 @@ -90,3 +92,82 @@ def setUp(self):
 super(PyFlinkBatchTableTestCase, self).setUp()
 self.t_config = 
TableConfig.Builder().as_batch_execution().set_parallelism(4).build()
 self.t_env = TableEnvironment.get_table_environment(self.t_config)
+
+
+class PythonAPICompletenessTestCase(unittest.TestCase):
+"""
+Base class for python api completeness tests, i.e.,
+python api should be aligned with the Java API as much as possible.
+"""
+
+@classmethod
+def get_python_class_methods(cls, python_class):
+return {cls.snake_to_camel(cls.java_method_name(method_name))
+for method_name in dir(python_class) if not 
method_name.startswith('_')}
+
+@staticmethod
+def snake_to_camel(method_name):
+output = ''.join(x.capitalize() or '_' for x in method_name.split('_'))
+return output[0].lower() + output[1:]
+
+@staticmethod
+def get_java_class_method(java_class):
+gateway = get_gateway()
+s = set()
+method_arr = gateway.jvm.Class.forName(java_class).getDeclaredMethods()
+for i in range(0, len(method_arr)):
+s.add(method_arr[i].getName())
+return s
+
+@classmethod
+def check_methods(cls):
+java_methods = 
PythonAPICompletenessTestCase.get_java_class_method(cls.java_class())
+python_methods = cls.get_python_class_methods(cls.python_class())
+missing_methods = java_methods - python_methods - cls.exclude_methods()
+if len(missing_methods) > 0:
+print(missing_methods)
+print('The Exception should be raised after FLINK-12407 is 
merged.')
+# raise Exception('Methods: %s in Java class %s have not been 
added in Python class %s.'
+#% (missing_methods, java_class, python_class))
+
+@classmethod
+def java_method_name(cls, python_method_name):
+"""
+This method should be overWrite when the method name of the Python API 
cannot be
 
 Review comment:
   overWrite  -> overwritten


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #8439: [FLINK-12485][python] Add completeness test for Table and TableEnviro…

2019-05-14 Thread GitBox
dianfu commented on a change in pull request #8439: [FLINK-12485][python] Add 
completeness test for Table and TableEnviro…
URL: https://github.com/apache/flink/pull/8439#discussion_r284064351
 
 

 ##
 File path: flink-python/pyflink/testing/test_case_utils.py
 ##
 @@ -90,3 +92,82 @@ def setUp(self):
 super(PyFlinkBatchTableTestCase, self).setUp()
 self.t_config = 
TableConfig.Builder().as_batch_execution().set_parallelism(4).build()
 self.t_env = TableEnvironment.get_table_environment(self.t_config)
+
+
+class PythonAPICompletenessTestCase(unittest.TestCase):
+"""
+Base class for python api completeness tests, i.e.,
+python api should be aligned with the Java API as much as possible.
+"""
+
+@classmethod
+def get_python_class_methods(cls, python_class):
+return {cls.snake_to_camel(cls.java_method_name(method_name))
+for method_name in dir(python_class) if not 
method_name.startswith('_')}
+
+@staticmethod
+def snake_to_camel(method_name):
+output = ''.join(x.capitalize() or '_' for x in method_name.split('_'))
+return output[0].lower() + output[1:]
+
+@staticmethod
+def get_java_class_method(java_class):
+gateway = get_gateway()
+s = set()
+method_arr = gateway.jvm.Class.forName(java_class).getDeclaredMethods()
+for i in range(0, len(method_arr)):
+s.add(method_arr[i].getName())
+return s
+
+@classmethod
+def check_methods(cls):
+java_methods = 
PythonAPICompletenessTestCase.get_java_class_method(cls.java_class())
+python_methods = cls.get_python_class_methods(cls.python_class())
+missing_methods = java_methods - python_methods - cls.exclude_methods()
+if len(missing_methods) > 0:
+print(missing_methods)
+print('The Exception should be raised after FLINK-12407 is 
merged.')
+# raise Exception('Methods: %s in Java class %s have not been 
added in Python class %s.'
+#% (missing_methods, java_class, python_class))
+
+@classmethod
+def java_method_name(cls, python_method_name):
+"""
+This method should be overWrite when the method name of the Python API 
cannot be
+consistent with the Java API method name. we need to get the 
corresponding
 
 Review comment:
   What about removing this line?
   `we need to get the corresponding Java API method name based on the Python 
method name.` 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Aitozi commented on issue #8441: [FLINK-12486][doc]clarify the storage of operator state and broadcast state individually

2019-05-14 Thread GitBox
Aitozi commented on issue #8441: [FLINK-12486][doc]clarify the storage of 
operator state and broadcast state individually
URL: https://github.com/apache/flink/pull/8441#issuecomment-492476714
 
 
   e.. I think I have also updated the `state_backends.zh.md`, what do you 
point to @klion26 ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12202) Consider introducing batch metric register in NetworkEnviroment

2019-05-14 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12202?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16839963#comment-16839963
 ] 

zhijiang commented on FLINK-12202:
--

Hey [~azagrebin], as we confirmed before, this ticket is not needed because we 
would register metrics while creating batch of partitions/gates. So should we 
close this jira?

> Consider introducing batch metric register in NetworkEnviroment
> ---
>
> Key: FLINK-12202
> URL: https://issues.apache.org/jira/browse/FLINK-12202
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Metrics, Runtime / Network
>Reporter: Andrey Zagrebin
>Assignee: zhijiang
>Priority: Major
>
> As we have some network specific metrics registered in TaskIOMetricGroup 
> (In/OutputBuffersGauge, In/OutputBufferPoolUsageGauge), we can introduce 
> batch metric registering in 
> NetworkEnviroment.registerMetrics(ProxyMetricGroup, partitions, gates), where 
> task passes its TaskIOMetricGroup into ProxyMetricGroup. This way we could 
> break a tie between task and NetworkEnviroment. 
> TaskIOMetricGroup.initializeBufferMetrics, In/OutputBuffersGauge, 
> In/OutputBufferPoolUsageGauge could be moved into 
> NetworkEnviroment.registerMetrics and network code.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] c4emmmm commented on a change in pull request #8402: [FLINK-12473][ml] Add the interface of ML pipeline and ML lib

2019-05-14 Thread GitBox
c4e commented on a change in pull request #8402: [FLINK-12473][ml] Add the 
interface of ML pipeline and ML lib
URL: https://github.com/apache/flink/pull/8402#discussion_r284071237
 
 

 ##
 File path: 
flink-ml/flink-ml-api/src/main/java/org/apache/flink/ml/api/core/Pipeline.java
 ##
 @@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.api.core;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.ml.api.misc.param.Params;
+import org.apache.flink.ml.util.persist.MLStageFactory;
+import org.apache.flink.table.api.Table;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A pipeline is a linear workflow which chains {@link Estimator}s and {@link 
Transformer}s to
+ * execute an algorithm.
+ *
+ * A pipeline itself can either act as an Estimator or a Transformer, 
depending on the stages it
+ * includes. More specifically:
+ * 
+ * 
+ * If a Pipeline has an {@link Estimator}, one needs to call {@link 
Pipeline#fit(Table)} before use
+ * the pipeline as a {@link Transformer}. In this case the Pipeline is an 
{@link Estimator} and can
+ * produce a Pipeline as a {@link Model}.
+ * 
+ * 
+ * If a Pipeline has no {@link Estimator}, it is a {@link Transformer} and can 
be applied to a Table
+ * directly. In this case, {@link Pipeline#fit(Table)} will simply return the 
pipeline itself.
+ * 
+ * 
+ *
+ * In addition, a pipeline can also be used as a {@link PipelineStage} in 
another pipeline, just
+ * like an ordinary {@link Estimator} or {@link Transformer} as describe above.
+ */
+@PublicEvolving
+public final class Pipeline implements Estimator, 
Transformer,
+   Model {
+   private static final long serialVersionUID = 1L;
+   private List stages;
+   private Params params;
+
+   public Pipeline() {
+   this(new ArrayList<>());
+   }
+
+   public Pipeline(List stages) {
+   this.stages = stages;
+   this.params = new Params();
+   }
+
+   private static boolean isStageNeedFit(PipelineStage stage) {
+   return (stage instanceof Pipeline && ((Pipeline) 
stage).needFit()) ||
+   (!(stage instanceof Pipeline) && stage instanceof 
Estimator);
+   }
+
+   /**
+* Appends a PipelineStage to the tail of this pipeline.
+*
+* @param stage the stage to be appended
+*/
+   public Pipeline appendStage(PipelineStage stage) {
+   stages.add(stage);
+   return this;
+   }
+
+   /**
+* Returns a list of all stages in this pipeline in order.
+*
+* @return a list of all stages in this pipeline in order.
+*/
+   public List getStages() {
+   return stages;
+   }
+
+   /**
+* Check whether the pipeline acts as an {@link Estimator} or not. When 
the return value is
+* true, that means this pipeline contains an {@link Estimator} and 
thus users must invoke
+* {@link #fit(Table)} before they can use this pipeline as a {@link 
Transformer}. Otherwise,
+* the pipeline can be used as a {@link Transformer} directly.
+*
+* @return {@code true} if this pipeline has an Estimator, {@code 
false} otherwise
+*/
+   public boolean needFit() {
+   return this.getIndexOfLastEstimator() >= 0;
+   }
+
+   public Params getParams() {
+   return params;
+   }
+
+   //find the last Estimator or Pipeline that needs fit in stages, -1 
stand for no Estimator in Pipeline
+   private int getIndexOfLastEstimator() {
 
 Review comment:
   I have renewed the Pipeline add left a comment. It's outside the code page 
so maybe you didn't see it. Sorry that i'm not familiar with github, maybe 
there's a better way to let you know.
   The new Pipeline has only final fields, except lastEstimatorIndex which is 
an internal cache. Every stage must be added via appendStage() method including 
in constructor or fromJson() method. The return of getStages() will be an 

[jira] [Updated] (FLINK-12127) Move network related options to NetworkEnvironmentOptions

2019-05-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12127:
---
Labels: pull-request-available  (was: )

> Move network related options to NetworkEnvironmentOptions
> -
>
> Key: FLINK-12127
> URL: https://issues.apache.org/jira/browse/FLINK-12127
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
>
> Some network related options in TaskManagerOptions could be moved into new 
> introduced `NetworkEnvironmentOptions` which would be used for different 
> shuffle services.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] flinkbot commented on issue #8445: [FLINK-12127][network, config] Move network related options form TaskManagerOptions and NettyConfig into NetworkEnvironmentOptions

2019-05-14 Thread GitBox
flinkbot commented on issue #8445: [FLINK-12127][network,config] Move network 
related options form TaskManagerOptions and NettyConfig into 
NetworkEnvironmentOptions
URL: https://github.com/apache/flink/pull/8445#issuecomment-492483428
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-14 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r284070033
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -435,6 +439,12 @@ public boolean triggerCheckpoint(long timestamp, boolean 
isPeriodic) {
triggerCheckpoint(timestamp, checkpointProperties, 
null, isPeriodic, false);
return true;
} catch (CheckpointException e) {
+   try {
+   long latestGeneratedCheckpointId = 
getCheckpointIdCounter().getAndIncrement();
 
 Review comment:
   > BTW, I have one more important additional point, which is the 
`numUnsuccessfulCheckpointsTriggers` in checkpoint coordinator, which 
absolutely sounds like something that should now be moved into the failure 
manager, wdyt?
   
   After we provided `CheckpointFailureManager`, IMO the 
`numUnsuccessfulCheckpointsTriggers` is not valuable. Currently, it is 
incremented in the trigger lock of method `triggerCheckpoint`. The domain of 
trigger lock is the subset of the trigger phase which been 
`CheckpointFailureManager` understood. So the counting is not correct. And it 
is just for logging purpose. So I suggest we could remove it in this PR or in 
the third step (next step). If the logging is really necessary, we could do it 
again after we implemented the new counting logic based on checkpoint id 
sequence. 
   
   What's your opinion?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on issue #8445: [FLINK-12127][network, config] Move network related options form TaskManagerOptions and NettyConfig into NetworkEnvironmentOptions

2019-05-14 Thread GitBox
zhijiangW commented on issue #8445: [FLINK-12127][network,config] Move network 
related options form TaskManagerOptions and NettyConfig into 
NetworkEnvironmentOptions
URL: https://github.com/apache/flink/pull/8445#issuecomment-492483315
 
 
   @flinkbot attention @azagrebin


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW opened a new pull request #8445: [FLINK-12127][network, config] Move network related options form TaskManagerOptions and NettyConfig into NetworkEnvironmentOptions

2019-05-14 Thread GitBox
zhijiangW opened a new pull request #8445: [FLINK-12127][network,config] Move 
network related options form TaskManagerOptions and NettyConfig into 
NetworkEnvironmentOptions
URL: https://github.com/apache/flink/pull/8445
 
 
   ## What is the purpose of the change
   
   *The introduced `NetworkEnvironmentOptions` could be used for different 
shuffle services in future, not only for current task manager. So it is better 
to extract network related parameters as independent options.*
   
   ## Brief change log
   
 - *Extract network related options from `TaskManagerOptions` into 
`NetworkEnvironmentOptions*
 - *Migrate network related options from `NettyConfig` into 
`NetworkEnvironmentOptions`*
 - *Modify the `config.md` for generating new `network_configuration.html`*
 - *Adjust the related tests*
   
   ## Verifying this change
   
   This change is a trivial rework without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable) 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-5243) Implement an example for BipartiteGraph

2019-05-14 Thread Jasleen Kaur (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16839939#comment-16839939
 ] 

Jasleen Kaur commented on FLINK-5243:
-

Thanks [~ivan.mushketyk]. Could I implement bipartite matching in the example?

> Implement an example for BipartiteGraph
> ---
>
> Key: FLINK-5243
> URL: https://issues.apache.org/jira/browse/FLINK-5243
> Project: Flink
>  Issue Type: Sub-task
>  Components: Library / Graph Processing (Gelly)
>Reporter: Ivan Mushketyk
>Priority: Major
>  Labels: beginner
>
> Should implement example for BipartiteGraph in gelly-examples project 
> similarly to examples for Graph class.
> Depends on this: https://issues.apache.org/jira/browse/FLINK-2254



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] yanghua removed a comment on issue #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-14 Thread GitBox
yanghua removed a comment on issue #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#issuecomment-492471326
 
 
   Irrelevant test error, Have created an issue to report it, will trigger 
rebuild~


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] danny0405 commented on issue #8324: [FLINK-11921][table] Upgrade to calcite 1.19

2019-05-14 Thread GitBox
danny0405 commented on issue #8324: [FLINK-11921][table] Upgrade to calcite 1.19
URL: https://github.com/apache/flink/pull/8324#issuecomment-492478037
 
 
   > @danny0405 Could you help to review this PR?
   
   Sure, i will try to finish this reviewing AFAIK.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #8439: [FLINK-12485][python] Add completeness test for Table and TableEnviro…

2019-05-14 Thread GitBox
dianfu commented on a change in pull request #8439: [FLINK-12485][python] Add 
completeness test for Table and TableEnviro…
URL: https://github.com/apache/flink/pull/8439#discussion_r284063074
 
 

 ##
 File path: flink-python/pyflink/testing/test_case_utils.py
 ##
 @@ -90,3 +92,82 @@ def setUp(self):
 super(PyFlinkBatchTableTestCase, self).setUp()
 self.t_config = 
TableConfig.Builder().as_batch_execution().set_parallelism(4).build()
 self.t_env = TableEnvironment.get_table_environment(self.t_config)
+
+
+class PythonAPICompletenessTestCase(unittest.TestCase):
+"""
+Base class for python api completeness tests, i.e.,
 
 Review comment:
   python  -> Python
   api -> API


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #8439: [FLINK-12485][python] Add completeness test for Table and TableEnviro…

2019-05-14 Thread GitBox
dianfu commented on a change in pull request #8439: [FLINK-12485][python] Add 
completeness test for Table and TableEnviro…
URL: https://github.com/apache/flink/pull/8439#discussion_r284063742
 
 

 ##
 File path: flink-python/pyflink/testing/test_case_utils.py
 ##
 @@ -90,3 +92,82 @@ def setUp(self):
 super(PyFlinkBatchTableTestCase, self).setUp()
 self.t_config = 
TableConfig.Builder().as_batch_execution().set_parallelism(4).build()
 self.t_env = TableEnvironment.get_table_environment(self.t_config)
+
+
+class PythonAPICompletenessTestCase(unittest.TestCase):
+"""
+Base class for python api completeness tests, i.e.,
+python api should be aligned with the Java API as much as possible.
+"""
+
+@classmethod
+def get_python_class_methods(cls, python_class):
+return {cls.snake_to_camel(cls.java_method_name(method_name))
+for method_name in dir(python_class) if not 
method_name.startswith('_')}
+
+@staticmethod
+def snake_to_camel(method_name):
+output = ''.join(x.capitalize() or '_' for x in method_name.split('_'))
+return output[0].lower() + output[1:]
+
+@staticmethod
+def get_java_class_method(java_class):
+gateway = get_gateway()
+s = set()
+method_arr = gateway.jvm.Class.forName(java_class).getDeclaredMethods()
+for i in range(0, len(method_arr)):
+s.add(method_arr[i].getName())
+return s
+
+@classmethod
+def check_methods(cls):
+java_methods = 
PythonAPICompletenessTestCase.get_java_class_method(cls.java_class())
+python_methods = cls.get_python_class_methods(cls.python_class())
+missing_methods = java_methods - python_methods - cls.exclude_methods()
+if len(missing_methods) > 0:
+print(missing_methods)
+print('The Exception should be raised after FLINK-12407 is 
merged.')
+# raise Exception('Methods: %s in Java class %s have not been 
added in Python class %s.'
+#% (missing_methods, java_class, python_class))
+
+@classmethod
+def java_method_name(cls, python_method_name):
+"""
+This method should be overWrite when the method name of the Python API 
cannot be
+consistent with the Java API method name. we need to get the 
corresponding
+Java API method name based on the Python method name. e.g.: 'as' is 
python
+keyword, so we use 'alias' in Python API corresponding 'as' in Java 
API.
+
+:param python_method_name: Method name of Python API.
+:return: Method name corresponding to Java API.
+"""
+return python_method_name
+
+@classmethod
+@abstractmethod
+def python_class(cls):
+"""
+Return the Python class that needs to be compared. such as 
:class:`Table`.
+"""
+pass
+
+@classmethod
+@abstractmethod
+def java_class(cls):
+"""
+Return the Java class that needs to be compared. such as 
`org.apache.flink.table.api.Table`.
+"""
+pass
+
+@classmethod
+def exclude_methods(cls):
 
 Review comment:
   exclude_methods -> excluded_methods


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on issue #8324: [FLINK-11921][table] Upgrade to calcite 1.19

2019-05-14 Thread GitBox
KurtYoung commented on issue #8324: [FLINK-11921][table] Upgrade to calcite 1.19
URL: https://github.com/apache/flink/pull/8324#issuecomment-492471785
 
 
   @danny0405 Could you help to review this PR?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11612) Translate the "Project Template for Java" page into Chinese

2019-05-14 Thread LakeShen (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11612?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16839915#comment-16839915
 ] 

LakeShen commented on FLINK-11612:
--

I will translate this page do my best

> Translate the "Project Template for Java" page into Chinese
> ---
>
> Key: FLINK-11612
> URL: https://issues.apache.org/jira/browse/FLINK-11612
> Project: Flink
>  Issue Type: Sub-task
>  Components: chinese-translation, Documentation
>Reporter: Jark Wu
>Assignee: LakeShen
>Priority: Major
>
> The page url is 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/projectsetup/java_api_quickstart.html
> The markdown file is located in 
> flink/docs/dev/projectsetup/java_api_quickstart.zh.md
> The markdown file will be created once FLINK-11529 is merged.
> You can reference the translation from : 
> https://github.com/flink-china/1.6.0/blob/master/quickstart/java_api_quickstart.md



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-11612) Translate the "Project Template for Java" page into Chinese

2019-05-14 Thread LakeShen (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11612?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

LakeShen reassigned FLINK-11612:


Assignee: LakeShen

> Translate the "Project Template for Java" page into Chinese
> ---
>
> Key: FLINK-11612
> URL: https://issues.apache.org/jira/browse/FLINK-11612
> Project: Flink
>  Issue Type: Sub-task
>  Components: chinese-translation, Documentation
>Reporter: Jark Wu
>Assignee: LakeShen
>Priority: Major
>
> The page url is 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/projectsetup/java_api_quickstart.html
> The markdown file is located in 
> flink/docs/dev/projectsetup/java_api_quickstart.zh.md
> The markdown file will be created once FLINK-11529 is merged.
> You can reference the translation from : 
> https://github.com/flink-china/1.6.0/blob/master/quickstart/java_api_quickstart.md



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] yanghua commented on issue #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-14 Thread GitBox
yanghua commented on issue #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#issuecomment-492471326
 
 
   Irrelevant test error, Have created an issue to report it, will trigger 
rebuild~


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-12370) Integrated Travis for Python Table API

2019-05-14 Thread sunjincheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng closed FLINK-12370.
---
   Resolution: Fixed
Fix Version/s: 1.9.0

Fixed in master: d95d395d92acca210e107663ed0e96c3285f0cf5

> Integrated Travis for Python Table API
> --
>
> Key: FLINK-12370
> URL: https://issues.apache.org/jira/browse/FLINK-12370
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Affects Versions: 1.9.0
>Reporter: sunjincheng
>Assignee: Wei Zhong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Integrated Travis for Python Table API



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12512) TableSourceTest#testNestedProject test failed

2019-05-14 Thread vinoyang (JIRA)
vinoyang created FLINK-12512:


 Summary: TableSourceTest#testNestedProject test failed
 Key: FLINK-12512
 URL: https://issues.apache.org/jira/browse/FLINK-12512
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / API
Reporter: vinoyang


 
{code:java}
20:41:59.128 [ERROR] 
testNestedProject(org.apache.flink.table.api.stream.table.TableSourceTest)  
Time elapsed: 0.047 s  <<< FAILURE!
org.junit.ComparisonFailure: 
null expected:<...deepNested.nested2.f[lag AS nestedFlag, 
deepNested.nested2.num AS nestedNum])
StreamTableSourceScan(table=[[T]], fields=[id, deepNested, nested], 
source=[TestSource(read nested fields: id.*, deepNested.nested2.num, 
deepNested.nested2.flag], deepNested.nested1...> but 
was:<...deepNested.nested2.f[1 AS nestedFlag, deepNested.nested2.f0 AS 
nestedNum])
StreamTableSourceScan(table=[[T]], fields=[id, deepNested, nested], 
source=[TestSource(read nested fields: id.*, deepNested.nested2.f1, 
deepNested.nested2.f0], deepNested.nested1...>
at 
org.apache.flink.table.api.stream.table.TableSourceTest.testNestedProject(TableSourceTest.scala:375)
{code}
log details : [https://api.travis-ci.org/v3/job/532319575/log.txt]

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] asfgit closed pull request #8392: [FLINK-12370][python][travis] Integrated Travis for Python Table API.

2019-05-14 Thread GitBox
asfgit closed pull request #8392: [FLINK-12370][python][travis] Integrated 
Travis for Python Table API.
URL: https://github.com/apache/flink/pull/8392
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-11840) Introduce FlinkLogicalSnapshot

2019-05-14 Thread Kurt Young (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11840?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young closed FLINK-11840.
--
   Resolution: Duplicate
Fix Version/s: 1.9.0

> Introduce FlinkLogicalSnapshot
> --
>
> Key: FLINK-11840
> URL: https://issues.apache.org/jira/browse/FLINK-11840
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Planner
>Reporter: godfrey he
>Assignee: godfrey he
>Priority: Major
> Fix For: 1.9.0
>
>
> {{Snapshot}} hasn't been introduced into Calcite yet. It's a whole new 
> feature. Related changes include parser, sql validator, relational expression 
> and so on.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] dianfu commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.

2019-05-14 Thread GitBox
dianfu commented on a change in pull request #8401: [FLINK-12407][python] Add 
all table operators align Java Table API.
URL: https://github.com/apache/flink/pull/8401#discussion_r284057878
 
 

 ##
 File path: flink-python/pyflink/table/table.py
 ##
 @@ -106,6 +113,344 @@ def where(self, predicate):
 """
 return Table(self._j_table.where(predicate))
 
+def group_by(self, fields):
+"""
+Groups the elements on some grouping keys. Use this before a selection 
with aggregations
+to perform the aggregation on a per-group basis. Similar to a SQL 
GROUP BY statement.
+
+Example:
+::
+>>> tab.group_by("key").select("key, value.avg")
+
+:param fields: Group keys.
+:return: The grouped table.
+"""
+return GroupedTable(self._j_table.groupBy(fields))
+
+def distinct(self):
+"""
+Removes duplicate values and returns onl
+Example:
+::
+>>> tab.select("key, value").distinct()
+
+:return: Result table.
+"""
+return Table(self._j_table.distinct())
+
+def join(self, right, join_predicate=None):
+"""
+Joins two :class:`Table`s. Similar to a SQL join. The fields of the 
two joined
+operations must not overlap, use :func:`~pyflink.table.Table.alias` to 
rename fields if
+necessary. You can use where and select clauses after a join to 
further specify the
+behaviour of the join.
+
+.. note::
+Both tables must be bound to the same :class:`TableEnvironment` .
+
+Example:
+::
+>>> left.join(right).where("a = b && c > 3").select("a, b, d")
+>>> left.join(right, "a = b")
+
+:param right: Right table.
+:param join_predicate: Optional, the join predicate expression string.
+:return: Result table.
+"""
+if join_predicate is not None:
+return Table(self._j_table.join(right._j_table, join_predicate))
+else:
+return Table(self._j_table.join(right._j_table))
+
+def left_outer_join(self, right, join_predicate=None):
+"""
+Joins two :class:`Table`s. Similar to a SQL left outer join. The 
fields of the two joined
+operations must not overlap, use :func:`~pyflink.table.Table.as_` to 
rename fields if
+necessary.
+
+.. note::
+Both tables must be bound to the same :class:`TableEnvironment` 
and its
+:class:`TableConfig` must have null check enabled (default).
+
+Example:
+::
+>>> left.left_outer_join(right).select("a, b, d")
+>>> left.left_outer_join(right, "a = b").select("a, b, d")
+
+:param right: Right table.
+:param join_predicate: Optional, the join predicate expression string.
+:return: Result table.
+"""
+if join_predicate is None:
+return Table(self._j_table.leftOuterJoin(right._j_table))
+else:
+return Table(self._j_table.leftOuterJoin(
+right._j_table, join_predicate))
+
+def right_outer_join(self, right, join_predicate):
+"""
+Joins two :class:`Table`s. Similar to a SQL right outer join. The 
fields of the two joined
+operations must not overlap, use :func:`~pyflink.table.Table.as_` to 
rename fields if
+necessary.
+
+.. note::
+Both tables must be bound to the same :class:`TableEnvironment` 
and its
+:class:`TableConfig` must have null check enabled (default).
+
+Example:
+::
+>>> left.right_outer_join(right, "a = b").select("a, b, d")
+
+:param right: Right table.
+:param join_predicate: The join predicate expression string.
+:return: Result table.
+"""
+return Table(self._j_table.rightOuterJoin(
+right._j_table, join_predicate))
+
+def full_outer_join(self, right, join_predicate):
+"""
+Joins two :class:`Table`s. Similar to a SQL full outer join. The 
fields of the two joined
+operations must not overlap, use :func:`~pyflink.table.Table.as_` to 
rename fields if
+necessary.
+
+.. note::
+Both tables must be bound to the same :class:`TableEnvironment` 
and its
+:class:`TableConfig` must have null check enabled (default).
+
+Example:
+::
+>>> left.full_outer_join(right, "a = b").select("a, b, d")
+
+:param right: Right table.
+:param join_predicate: The join predicate expression string.
+:return: Result table.
+"""
+return Table(self._j_table.fullOuterJoin(
+right._j_table, join_predicate))
+
+def minus(self, right):
+"""
+Minus of two :class:`Table`s with duplicate records removed.
+Similar to a SQL EXCEPT clause. Minus returns records from the left 
table 

[GitHub] [flink] klion26 commented on issue #8432: Release 1.8

2019-05-14 Thread GitBox
klion26 commented on issue #8432: Release 1.8
URL: https://github.com/apache/flink/pull/8432#issuecomment-492467084
 
 
   @sxganapa could you please close this pr which wants to merge release-1.8 to 
master


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on issue #8392: [FLINK-12370][python][travis] Integrated Travis for Python Table API.

2019-05-14 Thread GitBox
sunjincheng121 commented on issue #8392: [FLINK-12370][python][travis] 
Integrated Travis for Python Table API.
URL: https://github.com/apache/flink/pull/8392#issuecomment-492466610
 
 
   Thanks for the fix @WeiZhong94 
   LGTM. +1 to merged
   @flinkbot approve all


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.

2019-05-14 Thread GitBox
sunjincheng121 commented on a change in pull request #8401: 
[FLINK-12407][python] Add all table operators align Java Table API.
URL: https://github.com/apache/flink/pull/8401#discussion_r284055809
 
 

 ##
 File path: flink-python/pyflink/table/tests/test_batch_table_api.py
 ##
 @@ -0,0 +1,418 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+# #  distributed under the License is distributed on an "AS IS" BASIS,
+# #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# #  See the License for the specific language governing permissions and
+# # limitations under the License.
+
+import os
+
+from pyflink.table.types import DataTypes
+from pyflink.table.window import Tumble, Slide, Session
+
+from pyflink.testing.test_case_utils import PyFlinkBatchTableTestCase
+
+
+class BatchTableTests(PyFlinkBatchTableTestCase):
 
 Review comment:
   Thanks for your reply @dianfu!
   I think it again, you are right, add base test class which can run both 
Batch and Stream is pretty good, but from the points of CI time cost view,  for 
Python API wen only need check the `batchEvn` work well, and some special 
operators which only for batch need be check, other common operators test for 
stream is enough, because Python API only proxy to Java API, and Java API's 
test already cover the batch and stream. It's this make sense to you? 
@WeiZhong94 @dianfu 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.

2019-05-14 Thread GitBox
sunjincheng121 commented on a change in pull request #8401: 
[FLINK-12407][python] Add all table operators align Java Table API.
URL: https://github.com/apache/flink/pull/8401#discussion_r284054367
 
 

 ##
 File path: flink-python/pyflink/table/table.py
 ##
 @@ -106,6 +113,344 @@ def where(self, predicate):
 """
 return Table(self._j_table.where(predicate))
 
+def group_by(self, fields):
+"""
+Groups the elements on some grouping keys. Use this before a selection 
with aggregations
+to perform the aggregation on a per-group basis. Similar to a SQL 
GROUP BY statement.
+
+Example:
+::
+>>> tab.group_by("key").select("key, value.avg")
+
+:param fields: Group keys.
+:return: The grouped table.
+"""
+return GroupedTable(self._j_table.groupBy(fields))
+
+def distinct(self):
+"""
+Removes duplicate values and returns onl
+Example:
+::
+>>> tab.select("key, value").distinct()
+
+:return: Result table.
+"""
+return Table(self._j_table.distinct())
+
+def join(self, right, join_predicate=None):
+"""
+Joins two :class:`Table`s. Similar to a SQL join. The fields of the 
two joined
+operations must not overlap, use :func:`~pyflink.table.Table.alias` to 
rename fields if
+necessary. You can use where and select clauses after a join to 
further specify the
+behaviour of the join.
+
+.. note::
+Both tables must be bound to the same :class:`TableEnvironment` .
+
+Example:
+::
+>>> left.join(right).where("a = b && c > 3").select("a, b, d")
+>>> left.join(right, "a = b")
+
+:param right: Right table.
+:param join_predicate: Optional, the join predicate expression string.
+:return: Result table.
+"""
+if join_predicate is not None:
+return Table(self._j_table.join(right._j_table, join_predicate))
+else:
+return Table(self._j_table.join(right._j_table))
+
+def left_outer_join(self, right, join_predicate=None):
+"""
+Joins two :class:`Table`s. Similar to a SQL left outer join. The 
fields of the two joined
+operations must not overlap, use :func:`~pyflink.table.Table.as_` to 
rename fields if
+necessary.
+
+.. note::
+Both tables must be bound to the same :class:`TableEnvironment` 
and its
+:class:`TableConfig` must have null check enabled (default).
+
+Example:
+::
+>>> left.left_outer_join(right).select("a, b, d")
+>>> left.left_outer_join(right, "a = b").select("a, b, d")
+
+:param right: Right table.
+:param join_predicate: Optional, the join predicate expression string.
+:return: Result table.
+"""
+if join_predicate is None:
+return Table(self._j_table.leftOuterJoin(right._j_table))
+else:
+return Table(self._j_table.leftOuterJoin(
+right._j_table, join_predicate))
+
+def right_outer_join(self, right, join_predicate):
+"""
+Joins two :class:`Table`s. Similar to a SQL right outer join. The 
fields of the two joined
+operations must not overlap, use :func:`~pyflink.table.Table.as_` to 
rename fields if
+necessary.
+
+.. note::
+Both tables must be bound to the same :class:`TableEnvironment` 
and its
+:class:`TableConfig` must have null check enabled (default).
+
+Example:
+::
+>>> left.right_outer_join(right, "a = b").select("a, b, d")
+
+:param right: Right table.
+:param join_predicate: The join predicate expression string.
+:return: Result table.
+"""
+return Table(self._j_table.rightOuterJoin(
+right._j_table, join_predicate))
+
+def full_outer_join(self, right, join_predicate):
+"""
+Joins two :class:`Table`s. Similar to a SQL full outer join. The 
fields of the two joined
+operations must not overlap, use :func:`~pyflink.table.Table.as_` to 
rename fields if
+necessary.
+
+.. note::
+Both tables must be bound to the same :class:`TableEnvironment` 
and its
+:class:`TableConfig` must have null check enabled (default).
+
+Example:
+::
+>>> left.full_outer_join(right, "a = b").select("a, b, d")
+
+:param right: Right table.
+:param join_predicate: The join predicate expression string.
+:return: Result table.
+"""
+return Table(self._j_table.fullOuterJoin(
+right._j_table, join_predicate))
+
+def minus(self, right):
+"""
+Minus of two :class:`Table`s with duplicate records removed.
+Similar to a SQL EXCEPT clause. Minus returns records from the left 

[GitHub] [flink] asfgit closed pull request #8434: [FLINK-12234][hive] Support view related operations in HiveCatalog

2019-05-14 Thread GitBox
asfgit closed pull request #8434: [FLINK-12234][hive] Support view related 
operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8434
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-12234) Support view related operations in HiveCatalog

2019-05-14 Thread Bowen Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li closed FLINK-12234.

Resolution: Fixed

merged in 1.9.0: 544903d5cc77d6f90e0d77cb21ebf008b435e444

> Support view related operations in HiveCatalog
> --
>
> Key: FLINK-12234
> URL: https://issues.apache.org/jira/browse/FLINK-12234
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> Support view related operations in HiveCatalog, which implements 
> ReadableWritableCatalog API



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-12505) Unify database operations to HiveCatalogBase from its subclasses

2019-05-14 Thread Bowen Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12505?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li closed FLINK-12505.

Resolution: Fixed

merged in 1.9.0: f7ecfdbcdf0f1ab17097ebbe1d170b984961bf60

> Unify database operations to HiveCatalogBase from its subclasses
> 
>
> Key: FLINK-12505
> URL: https://issues.apache.org/jira/browse/FLINK-12505
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently each subclass of HiveCatalogBase has its own impl for catalog 
> related database APIs. We should unify them as much as possible



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] asfgit closed pull request #8433: [FLINK-12505][hive] Unify database operations to HiveCatalogBase from its subclasses

2019-05-14 Thread GitBox
asfgit closed pull request #8433: [FLINK-12505][hive] Unify database operations 
to HiveCatalogBase from its subclasses
URL: https://github.com/apache/flink/pull/8433
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on issue #8434: [FLINK-12234][hive] Support view related operations in HiveCatalog

2019-05-14 Thread GitBox
bowenli86 commented on issue #8434: [FLINK-12234][hive] Support view related 
operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8434#issuecomment-492395614
 
 
   merging...


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on issue #8433: [FLINK-12505][hive] Unify database operations to HiveCatalogBase from its subclasses

2019-05-14 Thread GitBox
bowenli86 commented on issue #8433: [FLINK-12505][hive] Unify database 
operations to HiveCatalogBase from its subclasses
URL: https://github.com/apache/flink/pull/8433#issuecomment-492395677
 
 
   merging...


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8434: [FLINK-12234][hive] Support view related operations in HiveCatalog

2019-05-14 Thread GitBox
bowenli86 commented on a change in pull request #8434: [FLINK-12234][hive] 
Support view related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8434#discussion_r283981020
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogView.java
 ##
 @@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogView;
+import org.apache.flink.util.StringUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A Hive catalog view implementation.
+ */
+public class HiveCatalogView implements CatalogView {
+   // Original text of the view definition.
+   private final String originalQuery;
+
+   // Expanded text of the original view definition
+   // This is needed because the context such as current DB is
+   // lost after the session, in which view is defined, is gone.
+   // Expanded query text takes care of the this, as an example.
+   private final String expandedQuery;
+
+   // Schema of the view (column names and types)
+   private final TableSchema tableSchema;
+   // Properties of the view
+   private final Map properties;
+   // Comment of the view
+   private String comment = "This is a hive catalog view.";
 
 Review comment:
   agree. This inherited behavior has been bothering me for a while. Opened 
FLINK-12511 to address this in all catalog metadata classes


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-12511) make variable "comment" in all catalog metadata classes final

2019-05-14 Thread Bowen Li (JIRA)
Bowen Li created FLINK-12511:


 Summary: make variable "comment" in all catalog metadata classes 
final 
 Key: FLINK-12511
 URL: https://issues.apache.org/jira/browse/FLINK-12511
 Project: Flink
  Issue Type: Improvement
Affects Versions: 1.9.0
Reporter: Bowen Li
Assignee: Bowen Li
 Fix For: 1.9.0


Because of historical reasons, currently the variable "comment" in all catalog 
metadata classes are not final yet, it has a default value and can be 
overwritten in constructor. It creates problems like overloaded constructors 
are built in a wrong way.

We should remove the default value of "comment" and make it final that can only 
be assigned value upon construction.






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] bowenli86 commented on a change in pull request #8434: [FLINK-12234][hive] Support view related operations in HiveCatalog

2019-05-14 Thread GitBox
bowenli86 commented on a change in pull request #8434: [FLINK-12234][hive] 
Support view related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8434#discussion_r283981020
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogView.java
 ##
 @@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogView;
+import org.apache.flink.util.StringUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A Hive catalog view implementation.
+ */
+public class HiveCatalogView implements CatalogView {
+   // Original text of the view definition.
+   private final String originalQuery;
+
+   // Expanded text of the original view definition
+   // This is needed because the context such as current DB is
+   // lost after the session, in which view is defined, is gone.
+   // Expanded query text takes care of the this, as an example.
+   private final String expandedQuery;
+
+   // Schema of the view (column names and types)
+   private final TableSchema tableSchema;
+   // Properties of the view
+   private final Map properties;
+   // Comment of the view
+   private String comment = "This is a hive catalog view.";
 
 Review comment:
   agree. This inherited behavior has been bothering me for a while. Opened 
FLINK-12511 for this


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sjwiesman commented on a change in pull request #8437: [FLINK-12508] Expand Testing Documentation

2019-05-14 Thread GitBox
sjwiesman commented on a change in pull request #8437: [FLINK-12508] Expand 
Testing Documentation
URL: https://github.com/apache/flink/pull/8437#discussion_r283954075
 
 

 ##
 File path: docs/dev/stream/testing.md
 ##
 @@ -23,138 +23,387 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-This page briefly discusses how to test a Flink application in your IDE or a 
local environment.
+Testing is an integral part of every software development process. As such 
Apache Flink comes with tooling to test your Apache Flink application code on 
different levels of the testing pyramid.
 
 * This will be replaced by the TOC
 {:toc}
 
-## Unit testing
+## Testing User-Defined Functions
 
-Usually, one can assume that Flink produces correct results outside of a 
user-defined `Function`. Therefore, it is recommended to test `Function` 
classes that contain the main business logic with unit tests as much as 
possible.
+Usually, one can assume that Flink produces correct results outside of a 
user-defined function. Therefore, it is recommended to test these classes that 
contain the main business logic with unit tests as much as possible.
 
-For example if one implements the following `ReduceFunction`:
+### Unit Testing Stateless, Timeless UDFs
+
+
+For example, let's take the following stateless `MapFunction`.
 
 
 
 {% highlight java %}
-public class SumReduce implements ReduceFunction {
+public class IncrementMapFunction implements MapFunction {
 
 @Override
-public Long reduce(Long value1, Long value2) throws Exception {
-return value1 + value2;
+public Long map(Long record) throws Exception {
+return record +1 ;
 }
 }
 {% endhighlight %}
 
 
 
 {% highlight scala %}
-class SumReduce extends ReduceFunction[Long] {
+class IncrementMapFunction extends MapFunction[Long, Long] {
 
-override def reduce(value1: java.lang.Long, value2: java.lang.Long): 
java.lang.Long = {
-value1 + value2
+override def map(record: java.lang.Long): java.lang.Long = {
+record + 1
 }
 }
 {% endhighlight %}
 
 
 
-It is very easy to unit test it with your favorite framework by passing 
suitable arguments and verify the output:
+It is very easy to unit test such a function with your favorite testing 
framework by passing suitable arguments and verifying the output.
 
 
 
 {% highlight java %}
-public class SumReduceTest {
+public class IncrementMapFunctionTest {
 
 @Test
-public void testSum() throws Exception {
+public void testIncrement() throws Exception {
 // instantiate your function
-SumReduce sumReduce = new SumReduce();
+IncrementMapFunction incrementer = new IncrementMapFunction();
 
 // call the methods that you have implemented
-assertEquals(42L, sumReduce.reduce(40L, 2L));
+assertEquals(3L, incrementer.map(2L));
 }
 }
 {% endhighlight %}
 
 
 
 {% highlight scala %}
-class SumReduceTest extends FlatSpec with Matchers {
+class IncrementMapFunctionTest extends FlatSpec with Matchers {
+
+"IncrementMapFunction" should "increment values" in {
+// instantiate your function
+val incrementer: IncrementMapFunction = new IncrementMapFunction()
+
+// call the methods that you have implemented
+incremeter.map(2) should be (3)
+}
+}
+{% endhighlight %}
+
+
+
+Similarly, a user-defined function which uses an 
`org.apache.flink.util.Collector` (e.g. a `FlatMapFunction` or 
`ProcessFunction`) can be easily tested by providing a mock object instead of a 
real collector.  A `FlatMapFunction` with the same functionality as the 
`IncrementMapFunction` could be unit tested as follows.
+
+
+
+{% highlight java %}
+public class IncrementFlatMapFunctionTest {
 
-"SumReduce" should "add values" in {
+@Test
+public void testIncrement() throws Exception {
 // instantiate your function
-val sumReduce: SumReduce = new SumReduce()
+IncrementFlatMapFunction incrementer = new IncrementFlatMapFunction();
+
+Collector collector = mock(Collector.class);
 
 // call the methods that you have implemented
-sumReduce.reduce(40L, 2L) should be (42L)
+incrementer.flatMap(2L, collector)
+
+//verify collector was called with the right output
+Mockito.verify(collector, times(1)).collect(3L);
 }
 }
 {% endhighlight %}
 
+
+
+{% highlight scala %}
+class IncrementFlatMapFunctionTest extends FlatSpec with MockFactory {
+
+"IncrementFlatMapFunction" should "increment values" in {
+   // instantiate your function
+  val incrementer : IncrementFlatMapFunction = new 
IncrementFlatMapFunction()
+
+  val collector = mock[Collector[Integer]]
+
+  //verify collector was called with the right output
+  (collector.collect _).expects(3)
+
+  // call the methods that you have implemented
+  flattenFunction.flatMap(2, collector)
+  }
+}
+{% endhighlight %}
+
 
 

[GitHub] [flink] sjwiesman commented on a change in pull request #8437: [FLINK-12508] Expand Testing Documentation

2019-05-14 Thread GitBox
sjwiesman commented on a change in pull request #8437: [FLINK-12508] Expand 
Testing Documentation
URL: https://github.com/apache/flink/pull/8437#discussion_r283959368
 
 

 ##
 File path: docs/dev/stream/testing.md
 ##
 @@ -181,85 +430,76 @@ public class ExampleIntegrationTest extends 
AbstractTestBase {
 
 
 {% highlight scala %}
-class ExampleIntegrationTest extends AbstractTestBase {
+class StreamingJobIntegrationTest extends FlatSpec with Matchers with 
BeforeAndAfter {
 
-@Test
-def testMultiply(): Unit = {
-val env = StreamExecutionEnvironment.getExecutionEnvironment
+  val flinkCluster = new MiniClusterWithClientResource(new 
MiniClusterResourceConfiguration.Builder()
+.setNumberSlotsPerTaskManager(1)
+.setNumberTaskManagers(1)
+.build)
 
-// configure your test environment
-env.setParallelism(1)
+  before {
+flinkCluster.before()
+  }
 
-// values are collected in a static variable
-CollectSink.values.clear()
+  after {
+flinkCluster.after()
+  }
 
-// create a stream of custom elements and apply transformations
-env
-.fromElements(1L, 21L, 22L)
-.map(new MultiplyByTwo())
-.addSink(new CollectSink())
 
-// execute
-env.execute()
+  "IncrementFlatMapFunction pipeline" should "incrementValues" in {
 
-// verify your results
-assertEquals(Lists.newArrayList(2L, 42L, 44L), CollectSink.values)
-}
-}
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+// configure your test environment
+env.setParallelism(2)
 
+// values are collected in a static variable
+CollectSink.values.clear()
+
+// create a stream of custom elements and apply transformations
+env.fromElements(1, 21, 22)
+   .map(new IncrementMapFunction())
+   .addSink(new CollectSink())
+
+// execute
+env.execute()
+
+// verify your results
+CollectSink.values should contain allOf (1,22,23)
+}
+}
 // create a testing sink
 class CollectSink extends SinkFunction[Long] {
 
-override def invoke(value: java.lang.Long): Unit = {
-synchronized {
-values.add(value)
-}
+  override def invoke(value: Long): Unit = {
+synchronized {
+  CollectSink.values.add(value)
 }
+  }
 }
 
 object CollectSink {
-
 // must be static
-val values: List[Long] = new ArrayList()
+val values: util.List[Long] = new util.ArrayList()
 }
 {% endhighlight %}
 
 
 
-The static variable in `CollectSink` is used here because Flink serializes all 
operators before distributing them across a cluster.
-Communicating with operators instantiated by a local Flink mini cluster via 
static variables is one way around this issue.
-Alternatively, you could for example write the data to files in a temporary 
directory with your test sink.
-You can also implement your own custom sources for emitting watermarks.
-
-## Testing checkpointing and state handling
+A few remarks on integration testing with `MiniClusterWithClientResource`:
 
-One way to test state handling is to enable checkpointing in integration 
tests. 
+* In order not to copy your whole pipeline code from production to test, make 
sources and sinks pluggable in your production code and inject special test 
sources and test sinks in your tests.
 
-You can do that by configuring your `StreamExecutionEnvironment` in the test:
-
-
-
-{% highlight java %}
-env.enableCheckpointing(500);
-env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100));
-{% endhighlight %}
-
-
-
-{% highlight scala %}
-env.enableCheckpointing(500)
-env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100))
-{% endhighlight %}
-
-
+* The static variable in `CollectSink` is used here because Flink serializes 
all operators before distributing them across a cluster.
+Communicating with operators instantiated by a local Flink mini cluster via 
static variables is one way around this issue.
+Alternatively, you could for example write the data to files in a temporary 
directory with your test sink.
 
-And for example adding to your Flink application an identity mapper operator 
that will throw an exception
-once every `1000ms`. However writing such test could be tricky because of time 
dependencies between the actions.
+* You can also implement your own custom *parallel* source function for 
emitting watermarks.
 
 Review comment:
   I feel like "also" implies the previous sentence was also about managing 
watermarks.
   ```suggestion
   * You can implement a custom *parallel* source function for emitting 
watermarks if your job uses event timer timers.
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:

[GitHub] [flink] sjwiesman commented on a change in pull request #8437: [FLINK-12508] Expand Testing Documentation

2019-05-14 Thread GitBox
sjwiesman commented on a change in pull request #8437: [FLINK-12508] Expand 
Testing Documentation
URL: https://github.com/apache/flink/pull/8437#discussion_r283957833
 
 

 ##
 File path: docs/dev/stream/testing.md
 ##
 @@ -23,138 +23,387 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-This page briefly discusses how to test a Flink application in your IDE or a 
local environment.
+Testing is an integral part of every software development process. As such 
Apache Flink comes with tooling to test your Apache Flink application code on 
different levels of the testing pyramid.
 
 * This will be replaced by the TOC
 {:toc}
 
-## Unit testing
+## Testing User-Defined Functions
 
-Usually, one can assume that Flink produces correct results outside of a 
user-defined `Function`. Therefore, it is recommended to test `Function` 
classes that contain the main business logic with unit tests as much as 
possible.
+Usually, one can assume that Flink produces correct results outside of a 
user-defined function. Therefore, it is recommended to test these classes that 
contain the main business logic with unit tests as much as possible.
 
-For example if one implements the following `ReduceFunction`:
+### Unit Testing Stateless, Timeless UDFs
+
+
+For example, let's take the following stateless `MapFunction`.
 
 
 
 {% highlight java %}
-public class SumReduce implements ReduceFunction {
+public class IncrementMapFunction implements MapFunction {
 
 @Override
-public Long reduce(Long value1, Long value2) throws Exception {
-return value1 + value2;
+public Long map(Long record) throws Exception {
+return record +1 ;
 }
 }
 {% endhighlight %}
 
 
 
 {% highlight scala %}
-class SumReduce extends ReduceFunction[Long] {
+class IncrementMapFunction extends MapFunction[Long, Long] {
 
-override def reduce(value1: java.lang.Long, value2: java.lang.Long): 
java.lang.Long = {
-value1 + value2
+override def map(record: java.lang.Long): java.lang.Long = {
+record + 1
 }
 }
 {% endhighlight %}
 
 
 
-It is very easy to unit test it with your favorite framework by passing 
suitable arguments and verify the output:
+It is very easy to unit test such a function with your favorite testing 
framework by passing suitable arguments and verifying the output.
 
 
 
 {% highlight java %}
-public class SumReduceTest {
+public class IncrementMapFunctionTest {
 
 @Test
-public void testSum() throws Exception {
+public void testIncrement() throws Exception {
 // instantiate your function
-SumReduce sumReduce = new SumReduce();
+IncrementMapFunction incrementer = new IncrementMapFunction();
 
 // call the methods that you have implemented
-assertEquals(42L, sumReduce.reduce(40L, 2L));
+assertEquals(3L, incrementer.map(2L));
 }
 }
 {% endhighlight %}
 
 
 
 {% highlight scala %}
-class SumReduceTest extends FlatSpec with Matchers {
+class IncrementMapFunctionTest extends FlatSpec with Matchers {
+
+"IncrementMapFunction" should "increment values" in {
+// instantiate your function
+val incrementer: IncrementMapFunction = new IncrementMapFunction()
+
+// call the methods that you have implemented
+incremeter.map(2) should be (3)
+}
+}
+{% endhighlight %}
+
+
+
+Similarly, a user-defined function which uses an 
`org.apache.flink.util.Collector` (e.g. a `FlatMapFunction` or 
`ProcessFunction`) can be easily tested by providing a mock object instead of a 
real collector.  A `FlatMapFunction` with the same functionality as the 
`IncrementMapFunction` could be unit tested as follows.
+
+
+
+{% highlight java %}
+public class IncrementFlatMapFunctionTest {
 
-"SumReduce" should "add values" in {
+@Test
+public void testIncrement() throws Exception {
 // instantiate your function
-val sumReduce: SumReduce = new SumReduce()
+IncrementFlatMapFunction incrementer = new IncrementFlatMapFunction();
+
+Collector collector = mock(Collector.class);
 
 // call the methods that you have implemented
-sumReduce.reduce(40L, 2L) should be (42L)
+incrementer.flatMap(2L, collector)
+
+//verify collector was called with the right output
+Mockito.verify(collector, times(1)).collect(3L);
 }
 }
 {% endhighlight %}
 
+
+
+{% highlight scala %}
+class IncrementFlatMapFunctionTest extends FlatSpec with MockFactory {
+
+"IncrementFlatMapFunction" should "increment values" in {
+   // instantiate your function
+  val incrementer : IncrementFlatMapFunction = new 
IncrementFlatMapFunction()
+
+  val collector = mock[Collector[Integer]]
+
+  //verify collector was called with the right output
+  (collector.collect _).expects(3)
+
+  // call the methods that you have implemented
+  flattenFunction.flatMap(2, collector)
+  }
+}
+{% endhighlight %}
+
 
 

[GitHub] [flink] sjwiesman commented on a change in pull request #8437: [FLINK-12508] Expand Testing Documentation

2019-05-14 Thread GitBox
sjwiesman commented on a change in pull request #8437: [FLINK-12508] Expand 
Testing Documentation
URL: https://github.com/apache/flink/pull/8437#discussion_r283954443
 
 

 ##
 File path: docs/dev/stream/testing.md
 ##
 @@ -23,138 +23,387 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-This page briefly discusses how to test a Flink application in your IDE or a 
local environment.
+Testing is an integral part of every software development process. As such 
Apache Flink comes with tooling to test your Apache Flink application code on 
different levels of the testing pyramid.
 
 * This will be replaced by the TOC
 {:toc}
 
-## Unit testing
+## Testing User-Defined Functions
 
-Usually, one can assume that Flink produces correct results outside of a 
user-defined `Function`. Therefore, it is recommended to test `Function` 
classes that contain the main business logic with unit tests as much as 
possible.
+Usually, one can assume that Flink produces correct results outside of a 
user-defined function. Therefore, it is recommended to test these classes that 
contain the main business logic with unit tests as much as possible.
 
-For example if one implements the following `ReduceFunction`:
+### Unit Testing Stateless, Timeless UDFs
+
+
+For example, let's take the following stateless `MapFunction`.
 
 
 
 {% highlight java %}
-public class SumReduce implements ReduceFunction {
+public class IncrementMapFunction implements MapFunction {
 
 @Override
-public Long reduce(Long value1, Long value2) throws Exception {
-return value1 + value2;
+public Long map(Long record) throws Exception {
+return record +1 ;
 }
 }
 {% endhighlight %}
 
 
 
 {% highlight scala %}
-class SumReduce extends ReduceFunction[Long] {
+class IncrementMapFunction extends MapFunction[Long, Long] {
 
-override def reduce(value1: java.lang.Long, value2: java.lang.Long): 
java.lang.Long = {
-value1 + value2
+override def map(record: java.lang.Long): java.lang.Long = {
+record + 1
 }
 }
 {% endhighlight %}
 
 
 
-It is very easy to unit test it with your favorite framework by passing 
suitable arguments and verify the output:
+It is very easy to unit test such a function with your favorite testing 
framework by passing suitable arguments and verifying the output.
 
 
 
 {% highlight java %}
-public class SumReduceTest {
+public class IncrementMapFunctionTest {
 
 @Test
-public void testSum() throws Exception {
+public void testIncrement() throws Exception {
 // instantiate your function
-SumReduce sumReduce = new SumReduce();
+IncrementMapFunction incrementer = new IncrementMapFunction();
 
 // call the methods that you have implemented
-assertEquals(42L, sumReduce.reduce(40L, 2L));
+assertEquals(3L, incrementer.map(2L));
 }
 }
 {% endhighlight %}
 
 
 
 {% highlight scala %}
-class SumReduceTest extends FlatSpec with Matchers {
+class IncrementMapFunctionTest extends FlatSpec with Matchers {
+
+"IncrementMapFunction" should "increment values" in {
+// instantiate your function
+val incrementer: IncrementMapFunction = new IncrementMapFunction()
+
+// call the methods that you have implemented
+incremeter.map(2) should be (3)
+}
+}
+{% endhighlight %}
+
+
+
+Similarly, a user-defined function which uses an 
`org.apache.flink.util.Collector` (e.g. a `FlatMapFunction` or 
`ProcessFunction`) can be easily tested by providing a mock object instead of a 
real collector.  A `FlatMapFunction` with the same functionality as the 
`IncrementMapFunction` could be unit tested as follows.
+
+
+
+{% highlight java %}
+public class IncrementFlatMapFunctionTest {
 
-"SumReduce" should "add values" in {
+@Test
+public void testIncrement() throws Exception {
 // instantiate your function
-val sumReduce: SumReduce = new SumReduce()
+IncrementFlatMapFunction incrementer = new IncrementFlatMapFunction();
+
+Collector collector = mock(Collector.class);
 
 // call the methods that you have implemented
-sumReduce.reduce(40L, 2L) should be (42L)
+incrementer.flatMap(2L, collector)
+
+//verify collector was called with the right output
+Mockito.verify(collector, times(1)).collect(3L);
 }
 }
 {% endhighlight %}
 
+
+
+{% highlight scala %}
+class IncrementFlatMapFunctionTest extends FlatSpec with MockFactory {
+
+"IncrementFlatMapFunction" should "increment values" in {
+   // instantiate your function
+  val incrementer : IncrementFlatMapFunction = new 
IncrementFlatMapFunction()
+
+  val collector = mock[Collector[Integer]]
+
+  //verify collector was called with the right output
+  (collector.collect _).expects(3)
+
+  // call the methods that you have implemented
+  flattenFunction.flatMap(2, collector)
+  }
+}
+{% endhighlight %}
+
 
 

[GitHub] [flink] sjwiesman commented on a change in pull request #8437: [FLINK-12508] Expand Testing Documentation

2019-05-14 Thread GitBox
sjwiesman commented on a change in pull request #8437: [FLINK-12508] Expand 
Testing Documentation
URL: https://github.com/apache/flink/pull/8437#discussion_r283958605
 
 

 ##
 File path: docs/dev/stream/testing.md
 ##
 @@ -181,85 +430,76 @@ public class ExampleIntegrationTest extends 
AbstractTestBase {
 
 
 {% highlight scala %}
-class ExampleIntegrationTest extends AbstractTestBase {
+class StreamingJobIntegrationTest extends FlatSpec with Matchers with 
BeforeAndAfter {
 
-@Test
-def testMultiply(): Unit = {
-val env = StreamExecutionEnvironment.getExecutionEnvironment
+  val flinkCluster = new MiniClusterWithClientResource(new 
MiniClusterResourceConfiguration.Builder()
+.setNumberSlotsPerTaskManager(1)
+.setNumberTaskManagers(1)
+.build)
 
-// configure your test environment
-env.setParallelism(1)
+  before {
+flinkCluster.before()
+  }
 
-// values are collected in a static variable
-CollectSink.values.clear()
+  after {
+flinkCluster.after()
+  }
 
-// create a stream of custom elements and apply transformations
-env
-.fromElements(1L, 21L, 22L)
-.map(new MultiplyByTwo())
-.addSink(new CollectSink())
 
-// execute
-env.execute()
+  "IncrementFlatMapFunction pipeline" should "incrementValues" in {
 
-// verify your results
-assertEquals(Lists.newArrayList(2L, 42L, 44L), CollectSink.values)
-}
-}
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+// configure your test environment
+env.setParallelism(2)
 
+// values are collected in a static variable
+CollectSink.values.clear()
+
+// create a stream of custom elements and apply transformations
+env.fromElements(1, 21, 22)
+   .map(new IncrementMapFunction())
+   .addSink(new CollectSink())
+
+// execute
+env.execute()
+
+// verify your results
+CollectSink.values should contain allOf (1,22,23)
+}
+}
 // create a testing sink
 class CollectSink extends SinkFunction[Long] {
 
-override def invoke(value: java.lang.Long): Unit = {
-synchronized {
-values.add(value)
-}
+  override def invoke(value: Long): Unit = {
+synchronized {
+  CollectSink.values.add(value)
 }
+  }
 }
 
 object CollectSink {
-
 // must be static
-val values: List[Long] = new ArrayList()
+val values: util.List[Long] = new util.ArrayList()
 }
 {% endhighlight %}
 
 
 
-The static variable in `CollectSink` is used here because Flink serializes all 
operators before distributing them across a cluster.
-Communicating with operators instantiated by a local Flink mini cluster via 
static variables is one way around this issue.
-Alternatively, you could for example write the data to files in a temporary 
directory with your test sink.
-You can also implement your own custom sources for emitting watermarks.
-
-## Testing checkpointing and state handling
+A few remarks on integration testing with `MiniClusterWithClientResource`:
 
-One way to test state handling is to enable checkpointing in integration 
tests. 
+* In order not to copy your whole pipeline code from production to test, make 
sources and sinks pluggable in your production code and inject special test 
sources and test sinks in your tests.
 
-You can do that by configuring your `StreamExecutionEnvironment` in the test:
-
-
-
-{% highlight java %}
-env.enableCheckpointing(500);
-env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100));
-{% endhighlight %}
-
-
-
-{% highlight scala %}
-env.enableCheckpointing(500)
-env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100))
-{% endhighlight %}
-
-
+* The static variable in `CollectSink` is used here because Flink serializes 
all operators before distributing them across a cluster.
+Communicating with operators instantiated by a local Flink mini cluster via 
static variables is one way around this issue.
+Alternatively, you could for example write the data to files in a temporary 
directory with your test sink.
 
 Review comment:
   Unneccessary 
   ```suggestion
   Alternatively, you could write the data to files in a temporary directory 
with your test sink.
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sjwiesman commented on a change in pull request #8437: [FLINK-12508] Expand Testing Documentation

2019-05-14 Thread GitBox
sjwiesman commented on a change in pull request #8437: [FLINK-12508] Expand 
Testing Documentation
URL: https://github.com/apache/flink/pull/8437#discussion_r283963231
 
 

 ##
 File path: docs/dev/stream/testing.md
 ##
 @@ -23,138 +23,387 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-This page briefly discusses how to test a Flink application in your IDE or a 
local environment.
+Testing is an integral part of every software development process. As such 
Apache Flink comes with tooling to test your Apache Flink application code on 
different levels of the testing pyramid.
 
 * This will be replaced by the TOC
 {:toc}
 
-## Unit testing
+## Testing User-Defined Functions
 
-Usually, one can assume that Flink produces correct results outside of a 
user-defined `Function`. Therefore, it is recommended to test `Function` 
classes that contain the main business logic with unit tests as much as 
possible.
+Usually, one can assume that Flink produces correct results outside of a 
user-defined function. Therefore, it is recommended to test these classes that 
contain the main business logic with unit tests as much as possible.
 
 Review comment:
   ```suggestion
   Usually, one can assume that Flink produces correct results outside of a 
user-defined function. Therefore, it is recommended to test those classes that 
contain the main business logic with unit tests as much as possible.
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sjwiesman commented on a change in pull request #8437: [FLINK-12508] Expand Testing Documentation

2019-05-14 Thread GitBox
sjwiesman commented on a change in pull request #8437: [FLINK-12508] Expand 
Testing Documentation
URL: https://github.com/apache/flink/pull/8437#discussion_r283951933
 
 

 ##
 File path: docs/dev/stream/testing.md
 ##
 @@ -23,138 +23,387 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-This page briefly discusses how to test a Flink application in your IDE or a 
local environment.
+Testing is an integral part of every software development process. As such 
Apache Flink comes with tooling to test your Apache Flink application code on 
different levels of the testing pyramid.
 
 Review comment:
   The first sentence is not a complete sentence. 
   
   ```suggestion
   Testing is an integral part of every software development process as such 
Apache Flink comes with tooling to test your application code on multiple 
levels of the testing pyramid.
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sjwiesman commented on a change in pull request #8437: [FLINK-12508] Expand Testing Documentation

2019-05-14 Thread GitBox
sjwiesman commented on a change in pull request #8437: [FLINK-12508] Expand 
Testing Documentation
URL: https://github.com/apache/flink/pull/8437#discussion_r283962452
 
 

 ##
 File path: docs/dev/stream/testing.md
 ##
 @@ -181,85 +430,76 @@ public class ExampleIntegrationTest extends 
AbstractTestBase {
 
 
 {% highlight scala %}
-class ExampleIntegrationTest extends AbstractTestBase {
+class StreamingJobIntegrationTest extends FlatSpec with Matchers with 
BeforeAndAfter {
 
-@Test
-def testMultiply(): Unit = {
-val env = StreamExecutionEnvironment.getExecutionEnvironment
+  val flinkCluster = new MiniClusterWithClientResource(new 
MiniClusterResourceConfiguration.Builder()
+.setNumberSlotsPerTaskManager(1)
+.setNumberTaskManagers(1)
+.build)
 
-// configure your test environment
-env.setParallelism(1)
+  before {
+flinkCluster.before()
+  }
 
-// values are collected in a static variable
-CollectSink.values.clear()
+  after {
+flinkCluster.after()
+  }
 
-// create a stream of custom elements and apply transformations
-env
-.fromElements(1L, 21L, 22L)
-.map(new MultiplyByTwo())
-.addSink(new CollectSink())
 
-// execute
-env.execute()
+  "IncrementFlatMapFunction pipeline" should "incrementValues" in {
 
-// verify your results
-assertEquals(Lists.newArrayList(2L, 42L, 44L), CollectSink.values)
-}
-}
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+// configure your test environment
+env.setParallelism(2)
 
+// values are collected in a static variable
+CollectSink.values.clear()
+
+// create a stream of custom elements and apply transformations
+env.fromElements(1, 21, 22)
+   .map(new IncrementMapFunction())
+   .addSink(new CollectSink())
+
+// execute
+env.execute()
+
+// verify your results
+CollectSink.values should contain allOf (1,22,23)
+}
+}
 // create a testing sink
 class CollectSink extends SinkFunction[Long] {
 
-override def invoke(value: java.lang.Long): Unit = {
-synchronized {
-values.add(value)
-}
+  override def invoke(value: Long): Unit = {
+synchronized {
+  CollectSink.values.add(value)
 }
+  }
 }
 
 object CollectSink {
-
 // must be static
-val values: List[Long] = new ArrayList()
+val values: util.List[Long] = new util.ArrayList()
 }
 {% endhighlight %}
 
 
 
-The static variable in `CollectSink` is used here because Flink serializes all 
operators before distributing them across a cluster.
-Communicating with operators instantiated by a local Flink mini cluster via 
static variables is one way around this issue.
-Alternatively, you could for example write the data to files in a temporary 
directory with your test sink.
-You can also implement your own custom sources for emitting watermarks.
-
-## Testing checkpointing and state handling
+A few remarks on integration testing with `MiniClusterWithClientResource`:
 
-One way to test state handling is to enable checkpointing in integration 
tests. 
+* In order not to copy your whole pipeline code from production to test, make 
sources and sinks pluggable in your production code and inject special test 
sources and test sinks in your tests.
 
-You can do that by configuring your `StreamExecutionEnvironment` in the test:
-
-
-
-{% highlight java %}
-env.enableCheckpointing(500);
-env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100));
-{% endhighlight %}
-
-
-
-{% highlight scala %}
-env.enableCheckpointing(500)
-env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100))
-{% endhighlight %}
-
-
+* The static variable in `CollectSink` is used here because Flink serializes 
all operators before distributing them across a cluster.
+Communicating with operators instantiated by a local Flink mini cluster via 
static variables is one way around this issue.
+Alternatively, you could for example write the data to files in a temporary 
directory with your test sink.
 
-And for example adding to your Flink application an identity mapper operator 
that will throw an exception
-once every `1000ms`. However writing such test could be tricky because of time 
dependencies between the actions.
+* You can also implement your own custom *parallel* source function for 
emitting watermarks.
 
-Another approach is to write a unit test using the Flink internal testing 
utility `AbstractStreamOperatorTestHarness` from the `flink-streaming-java` 
module.
+* It is recommended to always test your pipelines locally with a parallelism > 
1 in order to identify bugs which only surface, when the pipelines is executed 
in parallel.
 
-For an example of how to do that please have a look at the 
`org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest` 
also in the `flink-streaming-java` module.
+* If you use `@Rule` instead 

[GitHub] [flink] sjwiesman commented on a change in pull request #8437: [FLINK-12508] Expand Testing Documentation

2019-05-14 Thread GitBox
sjwiesman commented on a change in pull request #8437: [FLINK-12508] Expand 
Testing Documentation
URL: https://github.com/apache/flink/pull/8437#discussion_r283961380
 
 

 ##
 File path: docs/dev/stream/testing.md
 ##
 @@ -181,85 +430,76 @@ public class ExampleIntegrationTest extends 
AbstractTestBase {
 
 
 {% highlight scala %}
-class ExampleIntegrationTest extends AbstractTestBase {
+class StreamingJobIntegrationTest extends FlatSpec with Matchers with 
BeforeAndAfter {
 
-@Test
-def testMultiply(): Unit = {
-val env = StreamExecutionEnvironment.getExecutionEnvironment
+  val flinkCluster = new MiniClusterWithClientResource(new 
MiniClusterResourceConfiguration.Builder()
+.setNumberSlotsPerTaskManager(1)
+.setNumberTaskManagers(1)
+.build)
 
-// configure your test environment
-env.setParallelism(1)
+  before {
+flinkCluster.before()
+  }
 
-// values are collected in a static variable
-CollectSink.values.clear()
+  after {
+flinkCluster.after()
+  }
 
-// create a stream of custom elements and apply transformations
-env
-.fromElements(1L, 21L, 22L)
-.map(new MultiplyByTwo())
-.addSink(new CollectSink())
 
-// execute
-env.execute()
+  "IncrementFlatMapFunction pipeline" should "incrementValues" in {
 
-// verify your results
-assertEquals(Lists.newArrayList(2L, 42L, 44L), CollectSink.values)
-}
-}
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+// configure your test environment
+env.setParallelism(2)
 
+// values are collected in a static variable
+CollectSink.values.clear()
+
+// create a stream of custom elements and apply transformations
+env.fromElements(1, 21, 22)
+   .map(new IncrementMapFunction())
+   .addSink(new CollectSink())
+
+// execute
+env.execute()
+
+// verify your results
+CollectSink.values should contain allOf (1,22,23)
+}
+}
 // create a testing sink
 class CollectSink extends SinkFunction[Long] {
 
-override def invoke(value: java.lang.Long): Unit = {
-synchronized {
-values.add(value)
-}
+  override def invoke(value: Long): Unit = {
+synchronized {
+  CollectSink.values.add(value)
 }
+  }
 }
 
 object CollectSink {
-
 // must be static
-val values: List[Long] = new ArrayList()
+val values: util.List[Long] = new util.ArrayList()
 }
 {% endhighlight %}
 
 
 
-The static variable in `CollectSink` is used here because Flink serializes all 
operators before distributing them across a cluster.
-Communicating with operators instantiated by a local Flink mini cluster via 
static variables is one way around this issue.
-Alternatively, you could for example write the data to files in a temporary 
directory with your test sink.
-You can also implement your own custom sources for emitting watermarks.
-
-## Testing checkpointing and state handling
+A few remarks on integration testing with `MiniClusterWithClientResource`:
 
-One way to test state handling is to enable checkpointing in integration 
tests. 
+* In order not to copy your whole pipeline code from production to test, make 
sources and sinks pluggable in your production code and inject special test 
sources and test sinks in your tests.
 
-You can do that by configuring your `StreamExecutionEnvironment` in the test:
-
-
-
-{% highlight java %}
-env.enableCheckpointing(500);
-env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100));
-{% endhighlight %}
-
-
-
-{% highlight scala %}
-env.enableCheckpointing(500)
-env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100))
-{% endhighlight %}
-
-
+* The static variable in `CollectSink` is used here because Flink serializes 
all operators before distributing them across a cluster.
+Communicating with operators instantiated by a local Flink mini cluster via 
static variables is one way around this issue.
+Alternatively, you could for example write the data to files in a temporary 
directory with your test sink.
 
-And for example adding to your Flink application an identity mapper operator 
that will throw an exception
-once every `1000ms`. However writing such test could be tricky because of time 
dependencies between the actions.
+* You can also implement your own custom *parallel* source function for 
emitting watermarks.
 
-Another approach is to write a unit test using the Flink internal testing 
utility `AbstractStreamOperatorTestHarness` from the `flink-streaming-java` 
module.
+* It is recommended to always test your pipelines locally with a parallelism > 
1 in order to identify bugs which only surface, when the pipelines is executed 
in parallel.
 
-For an example of how to do that please have a look at the 
`org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest` 
also in the `flink-streaming-java` module.
+* If you use `@Rule` instead 

[GitHub] [flink] sjwiesman commented on a change in pull request #8437: [FLINK-12508] Expand Testing Documentation

2019-05-14 Thread GitBox
sjwiesman commented on a change in pull request #8437: [FLINK-12508] Expand 
Testing Documentation
URL: https://github.com/apache/flink/pull/8437#discussion_r283959906
 
 

 ##
 File path: docs/dev/stream/testing.md
 ##
 @@ -181,85 +430,76 @@ public class ExampleIntegrationTest extends 
AbstractTestBase {
 
 
 {% highlight scala %}
-class ExampleIntegrationTest extends AbstractTestBase {
+class StreamingJobIntegrationTest extends FlatSpec with Matchers with 
BeforeAndAfter {
 
-@Test
-def testMultiply(): Unit = {
-val env = StreamExecutionEnvironment.getExecutionEnvironment
+  val flinkCluster = new MiniClusterWithClientResource(new 
MiniClusterResourceConfiguration.Builder()
+.setNumberSlotsPerTaskManager(1)
+.setNumberTaskManagers(1)
+.build)
 
-// configure your test environment
-env.setParallelism(1)
+  before {
+flinkCluster.before()
+  }
 
-// values are collected in a static variable
-CollectSink.values.clear()
+  after {
+flinkCluster.after()
+  }
 
-// create a stream of custom elements and apply transformations
-env
-.fromElements(1L, 21L, 22L)
-.map(new MultiplyByTwo())
-.addSink(new CollectSink())
 
-// execute
-env.execute()
+  "IncrementFlatMapFunction pipeline" should "incrementValues" in {
 
-// verify your results
-assertEquals(Lists.newArrayList(2L, 42L, 44L), CollectSink.values)
-}
-}
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+// configure your test environment
+env.setParallelism(2)
 
+// values are collected in a static variable
+CollectSink.values.clear()
+
+// create a stream of custom elements and apply transformations
+env.fromElements(1, 21, 22)
+   .map(new IncrementMapFunction())
+   .addSink(new CollectSink())
+
+// execute
+env.execute()
+
+// verify your results
+CollectSink.values should contain allOf (1,22,23)
+}
+}
 // create a testing sink
 class CollectSink extends SinkFunction[Long] {
 
-override def invoke(value: java.lang.Long): Unit = {
-synchronized {
-values.add(value)
-}
+  override def invoke(value: Long): Unit = {
+synchronized {
+  CollectSink.values.add(value)
 }
+  }
 }
 
 object CollectSink {
-
 // must be static
-val values: List[Long] = new ArrayList()
+val values: util.List[Long] = new util.ArrayList()
 }
 {% endhighlight %}
 
 
 
-The static variable in `CollectSink` is used here because Flink serializes all 
operators before distributing them across a cluster.
-Communicating with operators instantiated by a local Flink mini cluster via 
static variables is one way around this issue.
-Alternatively, you could for example write the data to files in a temporary 
directory with your test sink.
-You can also implement your own custom sources for emitting watermarks.
-
-## Testing checkpointing and state handling
+A few remarks on integration testing with `MiniClusterWithClientResource`:
 
-One way to test state handling is to enable checkpointing in integration 
tests. 
+* In order not to copy your whole pipeline code from production to test, make 
sources and sinks pluggable in your production code and inject special test 
sources and test sinks in your tests.
 
-You can do that by configuring your `StreamExecutionEnvironment` in the test:
-
-
-
-{% highlight java %}
-env.enableCheckpointing(500);
-env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100));
-{% endhighlight %}
-
-
-
-{% highlight scala %}
-env.enableCheckpointing(500)
-env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100))
-{% endhighlight %}
-
-
+* The static variable in `CollectSink` is used here because Flink serializes 
all operators before distributing them across a cluster.
+Communicating with operators instantiated by a local Flink mini cluster via 
static variables is one way around this issue.
+Alternatively, you could for example write the data to files in a temporary 
directory with your test sink.
 
-And for example adding to your Flink application an identity mapper operator 
that will throw an exception
-once every `1000ms`. However writing such test could be tricky because of time 
dependencies between the actions.
+* You can also implement your own custom *parallel* source function for 
emitting watermarks.
 
-Another approach is to write a unit test using the Flink internal testing 
utility `AbstractStreamOperatorTestHarness` from the `flink-streaming-java` 
module.
+* It is recommended to always test your pipelines locally with a parallelism > 
1 in order to identify bugs which only surface, when the pipelines is executed 
in parallel.
 
 Review comment:
   ```suggestion
   * It is recommended to always test your pipelines locally with a parallelism 
> 1 to identify bugs which only surface for the pipelines executed in parallel.
   ```


[GitHub] [flink] sjwiesman commented on a change in pull request #8437: [FLINK-12508] Expand Testing Documentation

2019-05-14 Thread GitBox
sjwiesman commented on a change in pull request #8437: [FLINK-12508] Expand 
Testing Documentation
URL: https://github.com/apache/flink/pull/8437#discussion_r283958118
 
 

 ##
 File path: docs/dev/stream/testing.md
 ##
 @@ -23,138 +23,387 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-This page briefly discusses how to test a Flink application in your IDE or a 
local environment.
+Testing is an integral part of every software development process. As such 
Apache Flink comes with tooling to test your Apache Flink application code on 
different levels of the testing pyramid.
 
 * This will be replaced by the TOC
 {:toc}
 
-## Unit testing
+## Testing User-Defined Functions
 
-Usually, one can assume that Flink produces correct results outside of a 
user-defined `Function`. Therefore, it is recommended to test `Function` 
classes that contain the main business logic with unit tests as much as 
possible.
+Usually, one can assume that Flink produces correct results outside of a 
user-defined function. Therefore, it is recommended to test these classes that 
contain the main business logic with unit tests as much as possible.
 
-For example if one implements the following `ReduceFunction`:
+### Unit Testing Stateless, Timeless UDFs
+
+
+For example, let's take the following stateless `MapFunction`.
 
 
 
 {% highlight java %}
-public class SumReduce implements ReduceFunction {
+public class IncrementMapFunction implements MapFunction {
 
 @Override
-public Long reduce(Long value1, Long value2) throws Exception {
-return value1 + value2;
+public Long map(Long record) throws Exception {
+return record +1 ;
 }
 }
 {% endhighlight %}
 
 
 
 {% highlight scala %}
-class SumReduce extends ReduceFunction[Long] {
+class IncrementMapFunction extends MapFunction[Long, Long] {
 
-override def reduce(value1: java.lang.Long, value2: java.lang.Long): 
java.lang.Long = {
-value1 + value2
+override def map(record: java.lang.Long): java.lang.Long = {
+record + 1
 }
 }
 {% endhighlight %}
 
 
 
-It is very easy to unit test it with your favorite framework by passing 
suitable arguments and verify the output:
+It is very easy to unit test such a function with your favorite testing 
framework by passing suitable arguments and verifying the output.
 
 
 
 {% highlight java %}
-public class SumReduceTest {
+public class IncrementMapFunctionTest {
 
 @Test
-public void testSum() throws Exception {
+public void testIncrement() throws Exception {
 // instantiate your function
-SumReduce sumReduce = new SumReduce();
+IncrementMapFunction incrementer = new IncrementMapFunction();
 
 // call the methods that you have implemented
-assertEquals(42L, sumReduce.reduce(40L, 2L));
+assertEquals(3L, incrementer.map(2L));
 }
 }
 {% endhighlight %}
 
 
 
 {% highlight scala %}
-class SumReduceTest extends FlatSpec with Matchers {
+class IncrementMapFunctionTest extends FlatSpec with Matchers {
+
+"IncrementMapFunction" should "increment values" in {
+// instantiate your function
+val incrementer: IncrementMapFunction = new IncrementMapFunction()
+
+// call the methods that you have implemented
+incremeter.map(2) should be (3)
+}
+}
+{% endhighlight %}
+
+
+
+Similarly, a user-defined function which uses an 
`org.apache.flink.util.Collector` (e.g. a `FlatMapFunction` or 
`ProcessFunction`) can be easily tested by providing a mock object instead of a 
real collector.  A `FlatMapFunction` with the same functionality as the 
`IncrementMapFunction` could be unit tested as follows.
+
+
+
+{% highlight java %}
+public class IncrementFlatMapFunctionTest {
 
-"SumReduce" should "add values" in {
+@Test
+public void testIncrement() throws Exception {
 // instantiate your function
-val sumReduce: SumReduce = new SumReduce()
+IncrementFlatMapFunction incrementer = new IncrementFlatMapFunction();
+
+Collector collector = mock(Collector.class);
 
 // call the methods that you have implemented
-sumReduce.reduce(40L, 2L) should be (42L)
+incrementer.flatMap(2L, collector)
+
+//verify collector was called with the right output
+Mockito.verify(collector, times(1)).collect(3L);
 }
 }
 {% endhighlight %}
 
+
+
+{% highlight scala %}
+class IncrementFlatMapFunctionTest extends FlatSpec with MockFactory {
+
+"IncrementFlatMapFunction" should "increment values" in {
+   // instantiate your function
+  val incrementer : IncrementFlatMapFunction = new 
IncrementFlatMapFunction()
+
+  val collector = mock[Collector[Integer]]
+
+  //verify collector was called with the right output
+  (collector.collect _).expects(3)
+
+  // call the methods that you have implemented
+  flattenFunction.flatMap(2, collector)
+  }
+}
+{% endhighlight %}
+
 
 

[GitHub] [flink] sjwiesman commented on a change in pull request #8437: [FLINK-12508] Expand Testing Documentation

2019-05-14 Thread GitBox
sjwiesman commented on a change in pull request #8437: [FLINK-12508] Expand 
Testing Documentation
URL: https://github.com/apache/flink/pull/8437#discussion_r283952634
 
 

 ##
 File path: docs/dev/stream/testing.md
 ##
 @@ -23,138 +23,387 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-This page briefly discusses how to test a Flink application in your IDE or a 
local environment.
+Testing is an integral part of every software development process. As such 
Apache Flink comes with tooling to test your Apache Flink application code on 
different levels of the testing pyramid.
 
 * This will be replaced by the TOC
 {:toc}
 
-## Unit testing
+## Testing User-Defined Functions
 
-Usually, one can assume that Flink produces correct results outside of a 
user-defined `Function`. Therefore, it is recommended to test `Function` 
classes that contain the main business logic with unit tests as much as 
possible.
+Usually, one can assume that Flink produces correct results outside of a 
user-defined function. Therefore, it is recommended to test these classes that 
contain the main business logic with unit tests as much as possible.
 
-For example if one implements the following `ReduceFunction`:
+### Unit Testing Stateless, Timeless UDFs
+
+
+For example, let's take the following stateless `MapFunction`.
 
 
 
 {% highlight java %}
-public class SumReduce implements ReduceFunction {
+public class IncrementMapFunction implements MapFunction {
 
 @Override
-public Long reduce(Long value1, Long value2) throws Exception {
-return value1 + value2;
+public Long map(Long record) throws Exception {
+return record +1 ;
 }
 }
 {% endhighlight %}
 
 
 
 {% highlight scala %}
-class SumReduce extends ReduceFunction[Long] {
+class IncrementMapFunction extends MapFunction[Long, Long] {
 
-override def reduce(value1: java.lang.Long, value2: java.lang.Long): 
java.lang.Long = {
-value1 + value2
+override def map(record: java.lang.Long): java.lang.Long = {
 
 Review comment:
   Scala has its own primitive types that auto coerces to the correct java 
boxed or unboxed primitive. 
   
   ```suggestion
   override def map(record: Long): Long = {
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-10190) Unable to use custom endpoint in Kinesis producer

2019-05-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10190:
---
Labels: pull-request-available  (was: )

> Unable to use custom endpoint in Kinesis producer
> -
>
> Key: FLINK-10190
> URL: https://issues.apache.org/jira/browse/FLINK-10190
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: 1.6.0
>Reporter: Sergei Poganshev
>Priority: Major
>  Labels: pull-request-available
>
> There's a check in 
> [KinesisConfigUtil|https://github.com/apache/flink/blob/7d034d4ef6986ba5ccda6f5e8c587b8fdd88be8e/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java#L269]
>  that validates the fact that either AWS_REGION or AWS_ENDPOINT is specified 
> (not both), while Kinesis producer requires a region in any case (even with 
> custom endpoint).
> Also the error message for that validation outputs AWS_REGION twice.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] flinkbot commented on issue #8444: [FLINK-10190] [Kinesis Connector] Allow AWS_REGION to be supplied along with custom Kinesis endpoint

2019-05-14 Thread GitBox
flinkbot commented on issue #8444: [FLINK-10190] [Kinesis Connector] Allow 
AWS_REGION to be supplied along with custom Kinesis endpoint
URL: https://github.com/apache/flink/pull/8444#issuecomment-492376902
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] skidder opened a new pull request #8444: [FLINK-10190] [Kinesis Connector] Allow AWS_REGION to be supplied along with custom Kinesis endpoint

2019-05-14 Thread GitBox
skidder opened a new pull request #8444: [FLINK-10190] [Kinesis Connector] 
Allow AWS_REGION to be supplied along with custom Kinesis endpoint
URL: https://github.com/apache/flink/pull/8444
 
 
   ## What is the purpose of the change
   
   Allow an AWS region (e.g. `us-east-1`) to be supplied along with a custom 
Kinesis endpoint in the Kinesis configuration.
   
   Using a [Kinesis VPC 
endpoint](https://aws.amazon.com/blogs/aws/new-aws-privatelink-endpoints-kinesis-ec2-systems-manager-and-elb-apis-in-your-vpc/)
 can prevent traffic leaving the AWS network. This greatly reduces costs by 
eliminating NAT traversal and cross-AZ transfers. However, this requires that 
the Kinesis endpoint URL be signed for a specific region. This is accomplished 
by including the region name along with the endpoint URL when constructing the 
Kinesis client.
   
   ## Brief change log
   
 - Permit both region and endpoint to be set
 - Set the region on the endpoint configuration when both region and 
endpoint are set
   
   ## Verifying this change
   
   This change added tests and can be verified by running connector unit tests.
   
   ## Does this pull request potentially affect one of the following parts:
   
   No
   
   ## Documentation
   
   Documentation for the Kinesis connector has been updated.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12504) NullPoint here NullPointException there.. It's every where

2019-05-14 Thread Ken Krugler (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16839718#comment-16839718
 ] 

Ken Krugler commented on FLINK-12504:
-

If I Google "Flink user mailing list", the first hit is for 
[https://flink.apache.org/community.html#mailing-lists]

On this page is a Subscribe link for the *user*@flink.apache.org mailing list.

> NullPoint here NullPointException there.. It's every where
> --
>
> Key: FLINK-12504
> URL: https://issues.apache.org/jira/browse/FLINK-12504
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Reporter: Chethan UK
>Priority: Major
>
> I was trying to push data from Kafka to Cassandra, after around 220K 
> sometimes, 300K points are pushed into C*, this 
> java.lang.NullPointerException throws in..
> ```
> java.lang.NullPointerException
>   at 
> org.apache.flink.api.common.functions.util.PrintSinkOutputWriter.write(PrintSinkOutputWriter.java:73)
>   at 
> org.apache.flink.streaming.api.functions.sink.PrintSinkFunction.invoke(PrintSinkFunction.java:81)
>   at 
> org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
>   at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:649)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:602)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
>   at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
>   at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
>   at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:711)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:93)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57)
>   at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:97)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>   at java.lang.Thread.run(Thread.java:748)
> ```
> How can normal Flink Users understand these error? The Job's keep failing and 
> it's very unstable to be considered in production... 
>  
> In RoadMap, is there plans to make Kotlin supported language as well?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-7697) Add metrics for Elasticsearch Sink

2019-05-14 Thread Piyush Goyal (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16837512#comment-16837512
 ] 

Piyush Goyal edited comment on FLINK-7697 at 5/14/19 7:01 PM:
--

+1 on adding metrics to ES Sink. 

Few others metrics that would be useful.
 * Number of failed index equests (tagged for failure reasons).
 * End-to-End latency for bulk request.

[~xueyu] , [~yew1eb] Is anyone working on it currently ? We(at Netflix) are 
starting to add these metrics to our private fork. If no one else has started 
working on it, we'll be happy to contribute it. 


was (Author: pgoyal):
+1 on adding metrics to ES Sink. 

Few others metrics that would be useful.
 * Number of failed index equests (tagged for failure reasons).
 * End-to-End latency for bulk request.

[~xueyu] [~yew1eb] Is anyone working on it currently ? We(at Netflix) are 
starting to add these metrics to our private fork. If no one else has started 
working on it, we'll be happy to contribute it. 

> Add metrics for Elasticsearch Sink
> --
>
> Key: FLINK-7697
> URL: https://issues.apache.org/jira/browse/FLINK-7697
> Project: Flink
>  Issue Type: Wish
>  Components: Connectors / ElasticSearch
>Reporter: Hai Zhou
>Assignee: xueyu
>Priority: Major
>
> We should add metrics  to track  events write to ElasticasearchSink.
> eg. 
> * number of successful bulk sends
> * number of documents inserted
> * number of documents updated
> * number of documents version conflicts



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-7697) Add metrics for Elasticsearch Sink

2019-05-14 Thread Piyush Goyal (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16837512#comment-16837512
 ] 

Piyush Goyal edited comment on FLINK-7697 at 5/14/19 7:00 PM:
--

+1 on adding metrics to ES Sink. 

Few others metrics that would be useful.
 * Number of failed index equests (tagged for failure reasons).
 * End-to-End latency for bulk request.

[~xueyu] [~yew1eb] Is anyone working on it currently ? We(at Netflix) are 
starting to add these metrics to our private fork. If no one else has started 
working on it, we'll be happy to contribute it. 


was (Author: pgoyal):
+1 on adding metrics to ES Sink. 

Few others metrics that would be useful.
 * Number of failed index equests (tagged for failure reasons).
 * End-to-End latency for bulk request.

Is anyone working on it currently ? We(at Netflix) are starting to add these 
metrics to our private fork. If no one else has started working on it, we'll be 
happy to contribute it. 

> Add metrics for Elasticsearch Sink
> --
>
> Key: FLINK-7697
> URL: https://issues.apache.org/jira/browse/FLINK-7697
> Project: Flink
>  Issue Type: Wish
>  Components: Connectors / ElasticSearch
>Reporter: Hai Zhou
>Assignee: xueyu
>Priority: Major
>
> We should add metrics  to track  events write to ElasticasearchSink.
> eg. 
> * number of successful bulk sends
> * number of documents inserted
> * number of documents updated
> * number of documents version conflicts



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] xuefuz commented on a change in pull request #8433: [FLINK-12505][hive] Unify database operations to HiveCatalogBase from its subclasses

2019-05-14 Thread GitBox
xuefuz commented on a change in pull request #8433: [FLINK-12505][hive] Unify 
database operations to HiveCatalogBase from its subclasses
URL: https://github.com/apache/flink/pull/8433#discussion_r283934117
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
 ##
 @@ -86,22 +84,20 @@ public GenericHiveMetastoreCatalog(String catalogName, 
HiveConf hiveConf) {
// -- databases --
 
@Override
-   public CatalogDatabase getDatabase(String databaseName) throws 
DatabaseNotExistException, CatalogException {
-   Database hiveDb = getHiveDatabase(databaseName);
-
-   return new GenericCatalogDatabase(hiveDb.getParameters(), 
hiveDb.getDescription());
-   }
-
-   @Override
-   public void createDatabase(String name, CatalogDatabase database, 
boolean ignoreIfExists)
-   throws DatabaseAlreadyExistException, CatalogException {
-   
createHiveDatabase(GenericHiveMetastoreCatalogUtil.createHiveDatabase(name, 
database), ignoreIfExists);
+   protected CatalogDatabase createCatalogDatabase(Database hiveDatabase) {
+   return new GenericCatalogDatabase(
+   hiveDatabase.getParameters(),
+   hiveDatabase.getDescription()
+   );
}
 
@Override
-   public void alterDatabase(String name, CatalogDatabase newDatabase, 
boolean ignoreIfNotExists)
-   throws DatabaseNotExistException, CatalogException {
-   alterHiveDatabase(name, 
GenericHiveMetastoreCatalogUtil.createHiveDatabase(name, newDatabase), 
ignoreIfNotExists);
+   protected Database createHiveDatabase(String databaseName, 
CatalogDatabase catalogDatabase) {
+   return new Database(
+   databaseName,
+   catalogDatabase.getComment(),
+   null,
 
 Review comment:
   Nit: It would be nice to explain what "null" here means.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8434: [FLINK-12234][hive] Support view related operations in HiveCatalog

2019-05-14 Thread GitBox
xuefuz commented on a change in pull request #8434: [FLINK-12234][hive] Support 
view related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8434#discussion_r283930256
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogView.java
 ##
 @@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogView;
+import org.apache.flink.util.StringUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A Hive catalog view implementation.
+ */
+public class HiveCatalogView implements CatalogView {
+   // Original text of the view definition.
+   private final String originalQuery;
+
+   // Expanded text of the original view definition
+   // This is needed because the context such as current DB is
+   // lost after the session, in which view is defined, is gone.
+   // Expanded query text takes care of the this, as an example.
+   private final String expandedQuery;
+
+   // Schema of the view (column names and types)
+   private final TableSchema tableSchema;
+   // Properties of the view
+   private final Map properties;
+   // Comment of the view
+   private String comment = "This is a hive catalog view.";
 
 Review comment:
   Nit: It seems the assigned value will be overwritten any way in the 
constructor, so we might as well remove it and make the variable final. (I 
guess other existing classes might have the same problem.)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8434: [FLINK-12234][hive] Support view related operations in HiveCatalog

2019-05-14 Thread GitBox
xuefuz commented on a change in pull request #8434: [FLINK-12234][hive] Support 
view related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8434#discussion_r283926412
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 ##
 @@ -169,8 +181,18 @@ protected Table createHiveTable(ObjectPath tablePath, 
CatalogBaseTable table) {
sd.setCols(allColumns);
hiveTable.setPartitionKeys(new ArrayList<>());
}
+   } else if (table instanceof CatalogView) {
+   HiveCatalogView view = (HiveCatalogView) table;
+
+   // TODO: [FLINK-12398] Support partitioned view in 
catalog API
+   sd.setCols(allColumns);
+   hiveTable.setPartitionKeys(new ArrayList<>());
+
+   hiveTable.setViewOriginalText(view.getOriginalQuery());
+   hiveTable.setViewExpandedText(view.getExpandedQuery());
+   hiveTable.setTableType(TableType.VIRTUAL_VIEW.name());
} else {
-   throw new UnsupportedOperationException("HiveCatalog 
doesn't support view yet");
+   throw new CatalogException("HiveCatalog only supports 
CatalogTable and CatalogView");
 
 Review comment:
   HiveCatalogTable/View?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8434: [FLINK-12234][hive] Support view related operations in HiveCatalog

2019-05-14 Thread GitBox
xuefuz commented on a change in pull request #8434: [FLINK-12234][hive] Support 
view related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8434#discussion_r283925761
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 ##
 @@ -169,8 +181,18 @@ protected Table createHiveTable(ObjectPath tablePath, 
CatalogBaseTable table) {
sd.setCols(allColumns);
hiveTable.setPartitionKeys(new ArrayList<>());
}
+   } else if (table instanceof CatalogView) {
 
 Review comment:
   should we check against HiveCatalogView instead, as we are casting table to 
it right after this line? Same applies to the check in the if condition.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dawidwys commented on a change in pull request #8390: [FLINK-12469][table] Clean up catalog API on default/current database

2019-05-14 Thread GitBox
dawidwys commented on a change in pull request #8390: [FLINK-12469][table] 
Clean up catalog API on default/current database
URL: https://github.com/apache/flink/pull/8390#discussion_r283833595
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
 ##
 @@ -65,9 +64,15 @@
private final Map> partitionColumnStats;
 
public GenericInMemoryCatalog(String name) {
+   this(name, DEFAULT_DB);
+   }
+
+   public GenericInMemoryCatalog(String name, String defaultDatabase) {
 
 Review comment:
   I agree we should not get pulled into the discussion around yaml file. It's 
irrelevant from the perspective of this ctor. We can any arbitrary logic in the 
`CatalogFactory`.
   
   I still believe thought from the clean code perspective we should allow for 
passing a CatalogDatabase (or GenericInMemoryCatalogDatabse if we want to limit 
the scope) in the ctor. Such approach helps to write a testable 
code(http://misko.hevery.com/attachments/Guide-Writing%20Testable%20Code.pdf or 
 http://misko.hevery.com/code-reviewers-guide/) (which in turn is easier to 
use, as tests also verify that the interfaces are usable). I see no reason why 
we should not allow for that. 
   
   But in the end I will leave it for your call.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dawidwys commented on a change in pull request #8390: [FLINK-12469][table] Clean up catalog API on default/current database

2019-05-14 Thread GitBox
dawidwys commented on a change in pull request #8390: [FLINK-12469][table] 
Clean up catalog API on default/current database
URL: https://github.com/apache/flink/pull/8390#discussion_r283833595
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
 ##
 @@ -65,9 +64,15 @@
private final Map> partitionColumnStats;
 
public GenericInMemoryCatalog(String name) {
+   this(name, DEFAULT_DB);
+   }
+
+   public GenericInMemoryCatalog(String name, String defaultDatabase) {
 
 Review comment:
   I agree we should not get pulled into the discussion around yaml file. It's 
irrelevant from the perspective of these ctor. We can any arbitrary logic in 
the `CatalogFactory`.
   
   I still believe thought from the clean code perspective we should allow for 
passing a CatalogDatabase (or GenericInMemoryCatalogDatabse if we want to limit 
the scope) in the ctor. Such approach helps to write a testable 
code(http://misko.hevery.com/attachments/Guide-Writing%20Testable%20Code.pdf or 
 http://misko.hevery.com/code-reviewers-guide/) (which in turn is easier to 
use, as tests also verify that the interfaces are usable). I see no reason why 
we should not allow for that. 
   
   But in the end I will leave it for your call.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dawidwys edited a comment on issue #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-14 Thread GitBox
dawidwys edited a comment on issue #8404: [FLINK-11476][table] Create 
CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#issuecomment-492337827
 
 
   @bowenli86 The goal is to have a common `TableEnvironment` for both blink 
planner and legacy/flink planner. The entry point to either of them will be 
`Planner` interface, but the `CatalogManager` will not be part of it. The 
`CatalogManager` will entirely reside in the api module in `TableEnvironment`. 
The only fully functional `TableEnvironment` that exists in the master branch 
is the flink's one, thus this is the one we should work with. The one in 
`blink-planner` is just the minimal version to be able to run some of the tests.
   
   At some point though we have to figure out how do we want to share e.g. the 
`org.apache.flink.table.catalog.CatalogManagerCalciteSchema`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dawidwys commented on issue #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-14 Thread GitBox
dawidwys commented on issue #8404: [FLINK-11476][table] Create CatalogManager 
to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#issuecomment-492337827
 
 
   @bowenli86 The goal is to have a common `TableEnvironment` for both blink 
planner and legacy/flink planner. The entry point to either of them will be 
`Planner` interface, but the `CatalogManager` will not be part of it. The 
`CatalogManager` will entirely reside in the api module in `TableEnvironment`. 
The only fully functional `TableEnvironment` that exists in the master branch 
is the flink's one, thus this is the one we should work with. The one in 
`blink-planner` is just the minimal version to be able to run some of the tests.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StefanRRichter edited a comment on issue #8431: [FLINK-12480] Introduce mailbox to StreamTask main-loop

2019-05-14 Thread GitBox
StefanRRichter edited a comment on issue #8431: [FLINK-12480] Introduce mailbox 
to StreamTask main-loop
URL: https://github.com/apache/flink/pull/8431#issuecomment-492336130
 
 
   Maybe to extend the explanation a bit, the `MailboxSender` interface is in 
function similar to the `Executor` interface. Just we want a bit more control 
over when letters are accepted in the queue.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanyan300300 commented on a change in pull request #8215: [FLINK-8801][yarn/s3] Fix jars downloading issues due to inconsistent timestamp in S3 Filesystem

2019-05-14 Thread GitBox
yanyan300300 commented on a change in pull request #8215: [FLINK-8801][yarn/s3] 
Fix jars downloading issues due to inconsistent timestamp in S3 Filesystem
URL: https://github.com/apache/flink/pull/8215#discussion_r283919936
 
 

 ##
 File path: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
 ##
 @@ -206,9 +236,9 @@ public static void deleteApplicationFiles(final 
Map env) {
 * @return YARN resource
 */
private static LocalResource registerLocalResource(
-   Path remoteRsrcPath,
-   long resourceSize,
-   long resourceModificationTime) {
 
 Review comment:
   Thanks, updated. I was referring line 411. Seems not every method is 
following this convention... 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StefanRRichter commented on issue #8431: [FLINK-12480] Introduce mailbox to StreamTask main-loop

2019-05-14 Thread GitBox
StefanRRichter commented on issue #8431: [FLINK-12480] Introduce mailbox to 
StreamTask main-loop
URL: https://github.com/apache/flink/pull/8431#issuecomment-492336130
 
 
   Maybe to extend the explanation a bit, the `MailboxSender` interface is in 
function not unlike the `Executor` interface. Just we want a bit more control 
over when letters are accepted in the queue.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-12469) Clean up catalog API on default/current DB

2019-05-14 Thread Bowen Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12469?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li closed FLINK-12469.

   Resolution: Fixed
Fix Version/s: 1.9.0

merged in 1.9.0:  1c3ac2f82b707ae12556a5883980da8050510244

> Clean up catalog API on default/current DB
> --
>
> Key: FLINK-12469
> URL: https://issues.apache.org/jira/browse/FLINK-12469
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.9.0
>Reporter: Xuefu Zhang
>Assignee: Xuefu Zhang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Currently catalog API has get/setCurrentDatabase(), which is more user 
> session specific. In our design principal, catalog instance is agnostic to 
> user sessions. Thus, current database concept doesn't belong there. However, 
> a catalog should support a (configurable) default database, which would be 
> taken as user's current database when user's session doesn't specify a 
> current DB.
> This JIRA is to remove current database concept from catalog api and add 
> default database instead.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] StefanRRichter commented on a change in pull request #8431: [FLINK-12480] Introduce mailbox to StreamTask main-loop

2019-05-14 Thread GitBox
StefanRRichter commented on a change in pull request #8431: [FLINK-12480] 
Introduce mailbox to StreamTask main-loop
URL: https://github.com/apache/flink/pull/8431#discussion_r283917456
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImpl.java
 ##
 @@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks.mailbox;
+
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Implementation of {@link Mailbox} inspired by {@link 
java.util.concurrent.ArrayBlockingQueue} and tailored towards
+ * our use case with multiple writers, single reader and volatile reads 
instead of lock & read on {@link #count}.
+ */
+public class MailboxImpl implements Mailbox {
+
+   /**
+* The enqueued letters.
+*/
+   @GuardedBy("lock")
+   private final Runnable[] ringBuffer;
+
+   /**
+* Lock for all concurrent ops.
+*/
+   private final ReentrantLock lock;
+
+   /**
+* Condition that is triggered when the buffer is no longer empty.
+*/
+   @GuardedBy("lock")
+   private final Condition notEmpty;
+
+   /**
+* Condition that is triggered when the buffer is no longer full.
+*/
+   @GuardedBy("lock")
+   private final Condition notFull;
+
+   /**
+* Index of the ring buffer head.
+*/
+   @GuardedBy("lock")
+   private int headIndex;
+
+   /**
+* Index of the ring buffer tail.
+*/
+   @GuardedBy("lock")
+   private int tailIndex;
+
+   /**
+* Number of letters in the mailbox.
+*/
+   @GuardedBy("lock")
+   private volatile int count;
+
+   /**
+* A mask to wrap around the indexes of the ring buffer. We use this to 
avoid ifs or modulo ops.
+*/
+   private final int moduloMask;
+
+   public MailboxImpl() {
+   this(6); // 2^6 = 64
+   }
+
+   public MailboxImpl(int capacityPow2) {
+   final int capacity = 1 << capacityPow2;
+   Preconditions.checkState(capacity > 0);
+   this.moduloMask = capacity - 1;
+   this.ringBuffer = new Runnable[capacity];
+   this.lock = new ReentrantLock();
+   this.notEmpty = lock.newCondition();
+   this.notFull = lock.newCondition();
+   }
+
+   @Override
+   public boolean hasMail() {
+   return count > 0;
+   }
+
+   @Override
+   public Runnable tryTakeMail() {
+   final ReentrantLock lock = this.lock;
+   lock.lock();
+   try {
+   return isEmpty() ? null : takeInternal();
+   } finally {
+   lock.unlock();
+   }
+   }
+
+   @Nonnull
+   @Override
+   public Runnable takeMail() throws InterruptedException {
+   final ReentrantLock lock = this.lock;
+   lock.lockInterruptibly();
+   try {
+   while (isEmpty()) {
+   notEmpty.await();
+   }
+   return takeInternal();
+   } finally {
+   lock.unlock();
+   }
+   }
+
+   @Override
+   public void waitUntilHasMail() throws InterruptedException {
+   final ReentrantLock lock = this.lock;
+   lock.lockInterruptibly();
+   try {
+   while (isEmpty()) {
+   notEmpty.await();
+   }
+   } finally {
+   lock.unlock();
+   }
+   }
+
+   
//--
+
+   @Override
+   public boolean tryPutMail(@Nonnull Runnable letter) {
+   final ReentrantLock lock = this.lock;
+  

[GitHub] [flink] kezhuw commented on a change in pull request #8431: [FLINK-12480] Introduce mailbox to StreamTask main-loop

2019-05-14 Thread GitBox
kezhuw commented on a change in pull request #8431: [FLINK-12480] Introduce 
mailbox to StreamTask main-loop
URL: https://github.com/apache/flink/pull/8431#discussion_r283892928
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
 ##
 @@ -98,12 +96,9 @@ public void init() throws Exception {
}
 
@Override
-   protected void run() throws Exception {
-   // cache processor reference on the stack, to make the code 
more JIT friendly
-   final StreamInputProcessor inputProcessor = 
this.inputProcessor;
-
-   while (running && inputProcessor.processInput()) {
-   // all the work happens in the "processInput" method
+   protected void performDefaultAction(ActionContext context) throws 
Exception {
+   if (!inputProcessor.processInput()) {
 
 Review comment:
   Thanks, I did not notice that.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StefanRRichter commented on issue #8431: [FLINK-12480] Introduce mailbox to StreamTask main-loop

2019-05-14 Thread GitBox
StefanRRichter commented on issue #8431: [FLINK-12480] Introduce mailbox to 
StreamTask main-loop
URL: https://github.com/apache/flink/pull/8431#issuecomment-492334593
 
 
   @kezhuw the letters will always be just `Runnable`, and right now there are 
two cases: checkpoint trigger and processing timers. Stream elements are 
processed as part of the default action, they don't go through the mailbox for 
performance reasons. Imagine the default method as the often-called, hot action 
of the task. The letters are non-frequent request from other threads to run 
some code for them in the task's main thread to avoid concurrency. Thats all.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] kezhuw commented on a change in pull request #8431: [FLINK-12480] Introduce mailbox to StreamTask main-loop

2019-05-14 Thread GitBox
kezhuw commented on a change in pull request #8431: [FLINK-12480] Introduce 
mailbox to StreamTask main-loop
URL: https://github.com/apache/flink/pull/8431#discussion_r283916080
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImpl.java
 ##
 @@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks.mailbox;
+
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Implementation of {@link Mailbox} inspired by {@link 
java.util.concurrent.ArrayBlockingQueue} and tailored towards
+ * our use case with multiple writers, single reader and volatile reads 
instead of lock & read on {@link #count}.
+ */
+public class MailboxImpl implements Mailbox {
+
+   /**
+* The enqueued letters.
+*/
+   @GuardedBy("lock")
+   private final Runnable[] ringBuffer;
+
+   /**
+* Lock for all concurrent ops.
+*/
+   private final ReentrantLock lock;
+
+   /**
+* Condition that is triggered when the buffer is no longer empty.
+*/
+   @GuardedBy("lock")
+   private final Condition notEmpty;
+
+   /**
+* Condition that is triggered when the buffer is no longer full.
+*/
+   @GuardedBy("lock")
+   private final Condition notFull;
+
+   /**
+* Index of the ring buffer head.
+*/
+   @GuardedBy("lock")
+   private int headIndex;
+
+   /**
+* Index of the ring buffer tail.
+*/
+   @GuardedBy("lock")
+   private int tailIndex;
+
+   /**
+* Number of letters in the mailbox.
+*/
+   @GuardedBy("lock")
+   private volatile int count;
+
+   /**
+* A mask to wrap around the indexes of the ring buffer. We use this to 
avoid ifs or modulo ops.
+*/
+   private final int moduloMask;
+
+   public MailboxImpl() {
+   this(6); // 2^6 = 64
+   }
+
+   public MailboxImpl(int capacityPow2) {
+   final int capacity = 1 << capacityPow2;
+   Preconditions.checkState(capacity > 0);
+   this.moduloMask = capacity - 1;
+   this.ringBuffer = new Runnable[capacity];
+   this.lock = new ReentrantLock();
+   this.notEmpty = lock.newCondition();
+   this.notFull = lock.newCondition();
+   }
+
+   @Override
+   public boolean hasMail() {
+   return count > 0;
+   }
+
+   @Override
+   public Runnable tryTakeMail() {
+   final ReentrantLock lock = this.lock;
+   lock.lock();
+   try {
+   return isEmpty() ? null : takeInternal();
+   } finally {
+   lock.unlock();
+   }
+   }
+
+   @Nonnull
+   @Override
+   public Runnable takeMail() throws InterruptedException {
+   final ReentrantLock lock = this.lock;
+   lock.lockInterruptibly();
+   try {
+   while (isEmpty()) {
+   notEmpty.await();
+   }
+   return takeInternal();
+   } finally {
+   lock.unlock();
+   }
+   }
+
+   @Override
+   public void waitUntilHasMail() throws InterruptedException {
+   final ReentrantLock lock = this.lock;
+   lock.lockInterruptibly();
+   try {
+   while (isEmpty()) {
+   notEmpty.await();
+   }
+   } finally {
+   lock.unlock();
+   }
+   }
+
+   
//--
+
+   @Override
+   public boolean tryPutMail(@Nonnull Runnable letter) {
+   final ReentrantLock lock = this.lock;
+   

[GitHub] [flink] dawidwys commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-14 Thread GitBox
dawidwys commented on a change in pull request #8404: [FLINK-11476][table] 
Create CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r283916005
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
 ##
 @@ -118,8 +118,12 @@ abstract class TableTestUtil {
 }
 else if (expectedLine == TableTestUtil.ANY_SUBTREE) {
   break
-}
-else if (expectedLine != actualLine) {
+} else if (!verifyCatalogPath && actualLine.contains("table=[[")) {
 
 Review comment:
   I did not want change all of the tests. That's why I added an option to 
strip the default catalog & database during validation.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dawidwys commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-14 Thread GitBox
dawidwys commented on a change in pull request #8404: [FLINK-11476][table] 
Create CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r283915654
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
 ##
 @@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.schema.Function;
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.SchemaVersion;
+import org.apache.calcite.schema.Schemas;
+import org.apache.calcite.schema.Table;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * A mapping between Flink catalog's database and Calcite's schema.
+ * Tables are registered as tables in the schema.
+ */
+class DatabaseCalciteSchema implements Schema {
+   private final String dbName;
+   private final Catalog catalog;
+
+   public DatabaseCalciteSchema(String dbName, Catalog catalog) {
+   this.dbName = dbName;
+   this.catalog = catalog;
+   }
+
+   @Override
+   public Table getTable(String tableName) {
+
+   ObjectPath tablePath = new ObjectPath(dbName, tableName);
+
+   try {
+   if (!catalog.tableExists(tablePath)) {
+   return null;
+   }
+
+   CatalogBaseTable table = catalog.getTable(tablePath);
+
+   if (table instanceof CalciteCatalogTable) {
+   return ((CalciteCatalogTable) table).getTable();
+   } else {
+   throw new TableException("Unsupported table 
type: " + table);
+   }
+   } catch (Exception e) {
+   throw new TableException("Could not find table: " + 
tableName, e);
 
 Review comment:
   Actually we should never end up here with `TableNotExistException` as we are 
checking `catalog.tableExist()` before. That's why I went with `TableExceptin` 
here.
   
   I added appropriate comment there.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] asfgit closed pull request #8390: [FLINK-12469][table] Clean up catalog API on default/current database

2019-05-14 Thread GitBox
asfgit closed pull request #8390: [FLINK-12469][table] Clean up catalog API on 
default/current database
URL: https://github.com/apache/flink/pull/8390
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] kezhuw commented on issue #8431: [FLINK-12480] Introduce mailbox to StreamTask main-loop

2019-05-14 Thread GitBox
kezhuw commented on issue #8431: [FLINK-12480] Introduce mailbox to StreamTask 
main-loop
URL: https://github.com/apache/flink/pull/8431#issuecomment-492331773
 
 
   @StefanRRichter After read through 
https://docs.google.com/document/d/1eDpsUKv2FqwZiS1Pm6gYO5eFHScBHfULKmH1-ZEWB4g,
 I have some doubts about this feature which may relate to your discussion with 
@1u0 about message abstraction, I use word "letter" in following description to 
avoid conflicting with @1u0's point:
   * Will `StreamElement` handling, checkpoint handling be represented as 
letters in mailbox ?
   * Is there any cases that sender of letter do not know how to handle that 
letter, eg. do not know how to wrap it as `Runnable` ?
   
   If it is true to second question, I think it is the responsibility of 
mailbox, which has more knowledges than senders, to wrap not runnable letters 
as runnable letters if we don't want casting in processing of letters. Even in 
this case, I think overloading methods is a more appropriate approach than 
introducing generic message type as the latter is intrusive to type signature 
and may incur to unnecessary wrapper classes. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] walterddr commented on a change in pull request #8402: [FLINK-12473][ml] Add the interface of ML pipeline and ML lib

2019-05-14 Thread GitBox
walterddr commented on a change in pull request #8402: [FLINK-12473][ml] Add 
the interface of ML pipeline and ML lib
URL: https://github.com/apache/flink/pull/8402#discussion_r283907768
 
 

 ##
 File path: 
flink-ml/flink-ml-api/src/main/java/org/apache/flink/ml/api/core/Pipeline.java
 ##
 @@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.api.core;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.ml.api.misc.param.Params;
+import org.apache.flink.ml.util.persist.MLStageFactory;
+import org.apache.flink.table.api.Table;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A pipeline is a linear workflow which chains {@link Estimator}s and {@link 
Transformer}s to
+ * execute an algorithm.
+ *
+ * A pipeline itself can either act as an Estimator or a Transformer, 
depending on the stages it
+ * includes. More specifically:
+ * 
+ * 
+ * If a Pipeline has an {@link Estimator}, one needs to call {@link 
Pipeline#fit(Table)} before use
+ * the pipeline as a {@link Transformer}. In this case the Pipeline is an 
{@link Estimator} and can
+ * produce a Pipeline as a {@link Model}.
+ * 
+ * 
+ * If a Pipeline has no {@link Estimator}, it is a {@link Transformer} and can 
be applied to a Table
+ * directly. In this case, {@link Pipeline#fit(Table)} will simply return the 
pipeline itself.
+ * 
+ * 
+ *
+ * In addition, a pipeline can also be used as a {@link PipelineStage} in 
another pipeline, just
+ * like an ordinary {@link Estimator} or {@link Transformer} as describe above.
+ */
+@PublicEvolving
+public final class Pipeline implements Estimator, 
Transformer,
+   Model {
+   private static final long serialVersionUID = 1L;
+   private List stages;
+   private Params params;
+
+   public Pipeline() {
+   this(new ArrayList<>());
+   }
+
+   public Pipeline(List stages) {
+   this.stages = stages;
+   this.params = new Params();
+   }
+
+   private static boolean isStageNeedFit(PipelineStage stage) {
+   return (stage instanceof Pipeline && ((Pipeline) 
stage).needFit()) ||
+   (!(stage instanceof Pipeline) && stage instanceof 
Estimator);
+   }
+
+   /**
+* Appends a PipelineStage to the tail of this pipeline.
+*
+* @param stage the stage to be appended
+*/
+   public Pipeline appendStage(PipelineStage stage) {
+   stages.add(stage);
+   return this;
+   }
+
+   /**
+* Returns a list of all stages in this pipeline in order.
+*
+* @return a list of all stages in this pipeline in order.
+*/
+   public List getStages() {
+   return stages;
+   }
+
+   /**
+* Check whether the pipeline acts as an {@link Estimator} or not. When 
the return value is
+* true, that means this pipeline contains an {@link Estimator} and 
thus users must invoke
+* {@link #fit(Table)} before they can use this pipeline as a {@link 
Transformer}. Otherwise,
+* the pipeline can be used as a {@link Transformer} directly.
+*
+* @return {@code true} if this pipeline has an Estimator, {@code 
false} otherwise
+*/
+   public boolean needFit() {
+   return this.getIndexOfLastEstimator() >= 0;
+   }
+
+   public Params getParams() {
+   return params;
+   }
+
+   //find the last Estimator or Pipeline that needs fit in stages, -1 
stand for no Estimator in Pipeline
+   private int getIndexOfLastEstimator() {
 
 Review comment:
   Yes, I think that's also a possibility. Just we need to carefully maintain 
the code to limit what API can alter the "pipeline" (e.g. each time pipeline 
changes, we need to update the state). 
   
   One more thinking on this is, we can always make the pipeline final, since 
every time we `fit` the pipeline, we create a new one. what do you think 
@c4e



[GitHub] [flink] walterddr commented on issue #8389: [FLINK-12399][table] Fix FilterableTableSource does not change after applyPredicate

2019-05-14 Thread GitBox
walterddr commented on issue #8389: [FLINK-12399][table] Fix 
FilterableTableSource does not change after applyPredicate
URL: https://github.com/apache/flink/pull/8389#issuecomment-492325084
 
 
   @godfreyhe yeah you are right. this is not exactly "explain source", ideally 
speaking we need to put "what filter has been pushed down" in the 
`explainSource` result since that actually "explain" the data source. 
   
   I will draft a design to change the API in the JIRA. how about we move the 
discussion there? I will close this PR if we all agree.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] walterddr commented on a change in pull request #8389: [FLINK-12399][table] Fix FilterableTableSource does not change after applyPredicate

2019-05-14 Thread GitBox
walterddr commented on a change in pull request #8389: [FLINK-12399][table] Fix 
FilterableTableSource does not change after applyPredicate
URL: https://github.com/apache/flink/pull/8389#discussion_r283906054
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
 ##
 @@ -81,7 +81,13 @@ class FlinkLogicalTableSourceScan(
 val terms = super.explainTerms(pw)
 .item("fields", tableSource.getTableSchema.getFieldNames.mkString(", 
"))
 
-val sourceDesc = tableSource.explainSource()
+val auxiliarySourceDesc = tableSource match {
+  case fts: FilterableTableSource[_] =>
+s"FilterPushDown=${fts.isFilterPushedDown.toString}"
 
 Review comment:
   hmm. I think this is valid. I original think since there could only be one 
"match" and "onMatch", all the different filters either can be pushed down all 
cannot (e.g. always the same set of filter being push down).
   
   but I think yes, there's nothing preventing calcite to choose to partially 
push down (not partially possible, but partially push down some of the filter 
because of volcano planner thinks it is more efficient than another). I doubt 
this is possible at this moment.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8390: [FLINK-12469][table] Clean up catalog API on default/current database

2019-05-14 Thread GitBox
xuefuz commented on a change in pull request #8390: [FLINK-12469][table] Clean 
up catalog API on default/current database
URL: https://github.com/apache/flink/pull/8390#discussion_r283902136
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
 ##
 @@ -65,9 +64,15 @@
private final Map> partitionColumnStats;
 
public GenericInMemoryCatalog(String name) {
+   this(name, DEFAULT_DB);
+   }
+
+   public GenericInMemoryCatalog(String name, String defaultDatabase) {
 
 Review comment:
   Since we will have PR coming for YAML file structure for catalogs, let's 
defer the discussion until we see it. Now as far as I am concerned, I don't see 
a use case for a user to create a default DB before creating a catalog. It's 
also common for a db product to create default db automatically. If user needs 
there specific databases, they can create their own (possibly via DDL). Our 
in-memory catalog is following the suite except we allow user specific a name 
other than "default" with the change here. This addition seems to me even 
optional. Nevertheless, if we find it necessary that taking a database created 
by user to instantiate the in-memory DB, we can add it at that time.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #8324: [FLINK-11921][table] Upgrade to calcite 1.19

2019-05-14 Thread GitBox
flinkbot edited a comment on issue #8324: [FLINK-11921][table] Upgrade to 
calcite 1.19
URL: https://github.com/apache/flink/pull/8324#issuecomment-488032392
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❗ 3. Needs [attention] from.
   - Needs attention by @kurtyoung, @twalthr [PMC], @wuchong [committer]
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   3   4   >