[GitHub] flink issue #2916: [FLINK-4523] [kinesis] Allow Kinesis Consumer to start fr...

2017-01-23 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2916
  
Merged to master. Thank you for your contribution @tony810430 !


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2916: [FLINK-4523] [kinesis] Allow Kinesis Consumer to s...

2017-01-23 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2916


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3190: [FLINK-5546][build] java.io.tmpdir setted as project buil...

2017-01-23 Thread shijinkui
Github user shijinkui commented on the issue:

https://github.com/apache/flink/pull/3190
  
@StephanEwen I have use TemporaryFolder to replace creating File manually. 
There are some tips:
1. TemporaryFolder should invoke `create()` in setup manually
2. in `shutdown()` should invoke `tmp.delete()` to delete all the dir and 
files recursively.
3. file name add random string

the generated files like this in root directory 
`flink/flink-runtime/target/tmp`:

```
tmp/
└── junit4955582781992164088
├── 1fde53fa-7f7c-4f55-921c-04d078601cfd
│   └── flink-dist-cache-664fc7de-51af-4fc1-86a0-3d8c76bb65df
├── 30326e4a-205f-4982-a920-e790a7f52730
│   └── flink-dist-cache-d3eb183b-ca0c-4b23-877f-adf1540d6563
│   └── b217df314c282e23301ba8f8351fca5c
│   └── cacheFile1e7f5410-2326-4916-8a50-986d49393a7f
├── 35b19e4e-1515-4f93-813e-d4a40b09baeb
│   └── flink-dist-cache-af593973-73a0-4996-8323-40978a703317
├── c754952f-fe8d-465f-9424-1303e734e990
│   └── flink-dist-cache-18f1984b-84ca-4db1-be2c-d2a3bc0a5fe3
├── cacheFile1e7f5410-2326-4916-8a50-986d49393a7f
└── fde2c7bc-8bb1-43ec-8e12-21510322b2b1
└── flink-dist-cache-b34f1e0b-79a7-4cf7-ab3f-7a7a93a04d73
```



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3035: [ FLINK-4905] Kafka test instability IllegalStateExceptio...

2017-01-23 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3035
  
Thank you for picking this up @StephanEwen! I've taken a look at your 
approach in the local branch, +1 to the approach.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2941: [FLINK-3555] Web interface does not render job informatio...

2017-01-23 Thread iampeter
Github user iampeter commented on the issue:

https://github.com/apache/flink/pull/2941
  
I agree with @sachingoel0101 as for overrriding classes from `vendor.css`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3197: [FLINK-5567] [Table API & SQL]Introduce and migrat...

2017-01-23 Thread beyond1920
GitHub user beyond1920 opened a pull request:

https://github.com/apache/flink/pull/3197

[FLINK-5567] [Table API & SQL]Introduce and migrate current table 
statistics to FlinkStatistics

This pr includes two commits, the first commit is Related to 
[https://github.com/apache/flink/pull/3196](url), the second commit is to 
introduce and migrate current table statistics to FlinkStatistics. So please 
focus on second commit.
The main changes including:
1. Introduce FlinkStatistic class, which is an implementation of Calcite 
Statistic.
2. Integrate FlinkStatistic with FlinkTable. 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alibaba/flink flink-5567

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3197.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3197


commit cadc16eefb0e0a9002e536a48b4b9f6824b6ab23
Author: 槿瑜 
Date:   2017-01-24T06:34:01Z

Introduce structure to hold table and column level statistics

commit 56c51b0f8d7983b8593946f64ece2b4881f0d723
Author: 槿瑜 
Date:   2017-01-24T06:57:08Z

ntroduce and migrate current table statistics to FlinkStatistics




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3196: [FLINK-5566] [Table API & SQL]Introduce structure ...

2017-01-23 Thread beyond1920
GitHub user beyond1920 opened a pull request:

https://github.com/apache/flink/pull/3196

[FLINK-5566] [Table API & SQL]Introduce structure to hold table and column 
level statistics

This pr aims to introduce structure to hold table and column level 
statistics.
TableStats: Responsible for hold table level statistics
ColumnStats: Responsible for hold column level statistics.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alibaba/flink flink-5566

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3196.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3196


commit cadc16eefb0e0a9002e536a48b4b9f6824b6ab23
Author: 槿瑜 
Date:   2017-01-24T06:34:01Z

Introduce structure to hold table and column level statistics




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2937: [FLINK-4303] [cep] Examples for CEP library.

2017-01-23 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2937#discussion_r97481654
  
--- Diff: 
flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/examples/CEPMonitoringExample.scala
 ---
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.scala.examples
+
+import org.apache.flink.cep.scala.CEP
+import org.apache.flink.cep.scala.examples.events.{MonitoringEvent, 
TemperatureAlert, TemperatureEvent, TemperatureWarning}
+import org.apache.flink.cep.scala.examples.sources.MonitoringEventSource
+import org.apache.flink.cep.scala.pattern.Pattern
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.functions.IngestionTimeExtractor
+import org.apache.flink.streaming.api.scala.{DataStream, 
StreamExecutionEnvironment, createTypeInformation}
+import org.apache.flink.streaming.api.windowing.time.Time
+
+object CEPMonitoringExample {
--- End diff --

It would be great if there's Javadoc explaining what the example 
demonstrates exactly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2937: [FLINK-4303] [cep] Examples for CEP library.

2017-01-23 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2937#discussion_r97481738
  
--- Diff: 
flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/examples/events/MonitoringEvent.scala
 ---
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.scala.examples.events
+
+class MonitoringEvent(rackID: Int) {
--- End diff --

Same here: Class-level Javadocs will be nice.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2916: [FLINK-4523] [kinesis] Allow Kinesis Consumer to start fr...

2017-01-23 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2916
  
Rebasing and merging this ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3118: [FLINK-5224] [table] Improve UDTF: emit rows directly ins...

2017-01-23 Thread wuchong
Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/3118
  
Hi @twalthr , I have updated this PR. 

I refactored the code of `FlinkCorrelate#generateFunction` to separate the 
generation of `FlatMapFunction` and `TableFunctionCollector`. This can fix the 
problem that the generated `TableFunctionCollector` contains another 
`TableFunctionCollector` instance variable and makes the logic clean. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2974: [FLINK-5298] TM checks that log file exists

2017-01-23 Thread zentol
Github user zentol closed the pull request at:

https://github.com/apache/flink/pull/2974


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3128: [FLINK-5464] Improve MetricDumpSerialization error...

2017-01-23 Thread zentol
Github user zentol closed the pull request at:

https://github.com/apache/flink/pull/3128


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3089: [FLINK-5497] remove duplicated tests

2017-01-23 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3089
  
Thanks!
I will try to get to this in the remainder of the week.
Speeding up builds is a valuable thing :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3116: [docs] [metrics] Update metrics documentation

2017-01-23 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3116
  
I think this is a good improvement over how it was before.
+1 to merge for master and 1.2


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3125: [FLINK-5499][JobManager]Reuse the resource location of pr...

2017-01-23 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3125
  
Thank you for opening this pull request. I think the feature is a good 
idea, but I would like to approach it a bit broader:
  - On state restore, this should prefer the old state location, agreed
  - If no such location exists, it should still try to co-locate by input. 
Especially for the batch execution, that is quite important.

Also, this would need some tests.
I'll add some more detailed comments to the issue soon...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3127: [FLINK-5481] Add type extraction from collection

2017-01-23 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3127
  
I am a bit skeptical about the special case handling or Row in the type 
exatractor there.
Why specifically support row and not other types? To me, Row does not seem 
like a most common type to be put into the collection inputs that it needs such 
special case handling.

Also, this does not detect if the different rows have conflicting types in 
the columns (such as `row1[x] = int, row2[x] = String).

What do you think about simply improving how to generate a row type info 
(similar as we did with `TypeHint` for other types)? Then 
`StreamExecutionEnvironment.fromCollection(rows, type)` should work quite well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3138: #Flink-5522 Storm Local Cluster can't work with powermock

2017-01-23 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3138
  
@liuyuzhong7 Would be good to know if you plan to follow up on this issue.
Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3134: [FLINK-5450] Fix restore from legacy log message

2017-01-23 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3134
  
I think this should go to master and Flink-1.2


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3190: [FLINK-5546][build] java.io.tmpdir setted as project buil...

2017-01-23 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3190
  
@shijinkui I think there are already good temp file utils in JUnit itself.
Do we need to change anything in the build when we fix the tests?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3173: [FLINK-5577][yarn]Each time application is submitted to y...

2017-01-23 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3173
  
Thanks for reporting and fixing this issue!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3173: [FLINK-5577][yarn]Each time application is submitted to y...

2017-01-23 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3173
  
I think this looks good, merging this for the master...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3193: [FLINK-5613][query] querying a non-existing key is...

2017-01-23 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3193


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3193: [FLINK-5613][query] querying a non-existing key is incons...

2017-01-23 Thread rmetzger
Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/3193
  
Merging ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3127: [FLINK-5481] Add type extraction from collection

2017-01-23 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/3127#discussion_r97353282
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java 
---
@@ -1916,6 +1918,50 @@ else if (value instanceof Row) {
}
}
 
+   public static  TypeInformation getForCollection(Iterable 
value) {
+   return new TypeExtractor().privateGetForIterable(value);
+   }
+
+   public static  TypeInformation getForCollection(X[] value) {
--- End diff --

Add Javadoc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3127: [FLINK-5481] Add type extraction from collection

2017-01-23 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/3127#discussion_r97355537
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java 
---
@@ -1916,6 +1918,50 @@ else if (value instanceof Row) {
}
}
 
+   public static  TypeInformation getForCollection(Iterable 
value) {
+   return new TypeExtractor().privateGetForIterable(value);
+   }
+
+   public static  TypeInformation getForCollection(X[] value) {
+   return new 
TypeExtractor().privateGetForIterable(Arrays.asList(value));
+   }
+
+   @SuppressWarnings({ "unchecked", "rawtypes" })
+   private  TypeInformation privateGetForIterable(Iterable value) 
{
+   checkNotNull(value);
+
+   Iterator it = value.iterator();
+   X v = checkNotNull(it.next());
+
+   if (v instanceof Row) {
+   int arity =((Row) v).getArity();
--- End diff --

Missing space.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3127: [FLINK-5481] Add type extraction from collection

2017-01-23 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/3127#discussion_r97353232
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java 
---
@@ -1916,6 +1918,50 @@ else if (value instanceof Row) {
}
}
 
+   public static  TypeInformation getForCollection(Iterable 
value) {
--- End diff --

Add Javadoc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3127: [FLINK-5481] Add type extraction from collection

2017-01-23 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/3127#discussion_r97355576
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java 
---
@@ -1916,6 +1918,50 @@ else if (value instanceof Row) {
}
}
 
+   public static  TypeInformation getForCollection(Iterable 
value) {
+   return new TypeExtractor().privateGetForIterable(value);
+   }
+
+   public static  TypeInformation getForCollection(X[] value) {
+   return new 
TypeExtractor().privateGetForIterable(Arrays.asList(value));
+   }
+
+   @SuppressWarnings({ "unchecked", "rawtypes" })
+   private  TypeInformation privateGetForIterable(Iterable value) 
{
+   checkNotNull(value);
+
+   Iterator it = value.iterator();
+   X v = checkNotNull(it.next());
+
+   if (v instanceof Row) {
+   int arity =((Row) v).getArity();
+   Iterable rows = (Iterable) value;
+   List typesInRow = new 
ArrayList<>(arity);
+   TypeInformation[] temp = new 
TypeInformation[arity];
+   Collections.addAll(typesInRow, temp);
+
+   for (Row r: rows) {
--- End diff --

Missing space.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3127: [FLINK-5481] Add type extraction from collection

2017-01-23 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/3127#discussion_r97354898
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java 
---
@@ -1916,6 +1918,50 @@ else if (value instanceof Row) {
}
}
 
+   public static  TypeInformation getForCollection(Iterable 
value) {
+   return new TypeExtractor().privateGetForIterable(value);
+   }
+
+   public static  TypeInformation getForCollection(X[] value) {
+   return new 
TypeExtractor().privateGetForIterable(Arrays.asList(value));
+   }
+
+   @SuppressWarnings({ "unchecked", "rawtypes" })
+   private  TypeInformation privateGetForIterable(Iterable value) 
{
+   checkNotNull(value);
+
+   Iterator it = value.iterator();
+   X v = checkNotNull(it.next());
--- End diff --

Calling `next` without checking can result in `NoSuchElementException`. We 
should check that first and throw a helpful exception.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3127: [FLINK-5481] Add type extraction from collection

2017-01-23 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/3127#discussion_r97359437
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java 
---
@@ -1916,6 +1918,50 @@ else if (value instanceof Row) {
}
}
 
+   public static  TypeInformation getForCollection(Iterable 
value) {
+   return new TypeExtractor().privateGetForIterable(value);
+   }
+
+   public static  TypeInformation getForCollection(X[] value) {
+   return new 
TypeExtractor().privateGetForIterable(Arrays.asList(value));
+   }
+
+   @SuppressWarnings({ "unchecked", "rawtypes" })
+   private  TypeInformation privateGetForIterable(Iterable value) 
{
--- End diff --

Would be great if you could add some inline comments in this method, just 
to quickly see what your code is doing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3183: [backport] [FLINK-5214] [FLINK-5229] Backport Stre...

2017-01-23 Thread tillrohrmann
Github user tillrohrmann closed the pull request at:

https://github.com/apache/flink/pull/3183


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3183: [backport] [FLINK-5214] [FLINK-5229] Backport StreamTask ...

2017-01-23 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/3183
  
Merged manually into `release-1.2` branch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3193: [FLINK-5527][query] querying a non-existing key is incons...

2017-01-23 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/3193
  
+1 to merge if Travis passes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3178: [FLINK-5214] Clean up checkpoint data in case of a...

2017-01-23 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3178


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3179: [FLINK-5229] [state] Cleanup of operator snapshots...

2017-01-23 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3179


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3021: [FLINK-3617] Prevent NPE in CaseClassSerializer an...

2017-01-23 Thread chermenin
Github user chermenin closed the pull request at:

https://github.com/apache/flink/pull/3021


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3128: [FLINK-5464] Improve MetricDumpSerialization error handli...

2017-01-23 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3128
  
Merging.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2974: [FLINK-5298] TM checks that log file exists

2017-01-23 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2974
  
@tillrohrmann I've addressed your comments. Will merge this once travis 
passes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2974: [FLINK-5298] TM checks that log file exists

2017-01-23 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2974#discussion_r97354396
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
 ---
@@ -1074,6 +1080,79 @@ protected void run() {
}};
}
 
+   @Test
+   public void testLogNotFoundHandling() throws Exception {
+
+   new JavaTestKit(system){{
+
+   ActorGateway jobManager = null;
+   ActorGateway taskManager = null;
+
+   final ActorGateway testActorGateway = new 
AkkaActorGateway(
+   getTestActor(),
+   leaderSessionID);
+
+   try {
+   final IntermediateDataSetID resultId = new 
IntermediateDataSetID();
+
+   // Create the JM
+   ActorRef jm = system.actorOf(Props.create(
+   new 
SimplePartitionStateLookupJobManagerCreator(leaderSessionID, getTestActor(;
+
+   jobManager = new AkkaActorGateway(jm, 
leaderSessionID);
+
+   final int dataPort = 
NetUtils.getAvailablePort();
+   Configuration config = new Configuration();
+   
config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort);
+   
config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
+   
config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
+   
config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "/i/dont/exist");
+
+   taskManager = TestingUtils.createTaskManager(
+   system,
+   jobManager,
--- End diff --

Without a JobManager no BlobService is started. This means the TM would 
fail earlier then we want him to.

While trying it out i found another exception that should be wrapped though 
:>


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3127: [FLINK-5481] Add type extraction from collection

2017-01-23 Thread twalthr
Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/3127
  
I will shepherd this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3195: [FLINK-5617] API stability check for Flink 1.2

2017-01-23 Thread rmetzger
Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/3195
  
Yes, I'll mark them as such while merging. Thank you for taking a look.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3195: [FLINK-5617] API stability check for Flink 1.2

2017-01-23 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/3195
  
@rmetzger I think they should be `@PublicEvolging`, can you change it in 
this PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5298) TaskManager crashes when TM log not existant

2017-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834787#comment-15834787
 ] 

ASF GitHub Bot commented on FLINK-5298:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2974#discussion_r97346342
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
 ---
@@ -1074,6 +1080,79 @@ protected void run() {
}};
}
 
+   @Test
+   public void testLogNotFoundHandling() throws Exception {
+
+   new JavaTestKit(system){{
+
+   ActorGateway jobManager = null;
+   ActorGateway taskManager = null;
+
+   final ActorGateway testActorGateway = new 
AkkaActorGateway(
+   getTestActor(),
+   leaderSessionID);
+
+   try {
+   final IntermediateDataSetID resultId = new 
IntermediateDataSetID();
+
+   // Create the JM
+   ActorRef jm = system.actorOf(Props.create(
+   new 
SimplePartitionStateLookupJobManagerCreator(leaderSessionID, getTestActor(;
+
+   jobManager = new AkkaActorGateway(jm, 
leaderSessionID);
+
+   final int dataPort = 
NetUtils.getAvailablePort();
+   Configuration config = new Configuration();
+   
config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort);
+   
config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
+   
config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
+   
config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "/i/dont/exist");
+
+   taskManager = TestingUtils.createTaskManager(
+   system,
+   jobManager,
--- End diff --

maybe `ActorRef.noSender()` is enough here.


> TaskManager crashes when TM log not existant
> 
>
> Key: FLINK-5298
> URL: https://issues.apache.org/jira/browse/FLINK-5298
> Project: Flink
>  Issue Type: Bug
>  Components: Mesos, TaskManager, Webfrontend
>Affects Versions: 1.1.0, 1.2.0
>Reporter: Mischa Krüger
>Assignee: Chesnay Schepler
>Priority: Trivial
> Fix For: 1.2.0
>
>
> {code}
> java.io.FileNotFoundException: flink-taskmanager.out (No such file or 
> directory)
> at java.io.FileInputStream.open0(Native Method)
> at java.io.FileInputStream.open(FileInputStream.java:195)
> at java.io.FileInputStream.(FileInputStream.java:138)
> at 
> org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleRequestTaskManagerLog(TaskManager.scala:833)
> at 
> org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:340)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> at 
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
> at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
> at 
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> at 
> org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:122)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
> at akka.dispatch.Mailbox.run(Mailbox.scala:221)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> at 

[jira] [Commented] (FLINK-5298) TaskManager crashes when TM log not existant

2017-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834789#comment-15834789
 ] 

ASF GitHub Bot commented on FLINK-5298:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2974#discussion_r97346835
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
 ---
@@ -1074,6 +1080,79 @@ protected void run() {
}};
}
 
+   @Test
+   public void testLogNotFoundHandling() throws Exception {
+
+   new JavaTestKit(system){{
+
+   ActorGateway jobManager = null;
+   ActorGateway taskManager = null;
+
+   final ActorGateway testActorGateway = new 
AkkaActorGateway(
+   getTestActor(),
+   leaderSessionID);
+
+   try {
+   final IntermediateDataSetID resultId = new 
IntermediateDataSetID();
+
+   // Create the JM
+   ActorRef jm = system.actorOf(Props.create(
+   new 
SimplePartitionStateLookupJobManagerCreator(leaderSessionID, getTestActor(;
+
+   jobManager = new AkkaActorGateway(jm, 
leaderSessionID);
+
+   final int dataPort = 
NetUtils.getAvailablePort();
+   Configuration config = new Configuration();
+   
config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort);
+   
config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
+   
config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
+   
config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "/i/dont/exist");
+
+   taskManager = TestingUtils.createTaskManager(
+   system,
+   jobManager,
+   config,
+   false,
+   true);
+
+   // 
-
+
+   final ActorGateway tm = taskManager;
+   final ExecutionContextExecutor context = 
ExecutionContext$.MODULE$.fromExecutor(Executors.newSingleThreadExecutor());
+
+   new Within(d) {
+   @Override
+   protected void run() {
+   Future logFuture = 
tm.ask(TaskManagerMessages.getRequestTaskManagerLog(), timeout);
+
+   logFuture.onSuccess(new 
OnSuccess() {
+   @Override
+   public void 
onSuccess(Object result) throws Throwable {
+   
Assert.fail();
+   }
+   }, context);
+   logFuture.onFailure(new 
OnFailure() {
+   @Override
+   public void 
onFailure(Throwable failure) throws Throwable {
+   
testActorGateway.tell(new Status.Success("success"));
+   }
+   }, context);
+   
+   Status.Success msg = 
expectMsgClass(Status.Success.class);
+   Assert.assertEquals("success", 
msg.status());
+   }
+   };
+   }
+   catch(Exception e) {
+   e.printStackTrace();
+   fail(e.getMessage());
--- End diff --

Better let the exception bubble up. Less code ;-)


> TaskManager crashes when TM log not existant
> 
>
> Key: FLINK-5298
> URL: https://issues.apache.org/jira/browse/FLINK-5298
> Project: Flink
>  Issue Type: Bug
>  Components: Mesos, TaskManager, Webfrontend
>Affects 

[jira] [Commented] (FLINK-5298) TaskManager crashes when TM log not existant

2017-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834788#comment-15834788
 ] 

ASF GitHub Bot commented on FLINK-5298:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2974#discussion_r97347534
  
--- Diff: 
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java
 ---
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.http.DefaultFullHttpRequest;
+import io.netty.handler.codec.http.HttpMethod;
+import io.netty.handler.codec.http.HttpVersion;
+import io.netty.handler.codec.http.router.Routed;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.CompletableFuture;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import scala.Option;
+import scala.collection.JavaConverters;
+import scala.concurrent.ExecutionContext$;
+import scala.concurrent.Future$;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.isA;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+public class TaskManagerLogHandlerTest {
+   @Test
+   public void testLogFetchingFailure() throws Exception {
+   // = setup TaskManager 
=
+   InstanceID tmID = new InstanceID();
+   ResourceID tmRID = new ResourceID(tmID.toString());
+   TaskManagerGateway taskManagerGateway = 
mock(TaskManagerGateway.class);
+   when(taskManagerGateway.getAddress()).thenReturn("/tm/address");
+
+   Instance taskManager = mock(Instance.class);
+   when(taskManager.getId()).thenReturn(tmID);
+   when(taskManager.getTaskManagerID()).thenReturn(tmRID);
+   
when(taskManager.getTaskManagerGateway()).thenReturn(taskManagerGateway);
+   CompletableFuture future = new 
FlinkCompletableFuture<>();
+   future.completeExceptionally(new IOException("failure"));
+   
when(taskManagerGateway.requestTaskManagerLog(any(Time.class))).thenReturn(future);
+
+   // = setup JobManager 
==
+
+   ActorGateway jobManagerGateway = mock(ActorGateway.class);
+   Object registeredTaskManagersAnswer = new 
JobManagerMessages.RegisteredTaskManagers(
+   
JavaConverters.collectionAsScalaIterableConverter(Collections.singletonList(taskManager)).asScala());
+
+   

[jira] [Commented] (FLINK-5298) TaskManager crashes when TM log not existant

2017-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834786#comment-15834786
 ] 

ASF GitHub Bot commented on FLINK-5298:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2974#discussion_r97346693
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
 ---
@@ -1074,6 +1080,79 @@ protected void run() {
}};
}
 
+   @Test
+   public void testLogNotFoundHandling() throws Exception {
+
+   new JavaTestKit(system){{
+
+   ActorGateway jobManager = null;
+   ActorGateway taskManager = null;
+
+   final ActorGateway testActorGateway = new 
AkkaActorGateway(
+   getTestActor(),
+   leaderSessionID);
+
+   try {
+   final IntermediateDataSetID resultId = new 
IntermediateDataSetID();
+
+   // Create the JM
+   ActorRef jm = system.actorOf(Props.create(
+   new 
SimplePartitionStateLookupJobManagerCreator(leaderSessionID, getTestActor(;
+
+   jobManager = new AkkaActorGateway(jm, 
leaderSessionID);
+
+   final int dataPort = 
NetUtils.getAvailablePort();
+   Configuration config = new Configuration();
+   
config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort);
+   
config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
+   
config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
+   
config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "/i/dont/exist");
+
+   taskManager = TestingUtils.createTaskManager(
+   system,
+   jobManager,
+   config,
+   false,
+   true);
+
+   // 
-
+
+   final ActorGateway tm = taskManager;
+   final ExecutionContextExecutor context = 
ExecutionContext$.MODULE$.fromExecutor(Executors.newSingleThreadExecutor());
+
+   new Within(d) {
+   @Override
+   protected void run() {
+   Future logFuture = 
tm.ask(TaskManagerMessages.getRequestTaskManagerLog(), timeout);
+
+   logFuture.onSuccess(new 
OnSuccess() {
+   @Override
+   public void 
onSuccess(Object result) throws Throwable {
+   
Assert.fail();
+   }
+   }, context);
+   logFuture.onFailure(new 
OnFailure() {
+   @Override
+   public void 
onFailure(Throwable failure) throws Throwable {
+   
testActorGateway.tell(new Status.Success("success"));
+   }
+   }, context);
--- End diff --

Maybe `Await.result` is a bit more succinct.


> TaskManager crashes when TM log not existant
> 
>
> Key: FLINK-5298
> URL: https://issues.apache.org/jira/browse/FLINK-5298
> Project: Flink
>  Issue Type: Bug
>  Components: Mesos, TaskManager, Webfrontend
>Affects Versions: 1.1.0, 1.2.0
>Reporter: Mischa Krüger
>Assignee: Chesnay Schepler
>Priority: Trivial
> Fix For: 1.2.0
>
>
> {code}
> java.io.FileNotFoundException: flink-taskmanager.out (No such file or 
> directory)
> at java.io.FileInputStream.open0(Native Method)
> at java.io.FileInputStream.open(FileInputStream.java:195)
> at java.io.FileInputStream.(FileInputStream.java:138)
> at 
> 

[GitHub] flink pull request #2974: [FLINK-5298] TM checks that log file exists

2017-01-23 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2974#discussion_r97346693
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
 ---
@@ -1074,6 +1080,79 @@ protected void run() {
}};
}
 
+   @Test
+   public void testLogNotFoundHandling() throws Exception {
+
+   new JavaTestKit(system){{
+
+   ActorGateway jobManager = null;
+   ActorGateway taskManager = null;
+
+   final ActorGateway testActorGateway = new 
AkkaActorGateway(
+   getTestActor(),
+   leaderSessionID);
+
+   try {
+   final IntermediateDataSetID resultId = new 
IntermediateDataSetID();
+
+   // Create the JM
+   ActorRef jm = system.actorOf(Props.create(
+   new 
SimplePartitionStateLookupJobManagerCreator(leaderSessionID, getTestActor(;
+
+   jobManager = new AkkaActorGateway(jm, 
leaderSessionID);
+
+   final int dataPort = 
NetUtils.getAvailablePort();
+   Configuration config = new Configuration();
+   
config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort);
+   
config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
+   
config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
+   
config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "/i/dont/exist");
+
+   taskManager = TestingUtils.createTaskManager(
+   system,
+   jobManager,
+   config,
+   false,
+   true);
+
+   // 
-
+
+   final ActorGateway tm = taskManager;
+   final ExecutionContextExecutor context = 
ExecutionContext$.MODULE$.fromExecutor(Executors.newSingleThreadExecutor());
+
+   new Within(d) {
+   @Override
+   protected void run() {
+   Future logFuture = 
tm.ask(TaskManagerMessages.getRequestTaskManagerLog(), timeout);
+
+   logFuture.onSuccess(new 
OnSuccess() {
+   @Override
+   public void 
onSuccess(Object result) throws Throwable {
+   
Assert.fail();
+   }
+   }, context);
+   logFuture.onFailure(new 
OnFailure() {
+   @Override
+   public void 
onFailure(Throwable failure) throws Throwable {
+   
testActorGateway.tell(new Status.Success("success"));
+   }
+   }, context);
--- End diff --

Maybe `Await.result` is a bit more succinct.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2974: [FLINK-5298] TM checks that log file exists

2017-01-23 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2974#discussion_r97346342
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
 ---
@@ -1074,6 +1080,79 @@ protected void run() {
}};
}
 
+   @Test
+   public void testLogNotFoundHandling() throws Exception {
+
+   new JavaTestKit(system){{
+
+   ActorGateway jobManager = null;
+   ActorGateway taskManager = null;
+
+   final ActorGateway testActorGateway = new 
AkkaActorGateway(
+   getTestActor(),
+   leaderSessionID);
+
+   try {
+   final IntermediateDataSetID resultId = new 
IntermediateDataSetID();
+
+   // Create the JM
+   ActorRef jm = system.actorOf(Props.create(
+   new 
SimplePartitionStateLookupJobManagerCreator(leaderSessionID, getTestActor(;
+
+   jobManager = new AkkaActorGateway(jm, 
leaderSessionID);
+
+   final int dataPort = 
NetUtils.getAvailablePort();
+   Configuration config = new Configuration();
+   
config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort);
+   
config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
+   
config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
+   
config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "/i/dont/exist");
+
+   taskManager = TestingUtils.createTaskManager(
+   system,
+   jobManager,
--- End diff --

maybe `ActorRef.noSender()` is enough here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4917) Deprecate "CheckpointedAsynchronously" interface

2017-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834785#comment-15834785
 ] 

ASF GitHub Bot commented on FLINK-4917:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3087
  
@mtunique I am addressing the `Checkpointed` in a quick followup to this 
pull request, pretty much copying the docs from `CheckpointedAsynchronously`.


> Deprecate "CheckpointedAsynchronously" interface
> 
>
> Key: FLINK-4917
> URL: https://issues.apache.org/jira/browse/FLINK-4917
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Stephan Ewen
>  Labels: easyfix, starter
>
> The {{CheckpointedAsynchronously}} should be deprecated, as it is no longer 
> part of the new operator state abstraction.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2974: [FLINK-5298] TM checks that log file exists

2017-01-23 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2974#discussion_r97347534
  
--- Diff: 
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java
 ---
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.http.DefaultFullHttpRequest;
+import io.netty.handler.codec.http.HttpMethod;
+import io.netty.handler.codec.http.HttpVersion;
+import io.netty.handler.codec.http.router.Routed;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.CompletableFuture;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import scala.Option;
+import scala.collection.JavaConverters;
+import scala.concurrent.ExecutionContext$;
+import scala.concurrent.Future$;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.isA;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+public class TaskManagerLogHandlerTest {
+   @Test
+   public void testLogFetchingFailure() throws Exception {
+   // = setup TaskManager 
=
+   InstanceID tmID = new InstanceID();
+   ResourceID tmRID = new ResourceID(tmID.toString());
+   TaskManagerGateway taskManagerGateway = 
mock(TaskManagerGateway.class);
+   when(taskManagerGateway.getAddress()).thenReturn("/tm/address");
+
+   Instance taskManager = mock(Instance.class);
+   when(taskManager.getId()).thenReturn(tmID);
+   when(taskManager.getTaskManagerID()).thenReturn(tmRID);
+   
when(taskManager.getTaskManagerGateway()).thenReturn(taskManagerGateway);
+   CompletableFuture future = new 
FlinkCompletableFuture<>();
+   future.completeExceptionally(new IOException("failure"));
+   
when(taskManagerGateway.requestTaskManagerLog(any(Time.class))).thenReturn(future);
+
+   // = setup JobManager 
==
+
+   ActorGateway jobManagerGateway = mock(ActorGateway.class);
+   Object registeredTaskManagersAnswer = new 
JobManagerMessages.RegisteredTaskManagers(
+   
JavaConverters.collectionAsScalaIterableConverter(Collections.singletonList(taskManager)).asScala());
+
+   
when(jobManagerGateway.ask(isA(JobManagerMessages.RequestRegisteredTaskManagers$.class),
 any(FiniteDuration.class)))
+   
.thenReturn(Future$.MODULE$.successful(registeredTaskManagersAnswer));
+   

[GitHub] flink pull request #2974: [FLINK-5298] TM checks that log file exists

2017-01-23 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2974#discussion_r97346835
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
 ---
@@ -1074,6 +1080,79 @@ protected void run() {
}};
}
 
+   @Test
+   public void testLogNotFoundHandling() throws Exception {
+
+   new JavaTestKit(system){{
+
+   ActorGateway jobManager = null;
+   ActorGateway taskManager = null;
+
+   final ActorGateway testActorGateway = new 
AkkaActorGateway(
+   getTestActor(),
+   leaderSessionID);
+
+   try {
+   final IntermediateDataSetID resultId = new 
IntermediateDataSetID();
+
+   // Create the JM
+   ActorRef jm = system.actorOf(Props.create(
+   new 
SimplePartitionStateLookupJobManagerCreator(leaderSessionID, getTestActor(;
+
+   jobManager = new AkkaActorGateway(jm, 
leaderSessionID);
+
+   final int dataPort = 
NetUtils.getAvailablePort();
+   Configuration config = new Configuration();
+   
config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort);
+   
config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
+   
config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
+   
config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "/i/dont/exist");
+
+   taskManager = TestingUtils.createTaskManager(
+   system,
+   jobManager,
+   config,
+   false,
+   true);
+
+   // 
-
+
+   final ActorGateway tm = taskManager;
+   final ExecutionContextExecutor context = 
ExecutionContext$.MODULE$.fromExecutor(Executors.newSingleThreadExecutor());
+
+   new Within(d) {
+   @Override
+   protected void run() {
+   Future logFuture = 
tm.ask(TaskManagerMessages.getRequestTaskManagerLog(), timeout);
+
+   logFuture.onSuccess(new 
OnSuccess() {
+   @Override
+   public void 
onSuccess(Object result) throws Throwable {
+   
Assert.fail();
+   }
+   }, context);
+   logFuture.onFailure(new 
OnFailure() {
+   @Override
+   public void 
onFailure(Throwable failure) throws Throwable {
+   
testActorGateway.tell(new Status.Success("success"));
+   }
+   }, context);
+   
+   Status.Success msg = 
expectMsgClass(Status.Success.class);
+   Assert.assertEquals("success", 
msg.status());
+   }
+   };
+   }
+   catch(Exception e) {
+   e.printStackTrace();
+   fail(e.getMessage());
--- End diff --

Better let the exception bubble up. Less code ;-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3087: [FLINK-4917] Deprecate "CheckpointedAsynchronously" inter...

2017-01-23 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3087
  
@mtunique I am addressing the `Checkpointed` in a quick followup to this 
pull request, pretty much copying the docs from `CheckpointedAsynchronously`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5617) Check new public APIs in 1.2 release

2017-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834768#comment-15834768
 ] 

ASF GitHub Bot commented on FLINK-5617:
---

GitHub user rmetzger opened a pull request:

https://github.com/apache/flink/pull/3195

[FLINK-5617] API stability check for Flink 1.2

With this PR, I'm marking some of the new methods in Flink 1.2 as public 
evolving / internal, instead of public.

There are the reports for the covered modules:
http://people.apache.org/~rmetzger/rel-1.2-japicmp/

@aljoscha and @kl0u: Are the `fold()` and `reduce()` methods intentionally 
in the `@Public` scope? 
http://people.apache.org/~rmetzger/rel-1.2-japicmp/flink-streaming-java.html

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rmetzger/flink flink5617

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3195.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3195


commit 12d9d631455af2c4b6af1024a8865b534f4db9c8
Author: Robert Metzger 
Date:   2017-01-23T13:31:27Z

[FLINK-5617] Change reference version for API stability checks to 1.1.4

commit f7323c2e72e24da3a55fd70d6d27ae33026ecc86
Author: Robert Metzger 
Date:   2017-01-23T15:28:48Z

[FLINK-5617] Mark some methods as PublicEvolving or Internal




> Check new public APIs in 1.2 release
> 
>
> Key: FLINK-5617
> URL: https://issues.apache.org/jira/browse/FLINK-5617
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: 1.2.0
>
>
> Before releasing Flink 1.2.0, I would like to quickly review which new public 
> methods we are supporting in future releases.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #3182: [FLINK-5473] Limit MaxParallelism to 1 for non-parallel o...

2017-01-23 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/3182
  
Rebased.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-5618) Add queryable state documentation

2017-01-23 Thread Ufuk Celebi (JIRA)
Ufuk Celebi created FLINK-5618:
--

 Summary: Add queryable state documentation
 Key: FLINK-5618
 URL: https://issues.apache.org/jira/browse/FLINK-5618
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Reporter: Ufuk Celebi


Adds docs about how to use queryable state usage.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5473) setMaxParallelism() higher than 1 is possible on non-parallel operators

2017-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834774#comment-15834774
 ] 

ASF GitHub Bot commented on FLINK-5473:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/3182
  
Rebased.


> setMaxParallelism() higher than 1 is possible on non-parallel operators
> ---
>
> Key: FLINK-5473
> URL: https://issues.apache.org/jira/browse/FLINK-5473
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Assignee: Stefan Richter
>
> While trying out Flink 1.2, I found out that you can set a maxParallelism 
> higher than 1 on a non-parallel operator.
> I think we should have the same semantics as the setParallelism() method.
> Also, when setting a global maxParallelism in the execution environment, it 
> will be set as a default value for the non-parallel operator.
> When restoring a savepoint from 1.1, you have to set the maxParallelism to 
> the parallelism of the 1.1 job. Non-parallel operators will then also get the 
> maxPar set to this value, leading to an error on restore.
> So currently, users restoring from 1.1 to 1.2 have to manually set the 
> maxParallelism to 1 for all non-parallel operators.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #3128: [FLINK-5464] Improve MetricDumpSerialization error...

2017-01-23 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/3128#discussion_r97343787
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
 ---
@@ -70,16 +93,26 @@ public MetricSerializationResult(byte[] data, int 
numCounters, int numGauges, in

//-
// Serialization

//-
+
public static class MetricDumpSerializer {
+
private DataOutputSerializer buffer = new 
DataOutputSerializer(1024 * 32);
 
/**
 * Serializes the given metrics and returns the resulting byte 
array.
+* 
+* Should a {@link Metric} accessed in this method throw an 
exception it will be omitted from the returned
--- End diff --

Very nice 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3118: [FLINK-5224] [table] Improve UDTF: emit rows directly ins...

2017-01-23 Thread wuchong
Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/3118
  
Thank you for your reviewing @twalthr , I will update this PR ASAP.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5224) Improve UDTF: emit rows directly instead of buffering them

2017-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834769#comment-15834769
 ] 

ASF GitHub Bot commented on FLINK-5224:
---

Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/3118
  
Thank you for your reviewing @twalthr , I will update this PR ASAP.


> Improve UDTF: emit rows directly instead of buffering them
> --
>
> Key: FLINK-5224
> URL: https://issues.apache.org/jira/browse/FLINK-5224
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Jark Wu
>
> This needs to code generate a `Collector` and register it into instance of 
> {{TableFunction}}, and emit the rows generated by the UDTF directly instead 
> of buffering them.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5464) MetricQueryService throws NullPointerException on JobManager

2017-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834770#comment-15834770
 ] 

ASF GitHub Bot commented on FLINK-5464:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/3128#discussion_r97343787
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
 ---
@@ -70,16 +93,26 @@ public MetricSerializationResult(byte[] data, int 
numCounters, int numGauges, in

//-
// Serialization

//-
+
public static class MetricDumpSerializer {
+
private DataOutputSerializer buffer = new 
DataOutputSerializer(1024 * 32);
 
/**
 * Serializes the given metrics and returns the resulting byte 
array.
+* 
+* Should a {@link Metric} accessed in this method throw an 
exception it will be omitted from the returned
--- End diff --

Very nice 


> MetricQueryService throws NullPointerException on JobManager
> 
>
> Key: FLINK-5464
> URL: https://issues.apache.org/jira/browse/FLINK-5464
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
>
> I'm using Flink 699f4b0.
> My JobManager log contains many of these log entries:
> {code}
> 2017-01-11 19:42:05,778 WARN  
> org.apache.flink.runtime.webmonitor.metrics.MetricFetcher - Fetching 
> metrics failed.
> akka.pattern.AskTimeoutException: Ask timed out on 
> [Actor[akka://flink/user/MetricQueryService#-970662317]] after [1 ms]
>   at 
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
>   at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
>   at 
> scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
>   at 
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691)
>   at 
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
>   at java.lang.Thread.run(Thread.java:745)
> 2017-01-11 19:42:07,765 WARN  
> org.apache.flink.runtime.metrics.dump.MetricQueryService  - An exception 
> occurred while processing a message.
> java.lang.NullPointerException
>   at 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:162)
>   at 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$300(MetricDumpSerialization.java:47)
>   at 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:90)
>   at 
> org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109)
>   at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #3195: [FLINK-5617] API stability check for Flink 1.2

2017-01-23 Thread rmetzger
GitHub user rmetzger opened a pull request:

https://github.com/apache/flink/pull/3195

[FLINK-5617] API stability check for Flink 1.2

With this PR, I'm marking some of the new methods in Flink 1.2 as public 
evolving / internal, instead of public.

There are the reports for the covered modules:
http://people.apache.org/~rmetzger/rel-1.2-japicmp/

@aljoscha and @kl0u: Are the `fold()` and `reduce()` methods intentionally 
in the `@Public` scope? 
http://people.apache.org/~rmetzger/rel-1.2-japicmp/flink-streaming-java.html

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rmetzger/flink flink5617

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3195.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3195


commit 12d9d631455af2c4b6af1024a8865b534f4db9c8
Author: Robert Metzger 
Date:   2017-01-23T13:31:27Z

[FLINK-5617] Change reference version for API stability checks to 1.1.4

commit f7323c2e72e24da3a55fd70d6d27ae33026ecc86
Author: Robert Metzger 
Date:   2017-01-23T15:28:48Z

[FLINK-5617] Mark some methods as PublicEvolving or Internal




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3087: [FLINK-4917] Deprecate "CheckpointedAsynchronously" inter...

2017-01-23 Thread mtunique
Github user mtunique commented on the issue:

https://github.com/apache/flink/pull/3087
  
Maybe we should open an issue about `Checkpointed` java doc to talk about 
how to replace the APIs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4917) Deprecate "CheckpointedAsynchronously" interface

2017-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834763#comment-15834763
 ] 

ASF GitHub Bot commented on FLINK-4917:
---

Github user mtunique commented on the issue:

https://github.com/apache/flink/pull/3087
  
Maybe we should open an issue about `Checkpointed` java doc to talk about 
how to replace the APIs.


> Deprecate "CheckpointedAsynchronously" interface
> 
>
> Key: FLINK-4917
> URL: https://issues.apache.org/jira/browse/FLINK-4917
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Stephan Ewen
>  Labels: easyfix, starter
>
> The {{CheckpointedAsynchronously}} should be deprecated, as it is no longer 
> part of the new operator state abstraction.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5395) support locally build distribution by script create_release_files.sh

2017-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5395?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834762#comment-15834762
 ] 

ASF GitHub Bot commented on FLINK-5395:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/3049
  
+1 is good to merge.

I'll test it as part of the RC1 for 1.2.0 creation


> support locally build distribution by script create_release_files.sh
> 
>
> Key: FLINK-5395
> URL: https://issues.apache.org/jira/browse/FLINK-5395
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: shijinkui
>
> create_release_files.sh is build flink release only. It's hard to build 
> custom local Flink release distribution.
> Let create_release_files.sh support:
> 1. custom git repo url
> 2. custom build special scala and hadoop version
> 3. add `tools/flink` to .gitignore
> 4. add usage



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #3049: [FLINK-5395] [Build System] support locally build distrib...

2017-01-23 Thread rmetzger
Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/3049
  
+1 is good to merge.

I'll test it as part of the RC1 for 1.2.0 creation


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3087: [FLINK-4917] Deprecate "CheckpointedAsynchronously" inter...

2017-01-23 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3087
  
Thanks, much better now!
Will merge this for 1.2 and master...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4917) Deprecate "CheckpointedAsynchronously" interface

2017-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834757#comment-15834757
 ] 

ASF GitHub Bot commented on FLINK-4917:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3087
  
Thanks, much better now!
Will merge this for 1.2 and master...


> Deprecate "CheckpointedAsynchronously" interface
> 
>
> Key: FLINK-4917
> URL: https://issues.apache.org/jira/browse/FLINK-4917
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Stephan Ewen
>  Labels: easyfix, starter
>
> The {{CheckpointedAsynchronously}} should be deprecated, as it is no longer 
> part of the new operator state abstraction.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource

2017-01-23 Thread wuchong
Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/3149
  
Sounds good ! Looking forward that !


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2168) Add HBaseTableSource

2017-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834751#comment-15834751
 ] 

ASF GitHub Bot commented on FLINK-2168:
---

Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/3149
  
Sounds good ! Looking forward that !


> Add HBaseTableSource
> 
>
> Key: FLINK-2168
> URL: https://issues.apache.org/jira/browse/FLINK-2168
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: ramkrishna.s.vasudevan
>Priority: Minor
>  Labels: starter
>
> Add a {{HBaseTableSource}} to read data from a HBase table. The 
> {{HBaseTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.
> The implementation can be based on Flink's {{TableInputFormat}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4905) Kafka test instability IllegalStateException: Client is not started

2017-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834744#comment-15834744
 ] 

ASF GitHub Bot commented on FLINK-4905:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3035
  
Bit of background how the error happens:
  - The test throws a `SuccessException`
  - While being in the finally clause and shutting down the CluratorClient, 
the containing `Task` has not seen the exception.
  - When the commitOffsets() call fails, this overrides the 
`SuccessException` as the reason why the streaming program terminated.


> Kafka test instability IllegalStateException: Client is not started
> ---
>
> Key: FLINK-4905
> URL: https://issues.apache.org/jira/browse/FLINK-4905
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Reporter: Robert Metzger
>Assignee: Andrew Efimov
>  Labels: test-stability
> Attachments: Kafka08Fetcher.png
>
>
> The following travis build 
> (https://s3.amazonaws.com/archive.travis-ci.org/jobs/170365439/log.txt)  
> failed because of this error
> {code}
> 08:17:11,239 INFO  org.apache.flink.runtime.jobmanager.JobManager 
>- Status of job 33ebdc0e7c91be186d80658ce3d17069 (Read some records to 
> commit offsets to Kafka) changed to FAILING.
> java.lang.RuntimeException: Error while confirming checkpoint
>   at org.apache.flink.runtime.taskmanager.Task$4.run(Task.java:1040)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.IllegalStateException: Client is not started
>   at 
> org.apache.flink.shaded.com.google.common.base.Preconditions.checkState(Preconditions.java:173)
>   at 
> org.apache.curator.CuratorZookeeperClient.getZooKeeper(CuratorZookeeperClient.java:113)
>   at 
> org.apache.curator.utils.EnsurePath$InitialHelper$1.call(EnsurePath.java:148)
>   at org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:107)
>   at 
> org.apache.curator.utils.EnsurePath$InitialHelper.ensure(EnsurePath.java:141)
>   at org.apache.curator.utils.EnsurePath.ensure(EnsurePath.java:99)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:133)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.prepareAndCommitOffsets(ZookeeperOffsetHandler.java:93)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.commitInternalOffsetsToKafka(Kafka08Fetcher.java:341)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.notifyCheckpointComplete(FlinkKafkaConsumerBase.java:421)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyOfCompletedCheckpoint(AbstractUdfStreamOperator.java:229)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:571)
>   at org.apache.flink.runtime.taskmanager.Task$4.run(Task.java:1035)
>   ... 5 more
> 08:17:11,241 INFO  org.apache.flink.runtime.taskmanager.Task  
>- Attempting to cancel task Source: Custom Source -> Map -> Map -> Sink: 
> Unnamed (1/3)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #3194: [FLINK-5615][query] execute the QueryableStateITCa...

2017-01-23 Thread NicoK
GitHub user NicoK opened a pull request:

https://github.com/apache/flink/pull/3194

[FLINK-5615][query] execute the QueryableStateITCase for all three state 
back-ends

This extends the `QueryableStateITCase` so that it is able to run with any 
selected state backend. Some optimisations reduce the total runtime of the test 
cases so that we are able to run the tests with all three currently available 
backends, i.e. `MemoryStateBackend`, `FsStateBackend`, and 
`RocksDBStateBackend`, with little extra costs.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NicoK/flink flink-5615

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3194.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3194


commit 9c1a247ada0015ff1b97c6017e8c1de874ba6d17
Author: Nico Kruber 
Date:   2017-01-17T13:26:16Z

[FLINK-5613][query] querying a non-existing key is inconsistent among state 
backends

Querying for a non-existing key for a state that has a default value set
currently results in an UnknownKeyOrNamespace exception when the
MemoryStateBackend or FsStateBackend is used but results in the default 
value
if RocksDBStateBackend is set.

This removes the special handling from the RocksDBStateBackend and makes it
consistent with the other two back-ends, i.e. returning null which results
in the mentioned UnknownKeyOrNamespace exception.

commit cccbc5f30e09f5ccbc61a75fc9519dacf91c5522
Author: Nico Kruber 
Date:   2017-01-23T14:17:46Z

[FLINK-5615][query] improve testQueryableStateWithTaskManagerFailure test 
duration

This is based on the following points:
* slow down QueryableStateITCase#TestKeyRangeSource for the rest of the
  program to make more progress (we do not need a full-speed source here!)
* reduce the checkpointing interval
* reduce the amount of progress before starting our own evaluation
* reduce the number of checkpoints to wait for before killing a TM
* reduce the thread waiting time when asking how many checkpoints exist

Note that by slowing down QueryableStateITCase#TestKeyRangeSource, the other
tests should only be affected positively, too, since they also did not 
really
need a full-speed source and thus have more CPU cycles for their own tasks.

This, among with

commit 3dd506dd65f6f9f9e8879e6bf6df0261435d5317
Author: Nico Kruber 
Date:   2017-01-23T14:47:40Z

[FLINK-5615][query] speed up some more tests in QueryableStateITCase

This is based on reducing the number of keys the source generates. We do not
really need 1024 different keys for the tests - go with 256 now.

commit ee6b78eb3a91f39386148e3ef1c55a0f3824843f
Author: Nico Kruber 
Date:   2017-01-23T14:57:09Z

[FLINK-5615][query] execute the QueryableStateITCase for all state back-ends




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3035: [ FLINK-4905] Kafka test instability IllegalStateExceptio...

2017-01-23 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3035
  
Bit of background how the error happens:
  - The test throws a `SuccessException`
  - While being in the finally clause and shutting down the CluratorClient, 
the containing `Task` has not seen the exception.
  - When the commitOffsets() call fails, this overrides the 
`SuccessException` as the reason why the streaming program terminated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5592) Wrong number of RowSerializers with nested Rows in Collection mode

2017-01-23 Thread Jark Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834743#comment-15834743
 ] 

Jark Wu commented on FLINK-5592:


In Table API, please use 

{code}
// Java Table API
tEnv.scan("rows").select("person.get('age')")
// Scala Table API
tEnv.scan("rows").select('person.get("age"))
{code}

> Wrong number of RowSerializers with nested Rows in Collection mode
> --
>
> Key: FLINK-5592
> URL: https://issues.apache.org/jira/browse/FLINK-5592
> Project: Flink
>  Issue Type: Bug
>Reporter: Anton Solovev
>Priority: Minor
>
> {code}
>   @Test
>   def testNestedRowTypes(): Unit = {
> val env = ExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env, config)
> tEnv.registerTableSource("rows", new MockSource)
> val table: Table = tEnv.scan("rows")
> val nestedTable: Table = tEnv.scan("rows").select('person)
> val collect: Seq[Row] = nestedTable.collect()
> print(collect)
>   }
>   class MockSource extends BatchTableSource[Row] {
> import org.apache.flink.api.java.ExecutionEnvironment
> import org.apache.flink.api.java.DataSet
> override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = {
>   val data = List(
> Row.of(Row.of("data_1", "dob"), Row.of("info_4", "dub")),
> Row.of(Row.of("data_1", "dob"), Row.of("info_4", "dub")),
> Row.of(Row.of("data_1", "dob"), Row.of("info_4", "dub")))
>   execEnv.fromCollection(data.asJava, getReturnType)
> }
> override def getReturnType: TypeInformation[Row] = {
>   new RowTypeInfo(
> Array[TypeInformation[_]](
>   new RowTypeInfo(
> Array[TypeInformation[_]](BasicTypeInfo.STRING_TYPE_INFO, 
> BasicTypeInfo.STRING_TYPE_INFO),
> Array("name", "age"))),
> Array("person")
>   )
> }
>   }
> {code}
> throws {{java.lang.RuntimeException: Row arity of from does not match 
> serializers}}
> stacktrace 
> {code}
> at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:82)
>   at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:36)
>   at 
> org.apache.flink.api.common.operators.GenericDataSourceBase.executeOnCollections(GenericDataSourceBase.java:234)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.executeDataSource(CollectionExecutor.java:218)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:154)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:130)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.executeDataSink(CollectionExecutor.java:181)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:157)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:130)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:114)
>   at 
> org.apache.flink.api.java.CollectionEnvironment.execute(CollectionEnvironment.java:35)
>   at 
> org.apache.flink.test.util.CollectionTestEnvironment.execute(CollectionTestEnvironment.java:47)
>   at 
> org.apache.flink.test.util.CollectionTestEnvironment.execute(CollectionTestEnvironment.java:42)
>   at 
> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:672)
>   at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:547)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5615) queryable state: execute the QueryableStateITCase for all three state back-ends

2017-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834739#comment-15834739
 ] 

ASF GitHub Bot commented on FLINK-5615:
---

GitHub user NicoK opened a pull request:

https://github.com/apache/flink/pull/3194

[FLINK-5615][query] execute the QueryableStateITCase for all three state 
back-ends

This extends the `QueryableStateITCase` so that it is able to run with any 
selected state backend. Some optimisations reduce the total runtime of the test 
cases so that we are able to run the tests with all three currently available 
backends, i.e. `MemoryStateBackend`, `FsStateBackend`, and 
`RocksDBStateBackend`, with little extra costs.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NicoK/flink flink-5615

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3194.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3194


commit 9c1a247ada0015ff1b97c6017e8c1de874ba6d17
Author: Nico Kruber 
Date:   2017-01-17T13:26:16Z

[FLINK-5613][query] querying a non-existing key is inconsistent among state 
backends

Querying for a non-existing key for a state that has a default value set
currently results in an UnknownKeyOrNamespace exception when the
MemoryStateBackend or FsStateBackend is used but results in the default 
value
if RocksDBStateBackend is set.

This removes the special handling from the RocksDBStateBackend and makes it
consistent with the other two back-ends, i.e. returning null which results
in the mentioned UnknownKeyOrNamespace exception.

commit cccbc5f30e09f5ccbc61a75fc9519dacf91c5522
Author: Nico Kruber 
Date:   2017-01-23T14:17:46Z

[FLINK-5615][query] improve testQueryableStateWithTaskManagerFailure test 
duration

This is based on the following points:
* slow down QueryableStateITCase#TestKeyRangeSource for the rest of the
  program to make more progress (we do not need a full-speed source here!)
* reduce the checkpointing interval
* reduce the amount of progress before starting our own evaluation
* reduce the number of checkpoints to wait for before killing a TM
* reduce the thread waiting time when asking how many checkpoints exist

Note that by slowing down QueryableStateITCase#TestKeyRangeSource, the other
tests should only be affected positively, too, since they also did not 
really
need a full-speed source and thus have more CPU cycles for their own tasks.

This, among with

commit 3dd506dd65f6f9f9e8879e6bf6df0261435d5317
Author: Nico Kruber 
Date:   2017-01-23T14:47:40Z

[FLINK-5615][query] speed up some more tests in QueryableStateITCase

This is based on reducing the number of keys the source generates. We do not
really need 1024 different keys for the tests - go with 256 now.

commit ee6b78eb3a91f39386148e3ef1c55a0f3824843f
Author: Nico Kruber 
Date:   2017-01-23T14:57:09Z

[FLINK-5615][query] execute the QueryableStateITCase for all state back-ends




> queryable state: execute the QueryableStateITCase for all three state 
> back-ends
> ---
>
> Key: FLINK-5615
> URL: https://issues.apache.org/jira/browse/FLINK-5615
> Project: Flink
>  Issue Type: Improvement
>  Components: Queryable State
>Affects Versions: 1.2.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> The QueryableStateITCase currently is only tested with the MemoryStateBackend 
> but as has been seen in the past, some errors or inconsistent behaviour only 
> appeared with different state back-ends. It should thus be extended to be 
> tested with all three currently existing state back-ends.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5473) setMaxParallelism() higher than 1 is possible on non-parallel operators

2017-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834714#comment-15834714
 ] 

ASF GitHub Bot commented on FLINK-5473:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3182#discussion_r97337263
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
 ---
@@ -57,47 +57,51 @@
 
/** Use the same log for all ExecutionGraph classes */
private static final Logger LOG = ExecutionGraph.LOG;
-   
-   private final SerializableObject stateMonitor = new 
SerializableObject();
+
+   public static final int VALUE_NOT_SET = -1;
+
+   private final Object stateMonitor = new Object();

private final ExecutionGraph graph;

private final JobVertex jobVertex;

private final ExecutionVertex[] taskVertices;
 
-   private IntermediateResult[] producedDataSets;
+   private final IntermediateResult[] producedDataSets;

private final List inputs;

private final int parallelism;
 
-   private final int maxParallelism;
-   
private final boolean[] finishedSubtasks;
-   
-   private volatile int numSubtasksInFinalState;
-   
+
private final SlotSharingGroup slotSharingGroup;
-   
+
private final CoLocationGroup coLocationGroup;
-   
+
private final InputSplit[] inputSplits;
 
+   private final int maxParallelismConfigured;
+
+   private int maxParallelismDerived;
+
+   private volatile int numSubtasksInFinalState;
+
/**
 * Serialized task information which is for all sub tasks the same. 
Thus, it avoids to
 * serialize the same information multiple times in order to create the
 * TaskDeploymentDescriptors.
 */
-   private final SerializedValue 
serializedTaskInformation;
+   private SerializedValue serializedTaskInformation;
 
private InputSplitAssigner splitAssigner;

public ExecutionJobVertex(
ExecutionGraph graph,
JobVertex jobVertex,
int defaultParallelism,
-   Time timeout) throws JobException, IOException {
+   Time timeout) throws JobException {
--- End diff --

You are right, but I kept the indentation to avoid formatting changes.


> setMaxParallelism() higher than 1 is possible on non-parallel operators
> ---
>
> Key: FLINK-5473
> URL: https://issues.apache.org/jira/browse/FLINK-5473
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Assignee: Stefan Richter
>
> While trying out Flink 1.2, I found out that you can set a maxParallelism 
> higher than 1 on a non-parallel operator.
> I think we should have the same semantics as the setParallelism() method.
> Also, when setting a global maxParallelism in the execution environment, it 
> will be set as a default value for the non-parallel operator.
> When restoring a savepoint from 1.1, you have to set the maxParallelism to 
> the parallelism of the 1.1 job. Non-parallel operators will then also get the 
> maxPar set to this value, leading to an error on restore.
> So currently, users restoring from 1.1 to 1.2 have to manually set the 
> maxParallelism to 1 for all non-parallel operators.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #3193: [FLINK-5527][query] querying a non-existing key is...

2017-01-23 Thread NicoK
GitHub user NicoK opened a pull request:

https://github.com/apache/flink/pull/3193

[FLINK-5527][query] querying a non-existing key is inconsistent among state 
backends

Querying for a non-existing key for a state that has a default value set
currently results in an UnknownKeyOrNamespace exception when the
MemoryStateBackend or FsStateBackend is used but results in the default 
value
if RocksDBStateBackend is set.

This removes the special handling from the RocksDBStateBackend and makes it
consistent with the other two back-ends, i.e. returning null which results
in the mentioned UnknownKeyOrNamespace exception.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NicoK/flink flink-5613

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3193.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3193


commit a767d4ec9e88c39d3902fc633b331fb64163
Author: Nico Kruber 
Date:   2017-01-17T13:26:16Z

[FLINK-5527][query] querying a non-existing key is inconsistent among state 
backends

Querying for a non-existing key for a state that has a default value set
currently results in an UnknownKeyOrNamespace exception when the
MemoryStateBackend or FsStateBackend is used but results in the default 
value
if RocksDBStateBackend is set.

This removes the special handling from the RocksDBStateBackend and makes it
consistent with the other two back-ends, i.e. returning null which results
in the mentioned UnknownKeyOrNamespace exception.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3128: [FLINK-5464] Improve MetricDumpSerialization error handli...

2017-01-23 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3128
  
@uce I've addressed your comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5464) MetricQueryService throws NullPointerException on JobManager

2017-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834724#comment-15834724
 ] 

ASF GitHub Bot commented on FLINK-5464:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3128
  
@uce I've addressed your comments.


> MetricQueryService throws NullPointerException on JobManager
> 
>
> Key: FLINK-5464
> URL: https://issues.apache.org/jira/browse/FLINK-5464
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
>
> I'm using Flink 699f4b0.
> My JobManager log contains many of these log entries:
> {code}
> 2017-01-11 19:42:05,778 WARN  
> org.apache.flink.runtime.webmonitor.metrics.MetricFetcher - Fetching 
> metrics failed.
> akka.pattern.AskTimeoutException: Ask timed out on 
> [Actor[akka://flink/user/MetricQueryService#-970662317]] after [1 ms]
>   at 
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
>   at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
>   at 
> scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
>   at 
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691)
>   at 
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
>   at java.lang.Thread.run(Thread.java:745)
> 2017-01-11 19:42:07,765 WARN  
> org.apache.flink.runtime.metrics.dump.MetricQueryService  - An exception 
> occurred while processing a message.
> java.lang.NullPointerException
>   at 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:162)
>   at 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$300(MetricDumpSerialization.java:47)
>   at 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:90)
>   at 
> org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109)
>   at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #3182: [FLINK-5473] Limit MaxParallelism to 1 for non-par...

2017-01-23 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3182#discussion_r97337263
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
 ---
@@ -57,47 +57,51 @@
 
/** Use the same log for all ExecutionGraph classes */
private static final Logger LOG = ExecutionGraph.LOG;
-   
-   private final SerializableObject stateMonitor = new 
SerializableObject();
+
+   public static final int VALUE_NOT_SET = -1;
+
+   private final Object stateMonitor = new Object();

private final ExecutionGraph graph;

private final JobVertex jobVertex;

private final ExecutionVertex[] taskVertices;
 
-   private IntermediateResult[] producedDataSets;
+   private final IntermediateResult[] producedDataSets;

private final List inputs;

private final int parallelism;
 
-   private final int maxParallelism;
-   
private final boolean[] finishedSubtasks;
-   
-   private volatile int numSubtasksInFinalState;
-   
+
private final SlotSharingGroup slotSharingGroup;
-   
+
private final CoLocationGroup coLocationGroup;
-   
+
private final InputSplit[] inputSplits;
 
+   private final int maxParallelismConfigured;
+
+   private int maxParallelismDerived;
+
+   private volatile int numSubtasksInFinalState;
+
/**
 * Serialized task information which is for all sub tasks the same. 
Thus, it avoids to
 * serialize the same information multiple times in order to create the
 * TaskDeploymentDescriptors.
 */
-   private final SerializedValue 
serializedTaskInformation;
+   private SerializedValue serializedTaskInformation;
 
private InputSplitAssigner splitAssigner;

public ExecutionJobVertex(
ExecutionGraph graph,
JobVertex jobVertex,
int defaultParallelism,
-   Time timeout) throws JobException, IOException {
+   Time timeout) throws JobException {
--- End diff --

You are right, but I kept the indentation to avoid formatting changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5473) setMaxParallelism() higher than 1 is possible on non-parallel operators

2017-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834718#comment-15834718
 ] 

ASF GitHub Bot commented on FLINK-5473:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/3182
  
Thanks for the review, @tillrohrmann! I followed all of your suggestions, 
except for the indentation formatting.


> setMaxParallelism() higher than 1 is possible on non-parallel operators
> ---
>
> Key: FLINK-5473
> URL: https://issues.apache.org/jira/browse/FLINK-5473
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Assignee: Stefan Richter
>
> While trying out Flink 1.2, I found out that you can set a maxParallelism 
> higher than 1 on a non-parallel operator.
> I think we should have the same semantics as the setParallelism() method.
> Also, when setting a global maxParallelism in the execution environment, it 
> will be set as a default value for the non-parallel operator.
> When restoring a savepoint from 1.1, you have to set the maxParallelism to 
> the parallelism of the 1.1 job. Non-parallel operators will then also get the 
> maxPar set to this value, leading to an error on restore.
> So currently, users restoring from 1.1 to 1.2 have to manually set the 
> maxParallelism to 1 for all non-parallel operators.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #3182: [FLINK-5473] Limit MaxParallelism to 1 for non-parallel o...

2017-01-23 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/3182
  
Thanks for the review, @tillrohrmann! I followed all of your suggestions, 
except for the indentation formatting.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5527) QueryableState: requesting a non-existing key in MemoryStateBackend or FsStateBackend does not return the default value

2017-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834715#comment-15834715
 ] 

ASF GitHub Bot commented on FLINK-5527:
---

GitHub user NicoK opened a pull request:

https://github.com/apache/flink/pull/3193

[FLINK-5527][query] querying a non-existing key is inconsistent among state 
backends

Querying for a non-existing key for a state that has a default value set
currently results in an UnknownKeyOrNamespace exception when the
MemoryStateBackend or FsStateBackend is used but results in the default 
value
if RocksDBStateBackend is set.

This removes the special handling from the RocksDBStateBackend and makes it
consistent with the other two back-ends, i.e. returning null which results
in the mentioned UnknownKeyOrNamespace exception.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NicoK/flink flink-5613

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3193.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3193


commit a767d4ec9e88c39d3902fc633b331fb64163
Author: Nico Kruber 
Date:   2017-01-17T13:26:16Z

[FLINK-5527][query] querying a non-existing key is inconsistent among state 
backends

Querying for a non-existing key for a state that has a default value set
currently results in an UnknownKeyOrNamespace exception when the
MemoryStateBackend or FsStateBackend is used but results in the default 
value
if RocksDBStateBackend is set.

This removes the special handling from the RocksDBStateBackend and makes it
consistent with the other two back-ends, i.e. returning null which results
in the mentioned UnknownKeyOrNamespace exception.




> QueryableState: requesting a non-existing key in MemoryStateBackend or 
> FsStateBackend does not return the default value
> ---
>
> Key: FLINK-5527
> URL: https://issues.apache.org/jira/browse/FLINK-5527
> Project: Flink
>  Issue Type: Improvement
>  Components: Queryable State
>Affects Versions: 1.2.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> Querying for a non-existing key for a state that has a default value set 
> currently results in an UnknownKeyOrNamespace exception when the 
> MemoryStateBackend or FsStateBackend is used. It should return the default 
> value instead just like the RocksDBStateBackend.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #3025: [FLINK-5365] Mesos AppMaster/TaskManager should ob...

2017-01-23 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3025


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Resolved] (FLINK-5365) Mesos AppMaster/TaskManager should obey sigterm

2017-01-23 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5365?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-5365.
--
   Resolution: Fixed
Fix Version/s: 1.3.0
   1.2.0

1.3.0: Fixed via 5d0d279dc5d82fb63bc6cbd7c8fac2324959a516
1.2.0: Fixed via e0a784197de6c6ff55d0d4e10d3316240706

> Mesos AppMaster/TaskManager should obey sigterm
> ---
>
> Key: FLINK-5365
> URL: https://issues.apache.org/jira/browse/FLINK-5365
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management, Mesos
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>Priority: Minor
> Fix For: 1.2.0, 1.3.0
>
>
> The AppMaster and TaskManager are ignoring the sigterm sent by 
> Marathon/Mesos.   The reason is simply that the shell scripts used to start 
> them don't pass the signal to java.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5365) Mesos AppMaster/TaskManager should obey sigterm

2017-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834710#comment-15834710
 ] 

ASF GitHub Bot commented on FLINK-5365:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3025


> Mesos AppMaster/TaskManager should obey sigterm
> ---
>
> Key: FLINK-5365
> URL: https://issues.apache.org/jira/browse/FLINK-5365
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management, Mesos
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>Priority: Minor
>
> The AppMaster and TaskManager are ignoring the sigterm sent by 
> Marathon/Mesos.   The reason is simply that the shell scripts used to start 
> them don't pass the signal to java.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5592) Wrong number of RowSerializers with nested Rows in Collection mode

2017-01-23 Thread Anton Solovev (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Solovev updated FLINK-5592:
-
Priority: Minor  (was: Major)

> Wrong number of RowSerializers with nested Rows in Collection mode
> --
>
> Key: FLINK-5592
> URL: https://issues.apache.org/jira/browse/FLINK-5592
> Project: Flink
>  Issue Type: Bug
>Reporter: Anton Solovev
>Priority: Minor
>
> {code}
>   @Test
>   def testNestedRowTypes(): Unit = {
> val env = ExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env, config)
> tEnv.registerTableSource("rows", new MockSource)
> val table: Table = tEnv.scan("rows")
> val nestedTable: Table = tEnv.scan("rows").select('person)
> val collect: Seq[Row] = nestedTable.collect()
> print(collect)
>   }
>   class MockSource extends BatchTableSource[Row] {
> import org.apache.flink.api.java.ExecutionEnvironment
> import org.apache.flink.api.java.DataSet
> override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = {
>   val data = List(
> Row.of(Row.of("data_1", "dob"), Row.of("info_4", "dub")),
> Row.of(Row.of("data_1", "dob"), Row.of("info_4", "dub")),
> Row.of(Row.of("data_1", "dob"), Row.of("info_4", "dub")))
>   execEnv.fromCollection(data.asJava, getReturnType)
> }
> override def getReturnType: TypeInformation[Row] = {
>   new RowTypeInfo(
> Array[TypeInformation[_]](
>   new RowTypeInfo(
> Array[TypeInformation[_]](BasicTypeInfo.STRING_TYPE_INFO, 
> BasicTypeInfo.STRING_TYPE_INFO),
> Array("name", "age"))),
> Array("person")
>   )
> }
>   }
> {code}
> throws {{java.lang.RuntimeException: Row arity of from does not match 
> serializers}}
> stacktrace 
> {code}
> at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:82)
>   at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:36)
>   at 
> org.apache.flink.api.common.operators.GenericDataSourceBase.executeOnCollections(GenericDataSourceBase.java:234)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.executeDataSource(CollectionExecutor.java:218)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:154)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:130)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.executeDataSink(CollectionExecutor.java:181)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:157)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:130)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:114)
>   at 
> org.apache.flink.api.java.CollectionEnvironment.execute(CollectionEnvironment.java:35)
>   at 
> org.apache.flink.test.util.CollectionTestEnvironment.execute(CollectionTestEnvironment.java:47)
>   at 
> org.apache.flink.test.util.CollectionTestEnvironment.execute(CollectionTestEnvironment.java:42)
>   at 
> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:672)
>   at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:547)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-5592) Wrong number of RowSerializers with nested Rows in Collection mode

2017-01-23 Thread Anton Solovev (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834697#comment-15834697
 ] 

Anton Solovev edited comment on FLINK-5592 at 1/23/17 3:07 PM:
---

[ Hi [~jark], thank you for helping me. I want exactly a row of a number of 
rows. You are right, it's problem of my code 
{code} override def getReturnType: TypeInformation[Row] = {
  new RowTypeInfo(
Array[TypeInformation[_]](
  new RowTypeInfo(
Array[TypeInformation[_]](
  BasicTypeInfo.STRING_TYPE_INFO,
  BasicTypeInfo.STRING_TYPE_INFO),
Array("name", "age")),
  new RowTypeInfo(
Array[TypeInformation[_]](
  BasicTypeInfo.STRING_TYPE_INFO,
  BasicTypeInfo.STRING_TYPE_INFO),
Array("more_info", "and_so_on"))),
Array("person", "additional")
  )
} {code} {{getReturnType}} does that thing I want, but how we can get 
nested field by table api? 
via table.scan("rows").select("person.name") it doesn't work


was (Author: tonycox):
[ Hi [~jark], thank you for helping me. I want exactly a row of a number of 
rows. You are right, it's problem of my code 
{code} override def getReturnType: TypeInformation[Row] = {
  new RowTypeInfo(
Array[TypeInformation[_]](
  new RowTypeInfo(
Array[TypeInformation[_]](
  BasicTypeInfo.STRING_TYPE_INFO,
  BasicTypeInfo.STRING_TYPE_INFO),
Array("name", "age")),
  new RowTypeInfo(
Array[TypeInformation[_]](
  BasicTypeInfo.STRING_TYPE_INFO,
  BasicTypeInfo.STRING_TYPE_INFO),
Array("more_info", "and_so_on"))),
Array("person", "additional")
  )
} {code} does that thing I want

> Wrong number of RowSerializers with nested Rows in Collection mode
> --
>
> Key: FLINK-5592
> URL: https://issues.apache.org/jira/browse/FLINK-5592
> Project: Flink
>  Issue Type: Bug
>Reporter: Anton Solovev
>
> {code}
>   @Test
>   def testNestedRowTypes(): Unit = {
> val env = ExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env, config)
> tEnv.registerTableSource("rows", new MockSource)
> val table: Table = tEnv.scan("rows")
> val nestedTable: Table = tEnv.scan("rows").select('person)
> val collect: Seq[Row] = nestedTable.collect()
> print(collect)
>   }
>   class MockSource extends BatchTableSource[Row] {
> import org.apache.flink.api.java.ExecutionEnvironment
> import org.apache.flink.api.java.DataSet
> override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = {
>   val data = List(
> Row.of(Row.of("data_1", "dob"), Row.of("info_4", "dub")),
> Row.of(Row.of("data_1", "dob"), Row.of("info_4", "dub")),
> Row.of(Row.of("data_1", "dob"), Row.of("info_4", "dub")))
>   execEnv.fromCollection(data.asJava, getReturnType)
> }
> override def getReturnType: TypeInformation[Row] = {
>   new RowTypeInfo(
> Array[TypeInformation[_]](
>   new RowTypeInfo(
> Array[TypeInformation[_]](BasicTypeInfo.STRING_TYPE_INFO, 
> BasicTypeInfo.STRING_TYPE_INFO),
> Array("name", "age"))),
> Array("person")
>   )
> }
>   }
> {code}
> throws {{java.lang.RuntimeException: Row arity of from does not match 
> serializers}}
> stacktrace 
> {code}
> at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:82)
>   at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:36)
>   at 
> org.apache.flink.api.common.operators.GenericDataSourceBase.executeOnCollections(GenericDataSourceBase.java:234)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.executeDataSource(CollectionExecutor.java:218)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:154)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:130)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.executeDataSink(CollectionExecutor.java:181)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:157)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:130)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:114)
>   at 
> org.apache.flink.api.java.CollectionEnvironment.execute(CollectionEnvironment.java:35)
>   at 
> org.apache.flink.test.util.CollectionTestEnvironment.execute(CollectionTestEnvironment.java:47)
>   at 
> 

[jira] [Reopened] (FLINK-5592) Wrong number of RowSerializers with nested Rows in Collection mode

2017-01-23 Thread Anton Solovev (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Solovev reopened FLINK-5592:
--

> Wrong number of RowSerializers with nested Rows in Collection mode
> --
>
> Key: FLINK-5592
> URL: https://issues.apache.org/jira/browse/FLINK-5592
> Project: Flink
>  Issue Type: Bug
>Reporter: Anton Solovev
>
> {code}
>   @Test
>   def testNestedRowTypes(): Unit = {
> val env = ExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env, config)
> tEnv.registerTableSource("rows", new MockSource)
> val table: Table = tEnv.scan("rows")
> val nestedTable: Table = tEnv.scan("rows").select('person)
> val collect: Seq[Row] = nestedTable.collect()
> print(collect)
>   }
>   class MockSource extends BatchTableSource[Row] {
> import org.apache.flink.api.java.ExecutionEnvironment
> import org.apache.flink.api.java.DataSet
> override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = {
>   val data = List(
> Row.of(Row.of("data_1", "dob"), Row.of("info_4", "dub")),
> Row.of(Row.of("data_1", "dob"), Row.of("info_4", "dub")),
> Row.of(Row.of("data_1", "dob"), Row.of("info_4", "dub")))
>   execEnv.fromCollection(data.asJava, getReturnType)
> }
> override def getReturnType: TypeInformation[Row] = {
>   new RowTypeInfo(
> Array[TypeInformation[_]](
>   new RowTypeInfo(
> Array[TypeInformation[_]](BasicTypeInfo.STRING_TYPE_INFO, 
> BasicTypeInfo.STRING_TYPE_INFO),
> Array("name", "age"))),
> Array("person")
>   )
> }
>   }
> {code}
> throws {{java.lang.RuntimeException: Row arity of from does not match 
> serializers}}
> stacktrace 
> {code}
> at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:82)
>   at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:36)
>   at 
> org.apache.flink.api.common.operators.GenericDataSourceBase.executeOnCollections(GenericDataSourceBase.java:234)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.executeDataSource(CollectionExecutor.java:218)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:154)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:130)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.executeDataSink(CollectionExecutor.java:181)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:157)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:130)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:114)
>   at 
> org.apache.flink.api.java.CollectionEnvironment.execute(CollectionEnvironment.java:35)
>   at 
> org.apache.flink.test.util.CollectionTestEnvironment.execute(CollectionTestEnvironment.java:47)
>   at 
> org.apache.flink.test.util.CollectionTestEnvironment.execute(CollectionTestEnvironment.java:42)
>   at 
> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:672)
>   at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:547)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #3025: [FLINK-5365] Mesos AppMaster/TaskManager should obey sigt...

2017-01-23 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/3025
  
Thanks for your contribution @EronWright. Really good fix :-) Merging this 
PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5365) Mesos AppMaster/TaskManager should obey sigterm

2017-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834707#comment-15834707
 ] 

ASF GitHub Bot commented on FLINK-5365:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/3025
  
Thanks for your contribution @EronWright. Really good fix :-) Merging this 
PR.


> Mesos AppMaster/TaskManager should obey sigterm
> ---
>
> Key: FLINK-5365
> URL: https://issues.apache.org/jira/browse/FLINK-5365
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management, Mesos
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>Priority: Minor
>
> The AppMaster and TaskManager are ignoring the sigterm sent by 
> Marathon/Mesos.   The reason is simply that the shell scripts used to start 
> them don't pass the signal to java.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-5592) Wrong number of RowSerializers with nested Rows in Collection mode

2017-01-23 Thread Anton Solovev (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Solovev closed FLINK-5592.

Resolution: Not A Problem

> Wrong number of RowSerializers with nested Rows in Collection mode
> --
>
> Key: FLINK-5592
> URL: https://issues.apache.org/jira/browse/FLINK-5592
> Project: Flink
>  Issue Type: Bug
>Reporter: Anton Solovev
>
> {code}
>   @Test
>   def testNestedRowTypes(): Unit = {
> val env = ExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env, config)
> tEnv.registerTableSource("rows", new MockSource)
> val table: Table = tEnv.scan("rows")
> val nestedTable: Table = tEnv.scan("rows").select('person)
> val collect: Seq[Row] = nestedTable.collect()
> print(collect)
>   }
>   class MockSource extends BatchTableSource[Row] {
> import org.apache.flink.api.java.ExecutionEnvironment
> import org.apache.flink.api.java.DataSet
> override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = {
>   val data = List(
> Row.of(Row.of("data_1", "dob"), Row.of("info_4", "dub")),
> Row.of(Row.of("data_1", "dob"), Row.of("info_4", "dub")),
> Row.of(Row.of("data_1", "dob"), Row.of("info_4", "dub")))
>   execEnv.fromCollection(data.asJava, getReturnType)
> }
> override def getReturnType: TypeInformation[Row] = {
>   new RowTypeInfo(
> Array[TypeInformation[_]](
>   new RowTypeInfo(
> Array[TypeInformation[_]](BasicTypeInfo.STRING_TYPE_INFO, 
> BasicTypeInfo.STRING_TYPE_INFO),
> Array("name", "age"))),
> Array("person")
>   )
> }
>   }
> {code}
> throws {{java.lang.RuntimeException: Row arity of from does not match 
> serializers}}
> stacktrace 
> {code}
> at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:82)
>   at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:36)
>   at 
> org.apache.flink.api.common.operators.GenericDataSourceBase.executeOnCollections(GenericDataSourceBase.java:234)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.executeDataSource(CollectionExecutor.java:218)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:154)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:130)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.executeDataSink(CollectionExecutor.java:181)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:157)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:130)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:114)
>   at 
> org.apache.flink.api.java.CollectionEnvironment.execute(CollectionEnvironment.java:35)
>   at 
> org.apache.flink.test.util.CollectionTestEnvironment.execute(CollectionTestEnvironment.java:47)
>   at 
> org.apache.flink.test.util.CollectionTestEnvironment.execute(CollectionTestEnvironment.java:42)
>   at 
> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:672)
>   at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:547)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5592) Wrong number of RowSerializers with nested Rows in Collection mode

2017-01-23 Thread Anton Solovev (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834697#comment-15834697
 ] 

Anton Solovev commented on FLINK-5592:
--

[ Hi [~jark], thank you for helping me. I want exactly a row of a number of 
rows. You are right, it's problem of my code 
{code} override def getReturnType: TypeInformation[Row] = {
  new RowTypeInfo(
Array[TypeInformation[_]](
  new RowTypeInfo(
Array[TypeInformation[_]](
  BasicTypeInfo.STRING_TYPE_INFO,
  BasicTypeInfo.STRING_TYPE_INFO),
Array("name", "age")),
  new RowTypeInfo(
Array[TypeInformation[_]](
  BasicTypeInfo.STRING_TYPE_INFO,
  BasicTypeInfo.STRING_TYPE_INFO),
Array("more_info", "and_so_on"))),
Array("person", "additional")
  )
} {code} does that thing I want

> Wrong number of RowSerializers with nested Rows in Collection mode
> --
>
> Key: FLINK-5592
> URL: https://issues.apache.org/jira/browse/FLINK-5592
> Project: Flink
>  Issue Type: Bug
>Reporter: Anton Solovev
>
> {code}
>   @Test
>   def testNestedRowTypes(): Unit = {
> val env = ExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env, config)
> tEnv.registerTableSource("rows", new MockSource)
> val table: Table = tEnv.scan("rows")
> val nestedTable: Table = tEnv.scan("rows").select('person)
> val collect: Seq[Row] = nestedTable.collect()
> print(collect)
>   }
>   class MockSource extends BatchTableSource[Row] {
> import org.apache.flink.api.java.ExecutionEnvironment
> import org.apache.flink.api.java.DataSet
> override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = {
>   val data = List(
> Row.of(Row.of("data_1", "dob"), Row.of("info_4", "dub")),
> Row.of(Row.of("data_1", "dob"), Row.of("info_4", "dub")),
> Row.of(Row.of("data_1", "dob"), Row.of("info_4", "dub")))
>   execEnv.fromCollection(data.asJava, getReturnType)
> }
> override def getReturnType: TypeInformation[Row] = {
>   new RowTypeInfo(
> Array[TypeInformation[_]](
>   new RowTypeInfo(
> Array[TypeInformation[_]](BasicTypeInfo.STRING_TYPE_INFO, 
> BasicTypeInfo.STRING_TYPE_INFO),
> Array("name", "age"))),
> Array("person")
>   )
> }
>   }
> {code}
> throws {{java.lang.RuntimeException: Row arity of from does not match 
> serializers}}
> stacktrace 
> {code}
> at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:82)
>   at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:36)
>   at 
> org.apache.flink.api.common.operators.GenericDataSourceBase.executeOnCollections(GenericDataSourceBase.java:234)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.executeDataSource(CollectionExecutor.java:218)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:154)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:130)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.executeDataSink(CollectionExecutor.java:181)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:157)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:130)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:114)
>   at 
> org.apache.flink.api.java.CollectionEnvironment.execute(CollectionEnvironment.java:35)
>   at 
> org.apache.flink.test.util.CollectionTestEnvironment.execute(CollectionTestEnvironment.java:47)
>   at 
> org.apache.flink.test.util.CollectionTestEnvironment.execute(CollectionTestEnvironment.java:42)
>   at 
> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:672)
>   at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:547)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Issue Comment Deleted] (FLINK-5592) Wrong number of RowSerializers with nested Rows in Collection mode

2017-01-23 Thread Anton Solovev (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Solovev updated FLINK-5592:
-
Comment: was deleted

(was: Hi [~jark], thank you for helping me. I want exactly a row of a number of 
rows, this case falls even if 
{code}
override def getReturnType: TypeInformation[Row] = {
  new RowTypeInfo(
Array[TypeInformation[_]](
  new RowTypeInfo(
Array[TypeInformation[_]](BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.STRING_TYPE_INFO),
Array("name", "age")),
  new RowTypeInfo(
Array[TypeInformation[_]](BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.STRING_TYPE_INFO),
Array("more_info", "and_so_on"))),
Array("person", "additional")
  )
}
{code})

> Wrong number of RowSerializers with nested Rows in Collection mode
> --
>
> Key: FLINK-5592
> URL: https://issues.apache.org/jira/browse/FLINK-5592
> Project: Flink
>  Issue Type: Bug
>Reporter: Anton Solovev
>
> {code}
>   @Test
>   def testNestedRowTypes(): Unit = {
> val env = ExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env, config)
> tEnv.registerTableSource("rows", new MockSource)
> val table: Table = tEnv.scan("rows")
> val nestedTable: Table = tEnv.scan("rows").select('person)
> val collect: Seq[Row] = nestedTable.collect()
> print(collect)
>   }
>   class MockSource extends BatchTableSource[Row] {
> import org.apache.flink.api.java.ExecutionEnvironment
> import org.apache.flink.api.java.DataSet
> override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = {
>   val data = List(
> Row.of(Row.of("data_1", "dob"), Row.of("info_4", "dub")),
> Row.of(Row.of("data_1", "dob"), Row.of("info_4", "dub")),
> Row.of(Row.of("data_1", "dob"), Row.of("info_4", "dub")))
>   execEnv.fromCollection(data.asJava, getReturnType)
> }
> override def getReturnType: TypeInformation[Row] = {
>   new RowTypeInfo(
> Array[TypeInformation[_]](
>   new RowTypeInfo(
> Array[TypeInformation[_]](BasicTypeInfo.STRING_TYPE_INFO, 
> BasicTypeInfo.STRING_TYPE_INFO),
> Array("name", "age"))),
> Array("person")
>   )
> }
>   }
> {code}
> throws {{java.lang.RuntimeException: Row arity of from does not match 
> serializers}}
> stacktrace 
> {code}
> at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:82)
>   at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:36)
>   at 
> org.apache.flink.api.common.operators.GenericDataSourceBase.executeOnCollections(GenericDataSourceBase.java:234)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.executeDataSource(CollectionExecutor.java:218)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:154)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:130)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.executeDataSink(CollectionExecutor.java:181)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:157)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:130)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:114)
>   at 
> org.apache.flink.api.java.CollectionEnvironment.execute(CollectionEnvironment.java:35)
>   at 
> org.apache.flink.test.util.CollectionTestEnvironment.execute(CollectionTestEnvironment.java:47)
>   at 
> org.apache.flink.test.util.CollectionTestEnvironment.execute(CollectionTestEnvironment.java:42)
>   at 
> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:672)
>   at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:547)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4905) Kafka test instability IllegalStateException: Client is not started

2017-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834668#comment-15834668
 ] 

ASF GitHub Bot commented on FLINK-4905:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3035
  
I would like to pick this fix up. The exception has still occurred a few 
times for me in the past, and I prefer the above outlined solution, because it 
adds less locking on cancellation/shutdown, meaning there are fewer 
implications on deadlocks or long stalls on cancellation/shutdown.


> Kafka test instability IllegalStateException: Client is not started
> ---
>
> Key: FLINK-4905
> URL: https://issues.apache.org/jira/browse/FLINK-4905
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Reporter: Robert Metzger
>Assignee: Andrew Efimov
>  Labels: test-stability
> Attachments: Kafka08Fetcher.png
>
>
> The following travis build 
> (https://s3.amazonaws.com/archive.travis-ci.org/jobs/170365439/log.txt)  
> failed because of this error
> {code}
> 08:17:11,239 INFO  org.apache.flink.runtime.jobmanager.JobManager 
>- Status of job 33ebdc0e7c91be186d80658ce3d17069 (Read some records to 
> commit offsets to Kafka) changed to FAILING.
> java.lang.RuntimeException: Error while confirming checkpoint
>   at org.apache.flink.runtime.taskmanager.Task$4.run(Task.java:1040)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.IllegalStateException: Client is not started
>   at 
> org.apache.flink.shaded.com.google.common.base.Preconditions.checkState(Preconditions.java:173)
>   at 
> org.apache.curator.CuratorZookeeperClient.getZooKeeper(CuratorZookeeperClient.java:113)
>   at 
> org.apache.curator.utils.EnsurePath$InitialHelper$1.call(EnsurePath.java:148)
>   at org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:107)
>   at 
> org.apache.curator.utils.EnsurePath$InitialHelper.ensure(EnsurePath.java:141)
>   at org.apache.curator.utils.EnsurePath.ensure(EnsurePath.java:99)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:133)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.prepareAndCommitOffsets(ZookeeperOffsetHandler.java:93)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.commitInternalOffsetsToKafka(Kafka08Fetcher.java:341)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.notifyCheckpointComplete(FlinkKafkaConsumerBase.java:421)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyOfCompletedCheckpoint(AbstractUdfStreamOperator.java:229)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:571)
>   at org.apache.flink.runtime.taskmanager.Task$4.run(Task.java:1035)
>   ... 5 more
> 08:17:11,241 INFO  org.apache.flink.runtime.taskmanager.Task  
>- Attempting to cancel task Source: Custom Source -> Map -> Map -> Sink: 
> Unnamed (1/3)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-5078) Introduce annotations for classes copied from Calcite

2017-01-23 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5078?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther reassigned FLINK-5078:
---

Assignee: Timo Walther

> Introduce annotations for classes copied from Calcite
> -
>
> Key: FLINK-5078
> URL: https://issues.apache.org/jira/browse/FLINK-5078
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> We have already copied several classes from Calcite because of missing 
> features or bugs. In order to track those classes, update them when bumping 
> up the version, or check if they became obsolete it might be useful to 
> introduce a special annotation to mark those classes. Maybe with an 
> additional standardized comment format which lines have been modified. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #3035: [ FLINK-4905] Kafka test instability IllegalStateExceptio...

2017-01-23 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3035
  
I would like to pick this fix up. The exception has still occurred a few 
times for me in the past, and I prefer the above outlined solution, because it 
adds less locking on cancellation/shutdown, meaning there are fewer 
implications on deadlocks or long stalls on cancellation/shutdown.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4905) Kafka test instability IllegalStateException: Client is not started

2017-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834660#comment-15834660
 ] 

ASF GitHub Bot commented on FLINK-4905:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3035
  
I think we can take a very simple approach here. Many other parts of the 
code follow the approach to tolerate exceptions thrown during cancellation, or 
during asynchronous calls on closed operators.

  - The client may be closed during shutting down of the Kafka08Fetcher, 
before even the surrounding KafkaConsumerBase knows that it is closing

  - The KafkaConsumerBase tries to commit offsets, sees the exception, and 
re-throws it since it assumes it.

  - We can make the Kafka08Fetcher catch exceptions when committing 
offsets, and only re-throwing them if the fetcher is still running. That should 
do the trick.

BTW: There are plans to make the streaming API sources appear to be single 
threaded, to avoid that sources have to plan for such situations.


> Kafka test instability IllegalStateException: Client is not started
> ---
>
> Key: FLINK-4905
> URL: https://issues.apache.org/jira/browse/FLINK-4905
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Reporter: Robert Metzger
>Assignee: Andrew Efimov
>  Labels: test-stability
> Attachments: Kafka08Fetcher.png
>
>
> The following travis build 
> (https://s3.amazonaws.com/archive.travis-ci.org/jobs/170365439/log.txt)  
> failed because of this error
> {code}
> 08:17:11,239 INFO  org.apache.flink.runtime.jobmanager.JobManager 
>- Status of job 33ebdc0e7c91be186d80658ce3d17069 (Read some records to 
> commit offsets to Kafka) changed to FAILING.
> java.lang.RuntimeException: Error while confirming checkpoint
>   at org.apache.flink.runtime.taskmanager.Task$4.run(Task.java:1040)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.IllegalStateException: Client is not started
>   at 
> org.apache.flink.shaded.com.google.common.base.Preconditions.checkState(Preconditions.java:173)
>   at 
> org.apache.curator.CuratorZookeeperClient.getZooKeeper(CuratorZookeeperClient.java:113)
>   at 
> org.apache.curator.utils.EnsurePath$InitialHelper$1.call(EnsurePath.java:148)
>   at org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:107)
>   at 
> org.apache.curator.utils.EnsurePath$InitialHelper.ensure(EnsurePath.java:141)
>   at org.apache.curator.utils.EnsurePath.ensure(EnsurePath.java:99)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:133)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.prepareAndCommitOffsets(ZookeeperOffsetHandler.java:93)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.commitInternalOffsetsToKafka(Kafka08Fetcher.java:341)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.notifyCheckpointComplete(FlinkKafkaConsumerBase.java:421)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyOfCompletedCheckpoint(AbstractUdfStreamOperator.java:229)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:571)
>   at org.apache.flink.runtime.taskmanager.Task$4.run(Task.java:1035)
>   ... 5 more
> 08:17:11,241 INFO  org.apache.flink.runtime.taskmanager.Task  
>- Attempting to cancel task Source: Custom Source -> Map -> Map -> Sink: 
> Unnamed (1/3)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #3035: [ FLINK-4905] Kafka test instability IllegalStateExceptio...

2017-01-23 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3035
  
I think we can take a very simple approach here. Many other parts of the 
code follow the approach to tolerate exceptions thrown during cancellation, or 
during asynchronous calls on closed operators.

  - The client may be closed during shutting down of the Kafka08Fetcher, 
before even the surrounding KafkaConsumerBase knows that it is closing

  - The KafkaConsumerBase tries to commit offsets, sees the exception, and 
re-throws it since it assumes it.

  - We can make the Kafka08Fetcher catch exceptions when committing 
offsets, and only re-throwing them if the fetcher is still running. That should 
do the trick.

BTW: There are plans to make the streaming API sources appear to be single 
threaded, to avoid that sources have to plan for such situations.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5464) MetricQueryService throws NullPointerException on JobManager

2017-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834655#comment-15834655
 ] 

ASF GitHub Bot commented on FLINK-5464:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3128#discussion_r97327675
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
 ---
@@ -191,62 +233,65 @@ private static void serializeMeter(DataOutputStream 
dos, Meter meter) throws IOE
 *
 * @param data serialized metrics
 * @return A list containing the deserialized metrics.
-* @throws IOException
 */
-   public List deserialize(byte[] data) throws 
IOException {
-   ByteArrayInputStream bais = new 
ByteArrayInputStream(data);
-   DataInputStream dis = new DataInputStream(bais);
 
-   int numCounters = dis.readInt();
-   int numGauges = dis.readInt();
-   int numHistograms = dis.readInt();
-   int numMeters = dis.readInt();
+   public List 
deserialize(MetricDumpSerialization.MetricSerializationResult data) {
+   DataInputView in = new DataInputDeserializer(data.data, 
0, data.data.length);
 
-   List metrics = new ArrayList<>(numCounters 
+ numGauges + numHistograms);
+   List metrics = new 
ArrayList<>(data.numCounters + data.numGauges + data.numHistograms + 
data.numMeters);
 
-   for (int x = 0; x < numCounters; x++) {
-   metrics.add(deserializeCounter(dis));
+   for (int x = 0; x < data.numCounters; x++) {
+   try {
+   metrics.add(deserializeCounter(in));
+   } catch (Exception e) {
+   LOG.warn("Failed to deserialize 
counter.", e);
+   }
}
 
-   for (int x = 0; x < numGauges; x++) {
-   metrics.add(deserializeGauge(dis));
+   for (int x = 0; x < data.numGauges; x++) {
+   try {
+   metrics.add(deserializeGauge(in));
+   } catch (Exception e) {
+   LOG.warn("Failed to deserialize 
counter.", e);
+   }
}
 
-   for (int x = 0; x < numHistograms; x++) {
-   metrics.add(deserializeHistogram(dis));
+   for (int x = 0; x < data.numHistograms; x++) {
+   try {
+   metrics.add(deserializeHistogram(in));
+   } catch (Exception e) {
+   LOG.warn("Failed to deserialize 
counter.", e);
+   }
}
 
-   for (int x = 0; x < numMeters; x++) {
-   metrics.add(deserializeMeter(dis));
+   for (int x = 0; x < data.numMeters; x++) {
+   try {
+   metrics.add(deserializeMeter(in));
+   } catch (Exception e) {
+   LOG.warn("Failed to deserialize 
counter.", e);
+   }
}
-
-   return metrics;
}
}
 
-   private static String deserializeString(DataInputStream dis) throws 
IOException {
-   int stringLength = dis.readInt();
-   byte[] bytes = new byte[stringLength];
-   dis.readFully(bytes);
-   return new String(bytes);
-   }
 
-   private static MetricDump.CounterDump 
deserializeCounter(DataInputStream dis) throws IOException {
+   private static MetricDump.CounterDump deserializeCounter(DataInputView 
dis) throws IOException {
QueryScopeInfo scope = deserializeMetricInfo(dis);
-   String name = deserializeString(dis);
-   return new MetricDump.CounterDump(scope, name, dis.readLong());
+   String name = dis.readUTF();
+   long count = dis.readLong();
+   return new MetricDump.CounterDump(scope, name, count);
--- End diff --

Personally for short methods i think it's overkill. I would do it for 
methods like `deserializaHistogram` though.


> MetricQueryService throws NullPointerException on JobManager
> 

[GitHub] flink pull request #3128: [FLINK-5464] Improve MetricDumpSerialization error...

2017-01-23 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3128#discussion_r97327675
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
 ---
@@ -191,62 +233,65 @@ private static void serializeMeter(DataOutputStream 
dos, Meter meter) throws IOE
 *
 * @param data serialized metrics
 * @return A list containing the deserialized metrics.
-* @throws IOException
 */
-   public List deserialize(byte[] data) throws 
IOException {
-   ByteArrayInputStream bais = new 
ByteArrayInputStream(data);
-   DataInputStream dis = new DataInputStream(bais);
 
-   int numCounters = dis.readInt();
-   int numGauges = dis.readInt();
-   int numHistograms = dis.readInt();
-   int numMeters = dis.readInt();
+   public List 
deserialize(MetricDumpSerialization.MetricSerializationResult data) {
+   DataInputView in = new DataInputDeserializer(data.data, 
0, data.data.length);
 
-   List metrics = new ArrayList<>(numCounters 
+ numGauges + numHistograms);
+   List metrics = new 
ArrayList<>(data.numCounters + data.numGauges + data.numHistograms + 
data.numMeters);
 
-   for (int x = 0; x < numCounters; x++) {
-   metrics.add(deserializeCounter(dis));
+   for (int x = 0; x < data.numCounters; x++) {
+   try {
+   metrics.add(deserializeCounter(in));
+   } catch (Exception e) {
+   LOG.warn("Failed to deserialize 
counter.", e);
+   }
}
 
-   for (int x = 0; x < numGauges; x++) {
-   metrics.add(deserializeGauge(dis));
+   for (int x = 0; x < data.numGauges; x++) {
+   try {
+   metrics.add(deserializeGauge(in));
+   } catch (Exception e) {
+   LOG.warn("Failed to deserialize 
counter.", e);
+   }
}
 
-   for (int x = 0; x < numHistograms; x++) {
-   metrics.add(deserializeHistogram(dis));
+   for (int x = 0; x < data.numHistograms; x++) {
+   try {
+   metrics.add(deserializeHistogram(in));
+   } catch (Exception e) {
+   LOG.warn("Failed to deserialize 
counter.", e);
+   }
}
 
-   for (int x = 0; x < numMeters; x++) {
-   metrics.add(deserializeMeter(dis));
+   for (int x = 0; x < data.numMeters; x++) {
+   try {
+   metrics.add(deserializeMeter(in));
+   } catch (Exception e) {
+   LOG.warn("Failed to deserialize 
counter.", e);
+   }
}
-
-   return metrics;
}
}
 
-   private static String deserializeString(DataInputStream dis) throws 
IOException {
-   int stringLength = dis.readInt();
-   byte[] bytes = new byte[stringLength];
-   dis.readFully(bytes);
-   return new String(bytes);
-   }
 
-   private static MetricDump.CounterDump 
deserializeCounter(DataInputStream dis) throws IOException {
+   private static MetricDump.CounterDump deserializeCounter(DataInputView 
dis) throws IOException {
QueryScopeInfo scope = deserializeMetricInfo(dis);
-   String name = deserializeString(dis);
-   return new MetricDump.CounterDump(scope, name, dis.readLong());
+   String name = dis.readUTF();
+   long count = dis.readLong();
+   return new MetricDump.CounterDump(scope, name, count);
--- End diff --

Personally for short methods i think it's overkill. I would do it for 
methods like `deserializaHistogram` though.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   3   4   5   6   >