[jira] [Commented] (FLINK-7928) Extend the filed in ResourceProfile for precisely calculating the resource of a task manager

2017-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292164#comment-16292164
 ] 

ASF GitHub Bot commented on FLINK-7928:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4991


> Extend the filed in ResourceProfile for precisely calculating the resource of 
> a task manager
> 
>
> Key: FLINK-7928
> URL: https://issues.apache.org/jira/browse/FLINK-7928
> Project: Flink
>  Issue Type: Improvement
>  Components: JobManager, ResourceManager
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> ResourceProfile records all the resource requirements for a slot。It is 
> generated by JobMaster and then passed to ResourceManager with the slot 
> request. 
> A task in the slot needs three parts of resource: 
> 1. The resource for the operators, this is specified by the ResourceSpec user 
> defined 
> 2. The resource for the operators to communicating with their upstreams. For 
> example, the resource for buffer pools and so on.
> 3. The resource for the operators to communicating with their downstreams. 
> Same as above.
> So ResourceProfile should contain three parts of resource, the first part 
> from ResouceSpec, and the other two part be generated by Job Master.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4991: [FLINK-7928] [runtime] extend the resources in Res...

2017-12-14 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4991


---


[jira] [Resolved] (FLINK-7928) Extend the filed in ResourceProfile for precisely calculating the resource of a task manager

2017-12-14 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-7928.
--
   Resolution: Fixed
Fix Version/s: 1.5.0

Fixed via 5643d156cea72314c2240119b30aa32a65a0aeb7

> Extend the filed in ResourceProfile for precisely calculating the resource of 
> a task manager
> 
>
> Key: FLINK-7928
> URL: https://issues.apache.org/jira/browse/FLINK-7928
> Project: Flink
>  Issue Type: Improvement
>  Components: JobManager, ResourceManager
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> ResourceProfile records all the resource requirements for a slot。It is 
> generated by JobMaster and then passed to ResourceManager with the slot 
> request. 
> A task in the slot needs three parts of resource: 
> 1. The resource for the operators, this is specified by the ResourceSpec user 
> defined 
> 2. The resource for the operators to communicating with their upstreams. For 
> example, the resource for buffer pools and so on.
> 3. The resource for the operators to communicating with their downstreams. 
> Same as above.
> So ResourceProfile should contain three parts of resource, the first part 
> from ResouceSpec, and the other two part be generated by Job Master.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8258) Enable query configuration for batch queries

2017-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8258?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291887#comment-16291887
 ] 

ASF GitHub Bot commented on FLINK-8258:
---

GitHub user xccui opened a pull request:

https://github.com/apache/flink/pull/5169

[FLINK-8258] [table] Enable query configuration for batch queries

## What is the purpose of the change

This PR enables the query configuration for queries in batch table 
environment.

## Brief change log

  - Adds a `BatchQueryConfig` parameter to `DataSetRel.translateToPlan()` 
and adjusts the corresponding subclasses.
  - Adds new `toDataset()` methods with `BatchQueryConfig` to 
`BatchTableEnvironment`.
  - Updates the `TableConversions.scala`.


## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xccui/flink FLINK-8258

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5169.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5169


commit 9cb36b957fc4ed2ef6ad965304b759e9c7a53300
Author: Xingcan Cui 
Date:   2017-12-14T09:37:40Z

[FLINK-8258] [table] Enable query configuration for batch queries




> Enable query configuration for batch queries
> 
>
> Key: FLINK-8258
> URL: https://issues.apache.org/jira/browse/FLINK-8258
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Xingcan Cui
>Assignee: Xingcan Cui
>
> Query configuration holds some parameters to configure the behavior of batch 
> queries. However, since there was nothing to set for batch queries before, 
> the configuration was not really passed. Due to FLINK-8236, we need to enable 
> it now.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5169: [FLINK-8258] [table] Enable query configuration fo...

2017-12-14 Thread xccui
GitHub user xccui opened a pull request:

https://github.com/apache/flink/pull/5169

[FLINK-8258] [table] Enable query configuration for batch queries

## What is the purpose of the change

This PR enables the query configuration for queries in batch table 
environment.

## Brief change log

  - Adds a `BatchQueryConfig` parameter to `DataSetRel.translateToPlan()` 
and adjusts the corresponding subclasses.
  - Adds new `toDataset()` methods with `BatchQueryConfig` to 
`BatchTableEnvironment`.
  - Updates the `TableConversions.scala`.


## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xccui/flink FLINK-8258

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5169.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5169


commit 9cb36b957fc4ed2ef6ad965304b759e9c7a53300
Author: Xingcan Cui 
Date:   2017-12-14T09:37:40Z

[FLINK-8258] [table] Enable query configuration for batch queries




---


[GitHub] flink pull request #5165: [FLINK-8285] [table] Enable query configuration fo...

2017-12-14 Thread xccui
Github user xccui closed the pull request at:

https://github.com/apache/flink/pull/5165


---


[jira] [Commented] (FLINK-8265) Missing jackson dependency for flink-mesos

2017-12-14 Thread Jared Stehler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8265?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291605#comment-16291605
 ] 

Jared Stehler commented on FLINK-8265:
--

There is a shaded jackson class with jackson2 in the package, but none with the 
path shown. It seems like there is a missing include for jackson here in the 
flink-mesos pom?

{code}



com.google.protobuf:protobuf-java

org.apache.mesos:mesos

com.netflix.fenzo:fenzo-core





com.google.protobuf

org.apache.flink.mesos.shaded.com.google.protobuf



com.fasterxml.jackson

org.apache.flink.mesos.shaded.com.fasterxml.jackson


{code}

I was able as a workaround to build a simple shaded jackson deps jar relocated 
to "org.apache.flink.mesos.shaded.com.fasterxml.jackson" and adding that to my 
/lib.


> Missing jackson dependency for flink-mesos
> --
>
> Key: FLINK-8265
> URL: https://issues.apache.org/jira/browse/FLINK-8265
> Project: Flink
>  Issue Type: Bug
>  Components: Mesos
>Affects Versions: 1.4.0
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>Priority: Critical
> Fix For: 1.4.1
>
>
> The Jackson library that is required by Fenzo is missing from the Flink 
> distribution jar-file.
> This manifests as an exception in certain circumstances when a hard 
> constraint is configured ("mesos.constraints.hard.hostattribute").
> {code}
> NoClassDefFoundError: 
> org/apache/flink/mesos/shaded/com/fasterxml/jackson/databind/ObjectMapper
> at com.netflix.fenzo.ConstraintFailure.(ConstraintFailure.java:35)
> at 
> com.netflix.fenzo.AssignableVirtualMachine.findFailedHardConstraints(AssignableVirtualMachine.java:784)
> at 
> com.netflix.fenzo.AssignableVirtualMachine.tryRequest(AssignableVirtualMachine.java:581)
> at com.netflix.fenzo.TaskScheduler.evalAssignments(TaskScheduler.java:796)
> at com.netflix.fenzo.TaskScheduler.access$1500(TaskScheduler.java:70)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8265) Missing jackson dependency for flink-mesos

2017-12-14 Thread Eron Wright (JIRA)
Eron Wright  created FLINK-8265:
---

 Summary: Missing jackson dependency for flink-mesos
 Key: FLINK-8265
 URL: https://issues.apache.org/jira/browse/FLINK-8265
 Project: Flink
  Issue Type: Bug
  Components: Mesos
Affects Versions: 1.4.0
Reporter: Eron Wright 
Assignee: Eron Wright 
Priority: Critical
 Fix For: 1.4.1


The Jackson library that is required by Fenzo is missing from the Flink 
distribution jar-file.

This manifests as an exception in certain circumstances when a hard constraint 
is configured ("mesos.constraints.hard.hostattribute").

{code}
NoClassDefFoundError: 
org/apache/flink/mesos/shaded/com/fasterxml/jackson/databind/ObjectMapper
at com.netflix.fenzo.ConstraintFailure.(ConstraintFailure.java:35)
at 
com.netflix.fenzo.AssignableVirtualMachine.findFailedHardConstraints(AssignableVirtualMachine.java:784)
at 
com.netflix.fenzo.AssignableVirtualMachine.tryRequest(AssignableVirtualMachine.java:581)
at com.netflix.fenzo.TaskScheduler.evalAssignments(TaskScheduler.java:796)
at com.netflix.fenzo.TaskScheduler.access$1500(TaskScheduler.java:70)
{code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8257) Unify the value checks for setParallelism()

2017-12-14 Thread Eron Wright (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291536#comment-16291536
 ] 

Eron Wright  commented on FLINK-8257:
-

Related: in some cases the parallelism must be equal to that of the preceding 
operator. 
e.g. 
[DataStream::assignTimestampsAndWatermarks|https://github.com/apache/flink/blob/release-1.4.0/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java#L833]

> Unify the value checks for setParallelism()
> ---
>
> Key: FLINK-8257
> URL: https://issues.apache.org/jira/browse/FLINK-8257
> Project: Flink
>  Issue Type: Improvement
>  Components: Configuration
>Reporter: Xingcan Cui
>
> The {{setParallelism()}} method exist in many components from different 
> levels. Some of the methods require the input value to be greater than {{1}} 
> (e.g., {{StreamTransformation.setParallelism()}}), while some of also allow 
> the value to be {{ExecutionConfig.PARALLELISM_DEFAULT}}, which is {{-1}} by 
> default (e.g., {{DataSink.setParallelism()}}). We need to unify the value 
> checks for these methods.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5157: [hotfix] [docs] Consistent capitalization in Mesos docume...

2017-12-14 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/5157
  
@joerg84 could you file a PR for this change in `docs/ops/config.md` and 
elsewhere in `docs/ops/deployment/mesos.md`?


---


[jira] [Commented] (FLINK-8234) Cache JobExecutionResult from finished JobManagerRunners

2017-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291336#comment-16291336
 ] 

ASF GitHub Bot commented on FLINK-8234:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5168#discussion_r157027881
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobExecutionResultCache.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.client.SerializedJobExecutionResult;
+import org.apache.flink.types.Either;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Caches {@link SerializedJobExecutionResult}s.
+ *
+ * @see org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler
+ */
+class JobExecutionResultCache {
+
+   private static final int MAX_RESULT_CACHE_DURATION_SECONDS = 300;
+
+   private final Cache>
+   jobExecutionResultCache =
+   CacheBuilder.newBuilder()
+   .expireAfterWrite(MAX_RESULT_CACHE_DURATION_SECONDS, 
TimeUnit.SECONDS)
+   .build();
+
+   public void put(final SerializedJobExecutionResult result) {
+   assertJobExecutionResultNotCached(result.getJobId());
+   jobExecutionResultCache.put(result.getJobId(), 
Either.Right(result));
+   }
+
+   public void put(final JobID jobId, Throwable throwable) {
+   assertJobExecutionResultNotCached(jobId);
+   jobExecutionResultCache.put(jobId, Either.Left(throwable));
+   }
+
+   public boolean contains(final JobID jobId) {
+   return jobExecutionResultCache.getIfPresent(jobId) != null;
+   }
+
+   @Nullable
+   public Either get(final JobID 
jobId) {
--- End diff --

Not sure if I am abusing Flink's `Either` here.


> Cache JobExecutionResult from finished JobManagerRunners
> 
>
> Key: FLINK-8234
> URL: https://issues.apache.org/jira/browse/FLINK-8234
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Gary Yao
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> In order to serve the {{JobExecutionResults}} we have to cache them in the 
> {{Dispatcher}} after the {{JobManagerRunner}} has finished. The cache should 
> have a configurable size and should periodically clean up stale entries in 
> order to avoid memory leaks.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP

2017-12-14 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5168#discussion_r157027881
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobExecutionResultCache.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.client.SerializedJobExecutionResult;
+import org.apache.flink.types.Either;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Caches {@link SerializedJobExecutionResult}s.
+ *
+ * @see org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler
+ */
+class JobExecutionResultCache {
+
+   private static final int MAX_RESULT_CACHE_DURATION_SECONDS = 300;
+
+   private final Cache>
+   jobExecutionResultCache =
+   CacheBuilder.newBuilder()
+   .expireAfterWrite(MAX_RESULT_CACHE_DURATION_SECONDS, 
TimeUnit.SECONDS)
+   .build();
+
+   public void put(final SerializedJobExecutionResult result) {
+   assertJobExecutionResultNotCached(result.getJobId());
+   jobExecutionResultCache.put(result.getJobId(), 
Either.Right(result));
+   }
+
+   public void put(final JobID jobId, Throwable throwable) {
+   assertJobExecutionResultNotCached(jobId);
+   jobExecutionResultCache.put(jobId, Either.Left(throwable));
+   }
+
+   public boolean contains(final JobID jobId) {
+   return jobExecutionResultCache.getIfPresent(jobId) != null;
+   }
+
+   @Nullable
+   public Either get(final JobID 
jobId) {
--- End diff --

Not sure if I am abusing Flink's `Either` here.


---


[jira] [Commented] (FLINK-8234) Cache JobExecutionResult from finished JobManagerRunners

2017-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291334#comment-16291334
 ] 

ASF GitHub Bot commented on FLINK-8234:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5168#discussion_r157027633
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/messages/JobExecutionResultNotFoundException.java
 ---
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.util.FlinkException;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Exception indicating that we could not find a
+ * {@link org.apache.flink.api.common.JobExecutionResult} under the given 
{@link JobID}.
+ */
+public class JobExecutionResultNotFoundException extends FlinkException {
+
+   private final JobID jobId;
+
+   private static final long serialVersionUID = 1L;
--- End diff --

Should be on top.


> Cache JobExecutionResult from finished JobManagerRunners
> 
>
> Key: FLINK-8234
> URL: https://issues.apache.org/jira/browse/FLINK-8234
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Gary Yao
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> In order to serve the {{JobExecutionResults}} we have to cache them in the 
> {{Dispatcher}} after the {{JobManagerRunner}} has finished. The cache should 
> have a configurable size and should periodically clean up stale entries in 
> order to avoid memory leaks.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8234) Cache JobExecutionResult from finished JobManagerRunners

2017-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291333#comment-16291333
 ] 

ASF GitHub Bot commented on FLINK-8234:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5168#discussion_r157027510
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobExecutionResultCache.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.client.SerializedJobExecutionResult;
+import org.apache.flink.types.Either;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Caches {@link SerializedJobExecutionResult}s.
+ *
+ * @see org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler
+ */
+class JobExecutionResultCache {
+
+   private static final int MAX_RESULT_CACHE_DURATION_SECONDS = 300;
+
+   private final Cache>
+   jobExecutionResultCache =
+   CacheBuilder.newBuilder()
+   .expireAfterWrite(MAX_RESULT_CACHE_DURATION_SECONDS, 
TimeUnit.SECONDS)
+   .build();
+
+   public void put(final SerializedJobExecutionResult result) {
--- End diff --

Javadocs are missing.


> Cache JobExecutionResult from finished JobManagerRunners
> 
>
> Key: FLINK-8234
> URL: https://issues.apache.org/jira/browse/FLINK-8234
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Gary Yao
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> In order to serve the {{JobExecutionResults}} we have to cache them in the 
> {{Dispatcher}} after the {{JobManagerRunner}} has finished. The cache should 
> have a configurable size and should periodically clean up stale entries in 
> order to avoid memory leaks.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP

2017-12-14 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5168#discussion_r157027633
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/messages/JobExecutionResultNotFoundException.java
 ---
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.util.FlinkException;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Exception indicating that we could not find a
+ * {@link org.apache.flink.api.common.JobExecutionResult} under the given 
{@link JobID}.
+ */
+public class JobExecutionResultNotFoundException extends FlinkException {
+
+   private final JobID jobId;
+
+   private static final long serialVersionUID = 1L;
--- End diff --

Should be on top.


---


[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP

2017-12-14 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5168#discussion_r157027510
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobExecutionResultCache.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.client.SerializedJobExecutionResult;
+import org.apache.flink.types.Either;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Caches {@link SerializedJobExecutionResult}s.
+ *
+ * @see org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler
+ */
+class JobExecutionResultCache {
+
+   private static final int MAX_RESULT_CACHE_DURATION_SECONDS = 300;
+
+   private final Cache>
+   jobExecutionResultCache =
+   CacheBuilder.newBuilder()
+   .expireAfterWrite(MAX_RESULT_CACHE_DURATION_SECONDS, 
TimeUnit.SECONDS)
+   .build();
+
+   public void put(final SerializedJobExecutionResult result) {
--- End diff --

Javadocs are missing.


---


[jira] [Commented] (FLINK-8234) Cache JobExecutionResult from finished JobManagerRunners

2017-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291332#comment-16291332
 ] 

ASF GitHub Bot commented on FLINK-8234:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5168#discussion_r157027178
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestMapperUtils.java
 ---
@@ -33,10 +40,23 @@
objectMapper.enable(
DeserializationFeature.FAIL_ON_IGNORED_PROPERTIES,
DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES,
-   DeserializationFeature.FAIL_ON_READING_DUP_TREE_KEY,
-   
DeserializationFeature.FAIL_ON_MISSING_CREATOR_PROPERTIES);
--- End diff --

I had to remove `FAIL_ON_MISSING_CREATOR_PROPERTIES` because `null` fields 
are not always represented in the JSON. The `RestClient` would otherwise run 
into problems.


> Cache JobExecutionResult from finished JobManagerRunners
> 
>
> Key: FLINK-8234
> URL: https://issues.apache.org/jira/browse/FLINK-8234
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Gary Yao
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> In order to serve the {{JobExecutionResults}} we have to cache them in the 
> {{Dispatcher}} after the {{JobManagerRunner}} has finished. The cache should 
> have a configurable size and should periodically clean up stale entries in 
> order to avoid memory leaks.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP

2017-12-14 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5168#discussion_r157027178
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestMapperUtils.java
 ---
@@ -33,10 +40,23 @@
objectMapper.enable(
DeserializationFeature.FAIL_ON_IGNORED_PROPERTIES,
DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES,
-   DeserializationFeature.FAIL_ON_READING_DUP_TREE_KEY,
-   
DeserializationFeature.FAIL_ON_MISSING_CREATOR_PROPERTIES);
--- End diff --

I had to remove `FAIL_ON_MISSING_CREATOR_PROPERTIES` because `null` fields 
are not always represented in the JSON. The `RestClient` would otherwise run 
into problems.


---


[jira] [Commented] (FLINK-8234) Cache JobExecutionResult from finished JobManagerRunners

2017-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291328#comment-16291328
 ] 

ASF GitHub Bot commented on FLINK-8234:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5168#discussion_r157026590
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExecutionResultHandler.java
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import 
org.apache.flink.runtime.messages.JobExecutionResultNotFoundException;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.JobExecutionResult;
+import 
org.apache.flink.runtime.rest.messages.job.JobExecutionResultHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.ExceptionUtils;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+
+/**
+ * Returns the {@link org.apache.flink.api.common.JobExecutionResult} for 
a given {@link JobID}.
+ */
+public class JobExecutionResultHandler
--- End diff --

Sample response after running batch WordCount example:
```
{
  "status": {
"id": "CREATED"
  },
  "job-execution-result": {
"id": "533a165a6de7f70919a54b1d6f36d3b3",
"net-runtime": 0,
"accumulator-results": {
  "94a58184eb17398571f35da42b714517": 

[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP

2017-12-14 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5168#discussion_r157026590
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExecutionResultHandler.java
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import 
org.apache.flink.runtime.messages.JobExecutionResultNotFoundException;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.JobExecutionResult;
+import 
org.apache.flink.runtime.rest.messages.job.JobExecutionResultHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.ExceptionUtils;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+
+/**
+ * Returns the {@link org.apache.flink.api.common.JobExecutionResult} for 
a given {@link JobID}.
+ */
+public class JobExecutionResultHandler
--- End diff --

Sample response after running batch WordCount example:
```
{
  "status": {
"id": "CREATED"
  },
  "job-execution-result": {
"id": "533a165a6de7f70919a54b1d6f36d3b3",
"net-runtime": 0,
"accumulator-results": {
  "94a58184eb17398571f35da42b714517": 
"rO0ABXNyABNqYXZhLnV0aWwuQXJyYXlMaXN0eIHSHZnHYZ0DAAFJAARzaXpleHCqdwQAAACqdXIAAltCrPMX+AYIVOACAAB4cAYCYQV1cQB+AAILB2FjdGlvbgF1cQB+AAIKBmFmdGVyAXVxAH4AAgwIYWdhaW5zdAF1cQB+AAIIBGFsbAJ1cQB+AAIIBGFuZAx1cQB+AAIJBWFybXMBdXEAfgACCwdhcnJvd3MBdXEAfgACCQVhd3J5AXVxAH4AAgcDYXkBdXEAfgACCQViYXJlAXVxAH4AAgcDYmUEdXEAfgACCQViZWFyA3VxAH4AAgsHYm9ka2luAXVxAH4AAgoGYm91cm4BdXEAfgACCARidXQBdXEAfgACBwNieQJ1cQB+AAINCWNhbGFtaXR5AXVxAH4AAgkFY2FzdAF1cQB+AAIJBWNvaWwBdXEAfgACCQVjb21lAXVxAH4AAg8LY29uc2NpZW5jZQF1cQB+AAIRDWNvbnN1bW1hdGlvbgF1cQB+AAIOCmNvbnR1bWVseQF1cQB+AAIMCGNvdW50cnkBdXEAfgACDAhjb3dhcmRzAXVxAH4AAg0JY3VycmVudHMBdXEAfgACBgJkBHVxAH4AAgoGZGVhdGgCdXEAfgACCgZkZWxheQF1cQB+AAILB2Rlc3BpcwF1cQB+AAINCWRldm91dGx5AAA
 

[jira] [Commented] (FLINK-8234) Cache JobExecutionResult from finished JobManagerRunners

2017-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291325#comment-16291325
 ] 

ASF GitHub Bot commented on FLINK-8234:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5168#discussion_r157026313
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestMapperUtils.java
 ---
@@ -33,10 +40,23 @@
objectMapper.enable(
DeserializationFeature.FAIL_ON_IGNORED_PROPERTIES,
DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES,
-   DeserializationFeature.FAIL_ON_READING_DUP_TREE_KEY,
-   
DeserializationFeature.FAIL_ON_MISSING_CREATOR_PROPERTIES);
+   DeserializationFeature.FAIL_ON_READING_DUP_TREE_KEY);
objectMapper.disable(
SerializationFeature.FAIL_ON_EMPTY_BEANS);
+
+   final SimpleModule jacksonFlinkModule = new SimpleModule();
+
+   final JavaType serializedValueWildcardType = objectMapper
+   .getTypeFactory()
+   .constructType(new TypeReference() {
+   });
+
+   jacksonFlinkModule.addSerializer(new 
SerializedValueSerializer(serializedValueWildcardType));
--- End diff --

Could also be done using `@JsonSerialization` annotation


> Cache JobExecutionResult from finished JobManagerRunners
> 
>
> Key: FLINK-8234
> URL: https://issues.apache.org/jira/browse/FLINK-8234
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Gary Yao
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> In order to serve the {{JobExecutionResults}} we have to cache them in the 
> {{Dispatcher}} after the {{JobManagerRunner}} has finished. The cache should 
> have a configurable size and should periodically clean up stale entries in 
> order to avoid memory leaks.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP

2017-12-14 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5168#discussion_r157026313
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestMapperUtils.java
 ---
@@ -33,10 +40,23 @@
objectMapper.enable(
DeserializationFeature.FAIL_ON_IGNORED_PROPERTIES,
DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES,
-   DeserializationFeature.FAIL_ON_READING_DUP_TREE_KEY,
-   
DeserializationFeature.FAIL_ON_MISSING_CREATOR_PROPERTIES);
+   DeserializationFeature.FAIL_ON_READING_DUP_TREE_KEY);
objectMapper.disable(
SerializationFeature.FAIL_ON_EMPTY_BEANS);
+
+   final SimpleModule jacksonFlinkModule = new SimpleModule();
+
+   final JavaType serializedValueWildcardType = objectMapper
+   .getTypeFactory()
+   .constructType(new TypeReference() {
+   });
+
+   jacksonFlinkModule.addSerializer(new 
SerializedValueSerializer(serializedValueWildcardType));
--- End diff --

Could also be done using `@JsonSerialization` annotation


---


[jira] [Commented] (FLINK-8234) Cache JobExecutionResult from finished JobManagerRunners

2017-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291320#comment-16291320
 ] 

ASF GitHub Bot commented on FLINK-8234:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5168#discussion_r157025791
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobExecutionResultCache.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.client.SerializedJobExecutionResult;
+import org.apache.flink.types.Either;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Caches {@link SerializedJobExecutionResult}s.
+ *
+ * @see org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler
+ */
+class JobExecutionResultCache {
+
+   private static final int MAX_RESULT_CACHE_DURATION_SECONDS = 300;
+
+   private final Cache>
--- End diff --

Cache isn't size limited.


> Cache JobExecutionResult from finished JobManagerRunners
> 
>
> Key: FLINK-8234
> URL: https://issues.apache.org/jira/browse/FLINK-8234
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Gary Yao
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> In order to serve the {{JobExecutionResults}} we have to cache them in the 
> {{Dispatcher}} after the {{JobManagerRunner}} has finished. The cache should 
> have a configurable size and should periodically clean up stale entries in 
> order to avoid memory leaks.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP

2017-12-14 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5168#discussion_r157025791
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobExecutionResultCache.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.client.SerializedJobExecutionResult;
+import org.apache.flink.types.Either;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Caches {@link SerializedJobExecutionResult}s.
+ *
+ * @see org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler
+ */
+class JobExecutionResultCache {
+
+   private static final int MAX_RESULT_CACHE_DURATION_SECONDS = 300;
+
+   private final Cache>
--- End diff --

Cache isn't size limited.


---


[jira] [Commented] (FLINK-7878) Extend the resource type user can define in ResourceSpec

2017-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291255#comment-16291255
 ] 

ASF GitHub Bot commented on FLINK-7878:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4911


> Extend the resource type user can define in ResourceSpec
> 
>
> Key: FLINK-7878
> URL: https://issues.apache.org/jira/browse/FLINK-7878
> Project: Flink
>  Issue Type: Improvement
>  Components: DataSet API, DataStream API
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Now, flink only support user define how much CPU and MEM used in an operator, 
> but now the resource in a cluster is various. For example, an application for 
> image processing may need GPU, some others may need FPGA. 
> Only CPU and MEM is not enough, and the resource type is becoming more and 
> more, so we need to make the ResourSpec extendible.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4911: [FLINK-7878] [api] make resource type extendible i...

2017-12-14 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4911


---


[jira] [Resolved] (FLINK-7878) Extend the resource type user can define in ResourceSpec

2017-12-14 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7878?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-7878.
--
   Resolution: Fixed
Fix Version/s: 1.5.0

Fixed via 5b9ac9508b5d16f85b76a6de940458d385e23f0d

> Extend the resource type user can define in ResourceSpec
> 
>
> Key: FLINK-7878
> URL: https://issues.apache.org/jira/browse/FLINK-7878
> Project: Flink
>  Issue Type: Improvement
>  Components: DataSet API, DataStream API
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Now, flink only support user define how much CPU and MEM used in an operator, 
> but now the resource in a cluster is various. For example, an application for 
> image processing may need GPU, some others may need FPGA. 
> Only CPU and MEM is not enough, and the resource type is becoming more and 
> more, so we need to make the ResourSpec extendible.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8234) Cache JobExecutionResult from finished JobManagerRunners

2017-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291245#comment-16291245
 ] 

ASF GitHub Bot commented on FLINK-8234:
---

GitHub user GJL opened a pull request:

https://github.com/apache/flink/pull/5168

[FLINK-8234][flip6] WIP

WIP

@tillrohrmann 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/GJL/flink FLINK-8234

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5168.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5168


commit cc969846791bf818fbc81feb241a188410431ae5
Author: gyao 
Date:   2017-12-14T16:27:16Z

[FLINK-8234][flip6] WIP




> Cache JobExecutionResult from finished JobManagerRunners
> 
>
> Key: FLINK-8234
> URL: https://issues.apache.org/jira/browse/FLINK-8234
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Gary Yao
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> In order to serve the {{JobExecutionResults}} we have to cache them in the 
> {{Dispatcher}} after the {{JobManagerRunner}} has finished. The cache should 
> have a configurable size and should periodically clean up stale entries in 
> order to avoid memory leaks.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP

2017-12-14 Thread GJL
GitHub user GJL opened a pull request:

https://github.com/apache/flink/pull/5168

[FLINK-8234][flip6] WIP

WIP

@tillrohrmann 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/GJL/flink FLINK-8234

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5168.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5168


commit cc969846791bf818fbc81feb241a188410431ae5
Author: gyao 
Date:   2017-12-14T16:27:16Z

[FLINK-8234][flip6] WIP




---


[jira] [Commented] (FLINK-7956) Add support for scheduling with slot sharing

2017-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291158#comment-16291158
 ] 

ASF GitHub Bot commented on FLINK-7956:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5091


> Add support for scheduling with slot sharing
> 
>
> Key: FLINK-7956
> URL: https://issues.apache.org/jira/browse/FLINK-7956
> Project: Flink
>  Issue Type: Sub-task
>  Components: Scheduler
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> In order to reach feature equivalence with the old code base, we should add 
> support for scheduling with slot sharing to the {{SlotPool}}. This will also 
> allow us to run all the IT cases based on the {{AbstractTestBase}} on the 
> Flip-6 {{MiniCluster}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7956) Add support for scheduling with slot sharing

2017-12-14 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7956?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-7956.

   Resolution: Done
Fix Version/s: 1.5.0

Added via 0ef7fddeff8430fd40d2d7a1b8a6454fd9416ced

> Add support for scheduling with slot sharing
> 
>
> Key: FLINK-7956
> URL: https://issues.apache.org/jira/browse/FLINK-7956
> Project: Flink
>  Issue Type: Sub-task
>  Components: Scheduler
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> In order to reach feature equivalence with the old code base, we should add 
> support for scheduling with slot sharing to the {{SlotPool}}. This will also 
> allow us to run all the IT cases based on the {{AbstractTestBase}} on the 
> Flip-6 {{MiniCluster}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-14 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5091


---


[jira] [Commented] (FLINK-7956) Add support for scheduling with slot sharing

2017-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291153#comment-16291153
 ] 

ASF GitHub Bot commented on FLINK-7956:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5091
  
Thanks for the review @GJL and @ifndef-SleePy. Travis passed locally. 
Merging this PR.


> Add support for scheduling with slot sharing
> 
>
> Key: FLINK-7956
> URL: https://issues.apache.org/jira/browse/FLINK-7956
> Project: Flink
>  Issue Type: Sub-task
>  Components: Scheduler
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
>
> In order to reach feature equivalence with the old code base, we should add 
> support for scheduling with slot sharing to the {{SlotPool}}. This will also 
> allow us to run all the IT cases based on the {{AbstractTestBase}} on the 
> Flip-6 {{MiniCluster}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5091: [FLINK-7956] [flip6] Add support for queued scheduling wi...

2017-12-14 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5091
  
Thanks for the review @GJL and @ifndef-SleePy. Travis passed locally. 
Merging this PR.


---


[jira] [Assigned] (FLINK-8234) Cache JobExecutionResult from finished JobManagerRunners

2017-12-14 Thread Gary Yao (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao reassigned FLINK-8234:
---

Assignee: Gary Yao

> Cache JobExecutionResult from finished JobManagerRunners
> 
>
> Key: FLINK-8234
> URL: https://issues.apache.org/jira/browse/FLINK-8234
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Gary Yao
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> In order to serve the {{JobExecutionResults}} we have to cache them in the 
> {{Dispatcher}} after the {{JobManagerRunner}} has finished. The cache should 
> have a configurable size and should periodically clean up stale entries in 
> order to avoid memory leaks.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4612: [FLINK-7452] [types] Add helper methods for all bu...

2017-12-14 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r156958542
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java ---
@@ -19,56 +19,422 @@
 package org.apache.flink.api.common.typeinfo;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.EitherTypeInfo;
+import org.apache.flink.api.java.typeutils.EnumTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
+import org.apache.flink.types.Either;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.Value;
 
+import java.lang.reflect.Field;
 import java.math.BigDecimal;
+import java.math.BigInteger;
 import java.sql.Date;
 import java.sql.Time;
 import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 /**
- * This class gives access to the type information of the most most common 
types.
+ * This class gives access to the type information of the most common 
types for which Flink
+ * has built-in serializers and comparators.
+ *
+ * In many cases, Flink tries to analyze generic signatures of 
functions to determine return
+ * types automatically. This class is intended for cases where type 
information has to be
+ * supplied manually or would result in an inefficient type.
+ *
+ * Please note that the Scala API and Table API provide more 
specialized Types classes.
+ * (See org.apache.flink.api.scala.Types and 
org.apache.flink.table.api.Types)
+ *
+ * A more convenient alternative might be a {@link TypeHint}.
+ *
+ * @see TypeInformation#of(Class) specify type information based on a 
class that will be analyzed
+ * @see TypeInformation#of(TypeHint) specify type information based on a 
{@link TypeHint}
  */
 @PublicEvolving
 public class Types {
 
-   public static final BasicTypeInfo STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
-   public static final BasicTypeInfo BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
-   public static final BasicTypeInfo BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
-   public static final BasicTypeInfo SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
-   public static final BasicTypeInfo INT = 
BasicTypeInfo.INT_TYPE_INFO;
-   public static final BasicTypeInfo LONG = 
BasicTypeInfo.LONG_TYPE_INFO;
-   public static final BasicTypeInfo FLOAT = 
BasicTypeInfo.FLOAT_TYPE_INFO;
-   public static final BasicTypeInfo DOUBLE = 
BasicTypeInfo.DOUBLE_TYPE_INFO;
-   public static final BasicTypeInfo DECIMAL = 
BasicTypeInfo.BIG_DEC_TYPE_INFO;
+   /**
+* Returns type information for {@link java.lang.Void}. Does not 
support a null value.
+*/
+   public static final TypeInformation VOID = 
BasicTypeInfo.VOID_TYPE_INFO;
+
+   /**
+* Returns type information for {@link java.lang.String}. Supports a 
null value.
+*/
+   public static final TypeInformation STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive byte and 
{@link java.lang.Byte}.
+* Does not support a null value.
+*/
+   public static final TypeInformation BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive boolean 
and {@link java.lang.Boolean}.
+* Does not support a null value.
+*/
+   public static final TypeInformation BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive short and 
{@link java.lang.Short}.
+* Does not support a null value.
+*/
+   public static final TypeInformation SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive int and 
{@link java.lang.Integer}.
+* Does not support a null value.
+*/
+   public static final TypeInformation INT = 
BasicTypeInfo.INT_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive long and 
{@link java.lang.Long}.
+* Does not support a null 

[GitHub] flink pull request #4612: [FLINK-7452] [types] Add helper methods for all bu...

2017-12-14 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r156958975
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/Types.scala ---
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.typeutils
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.functions.InvalidTypesException
+import org.apache.flink.api.common.typeinfo.{TypeInformation, Types => 
JTypes}
+import org.apache.flink.types.Row
+
+import _root_.scala.collection.JavaConverters._
+import _root_.scala.util.{Either, Try}
+
+/**
+  * This class gives access to the type information of the most common 
Scala types for which Flink
+  * has built-in serializers and comparators.
+  *
+  * This class contains types of 
[[org.apache.flink.api.common.typeinfo.Types]] and adds
+  * types for Scala specific classes (such as [[Unit]] or case classes).
+  *
+  * In many cases, Flink tries to analyze generic signatures of functions 
to determine return
+  * types automatically. This class is intended for cases where type 
information has to be
+  * supplied manually or would result in an inefficient type.
+  *
+  * Scala macros allow to determine type information of classes and type 
parameters. You can
+  * use [[Types.of]] to let type information be determined automatically.
+  */
+@PublicEvolving
+object Types {
+
+  /**
+* Generates type information based on the given class and/or its type 
parameters.
+*
+* The definition is similar to a 
[[org.apache.flink.api.common.typeinfo.TypeHint]] but does
+* not require to implement anonymous classes.
+*
+* If the class could not be analyzed by the Scala type analyzer, the 
Java analyzer
+* will be used.
+*
+* Example use:
+*
+* `Types.of[(Int, String, String)]` for Scala tuples
+* `Types.of[Unit]` for Scala specific types
+*
+* @tparam T class to be analyzed
+*/
+  def of[T: TypeInformation]: TypeInformation[T] = {
+val typeInfo: TypeInformation[T] = implicitly[TypeInformation[T]]
+typeInfo
+  }
+
+  /**
+* Returns type information for Scala [[Nothing]]. Does not support a 
null value.
+*/
+  val NOTHING: TypeInformation[Nothing] = new ScalaNothingTypeInfo
+
+  /**
+* Returns type information for Scala [[Unit]]. Does not support a null 
value.
+*/
+  val UNIT: TypeInformation[Unit] = new UnitTypeInfo
+
+  /**
+* Returns type information for [[String]] and [[java.lang.String]]. 
Supports a null value.
+*/
+  val STRING: TypeInformation[String] = JTypes.STRING
+
+  /**
+* Returns type information for primitive [[Byte]] and 
[[java.lang.Byte]]. Does not
+* support a null value.
+*/
+  val BYTE: TypeInformation[java.lang.Byte] = JTypes.BYTE
+
+  /**
+* Returns type information for primitive [[Boolean]] and 
[[java.lang.Boolean]]. Does not
+* support a null value.
+*/
+  val BOOLEAN: TypeInformation[java.lang.Boolean] = JTypes.BOOLEAN
+
+  /**
+* Returns type information for primitive [[Short]] and 
[[java.lang.Short]]. Does not
+* support a null value.
+*/
+  val SHORT: TypeInformation[java.lang.Short] = JTypes.SHORT
+
+  /**
+* Returns type information for primitive [[Int]] and 
[[java.lang.Integer]]. Does not
+* support a null value.
+*/
+  val INT: TypeInformation[java.lang.Integer] = JTypes.INT
+
+  /**
+* Returns type information for primitive [[Long]] and 
[[java.lang.Long]]. Does not
+* support a null value.
+*/
+  val LONG: TypeInformation[java.lang.Long] = JTypes.LONG
+
+  /**
+* Returns type information for primitive [[Float]] and 
[[java.lang.Float]]. Does not

[GitHub] flink pull request #4612: [FLINK-7452] [types] Add helper methods for all bu...

2017-12-14 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r156951752
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java ---
@@ -19,56 +19,422 @@
 package org.apache.flink.api.common.typeinfo;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.EitherTypeInfo;
+import org.apache.flink.api.java.typeutils.EnumTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
+import org.apache.flink.types.Either;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.Value;
 
+import java.lang.reflect.Field;
 import java.math.BigDecimal;
+import java.math.BigInteger;
 import java.sql.Date;
 import java.sql.Time;
 import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 /**
- * This class gives access to the type information of the most most common 
types.
+ * This class gives access to the type information of the most common 
types for which Flink
+ * has built-in serializers and comparators.
+ *
+ * In many cases, Flink tries to analyze generic signatures of 
functions to determine return
+ * types automatically. This class is intended for cases where type 
information has to be
+ * supplied manually or would result in an inefficient type.
+ *
+ * Please note that the Scala API and Table API provide more 
specialized Types classes.
+ * (See org.apache.flink.api.scala.Types and 
org.apache.flink.table.api.Types)
+ *
+ * A more convenient alternative might be a {@link TypeHint}.
+ *
+ * @see TypeInformation#of(Class) specify type information based on a 
class that will be analyzed
+ * @see TypeInformation#of(TypeHint) specify type information based on a 
{@link TypeHint}
  */
 @PublicEvolving
 public class Types {
 
-   public static final BasicTypeInfo STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
-   public static final BasicTypeInfo BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
-   public static final BasicTypeInfo BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
-   public static final BasicTypeInfo SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
-   public static final BasicTypeInfo INT = 
BasicTypeInfo.INT_TYPE_INFO;
-   public static final BasicTypeInfo LONG = 
BasicTypeInfo.LONG_TYPE_INFO;
-   public static final BasicTypeInfo FLOAT = 
BasicTypeInfo.FLOAT_TYPE_INFO;
-   public static final BasicTypeInfo DOUBLE = 
BasicTypeInfo.DOUBLE_TYPE_INFO;
-   public static final BasicTypeInfo DECIMAL = 
BasicTypeInfo.BIG_DEC_TYPE_INFO;
+   /**
+* Returns type information for {@link java.lang.Void}. Does not 
support a null value.
+*/
+   public static final TypeInformation VOID = 
BasicTypeInfo.VOID_TYPE_INFO;
+
+   /**
+* Returns type information for {@link java.lang.String}. Supports a 
null value.
+*/
+   public static final TypeInformation STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive byte and 
{@link java.lang.Byte}.
+* Does not support a null value.
+*/
+   public static final TypeInformation BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive boolean 
and {@link java.lang.Boolean}.
+* Does not support a null value.
+*/
+   public static final TypeInformation BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive short and 
{@link java.lang.Short}.
+* Does not support a null value.
+*/
+   public static final TypeInformation SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive int and 
{@link java.lang.Integer}.
+* Does not support a null value.
+*/
+   public static final TypeInformation INT = 
BasicTypeInfo.INT_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive long and 
{@link java.lang.Long}.
+* Does not support a null 

[GitHub] flink pull request #4612: [FLINK-7452] [types] Add helper methods for all bu...

2017-12-14 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r156938437
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java ---
@@ -19,56 +19,422 @@
 package org.apache.flink.api.common.typeinfo;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.EitherTypeInfo;
+import org.apache.flink.api.java.typeutils.EnumTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
+import org.apache.flink.types.Either;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.Value;
 
+import java.lang.reflect.Field;
 import java.math.BigDecimal;
+import java.math.BigInteger;
 import java.sql.Date;
 import java.sql.Time;
 import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 /**
- * This class gives access to the type information of the most most common 
types.
+ * This class gives access to the type information of the most common 
types for which Flink
+ * has built-in serializers and comparators.
+ *
+ * In many cases, Flink tries to analyze generic signatures of 
functions to determine return
+ * types automatically. This class is intended for cases where type 
information has to be
+ * supplied manually or would result in an inefficient type.
+ *
+ * Please note that the Scala API and Table API provide more 
specialized Types classes.
--- End diff --

"provide more specialized" -> "have dedicated"?


---


[GitHub] flink pull request #4612: [FLINK-7452] [types] Add helper methods for all bu...

2017-12-14 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r156942641
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java ---
@@ -19,56 +19,422 @@
 package org.apache.flink.api.common.typeinfo;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.EitherTypeInfo;
+import org.apache.flink.api.java.typeutils.EnumTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
+import org.apache.flink.types.Either;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.Value;
 
+import java.lang.reflect.Field;
 import java.math.BigDecimal;
+import java.math.BigInteger;
 import java.sql.Date;
 import java.sql.Time;
 import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 /**
- * This class gives access to the type information of the most most common 
types.
+ * This class gives access to the type information of the most common 
types for which Flink
+ * has built-in serializers and comparators.
+ *
+ * In many cases, Flink tries to analyze generic signatures of 
functions to determine return
+ * types automatically. This class is intended for cases where type 
information has to be
+ * supplied manually or would result in an inefficient type.
+ *
+ * Please note that the Scala API and Table API provide more 
specialized Types classes.
+ * (See org.apache.flink.api.scala.Types and 
org.apache.flink.table.api.Types)
+ *
+ * A more convenient alternative might be a {@link TypeHint}.
+ *
+ * @see TypeInformation#of(Class) specify type information based on a 
class that will be analyzed
+ * @see TypeInformation#of(TypeHint) specify type information based on a 
{@link TypeHint}
  */
 @PublicEvolving
 public class Types {
 
-   public static final BasicTypeInfo STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
-   public static final BasicTypeInfo BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
-   public static final BasicTypeInfo BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
-   public static final BasicTypeInfo SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
-   public static final BasicTypeInfo INT = 
BasicTypeInfo.INT_TYPE_INFO;
-   public static final BasicTypeInfo LONG = 
BasicTypeInfo.LONG_TYPE_INFO;
-   public static final BasicTypeInfo FLOAT = 
BasicTypeInfo.FLOAT_TYPE_INFO;
-   public static final BasicTypeInfo DOUBLE = 
BasicTypeInfo.DOUBLE_TYPE_INFO;
-   public static final BasicTypeInfo DECIMAL = 
BasicTypeInfo.BIG_DEC_TYPE_INFO;
+   /**
+* Returns type information for {@link java.lang.Void}. Does not 
support a null value.
+*/
+   public static final TypeInformation VOID = 
BasicTypeInfo.VOID_TYPE_INFO;
+
+   /**
+* Returns type information for {@link java.lang.String}. Supports a 
null value.
+*/
+   public static final TypeInformation STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive byte and 
{@link java.lang.Byte}.
+* Does not support a null value.
+*/
+   public static final TypeInformation BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive boolean 
and {@link java.lang.Boolean}.
+* Does not support a null value.
+*/
+   public static final TypeInformation BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive short and 
{@link java.lang.Short}.
+* Does not support a null value.
+*/
+   public static final TypeInformation SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive int and 
{@link java.lang.Integer}.
+* Does not support a null value.
+*/
+   public static final TypeInformation INT = 
BasicTypeInfo.INT_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive long and 
{@link java.lang.Long}.
+* Does not support a null 

[jira] [Commented] (FLINK-7452) Add helper methods for all built-in Flink types to Types

2017-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291076#comment-16291076
 ] 

ASF GitHub Bot commented on FLINK-7452:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r156945371
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java ---
@@ -19,56 +19,408 @@
 package org.apache.flink.api.common.typeinfo;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.java.typeutils.EitherTypeInfo;
+import org.apache.flink.api.java.typeutils.EnumTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
 
-import java.math.BigDecimal;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 /**
- * This class gives access to the type information of the most most common 
types.
+ * This class gives access to the type information of the most common 
types for which Flink
+ * has built-in serializers and comparators.
+ *
+ * In many cases, Flink tries to analyze generic signatures of 
functions to determine return
+ * types automatically. This class is intended for cases where the 
extraction is not possible
+ * (or inefficient) as well as cases where type information has to be 
supplied manually.
+ *
+ * Depending on the API you are using (e.g. Scala API or Table API), 
there might be a more
+ * specialized Types class.
+ *
+ * A more convenient alternative might be a {@link TypeHint}.
+ *
+ * @see TypeInformation#of(Class) specify type information based on a 
class that will be analyzed
+ * @see TypeInformation#of(TypeHint) specify type information based on a 
{@link TypeHint}
  */
 @PublicEvolving
 public class Types {
 
-   public static final BasicTypeInfo STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
-   public static final BasicTypeInfo BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
-   public static final BasicTypeInfo BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
-   public static final BasicTypeInfo SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
-   public static final BasicTypeInfo INT = 
BasicTypeInfo.INT_TYPE_INFO;
-   public static final BasicTypeInfo LONG = 
BasicTypeInfo.LONG_TYPE_INFO;
-   public static final BasicTypeInfo FLOAT = 
BasicTypeInfo.FLOAT_TYPE_INFO;
-   public static final BasicTypeInfo DOUBLE = 
BasicTypeInfo.DOUBLE_TYPE_INFO;
-   public static final BasicTypeInfo DECIMAL = 
BasicTypeInfo.BIG_DEC_TYPE_INFO;
+   /**
+* Returns type information for {@link java.lang.Void}. Does not 
support a null value.
+*/
+   public static final TypeInformation VOID = 
BasicTypeInfo.VOID_TYPE_INFO;
+
+   /**
+* Returns type information for {@link java.lang.String}. Supports a 
null value.
+*/
+   public static final TypeInformation STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive byte and a
+* wrapped {@link java.lang.Byte}. Does not support a null value.
+*/
+   public static final TypeInformation BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive boolean 
and a
+* wrapped {@link java.lang.Boolean}. Does not support a null value.
+*/
+   public static final TypeInformation BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive short and 
a
+* wrapped {@link java.lang.Short}. Does not support a null value.
+*/
+   public static final TypeInformation SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
 
-   public static final SqlTimeTypeInfo SQL_DATE = 
SqlTimeTypeInfo.DATE;
-   public static final SqlTimeTypeInfo SQL_TIME = 
SqlTimeTypeInfo.TIME;
-   public static final SqlTimeTypeInfo SQL_TIMESTAMP = 
SqlTimeTypeInfo.TIMESTAMP;
+   /**
+* Returns type information for both a primitive int and a
+* wrapped 

[GitHub] flink pull request #4612: [FLINK-7452] [types] Add helper methods for all bu...

2017-12-14 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r156951434
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java ---
@@ -19,56 +19,422 @@
 package org.apache.flink.api.common.typeinfo;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.EitherTypeInfo;
+import org.apache.flink.api.java.typeutils.EnumTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
+import org.apache.flink.types.Either;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.Value;
 
+import java.lang.reflect.Field;
 import java.math.BigDecimal;
+import java.math.BigInteger;
 import java.sql.Date;
 import java.sql.Time;
 import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 /**
- * This class gives access to the type information of the most most common 
types.
+ * This class gives access to the type information of the most common 
types for which Flink
+ * has built-in serializers and comparators.
+ *
+ * In many cases, Flink tries to analyze generic signatures of 
functions to determine return
+ * types automatically. This class is intended for cases where type 
information has to be
+ * supplied manually or would result in an inefficient type.
+ *
+ * Please note that the Scala API and Table API provide more 
specialized Types classes.
+ * (See org.apache.flink.api.scala.Types and 
org.apache.flink.table.api.Types)
+ *
+ * A more convenient alternative might be a {@link TypeHint}.
+ *
+ * @see TypeInformation#of(Class) specify type information based on a 
class that will be analyzed
+ * @see TypeInformation#of(TypeHint) specify type information based on a 
{@link TypeHint}
  */
 @PublicEvolving
 public class Types {
 
-   public static final BasicTypeInfo STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
-   public static final BasicTypeInfo BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
-   public static final BasicTypeInfo BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
-   public static final BasicTypeInfo SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
-   public static final BasicTypeInfo INT = 
BasicTypeInfo.INT_TYPE_INFO;
-   public static final BasicTypeInfo LONG = 
BasicTypeInfo.LONG_TYPE_INFO;
-   public static final BasicTypeInfo FLOAT = 
BasicTypeInfo.FLOAT_TYPE_INFO;
-   public static final BasicTypeInfo DOUBLE = 
BasicTypeInfo.DOUBLE_TYPE_INFO;
-   public static final BasicTypeInfo DECIMAL = 
BasicTypeInfo.BIG_DEC_TYPE_INFO;
+   /**
+* Returns type information for {@link java.lang.Void}. Does not 
support a null value.
+*/
+   public static final TypeInformation VOID = 
BasicTypeInfo.VOID_TYPE_INFO;
+
+   /**
+* Returns type information for {@link java.lang.String}. Supports a 
null value.
+*/
+   public static final TypeInformation STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive byte and 
{@link java.lang.Byte}.
+* Does not support a null value.
+*/
+   public static final TypeInformation BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive boolean 
and {@link java.lang.Boolean}.
+* Does not support a null value.
+*/
+   public static final TypeInformation BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive short and 
{@link java.lang.Short}.
+* Does not support a null value.
+*/
+   public static final TypeInformation SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive int and 
{@link java.lang.Integer}.
+* Does not support a null value.
+*/
+   public static final TypeInformation INT = 
BasicTypeInfo.INT_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive long and 
{@link java.lang.Long}.
+* Does not support a null 

[GitHub] flink pull request #4612: [FLINK-7452] [types] Add helper methods for all bu...

2017-12-14 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r156949716
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java ---
@@ -19,56 +19,422 @@
 package org.apache.flink.api.common.typeinfo;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.EitherTypeInfo;
+import org.apache.flink.api.java.typeutils.EnumTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
+import org.apache.flink.types.Either;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.Value;
 
+import java.lang.reflect.Field;
 import java.math.BigDecimal;
+import java.math.BigInteger;
 import java.sql.Date;
 import java.sql.Time;
 import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 /**
- * This class gives access to the type information of the most most common 
types.
+ * This class gives access to the type information of the most common 
types for which Flink
+ * has built-in serializers and comparators.
+ *
+ * In many cases, Flink tries to analyze generic signatures of 
functions to determine return
+ * types automatically. This class is intended for cases where type 
information has to be
+ * supplied manually or would result in an inefficient type.
+ *
+ * Please note that the Scala API and Table API provide more 
specialized Types classes.
+ * (See org.apache.flink.api.scala.Types and 
org.apache.flink.table.api.Types)
+ *
+ * A more convenient alternative might be a {@link TypeHint}.
+ *
+ * @see TypeInformation#of(Class) specify type information based on a 
class that will be analyzed
+ * @see TypeInformation#of(TypeHint) specify type information based on a 
{@link TypeHint}
  */
 @PublicEvolving
 public class Types {
 
-   public static final BasicTypeInfo STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
-   public static final BasicTypeInfo BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
-   public static final BasicTypeInfo BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
-   public static final BasicTypeInfo SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
-   public static final BasicTypeInfo INT = 
BasicTypeInfo.INT_TYPE_INFO;
-   public static final BasicTypeInfo LONG = 
BasicTypeInfo.LONG_TYPE_INFO;
-   public static final BasicTypeInfo FLOAT = 
BasicTypeInfo.FLOAT_TYPE_INFO;
-   public static final BasicTypeInfo DOUBLE = 
BasicTypeInfo.DOUBLE_TYPE_INFO;
-   public static final BasicTypeInfo DECIMAL = 
BasicTypeInfo.BIG_DEC_TYPE_INFO;
+   /**
+* Returns type information for {@link java.lang.Void}. Does not 
support a null value.
+*/
+   public static final TypeInformation VOID = 
BasicTypeInfo.VOID_TYPE_INFO;
+
+   /**
+* Returns type information for {@link java.lang.String}. Supports a 
null value.
+*/
+   public static final TypeInformation STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive byte and 
{@link java.lang.Byte}.
+* Does not support a null value.
+*/
+   public static final TypeInformation BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive boolean 
and {@link java.lang.Boolean}.
+* Does not support a null value.
+*/
+   public static final TypeInformation BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive short and 
{@link java.lang.Short}.
+* Does not support a null value.
+*/
+   public static final TypeInformation SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive int and 
{@link java.lang.Integer}.
+* Does not support a null value.
+*/
+   public static final TypeInformation INT = 
BasicTypeInfo.INT_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive long and 
{@link java.lang.Long}.
+* Does not support a null 

[GitHub] flink pull request #4612: [FLINK-7452] [types] Add helper methods for all bu...

2017-12-14 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r156945371
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java ---
@@ -19,56 +19,408 @@
 package org.apache.flink.api.common.typeinfo;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.java.typeutils.EitherTypeInfo;
+import org.apache.flink.api.java.typeutils.EnumTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
 
-import java.math.BigDecimal;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 /**
- * This class gives access to the type information of the most most common 
types.
+ * This class gives access to the type information of the most common 
types for which Flink
+ * has built-in serializers and comparators.
+ *
+ * In many cases, Flink tries to analyze generic signatures of 
functions to determine return
+ * types automatically. This class is intended for cases where the 
extraction is not possible
+ * (or inefficient) as well as cases where type information has to be 
supplied manually.
+ *
+ * Depending on the API you are using (e.g. Scala API or Table API), 
there might be a more
+ * specialized Types class.
+ *
+ * A more convenient alternative might be a {@link TypeHint}.
+ *
+ * @see TypeInformation#of(Class) specify type information based on a 
class that will be analyzed
+ * @see TypeInformation#of(TypeHint) specify type information based on a 
{@link TypeHint}
  */
 @PublicEvolving
 public class Types {
 
-   public static final BasicTypeInfo STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
-   public static final BasicTypeInfo BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
-   public static final BasicTypeInfo BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
-   public static final BasicTypeInfo SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
-   public static final BasicTypeInfo INT = 
BasicTypeInfo.INT_TYPE_INFO;
-   public static final BasicTypeInfo LONG = 
BasicTypeInfo.LONG_TYPE_INFO;
-   public static final BasicTypeInfo FLOAT = 
BasicTypeInfo.FLOAT_TYPE_INFO;
-   public static final BasicTypeInfo DOUBLE = 
BasicTypeInfo.DOUBLE_TYPE_INFO;
-   public static final BasicTypeInfo DECIMAL = 
BasicTypeInfo.BIG_DEC_TYPE_INFO;
+   /**
+* Returns type information for {@link java.lang.Void}. Does not 
support a null value.
+*/
+   public static final TypeInformation VOID = 
BasicTypeInfo.VOID_TYPE_INFO;
+
+   /**
+* Returns type information for {@link java.lang.String}. Supports a 
null value.
+*/
+   public static final TypeInformation STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive byte and a
+* wrapped {@link java.lang.Byte}. Does not support a null value.
+*/
+   public static final TypeInformation BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive boolean 
and a
+* wrapped {@link java.lang.Boolean}. Does not support a null value.
+*/
+   public static final TypeInformation BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive short and 
a
+* wrapped {@link java.lang.Short}. Does not support a null value.
+*/
+   public static final TypeInformation SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
 
-   public static final SqlTimeTypeInfo SQL_DATE = 
SqlTimeTypeInfo.DATE;
-   public static final SqlTimeTypeInfo SQL_TIME = 
SqlTimeTypeInfo.TIME;
-   public static final SqlTimeTypeInfo SQL_TIMESTAMP = 
SqlTimeTypeInfo.TIMESTAMP;
+   /**
+* Returns type information for both a primitive int and a
+* wrapped {@link java.lang.Integer}. Does not support a null value.
+*/
+   public static final TypeInformation INT = 
BasicTypeInfo.INT_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive long and a
+   

[GitHub] flink pull request #4612: [FLINK-7452] [types] Add helper methods for all bu...

2017-12-14 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r156955846
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala
 ---
@@ -25,55 +27,125 @@ import org.apache.flink.types.Row
 import _root_.scala.annotation.varargs
 
 /**
-  * This class enumerates all supported types of the Table API.
+  * This class enumerates all supported types of the Table API & SQL.
   */
 object Types {
 
-  val STRING = JTypes.STRING
-  val BOOLEAN = JTypes.BOOLEAN
+  /**
+* Returns type information for a Table API string or SQL VARCHAR type.
+*/
+  val STRING: TypeInformation[String] = JTypes.STRING
+
+  /**
+* Returns type information for a Table API boolean or SQL BOOLEAN type.
+*/
+  val BOOLEAN: TypeInformation[lang.Boolean] = JTypes.BOOLEAN
+
+  /**
+* Returns type information for a Table API byte or SQL TINYINT type.
+*/
+  val BYTE: TypeInformation[lang.Byte] = JTypes.BYTE
+
+  /**
+* Returns type information for a Table API short or SQL SMALLINT type.
+*/
+  val SHORT: TypeInformation[lang.Short] = JTypes.SHORT
+
+  /**
+* Returns type information for a Table API integer or SQL INT/INTEGER 
type.
+*/
+  val INT: TypeInformation[lang.Integer] = JTypes.INT
 
-  val BYTE = JTypes.BYTE
-  val SHORT = JTypes.SHORT
-  val INT = JTypes.INT
-  val LONG = JTypes.LONG
-  val FLOAT = JTypes.FLOAT
-  val DOUBLE = JTypes.DOUBLE
-  val DECIMAL = JTypes.DECIMAL
+  /**
+* Returns type information for a Table API long or SQL BIGINT type.
+*/
+  val LONG: TypeInformation[lang.Long] = JTypes.LONG
 
-  val SQL_DATE = JTypes.SQL_DATE
-  val SQL_TIME = JTypes.SQL_TIME
-  val SQL_TIMESTAMP = JTypes.SQL_TIMESTAMP
-  val INTERVAL_MONTHS = TimeIntervalTypeInfo.INTERVAL_MONTHS
-  val INTERVAL_MILLIS = TimeIntervalTypeInfo.INTERVAL_MILLIS
+  /**
+* Returns type information for a Table API float or SQL FLOAT/REAL 
type.
+*/
+  val FLOAT: TypeInformation[lang.Float] = JTypes.FLOAT
+
+  /**
+* Returns type information for a Table API integer or SQL DOUBLE type.
+*/
+  val DOUBLE: TypeInformation[lang.Double] = JTypes.DOUBLE
 
   /**
-* Generates row type information.
+* Returns type information for a Table API big decimal or SQL DECIMAL 
type.
+*/
+  val DECIMAL: TypeInformation[math.BigDecimal] = JTypes.BIG_DEC
+
+  /**
+* Returns type information for a Table API SQL date or SQL DATE type.
+*/
+  val SQL_DATE: TypeInformation[sql.Date] = JTypes.SQL_DATE
--- End diff --

The [Table API 
docs](https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/table/tableApi.html#data-types)
 and [SQL 
docs](https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/table/sql.html#data-types)
 need to be updated for `SQL_DATE`, `SQL_TIME`, and `SQL_TIMESTAMP`.


---


[jira] [Commented] (FLINK-7452) Add helper methods for all built-in Flink types to Types

2017-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291064#comment-16291064
 ] 

ASF GitHub Bot commented on FLINK-7452:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r156938239
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java ---
@@ -19,56 +19,417 @@
 package org.apache.flink.api.common.typeinfo;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.java.typeutils.EitherTypeInfo;
+import org.apache.flink.api.java.typeutils.EnumTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
 
-import java.math.BigDecimal;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 /**
- * This class gives access to the type information of the most most common 
types.
+ * This class gives access to the type information of the most common 
types for which Flink
+ * has built-in serializers and comparators.
+ *
+ * In many cases, Flink tries to analyze generic signatures of 
functions to determine return
+ * types automatically. This class is intended for cases where type 
information has to be
+ * supplied manually or would result in an inefficient type.
--- End diff --

rephrase as suggested?


> Add helper methods for all built-in Flink types to Types
> 
>
> Key: FLINK-7452
> URL: https://issues.apache.org/jira/browse/FLINK-7452
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> Sometimes it is very difficult to provide `TypeInformation` manually, in case 
> some extraction fails or is not available. {{TypeHint}}s should be the 
> preferred way but this methods can ensure correct types.
> I propose to add all built-in Flink types to the {{Types}}. Such as:
> {code}
> Types.POJO(MyPojo.class)
> Types.POJO(Map)
> Types.GENERIC(Object.class)
> Types.TUPLE(TypeInformation, ...)
> Types.MAP(TypeInformation, TypeInformation)
> {code}
> The methods should validate that the returned type is exactly the requested 
> type. And especially in case of POJO should help creating  {{PojoTypeInfo}}.
> Once this is in place, we can deprecate the {{TypeInfoParser}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7452) Add helper methods for all built-in Flink types to Types

2017-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291075#comment-16291075
 ] 

ASF GitHub Bot commented on FLINK-7452:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r156937819
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/Types.scala ---
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.typeutils
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.functions.InvalidTypesException
+import org.apache.flink.api.common.typeinfo.{TypeInformation, Types => 
JTypes}
+import org.apache.flink.types.Row
+
+import _root_.scala.collection.JavaConverters._
+import _root_.scala.util.{Either, Try}
+
+/**
+  * This class gives access to the type information of the most common 
Scala types for which Flink
+  * has built-in serializers and comparators.
+  *
+  * This class contains types of 
[[org.apache.flink.api.common.typeinfo.Types]] and adds
+  * types for Scala specific classes (such as [[Unit]] or case classes).
+  *
+  * In many cases, Flink tries to analyze generic signatures of functions 
to determine return
+  * types automatically. This class is intended for cases where type 
information has to be
+  * supplied manually or would result in an inefficient type.
+  *
+  * Scala macros allow to determine type information of classes and type 
parameters. You can
+  * use [[Types.of]] to let type information be determined automatically.
+  */
+@PublicEvolving
+object Types {
+
+  /**
+* Generates type information based on the given class and/or its type 
parameters.
+*
+* The definition is similar to a 
[[org.apache.flink.api.common.typeinfo.TypeHint]] but does
+* not require to implement anonymous classes.
+*
+* If the class could not be analyzed by the Scala type analyzer, the 
Java analyzer
+* will be used.
+*
+* Example use:
+*
+* `Types.of[(Int, String, String)]` for Scala tuples
+* `Types.of[Unit]` for Scala specific types
+*
+* @tparam T class to be analyzed
+*/
+  def of[T: TypeInformation]: TypeInformation[T] = {
+val typeInfo: TypeInformation[T] = implicitly[TypeInformation[T]]
+typeInfo
+  }
+
+  /**
+* Returns type information for Scala [[Nothing]]. Does not support a 
null value.
+*/
+  val NOTHING: TypeInformation[Nothing] = new ScalaNothingTypeInfo
+
+  /**
+* Returns type information for Scala [[Unit]]. Does not support a null 
value.
+*/
+  val UNIT: TypeInformation[Unit] = new UnitTypeInfo
+
+  /**
+* Returns type information for [[String]] and [[java.lang.String]]. 
Supports a null value.
+*/
+  val STRING: TypeInformation[String] = JTypes.STRING
+
+  /**
+* Returns type information for primitive [[Byte]] and 
[[java.lang.Byte]]. Does not
+* support a null value.
+*/
+  val BYTE: TypeInformation[java.lang.Byte] = JTypes.BYTE
+
+  /**
+* Returns type information for primitive [[Boolean]] and 
[[java.lang.Boolean]]. Does not
+* support a null value.
+*/
+  val BOOLEAN: TypeInformation[java.lang.Boolean] = JTypes.BOOLEAN
+
+  /**
+* Returns type information for primitive [[Short]] and 
[[java.lang.Short]]. Does not
+* support a null value.
+*/
+  val SHORT: TypeInformation[java.lang.Short] = JTypes.SHORT
+
+  /**
+* Returns type information for primitive [[Int]] and 
[[java.lang.Integer]]. Does not
+* support a null value.
+*/
+  val INT: TypeInformation[java.lang.Integer] = JTypes.INT
+
+  /**
+* Returns type information for primitive [[Long]] and 

[jira] [Commented] (FLINK-7452) Add helper methods for all built-in Flink types to Types

2017-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291081#comment-16291081
 ] 

ASF GitHub Bot commented on FLINK-7452:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r156949716
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java ---
@@ -19,56 +19,422 @@
 package org.apache.flink.api.common.typeinfo;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.EitherTypeInfo;
+import org.apache.flink.api.java.typeutils.EnumTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
+import org.apache.flink.types.Either;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.Value;
 
+import java.lang.reflect.Field;
 import java.math.BigDecimal;
+import java.math.BigInteger;
 import java.sql.Date;
 import java.sql.Time;
 import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 /**
- * This class gives access to the type information of the most most common 
types.
+ * This class gives access to the type information of the most common 
types for which Flink
+ * has built-in serializers and comparators.
+ *
+ * In many cases, Flink tries to analyze generic signatures of 
functions to determine return
+ * types automatically. This class is intended for cases where type 
information has to be
+ * supplied manually or would result in an inefficient type.
+ *
+ * Please note that the Scala API and Table API provide more 
specialized Types classes.
+ * (See org.apache.flink.api.scala.Types and 
org.apache.flink.table.api.Types)
+ *
+ * A more convenient alternative might be a {@link TypeHint}.
+ *
+ * @see TypeInformation#of(Class) specify type information based on a 
class that will be analyzed
+ * @see TypeInformation#of(TypeHint) specify type information based on a 
{@link TypeHint}
  */
 @PublicEvolving
 public class Types {
 
-   public static final BasicTypeInfo STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
-   public static final BasicTypeInfo BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
-   public static final BasicTypeInfo BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
-   public static final BasicTypeInfo SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
-   public static final BasicTypeInfo INT = 
BasicTypeInfo.INT_TYPE_INFO;
-   public static final BasicTypeInfo LONG = 
BasicTypeInfo.LONG_TYPE_INFO;
-   public static final BasicTypeInfo FLOAT = 
BasicTypeInfo.FLOAT_TYPE_INFO;
-   public static final BasicTypeInfo DOUBLE = 
BasicTypeInfo.DOUBLE_TYPE_INFO;
-   public static final BasicTypeInfo DECIMAL = 
BasicTypeInfo.BIG_DEC_TYPE_INFO;
+   /**
+* Returns type information for {@link java.lang.Void}. Does not 
support a null value.
+*/
+   public static final TypeInformation VOID = 
BasicTypeInfo.VOID_TYPE_INFO;
+
+   /**
+* Returns type information for {@link java.lang.String}. Supports a 
null value.
+*/
+   public static final TypeInformation STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive byte and 
{@link java.lang.Byte}.
+* Does not support a null value.
+*/
+   public static final TypeInformation BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive boolean 
and {@link java.lang.Boolean}.
+* Does not support a null value.
+*/
+   public static final TypeInformation BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive short and 
{@link java.lang.Short}.
+* Does not support a null value.
+*/
+   public static final TypeInformation SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive int and 
{@link java.lang.Integer}.
+* Does not support a null value.
   

[jira] [Commented] (FLINK-7452) Add helper methods for all built-in Flink types to Types

2017-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291073#comment-16291073
 ] 

ASF GitHub Bot commented on FLINK-7452:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r156951434
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java ---
@@ -19,56 +19,422 @@
 package org.apache.flink.api.common.typeinfo;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.EitherTypeInfo;
+import org.apache.flink.api.java.typeutils.EnumTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
+import org.apache.flink.types.Either;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.Value;
 
+import java.lang.reflect.Field;
 import java.math.BigDecimal;
+import java.math.BigInteger;
 import java.sql.Date;
 import java.sql.Time;
 import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 /**
- * This class gives access to the type information of the most most common 
types.
+ * This class gives access to the type information of the most common 
types for which Flink
+ * has built-in serializers and comparators.
+ *
+ * In many cases, Flink tries to analyze generic signatures of 
functions to determine return
+ * types automatically. This class is intended for cases where type 
information has to be
+ * supplied manually or would result in an inefficient type.
+ *
+ * Please note that the Scala API and Table API provide more 
specialized Types classes.
+ * (See org.apache.flink.api.scala.Types and 
org.apache.flink.table.api.Types)
+ *
+ * A more convenient alternative might be a {@link TypeHint}.
+ *
+ * @see TypeInformation#of(Class) specify type information based on a 
class that will be analyzed
+ * @see TypeInformation#of(TypeHint) specify type information based on a 
{@link TypeHint}
  */
 @PublicEvolving
 public class Types {
 
-   public static final BasicTypeInfo STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
-   public static final BasicTypeInfo BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
-   public static final BasicTypeInfo BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
-   public static final BasicTypeInfo SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
-   public static final BasicTypeInfo INT = 
BasicTypeInfo.INT_TYPE_INFO;
-   public static final BasicTypeInfo LONG = 
BasicTypeInfo.LONG_TYPE_INFO;
-   public static final BasicTypeInfo FLOAT = 
BasicTypeInfo.FLOAT_TYPE_INFO;
-   public static final BasicTypeInfo DOUBLE = 
BasicTypeInfo.DOUBLE_TYPE_INFO;
-   public static final BasicTypeInfo DECIMAL = 
BasicTypeInfo.BIG_DEC_TYPE_INFO;
+   /**
+* Returns type information for {@link java.lang.Void}. Does not 
support a null value.
+*/
+   public static final TypeInformation VOID = 
BasicTypeInfo.VOID_TYPE_INFO;
+
+   /**
+* Returns type information for {@link java.lang.String}. Supports a 
null value.
+*/
+   public static final TypeInformation STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive byte and 
{@link java.lang.Byte}.
+* Does not support a null value.
+*/
+   public static final TypeInformation BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive boolean 
and {@link java.lang.Boolean}.
+* Does not support a null value.
+*/
+   public static final TypeInformation BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive short and 
{@link java.lang.Short}.
+* Does not support a null value.
+*/
+   public static final TypeInformation SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive int and 
{@link java.lang.Integer}.
+* Does not support a null value.
   

[GitHub] flink pull request #4612: [FLINK-7452] [types] Add helper methods for all bu...

2017-12-14 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r156940963
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java ---
@@ -19,56 +19,422 @@
 package org.apache.flink.api.common.typeinfo;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.EitherTypeInfo;
+import org.apache.flink.api.java.typeutils.EnumTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
+import org.apache.flink.types.Either;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.Value;
 
+import java.lang.reflect.Field;
 import java.math.BigDecimal;
+import java.math.BigInteger;
 import java.sql.Date;
 import java.sql.Time;
 import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 /**
- * This class gives access to the type information of the most most common 
types.
+ * This class gives access to the type information of the most common 
types for which Flink
+ * has built-in serializers and comparators.
+ *
+ * In many cases, Flink tries to analyze generic signatures of 
functions to determine return
+ * types automatically. This class is intended for cases where type 
information has to be
+ * supplied manually or would result in an inefficient type.
+ *
+ * Please note that the Scala API and Table API provide more 
specialized Types classes.
+ * (See org.apache.flink.api.scala.Types and 
org.apache.flink.table.api.Types)
+ *
+ * A more convenient alternative might be a {@link TypeHint}.
+ *
+ * @see TypeInformation#of(Class) specify type information based on a 
class that will be analyzed
+ * @see TypeInformation#of(TypeHint) specify type information based on a 
{@link TypeHint}
  */
 @PublicEvolving
 public class Types {
 
-   public static final BasicTypeInfo STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
-   public static final BasicTypeInfo BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
-   public static final BasicTypeInfo BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
-   public static final BasicTypeInfo SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
-   public static final BasicTypeInfo INT = 
BasicTypeInfo.INT_TYPE_INFO;
-   public static final BasicTypeInfo LONG = 
BasicTypeInfo.LONG_TYPE_INFO;
-   public static final BasicTypeInfo FLOAT = 
BasicTypeInfo.FLOAT_TYPE_INFO;
-   public static final BasicTypeInfo DOUBLE = 
BasicTypeInfo.DOUBLE_TYPE_INFO;
-   public static final BasicTypeInfo DECIMAL = 
BasicTypeInfo.BIG_DEC_TYPE_INFO;
+   /**
+* Returns type information for {@link java.lang.Void}. Does not 
support a null value.
+*/
+   public static final TypeInformation VOID = 
BasicTypeInfo.VOID_TYPE_INFO;
+
+   /**
+* Returns type information for {@link java.lang.String}. Supports a 
null value.
+*/
+   public static final TypeInformation STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive byte and 
{@link java.lang.Byte}.
+* Does not support a null value.
+*/
+   public static final TypeInformation BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive boolean 
and {@link java.lang.Boolean}.
+* Does not support a null value.
+*/
+   public static final TypeInformation BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive short and 
{@link java.lang.Short}.
+* Does not support a null value.
+*/
+   public static final TypeInformation SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive int and 
{@link java.lang.Integer}.
+* Does not support a null value.
+*/
+   public static final TypeInformation INT = 
BasicTypeInfo.INT_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive long and 
{@link java.lang.Long}.
+* Does not support a null 

[jira] [Commented] (FLINK-7452) Add helper methods for all built-in Flink types to Types

2017-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291065#comment-16291065
 ] 

ASF GitHub Bot commented on FLINK-7452:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r156940309
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala
 ---
@@ -25,55 +27,125 @@ import org.apache.flink.types.Row
 import _root_.scala.annotation.varargs
 
 /**
-  * This class enumerates all supported types of the Table API.
+  * This class enumerates all supported types of the Table API & SQL.
   */
 object Types {
 
-  val STRING = JTypes.STRING
-  val BOOLEAN = JTypes.BOOLEAN
+  /**
+* Returns type information for a Table API string or SQL VARCHAR type.
+*/
+  val STRING: TypeInformation[String] = JTypes.STRING
+
+  /**
+* Returns type information for a Table API boolean or SQL BOOLEAN type.
+*/
+  val BOOLEAN: TypeInformation[lang.Boolean] = JTypes.BOOLEAN
+
+  /**
+* Returns type information for a Table API byte or SQL TINYINT type.
+*/
+  val BYTE: TypeInformation[lang.Byte] = JTypes.BYTE
+
+  /**
+* Returns type information for a Table API short or SQL SMALLINT type.
+*/
+  val SHORT: TypeInformation[lang.Short] = JTypes.SHORT
+
+  /**
+* Returns type information for a Table API integer or SQL INT/INTEGER 
type.
+*/
+  val INT: TypeInformation[lang.Integer] = JTypes.INT
 
-  val BYTE = JTypes.BYTE
-  val SHORT = JTypes.SHORT
-  val INT = JTypes.INT
-  val LONG = JTypes.LONG
-  val FLOAT = JTypes.FLOAT
-  val DOUBLE = JTypes.DOUBLE
-  val DECIMAL = JTypes.DECIMAL
+  /**
+* Returns type information for a Table API long or SQL BIGINT type.
+*/
+  val LONG: TypeInformation[lang.Long] = JTypes.LONG
 
-  val SQL_DATE = JTypes.SQL_DATE
-  val SQL_TIME = JTypes.SQL_TIME
-  val SQL_TIMESTAMP = JTypes.SQL_TIMESTAMP
-  val INTERVAL_MONTHS = TimeIntervalTypeInfo.INTERVAL_MONTHS
-  val INTERVAL_MILLIS = TimeIntervalTypeInfo.INTERVAL_MILLIS
+  /**
+* Returns type information for a Table API float or SQL FLOAT/REAL 
type.
+*/
+  val FLOAT: TypeInformation[lang.Float] = JTypes.FLOAT
+
+  /**
+* Returns type information for a Table API integer or SQL DOUBLE type.
+*/
+  val DOUBLE: TypeInformation[lang.Double] = JTypes.DOUBLE
 
   /**
-* Generates row type information.
+* Returns type information for a Table API big decimal or SQL DECIMAL 
type.
+*/
+  val DECIMAL: TypeInformation[math.BigDecimal] = JTypes.BIG_DEC
+
+  /**
+* Returns type information for a Table API SQL date or SQL DATE type.
+*/
+  val SQL_DATE: TypeInformation[sql.Date] = JTypes.SQL_DATE
+
+  /**
+* Returns type information for a Table API SQL time or SQL TIME type.
+*/
+  val SQL_TIME: TypeInformation[sql.Time] = JTypes.SQL_TIME
+
+  /**
+* Returns type information for a Table API SQL timestamp or SQL 
TIMESTAMP type.
+*/
+  val SQL_TIMESTAMP: TypeInformation[sql.Timestamp] = JTypes.SQL_TIMESTAMP
+
+  /**
+* Returns type information for a Table API interval of months.
+*/
+  val INTERVAL_MONTHS: TypeInformation[lang.Integer] = 
TimeIntervalTypeInfo.INTERVAL_MONTHS
+
+  /**
+* Returns type information for a Table API interval milliseconds.
+*/
+  val INTERVAL_MILLIS: TypeInformation[lang.Long] = 
TimeIntervalTypeInfo.INTERVAL_MILLIS
+
+  /**
+* Returns type information for [[org.apache.flink.types.Row]] with 
fields of the given types.
+*
+* A row is a variable-length, null-aware composite type for storing 
multiple values in a
+* deterministic field order. Every field can be null independent of 
the field's type.
+* The type of row fields cannot be automatically inferred; therefore, 
it is required to pass
+* type information whenever a row is used.
 *
-* A row type consists of zero or more fields with a field name and a 
corresponding type.
+* The schema of rows can have up to Integer.MAX_VALUE 
fields, however, all row instances
+* must have the same length otherwise serialization fails or 
information is lost.
 *
-* The fields have the default names (f0, f1, f2 ..).
+* This method generates type information with fields of the given 
types; the fields have
+* the default names (f0, f1, f2 ..).
 *
-* @param types types of row fields; e.g. Types.STRING, Types.INT
+* @param types The types of the row fields, e.g., 

[jira] [Commented] (FLINK-7452) Add helper methods for all built-in Flink types to Types

2017-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291067#comment-16291067
 ] 

ASF GitHub Bot commented on FLINK-7452:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r156937828
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/Types.scala ---
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.typeutils
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.functions.InvalidTypesException
+import org.apache.flink.api.common.typeinfo.{TypeInformation, Types => 
JTypes}
+import org.apache.flink.types.Row
+
+import _root_.scala.collection.JavaConverters._
+import _root_.scala.util.{Either, Try}
+
+/**
+  * This class gives access to the type information of the most common 
Scala types for which Flink
+  * has built-in serializers and comparators.
+  *
+  * This class contains types of 
[[org.apache.flink.api.common.typeinfo.Types]] and adds
+  * types for Scala specific classes (such as [[Unit]] or case classes).
+  *
+  * In many cases, Flink tries to analyze generic signatures of functions 
to determine return
+  * types automatically. This class is intended for cases where type 
information has to be
+  * supplied manually or would result in an inefficient type.
+  *
+  * Scala macros allow to determine type information of classes and type 
parameters. You can
+  * use [[Types.of]] to let type information be determined automatically.
+  */
+@PublicEvolving
+object Types {
+
+  /**
+* Generates type information based on the given class and/or its type 
parameters.
+*
+* The definition is similar to a 
[[org.apache.flink.api.common.typeinfo.TypeHint]] but does
+* not require to implement anonymous classes.
+*
+* If the class could not be analyzed by the Scala type analyzer, the 
Java analyzer
+* will be used.
+*
+* Example use:
+*
+* `Types.of[(Int, String, String)]` for Scala tuples
+* `Types.of[Unit]` for Scala specific types
+*
+* @tparam T class to be analyzed
+*/
+  def of[T: TypeInformation]: TypeInformation[T] = {
+val typeInfo: TypeInformation[T] = implicitly[TypeInformation[T]]
+typeInfo
+  }
+
+  /**
+* Returns type information for Scala [[Nothing]]. Does not support a 
null value.
+*/
+  val NOTHING: TypeInformation[Nothing] = new ScalaNothingTypeInfo
+
+  /**
+* Returns type information for Scala [[Unit]]. Does not support a null 
value.
+*/
+  val UNIT: TypeInformation[Unit] = new UnitTypeInfo
+
+  /**
+* Returns type information for [[String]] and [[java.lang.String]]. 
Supports a null value.
+*/
+  val STRING: TypeInformation[String] = JTypes.STRING
+
+  /**
+* Returns type information for primitive [[Byte]] and 
[[java.lang.Byte]]. Does not
+* support a null value.
+*/
+  val BYTE: TypeInformation[java.lang.Byte] = JTypes.BYTE
+
+  /**
+* Returns type information for primitive [[Boolean]] and 
[[java.lang.Boolean]]. Does not
+* support a null value.
+*/
+  val BOOLEAN: TypeInformation[java.lang.Boolean] = JTypes.BOOLEAN
+
+  /**
+* Returns type information for primitive [[Short]] and 
[[java.lang.Short]]. Does not
+* support a null value.
+*/
+  val SHORT: TypeInformation[java.lang.Short] = JTypes.SHORT
+
+  /**
+* Returns type information for primitive [[Int]] and 
[[java.lang.Integer]]. Does not
+* support a null value.
+*/
+  val INT: TypeInformation[java.lang.Integer] = JTypes.INT
+
+  /**
+* Returns type information for primitive [[Long]] and 

[jira] [Commented] (FLINK-7452) Add helper methods for all built-in Flink types to Types

2017-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291078#comment-16291078
 ] 

ASF GitHub Bot commented on FLINK-7452:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r156956706
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala
 ---
@@ -116,7 +191,7 @@ object Types {
 *
--- End diff --

Add information about how a Multiset is represented and the nullability of 
the set and its entries?


> Add helper methods for all built-in Flink types to Types
> 
>
> Key: FLINK-7452
> URL: https://issues.apache.org/jira/browse/FLINK-7452
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> Sometimes it is very difficult to provide `TypeInformation` manually, in case 
> some extraction fails or is not available. {{TypeHint}}s should be the 
> preferred way but this methods can ensure correct types.
> I propose to add all built-in Flink types to the {{Types}}. Such as:
> {code}
> Types.POJO(MyPojo.class)
> Types.POJO(Map)
> Types.GENERIC(Object.class)
> Types.TUPLE(TypeInformation, ...)
> Types.MAP(TypeInformation, TypeInformation)
> {code}
> The methods should validate that the returned type is exactly the requested 
> type. And especially in case of POJO should help creating  {{PojoTypeInfo}}.
> Once this is in place, we can deprecate the {{TypeInfoParser}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4612: [FLINK-7452] [types] Add helper methods for all bu...

2017-12-14 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r156956706
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala
 ---
@@ -116,7 +191,7 @@ object Types {
 *
--- End diff --

Add information about how a Multiset is represented and the nullability of 
the set and its entries?


---


[jira] [Commented] (FLINK-7452) Add helper methods for all built-in Flink types to Types

2017-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291063#comment-16291063
 ] 

ASF GitHub Bot commented on FLINK-7452:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r156940557
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/CorrelateStringExpressionTest.scala
 ---
@@ -34,7 +34,7 @@ class CorrelateStringExpressionTest extends TableTestBase 
{
 
 val util = streamTestUtil()
 val sTab = util.addTable[(Int, Long, String)]('a, 'b, 'c)
-val typeInfo = new RowTypeInfo(Seq(Types.INT, Types.LONG, 
Types.STRING): _*)
+val typeInfo = new RowTypeInfo(Seq(typeutils.Types.INT, 
typeutils.Types.LONG, typeutils.Types.STRING): _*)
--- End diff --

line exceeds 100 characters.


> Add helper methods for all built-in Flink types to Types
> 
>
> Key: FLINK-7452
> URL: https://issues.apache.org/jira/browse/FLINK-7452
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> Sometimes it is very difficult to provide `TypeInformation` manually, in case 
> some extraction fails or is not available. {{TypeHint}}s should be the 
> preferred way but this methods can ensure correct types.
> I propose to add all built-in Flink types to the {{Types}}. Such as:
> {code}
> Types.POJO(MyPojo.class)
> Types.POJO(Map)
> Types.GENERIC(Object.class)
> Types.TUPLE(TypeInformation, ...)
> Types.MAP(TypeInformation, TypeInformation)
> {code}
> The methods should validate that the returned type is exactly the requested 
> type. And especially in case of POJO should help creating  {{PojoTypeInfo}}.
> Once this is in place, we can deprecate the {{TypeInfoParser}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7452) Add helper methods for all built-in Flink types to Types

2017-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291070#comment-16291070
 ] 

ASF GitHub Bot commented on FLINK-7452:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r156956540
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala
 ---
@@ -107,7 +180,9 @@ object Types {
 * @param keyType type of the keys of the map e.g. Types.STRING
--- End diff --

Add information about nullability of Map and entries?


> Add helper methods for all built-in Flink types to Types
> 
>
> Key: FLINK-7452
> URL: https://issues.apache.org/jira/browse/FLINK-7452
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> Sometimes it is very difficult to provide `TypeInformation` manually, in case 
> some extraction fails or is not available. {{TypeHint}}s should be the 
> preferred way but this methods can ensure correct types.
> I propose to add all built-in Flink types to the {{Types}}. Such as:
> {code}
> Types.POJO(MyPojo.class)
> Types.POJO(Map)
> Types.GENERIC(Object.class)
> Types.TUPLE(TypeInformation, ...)
> Types.MAP(TypeInformation, TypeInformation)
> {code}
> The methods should validate that the returned type is exactly the requested 
> type. And especially in case of POJO should help creating  {{PojoTypeInfo}}.
> Once this is in place, we can deprecate the {{TypeInfoParser}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7452) Add helper methods for all built-in Flink types to Types

2017-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291072#comment-16291072
 ] 

ASF GitHub Bot commented on FLINK-7452:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r156940963
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java ---
@@ -19,56 +19,422 @@
 package org.apache.flink.api.common.typeinfo;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.EitherTypeInfo;
+import org.apache.flink.api.java.typeutils.EnumTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
+import org.apache.flink.types.Either;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.Value;
 
+import java.lang.reflect.Field;
 import java.math.BigDecimal;
+import java.math.BigInteger;
 import java.sql.Date;
 import java.sql.Time;
 import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 /**
- * This class gives access to the type information of the most most common 
types.
+ * This class gives access to the type information of the most common 
types for which Flink
+ * has built-in serializers and comparators.
+ *
+ * In many cases, Flink tries to analyze generic signatures of 
functions to determine return
+ * types automatically. This class is intended for cases where type 
information has to be
+ * supplied manually or would result in an inefficient type.
+ *
+ * Please note that the Scala API and Table API provide more 
specialized Types classes.
+ * (See org.apache.flink.api.scala.Types and 
org.apache.flink.table.api.Types)
+ *
+ * A more convenient alternative might be a {@link TypeHint}.
+ *
+ * @see TypeInformation#of(Class) specify type information based on a 
class that will be analyzed
+ * @see TypeInformation#of(TypeHint) specify type information based on a 
{@link TypeHint}
  */
 @PublicEvolving
 public class Types {
 
-   public static final BasicTypeInfo STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
-   public static final BasicTypeInfo BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
-   public static final BasicTypeInfo BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
-   public static final BasicTypeInfo SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
-   public static final BasicTypeInfo INT = 
BasicTypeInfo.INT_TYPE_INFO;
-   public static final BasicTypeInfo LONG = 
BasicTypeInfo.LONG_TYPE_INFO;
-   public static final BasicTypeInfo FLOAT = 
BasicTypeInfo.FLOAT_TYPE_INFO;
-   public static final BasicTypeInfo DOUBLE = 
BasicTypeInfo.DOUBLE_TYPE_INFO;
-   public static final BasicTypeInfo DECIMAL = 
BasicTypeInfo.BIG_DEC_TYPE_INFO;
+   /**
+* Returns type information for {@link java.lang.Void}. Does not 
support a null value.
+*/
+   public static final TypeInformation VOID = 
BasicTypeInfo.VOID_TYPE_INFO;
+
+   /**
+* Returns type information for {@link java.lang.String}. Supports a 
null value.
+*/
+   public static final TypeInformation STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive byte and 
{@link java.lang.Byte}.
+* Does not support a null value.
+*/
+   public static final TypeInformation BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive boolean 
and {@link java.lang.Boolean}.
+* Does not support a null value.
+*/
+   public static final TypeInformation BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive short and 
{@link java.lang.Short}.
+* Does not support a null value.
+*/
+   public static final TypeInformation SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive int and 
{@link java.lang.Integer}.
+* Does not support a null value.
   

[jira] [Commented] (FLINK-7452) Add helper methods for all built-in Flink types to Types

2017-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291079#comment-16291079
 ] 

ASF GitHub Bot commented on FLINK-7452:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r156955846
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala
 ---
@@ -25,55 +27,125 @@ import org.apache.flink.types.Row
 import _root_.scala.annotation.varargs
 
 /**
-  * This class enumerates all supported types of the Table API.
+  * This class enumerates all supported types of the Table API & SQL.
   */
 object Types {
 
-  val STRING = JTypes.STRING
-  val BOOLEAN = JTypes.BOOLEAN
+  /**
+* Returns type information for a Table API string or SQL VARCHAR type.
+*/
+  val STRING: TypeInformation[String] = JTypes.STRING
+
+  /**
+* Returns type information for a Table API boolean or SQL BOOLEAN type.
+*/
+  val BOOLEAN: TypeInformation[lang.Boolean] = JTypes.BOOLEAN
+
+  /**
+* Returns type information for a Table API byte or SQL TINYINT type.
+*/
+  val BYTE: TypeInformation[lang.Byte] = JTypes.BYTE
+
+  /**
+* Returns type information for a Table API short or SQL SMALLINT type.
+*/
+  val SHORT: TypeInformation[lang.Short] = JTypes.SHORT
+
+  /**
+* Returns type information for a Table API integer or SQL INT/INTEGER 
type.
+*/
+  val INT: TypeInformation[lang.Integer] = JTypes.INT
 
-  val BYTE = JTypes.BYTE
-  val SHORT = JTypes.SHORT
-  val INT = JTypes.INT
-  val LONG = JTypes.LONG
-  val FLOAT = JTypes.FLOAT
-  val DOUBLE = JTypes.DOUBLE
-  val DECIMAL = JTypes.DECIMAL
+  /**
+* Returns type information for a Table API long or SQL BIGINT type.
+*/
+  val LONG: TypeInformation[lang.Long] = JTypes.LONG
 
-  val SQL_DATE = JTypes.SQL_DATE
-  val SQL_TIME = JTypes.SQL_TIME
-  val SQL_TIMESTAMP = JTypes.SQL_TIMESTAMP
-  val INTERVAL_MONTHS = TimeIntervalTypeInfo.INTERVAL_MONTHS
-  val INTERVAL_MILLIS = TimeIntervalTypeInfo.INTERVAL_MILLIS
+  /**
+* Returns type information for a Table API float or SQL FLOAT/REAL 
type.
+*/
+  val FLOAT: TypeInformation[lang.Float] = JTypes.FLOAT
+
+  /**
+* Returns type information for a Table API integer or SQL DOUBLE type.
+*/
+  val DOUBLE: TypeInformation[lang.Double] = JTypes.DOUBLE
 
   /**
-* Generates row type information.
+* Returns type information for a Table API big decimal or SQL DECIMAL 
type.
+*/
+  val DECIMAL: TypeInformation[math.BigDecimal] = JTypes.BIG_DEC
+
+  /**
+* Returns type information for a Table API SQL date or SQL DATE type.
+*/
+  val SQL_DATE: TypeInformation[sql.Date] = JTypes.SQL_DATE
--- End diff --

The [Table API 
docs](https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/table/tableApi.html#data-types)
 and [SQL 
docs](https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/table/sql.html#data-types)
 need to be updated for `SQL_DATE`, `SQL_TIME`, and `SQL_TIMESTAMP`.


> Add helper methods for all built-in Flink types to Types
> 
>
> Key: FLINK-7452
> URL: https://issues.apache.org/jira/browse/FLINK-7452
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> Sometimes it is very difficult to provide `TypeInformation` manually, in case 
> some extraction fails or is not available. {{TypeHint}}s should be the 
> preferred way but this methods can ensure correct types.
> I propose to add all built-in Flink types to the {{Types}}. Such as:
> {code}
> Types.POJO(MyPojo.class)
> Types.POJO(Map)
> Types.GENERIC(Object.class)
> Types.TUPLE(TypeInformation, ...)
> Types.MAP(TypeInformation, TypeInformation)
> {code}
> The methods should validate that the returned type is exactly the requested 
> type. And especially in case of POJO should help creating  {{PojoTypeInfo}}.
> Once this is in place, we can deprecate the {{TypeInfoParser}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7452) Add helper methods for all built-in Flink types to Types

2017-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291069#comment-16291069
 ] 

ASF GitHub Bot commented on FLINK-7452:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r156940261
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala
 ---
@@ -25,55 +27,125 @@ import org.apache.flink.types.Row
 import _root_.scala.annotation.varargs
 
 /**
-  * This class enumerates all supported types of the Table API.
+  * This class enumerates all supported types of the Table API & SQL.
   */
 object Types {
 
-  val STRING = JTypes.STRING
-  val BOOLEAN = JTypes.BOOLEAN
+  /**
+* Returns type information for a Table API string or SQL VARCHAR type.
+*/
+  val STRING: TypeInformation[String] = JTypes.STRING
+
+  /**
+* Returns type information for a Table API boolean or SQL BOOLEAN type.
+*/
+  val BOOLEAN: TypeInformation[lang.Boolean] = JTypes.BOOLEAN
+
+  /**
+* Returns type information for a Table API byte or SQL TINYINT type.
+*/
+  val BYTE: TypeInformation[lang.Byte] = JTypes.BYTE
+
+  /**
+* Returns type information for a Table API short or SQL SMALLINT type.
+*/
+  val SHORT: TypeInformation[lang.Short] = JTypes.SHORT
+
+  /**
+* Returns type information for a Table API integer or SQL INT/INTEGER 
type.
+*/
+  val INT: TypeInformation[lang.Integer] = JTypes.INT
 
-  val BYTE = JTypes.BYTE
-  val SHORT = JTypes.SHORT
-  val INT = JTypes.INT
-  val LONG = JTypes.LONG
-  val FLOAT = JTypes.FLOAT
-  val DOUBLE = JTypes.DOUBLE
-  val DECIMAL = JTypes.DECIMAL
+  /**
+* Returns type information for a Table API long or SQL BIGINT type.
+*/
+  val LONG: TypeInformation[lang.Long] = JTypes.LONG
 
-  val SQL_DATE = JTypes.SQL_DATE
-  val SQL_TIME = JTypes.SQL_TIME
-  val SQL_TIMESTAMP = JTypes.SQL_TIMESTAMP
-  val INTERVAL_MONTHS = TimeIntervalTypeInfo.INTERVAL_MONTHS
-  val INTERVAL_MILLIS = TimeIntervalTypeInfo.INTERVAL_MILLIS
+  /**
+* Returns type information for a Table API float or SQL FLOAT/REAL 
type.
+*/
+  val FLOAT: TypeInformation[lang.Float] = JTypes.FLOAT
+
+  /**
+* Returns type information for a Table API integer or SQL DOUBLE type.
+*/
+  val DOUBLE: TypeInformation[lang.Double] = JTypes.DOUBLE
 
   /**
-* Generates row type information.
+* Returns type information for a Table API big decimal or SQL DECIMAL 
type.
+*/
+  val DECIMAL: TypeInformation[math.BigDecimal] = JTypes.BIG_DEC
+
+  /**
+* Returns type information for a Table API SQL date or SQL DATE type.
+*/
+  val SQL_DATE: TypeInformation[sql.Date] = JTypes.SQL_DATE
+
+  /**
+* Returns type information for a Table API SQL time or SQL TIME type.
+*/
+  val SQL_TIME: TypeInformation[sql.Time] = JTypes.SQL_TIME
+
+  /**
+* Returns type information for a Table API SQL timestamp or SQL 
TIMESTAMP type.
+*/
+  val SQL_TIMESTAMP: TypeInformation[sql.Timestamp] = JTypes.SQL_TIMESTAMP
+
+  /**
+* Returns type information for a Table API interval of months.
+*/
+  val INTERVAL_MONTHS: TypeInformation[lang.Integer] = 
TimeIntervalTypeInfo.INTERVAL_MONTHS
+
+  /**
+* Returns type information for a Table API interval milliseconds.
+*/
+  val INTERVAL_MILLIS: TypeInformation[lang.Long] = 
TimeIntervalTypeInfo.INTERVAL_MILLIS
+
+  /**
+* Returns type information for [[org.apache.flink.types.Row]] with 
fields of the given types.
+*
+* A row is a variable-length, null-aware composite type for storing 
multiple values in a
+* deterministic field order. Every field can be null independent of 
the field's type.
+* The type of row fields cannot be automatically inferred; therefore, 
it is required to pass
+* type information whenever a row is used.
 *
-* A row type consists of zero or more fields with a field name and a 
corresponding type.
+* The schema of rows can have up to Integer.MAX_VALUE 
fields, however, all row instances
--- End diff --

line exceeds 100 characters.


> Add helper methods for all built-in Flink types to Types
> 
>
> Key: FLINK-7452
> URL: https://issues.apache.org/jira/browse/FLINK-7452
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Timo Walther
>Assignee: 

[jira] [Commented] (FLINK-7452) Add helper methods for all built-in Flink types to Types

2017-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291071#comment-16291071
 ] 

ASF GitHub Bot commented on FLINK-7452:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r156958975
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/Types.scala ---
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.typeutils
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.functions.InvalidTypesException
+import org.apache.flink.api.common.typeinfo.{TypeInformation, Types => 
JTypes}
+import org.apache.flink.types.Row
+
+import _root_.scala.collection.JavaConverters._
+import _root_.scala.util.{Either, Try}
+
+/**
+  * This class gives access to the type information of the most common 
Scala types for which Flink
+  * has built-in serializers and comparators.
+  *
+  * This class contains types of 
[[org.apache.flink.api.common.typeinfo.Types]] and adds
+  * types for Scala specific classes (such as [[Unit]] or case classes).
+  *
+  * In many cases, Flink tries to analyze generic signatures of functions 
to determine return
+  * types automatically. This class is intended for cases where type 
information has to be
+  * supplied manually or would result in an inefficient type.
+  *
+  * Scala macros allow to determine type information of classes and type 
parameters. You can
+  * use [[Types.of]] to let type information be determined automatically.
+  */
+@PublicEvolving
+object Types {
+
+  /**
+* Generates type information based on the given class and/or its type 
parameters.
+*
+* The definition is similar to a 
[[org.apache.flink.api.common.typeinfo.TypeHint]] but does
+* not require to implement anonymous classes.
+*
+* If the class could not be analyzed by the Scala type analyzer, the 
Java analyzer
+* will be used.
+*
+* Example use:
+*
+* `Types.of[(Int, String, String)]` for Scala tuples
+* `Types.of[Unit]` for Scala specific types
+*
+* @tparam T class to be analyzed
+*/
+  def of[T: TypeInformation]: TypeInformation[T] = {
+val typeInfo: TypeInformation[T] = implicitly[TypeInformation[T]]
+typeInfo
+  }
+
+  /**
+* Returns type information for Scala [[Nothing]]. Does not support a 
null value.
+*/
+  val NOTHING: TypeInformation[Nothing] = new ScalaNothingTypeInfo
+
+  /**
+* Returns type information for Scala [[Unit]]. Does not support a null 
value.
+*/
+  val UNIT: TypeInformation[Unit] = new UnitTypeInfo
+
+  /**
+* Returns type information for [[String]] and [[java.lang.String]]. 
Supports a null value.
+*/
+  val STRING: TypeInformation[String] = JTypes.STRING
+
+  /**
+* Returns type information for primitive [[Byte]] and 
[[java.lang.Byte]]. Does not
+* support a null value.
+*/
+  val BYTE: TypeInformation[java.lang.Byte] = JTypes.BYTE
+
+  /**
+* Returns type information for primitive [[Boolean]] and 
[[java.lang.Boolean]]. Does not
+* support a null value.
+*/
+  val BOOLEAN: TypeInformation[java.lang.Boolean] = JTypes.BOOLEAN
+
+  /**
+* Returns type information for primitive [[Short]] and 
[[java.lang.Short]]. Does not
+* support a null value.
+*/
+  val SHORT: TypeInformation[java.lang.Short] = JTypes.SHORT
+
+  /**
+* Returns type information for primitive [[Int]] and 
[[java.lang.Integer]]. Does not
+* support a null value.
+*/
+  val INT: TypeInformation[java.lang.Integer] = JTypes.INT
+
+  /**
+* Returns type information for primitive [[Long]] and 

[jira] [Commented] (FLINK-7452) Add helper methods for all built-in Flink types to Types

2017-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291068#comment-16291068
 ] 

ASF GitHub Bot commented on FLINK-7452:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r156958542
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java ---
@@ -19,56 +19,422 @@
 package org.apache.flink.api.common.typeinfo;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.EitherTypeInfo;
+import org.apache.flink.api.java.typeutils.EnumTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
+import org.apache.flink.types.Either;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.Value;
 
+import java.lang.reflect.Field;
 import java.math.BigDecimal;
+import java.math.BigInteger;
 import java.sql.Date;
 import java.sql.Time;
 import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 /**
- * This class gives access to the type information of the most most common 
types.
+ * This class gives access to the type information of the most common 
types for which Flink
+ * has built-in serializers and comparators.
+ *
+ * In many cases, Flink tries to analyze generic signatures of 
functions to determine return
+ * types automatically. This class is intended for cases where type 
information has to be
+ * supplied manually or would result in an inefficient type.
+ *
+ * Please note that the Scala API and Table API provide more 
specialized Types classes.
+ * (See org.apache.flink.api.scala.Types and 
org.apache.flink.table.api.Types)
+ *
+ * A more convenient alternative might be a {@link TypeHint}.
+ *
+ * @see TypeInformation#of(Class) specify type information based on a 
class that will be analyzed
+ * @see TypeInformation#of(TypeHint) specify type information based on a 
{@link TypeHint}
  */
 @PublicEvolving
 public class Types {
 
-   public static final BasicTypeInfo STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
-   public static final BasicTypeInfo BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
-   public static final BasicTypeInfo BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
-   public static final BasicTypeInfo SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
-   public static final BasicTypeInfo INT = 
BasicTypeInfo.INT_TYPE_INFO;
-   public static final BasicTypeInfo LONG = 
BasicTypeInfo.LONG_TYPE_INFO;
-   public static final BasicTypeInfo FLOAT = 
BasicTypeInfo.FLOAT_TYPE_INFO;
-   public static final BasicTypeInfo DOUBLE = 
BasicTypeInfo.DOUBLE_TYPE_INFO;
-   public static final BasicTypeInfo DECIMAL = 
BasicTypeInfo.BIG_DEC_TYPE_INFO;
+   /**
+* Returns type information for {@link java.lang.Void}. Does not 
support a null value.
+*/
+   public static final TypeInformation VOID = 
BasicTypeInfo.VOID_TYPE_INFO;
+
+   /**
+* Returns type information for {@link java.lang.String}. Supports a 
null value.
+*/
+   public static final TypeInformation STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive byte and 
{@link java.lang.Byte}.
+* Does not support a null value.
+*/
+   public static final TypeInformation BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive boolean 
and {@link java.lang.Boolean}.
+* Does not support a null value.
+*/
+   public static final TypeInformation BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive short and 
{@link java.lang.Short}.
+* Does not support a null value.
+*/
+   public static final TypeInformation SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive int and 
{@link java.lang.Integer}.
+* Does not support a null value.
   

[jira] [Commented] (FLINK-7452) Add helper methods for all built-in Flink types to Types

2017-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291074#comment-16291074
 ] 

ASF GitHub Bot commented on FLINK-7452:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r156938437
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java ---
@@ -19,56 +19,422 @@
 package org.apache.flink.api.common.typeinfo;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.EitherTypeInfo;
+import org.apache.flink.api.java.typeutils.EnumTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
+import org.apache.flink.types.Either;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.Value;
 
+import java.lang.reflect.Field;
 import java.math.BigDecimal;
+import java.math.BigInteger;
 import java.sql.Date;
 import java.sql.Time;
 import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 /**
- * This class gives access to the type information of the most most common 
types.
+ * This class gives access to the type information of the most common 
types for which Flink
+ * has built-in serializers and comparators.
+ *
+ * In many cases, Flink tries to analyze generic signatures of 
functions to determine return
+ * types automatically. This class is intended for cases where type 
information has to be
+ * supplied manually or would result in an inefficient type.
+ *
+ * Please note that the Scala API and Table API provide more 
specialized Types classes.
--- End diff --

"provide more specialized" -> "have dedicated"?


> Add helper methods for all built-in Flink types to Types
> 
>
> Key: FLINK-7452
> URL: https://issues.apache.org/jira/browse/FLINK-7452
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> Sometimes it is very difficult to provide `TypeInformation` manually, in case 
> some extraction fails or is not available. {{TypeHint}}s should be the 
> preferred way but this methods can ensure correct types.
> I propose to add all built-in Flink types to the {{Types}}. Such as:
> {code}
> Types.POJO(MyPojo.class)
> Types.POJO(Map)
> Types.GENERIC(Object.class)
> Types.TUPLE(TypeInformation, ...)
> Types.MAP(TypeInformation, TypeInformation)
> {code}
> The methods should validate that the returned type is exactly the requested 
> type. And especially in case of POJO should help creating  {{PojoTypeInfo}}.
> Once this is in place, we can deprecate the {{TypeInfoParser}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4612: [FLINK-7452] [types] Add helper methods for all bu...

2017-12-14 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r156956540
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala
 ---
@@ -107,7 +180,9 @@ object Types {
 * @param keyType type of the keys of the map e.g. Types.STRING
--- End diff --

Add information about nullability of Map and entries?


---


[GitHub] flink pull request #4612: [FLINK-7452] [types] Add helper methods for all bu...

2017-12-14 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r156937819
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/Types.scala ---
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.typeutils
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.functions.InvalidTypesException
+import org.apache.flink.api.common.typeinfo.{TypeInformation, Types => 
JTypes}
+import org.apache.flink.types.Row
+
+import _root_.scala.collection.JavaConverters._
+import _root_.scala.util.{Either, Try}
+
+/**
+  * This class gives access to the type information of the most common 
Scala types for which Flink
+  * has built-in serializers and comparators.
+  *
+  * This class contains types of 
[[org.apache.flink.api.common.typeinfo.Types]] and adds
+  * types for Scala specific classes (such as [[Unit]] or case classes).
+  *
+  * In many cases, Flink tries to analyze generic signatures of functions 
to determine return
+  * types automatically. This class is intended for cases where type 
information has to be
+  * supplied manually or would result in an inefficient type.
+  *
+  * Scala macros allow to determine type information of classes and type 
parameters. You can
+  * use [[Types.of]] to let type information be determined automatically.
+  */
+@PublicEvolving
+object Types {
+
+  /**
+* Generates type information based on the given class and/or its type 
parameters.
+*
+* The definition is similar to a 
[[org.apache.flink.api.common.typeinfo.TypeHint]] but does
+* not require to implement anonymous classes.
+*
+* If the class could not be analyzed by the Scala type analyzer, the 
Java analyzer
+* will be used.
+*
+* Example use:
+*
+* `Types.of[(Int, String, String)]` for Scala tuples
+* `Types.of[Unit]` for Scala specific types
+*
+* @tparam T class to be analyzed
+*/
+  def of[T: TypeInformation]: TypeInformation[T] = {
+val typeInfo: TypeInformation[T] = implicitly[TypeInformation[T]]
+typeInfo
+  }
+
+  /**
+* Returns type information for Scala [[Nothing]]. Does not support a 
null value.
+*/
+  val NOTHING: TypeInformation[Nothing] = new ScalaNothingTypeInfo
+
+  /**
+* Returns type information for Scala [[Unit]]. Does not support a null 
value.
+*/
+  val UNIT: TypeInformation[Unit] = new UnitTypeInfo
+
+  /**
+* Returns type information for [[String]] and [[java.lang.String]]. 
Supports a null value.
+*/
+  val STRING: TypeInformation[String] = JTypes.STRING
+
+  /**
+* Returns type information for primitive [[Byte]] and 
[[java.lang.Byte]]. Does not
+* support a null value.
+*/
+  val BYTE: TypeInformation[java.lang.Byte] = JTypes.BYTE
+
+  /**
+* Returns type information for primitive [[Boolean]] and 
[[java.lang.Boolean]]. Does not
+* support a null value.
+*/
+  val BOOLEAN: TypeInformation[java.lang.Boolean] = JTypes.BOOLEAN
+
+  /**
+* Returns type information for primitive [[Short]] and 
[[java.lang.Short]]. Does not
+* support a null value.
+*/
+  val SHORT: TypeInformation[java.lang.Short] = JTypes.SHORT
+
+  /**
+* Returns type information for primitive [[Int]] and 
[[java.lang.Integer]]. Does not
+* support a null value.
+*/
+  val INT: TypeInformation[java.lang.Integer] = JTypes.INT
+
+  /**
+* Returns type information for primitive [[Long]] and 
[[java.lang.Long]]. Does not
+* support a null value.
+*/
+  val LONG: TypeInformation[java.lang.Long] = JTypes.LONG
+
+  /**
+* Returns type information for primitive [[Float]] and 
[[java.lang.Float]]. Does not

[jira] [Commented] (FLINK-7452) Add helper methods for all built-in Flink types to Types

2017-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291077#comment-16291077
 ] 

ASF GitHub Bot commented on FLINK-7452:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r156951752
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java ---
@@ -19,56 +19,422 @@
 package org.apache.flink.api.common.typeinfo;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.EitherTypeInfo;
+import org.apache.flink.api.java.typeutils.EnumTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
+import org.apache.flink.types.Either;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.Value;
 
+import java.lang.reflect.Field;
 import java.math.BigDecimal;
+import java.math.BigInteger;
 import java.sql.Date;
 import java.sql.Time;
 import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 /**
- * This class gives access to the type information of the most most common 
types.
+ * This class gives access to the type information of the most common 
types for which Flink
+ * has built-in serializers and comparators.
+ *
+ * In many cases, Flink tries to analyze generic signatures of 
functions to determine return
+ * types automatically. This class is intended for cases where type 
information has to be
+ * supplied manually or would result in an inefficient type.
+ *
+ * Please note that the Scala API and Table API provide more 
specialized Types classes.
+ * (See org.apache.flink.api.scala.Types and 
org.apache.flink.table.api.Types)
+ *
+ * A more convenient alternative might be a {@link TypeHint}.
+ *
+ * @see TypeInformation#of(Class) specify type information based on a 
class that will be analyzed
+ * @see TypeInformation#of(TypeHint) specify type information based on a 
{@link TypeHint}
  */
 @PublicEvolving
 public class Types {
 
-   public static final BasicTypeInfo STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
-   public static final BasicTypeInfo BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
-   public static final BasicTypeInfo BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
-   public static final BasicTypeInfo SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
-   public static final BasicTypeInfo INT = 
BasicTypeInfo.INT_TYPE_INFO;
-   public static final BasicTypeInfo LONG = 
BasicTypeInfo.LONG_TYPE_INFO;
-   public static final BasicTypeInfo FLOAT = 
BasicTypeInfo.FLOAT_TYPE_INFO;
-   public static final BasicTypeInfo DOUBLE = 
BasicTypeInfo.DOUBLE_TYPE_INFO;
-   public static final BasicTypeInfo DECIMAL = 
BasicTypeInfo.BIG_DEC_TYPE_INFO;
+   /**
+* Returns type information for {@link java.lang.Void}. Does not 
support a null value.
+*/
+   public static final TypeInformation VOID = 
BasicTypeInfo.VOID_TYPE_INFO;
+
+   /**
+* Returns type information for {@link java.lang.String}. Supports a 
null value.
+*/
+   public static final TypeInformation STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive byte and 
{@link java.lang.Byte}.
+* Does not support a null value.
+*/
+   public static final TypeInformation BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive boolean 
and {@link java.lang.Boolean}.
+* Does not support a null value.
+*/
+   public static final TypeInformation BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive short and 
{@link java.lang.Short}.
+* Does not support a null value.
+*/
+   public static final TypeInformation SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive int and 
{@link java.lang.Integer}.
+* Does not support a null value.
   

[jira] [Commented] (FLINK-7452) Add helper methods for all built-in Flink types to Types

2017-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291066#comment-16291066
 ] 

ASF GitHub Bot commented on FLINK-7452:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r156941632
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java ---
@@ -19,56 +19,422 @@
 package org.apache.flink.api.common.typeinfo;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.EitherTypeInfo;
+import org.apache.flink.api.java.typeutils.EnumTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
+import org.apache.flink.types.Either;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.Value;
 
+import java.lang.reflect.Field;
 import java.math.BigDecimal;
+import java.math.BigInteger;
 import java.sql.Date;
 import java.sql.Time;
 import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 /**
- * This class gives access to the type information of the most most common 
types.
+ * This class gives access to the type information of the most common 
types for which Flink
+ * has built-in serializers and comparators.
+ *
+ * In many cases, Flink tries to analyze generic signatures of 
functions to determine return
+ * types automatically. This class is intended for cases where type 
information has to be
+ * supplied manually or would result in an inefficient type.
+ *
+ * Please note that the Scala API and Table API provide more 
specialized Types classes.
+ * (See org.apache.flink.api.scala.Types and 
org.apache.flink.table.api.Types)
+ *
+ * A more convenient alternative might be a {@link TypeHint}.
+ *
+ * @see TypeInformation#of(Class) specify type information based on a 
class that will be analyzed
+ * @see TypeInformation#of(TypeHint) specify type information based on a 
{@link TypeHint}
  */
 @PublicEvolving
 public class Types {
 
-   public static final BasicTypeInfo STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
-   public static final BasicTypeInfo BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
-   public static final BasicTypeInfo BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
-   public static final BasicTypeInfo SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
-   public static final BasicTypeInfo INT = 
BasicTypeInfo.INT_TYPE_INFO;
-   public static final BasicTypeInfo LONG = 
BasicTypeInfo.LONG_TYPE_INFO;
-   public static final BasicTypeInfo FLOAT = 
BasicTypeInfo.FLOAT_TYPE_INFO;
-   public static final BasicTypeInfo DOUBLE = 
BasicTypeInfo.DOUBLE_TYPE_INFO;
-   public static final BasicTypeInfo DECIMAL = 
BasicTypeInfo.BIG_DEC_TYPE_INFO;
+   /**
+* Returns type information for {@link java.lang.Void}. Does not 
support a null value.
+*/
+   public static final TypeInformation VOID = 
BasicTypeInfo.VOID_TYPE_INFO;
+
+   /**
+* Returns type information for {@link java.lang.String}. Supports a 
null value.
+*/
+   public static final TypeInformation STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive byte and 
{@link java.lang.Byte}.
+* Does not support a null value.
+*/
+   public static final TypeInformation BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive boolean 
and {@link java.lang.Boolean}.
+* Does not support a null value.
+*/
+   public static final TypeInformation BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive short and 
{@link java.lang.Short}.
+* Does not support a null value.
+*/
+   public static final TypeInformation SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive int and 
{@link java.lang.Integer}.
+* Does not support a null value.
   

[jira] [Commented] (FLINK-7452) Add helper methods for all built-in Flink types to Types

2017-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291080#comment-16291080
 ] 

ASF GitHub Bot commented on FLINK-7452:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r156942641
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java ---
@@ -19,56 +19,422 @@
 package org.apache.flink.api.common.typeinfo;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.EitherTypeInfo;
+import org.apache.flink.api.java.typeutils.EnumTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
+import org.apache.flink.types.Either;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.Value;
 
+import java.lang.reflect.Field;
 import java.math.BigDecimal;
+import java.math.BigInteger;
 import java.sql.Date;
 import java.sql.Time;
 import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 /**
- * This class gives access to the type information of the most most common 
types.
+ * This class gives access to the type information of the most common 
types for which Flink
+ * has built-in serializers and comparators.
+ *
+ * In many cases, Flink tries to analyze generic signatures of 
functions to determine return
+ * types automatically. This class is intended for cases where type 
information has to be
+ * supplied manually or would result in an inefficient type.
+ *
+ * Please note that the Scala API and Table API provide more 
specialized Types classes.
+ * (See org.apache.flink.api.scala.Types and 
org.apache.flink.table.api.Types)
+ *
+ * A more convenient alternative might be a {@link TypeHint}.
+ *
+ * @see TypeInformation#of(Class) specify type information based on a 
class that will be analyzed
+ * @see TypeInformation#of(TypeHint) specify type information based on a 
{@link TypeHint}
  */
 @PublicEvolving
 public class Types {
 
-   public static final BasicTypeInfo STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
-   public static final BasicTypeInfo BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
-   public static final BasicTypeInfo BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
-   public static final BasicTypeInfo SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
-   public static final BasicTypeInfo INT = 
BasicTypeInfo.INT_TYPE_INFO;
-   public static final BasicTypeInfo LONG = 
BasicTypeInfo.LONG_TYPE_INFO;
-   public static final BasicTypeInfo FLOAT = 
BasicTypeInfo.FLOAT_TYPE_INFO;
-   public static final BasicTypeInfo DOUBLE = 
BasicTypeInfo.DOUBLE_TYPE_INFO;
-   public static final BasicTypeInfo DECIMAL = 
BasicTypeInfo.BIG_DEC_TYPE_INFO;
+   /**
+* Returns type information for {@link java.lang.Void}. Does not 
support a null value.
+*/
+   public static final TypeInformation VOID = 
BasicTypeInfo.VOID_TYPE_INFO;
+
+   /**
+* Returns type information for {@link java.lang.String}. Supports a 
null value.
+*/
+   public static final TypeInformation STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive byte and 
{@link java.lang.Byte}.
+* Does not support a null value.
+*/
+   public static final TypeInformation BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive boolean 
and {@link java.lang.Boolean}.
+* Does not support a null value.
+*/
+   public static final TypeInformation BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive short and 
{@link java.lang.Short}.
+* Does not support a null value.
+*/
+   public static final TypeInformation SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive int and 
{@link java.lang.Integer}.
+* Does not support a null value.
   

[GitHub] flink pull request #4612: [FLINK-7452] [types] Add helper methods for all bu...

2017-12-14 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r156938239
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java ---
@@ -19,56 +19,417 @@
 package org.apache.flink.api.common.typeinfo;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.java.typeutils.EitherTypeInfo;
+import org.apache.flink.api.java.typeutils.EnumTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
 
-import java.math.BigDecimal;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 /**
- * This class gives access to the type information of the most most common 
types.
+ * This class gives access to the type information of the most common 
types for which Flink
+ * has built-in serializers and comparators.
+ *
+ * In many cases, Flink tries to analyze generic signatures of 
functions to determine return
+ * types automatically. This class is intended for cases where type 
information has to be
+ * supplied manually or would result in an inefficient type.
--- End diff --

rephrase as suggested?


---


[GitHub] flink pull request #4612: [FLINK-7452] [types] Add helper methods for all bu...

2017-12-14 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r156941632
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java ---
@@ -19,56 +19,422 @@
 package org.apache.flink.api.common.typeinfo;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.EitherTypeInfo;
+import org.apache.flink.api.java.typeutils.EnumTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
+import org.apache.flink.types.Either;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.Value;
 
+import java.lang.reflect.Field;
 import java.math.BigDecimal;
+import java.math.BigInteger;
 import java.sql.Date;
 import java.sql.Time;
 import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 /**
- * This class gives access to the type information of the most most common 
types.
+ * This class gives access to the type information of the most common 
types for which Flink
+ * has built-in serializers and comparators.
+ *
+ * In many cases, Flink tries to analyze generic signatures of 
functions to determine return
+ * types automatically. This class is intended for cases where type 
information has to be
+ * supplied manually or would result in an inefficient type.
+ *
+ * Please note that the Scala API and Table API provide more 
specialized Types classes.
+ * (See org.apache.flink.api.scala.Types and 
org.apache.flink.table.api.Types)
+ *
+ * A more convenient alternative might be a {@link TypeHint}.
+ *
+ * @see TypeInformation#of(Class) specify type information based on a 
class that will be analyzed
+ * @see TypeInformation#of(TypeHint) specify type information based on a 
{@link TypeHint}
  */
 @PublicEvolving
 public class Types {
 
-   public static final BasicTypeInfo STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
-   public static final BasicTypeInfo BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
-   public static final BasicTypeInfo BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
-   public static final BasicTypeInfo SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
-   public static final BasicTypeInfo INT = 
BasicTypeInfo.INT_TYPE_INFO;
-   public static final BasicTypeInfo LONG = 
BasicTypeInfo.LONG_TYPE_INFO;
-   public static final BasicTypeInfo FLOAT = 
BasicTypeInfo.FLOAT_TYPE_INFO;
-   public static final BasicTypeInfo DOUBLE = 
BasicTypeInfo.DOUBLE_TYPE_INFO;
-   public static final BasicTypeInfo DECIMAL = 
BasicTypeInfo.BIG_DEC_TYPE_INFO;
+   /**
+* Returns type information for {@link java.lang.Void}. Does not 
support a null value.
+*/
+   public static final TypeInformation VOID = 
BasicTypeInfo.VOID_TYPE_INFO;
+
+   /**
+* Returns type information for {@link java.lang.String}. Supports a 
null value.
+*/
+   public static final TypeInformation STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive byte and 
{@link java.lang.Byte}.
+* Does not support a null value.
+*/
+   public static final TypeInformation BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive boolean 
and {@link java.lang.Boolean}.
+* Does not support a null value.
+*/
+   public static final TypeInformation BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive short and 
{@link java.lang.Short}.
+* Does not support a null value.
+*/
+   public static final TypeInformation SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive int and 
{@link java.lang.Integer}.
+* Does not support a null value.
+*/
+   public static final TypeInformation INT = 
BasicTypeInfo.INT_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive long and 
{@link java.lang.Long}.
+* Does not support a null 

[GitHub] flink pull request #4612: [FLINK-7452] [types] Add helper methods for all bu...

2017-12-14 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r156940261
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala
 ---
@@ -25,55 +27,125 @@ import org.apache.flink.types.Row
 import _root_.scala.annotation.varargs
 
 /**
-  * This class enumerates all supported types of the Table API.
+  * This class enumerates all supported types of the Table API & SQL.
   */
 object Types {
 
-  val STRING = JTypes.STRING
-  val BOOLEAN = JTypes.BOOLEAN
+  /**
+* Returns type information for a Table API string or SQL VARCHAR type.
+*/
+  val STRING: TypeInformation[String] = JTypes.STRING
+
+  /**
+* Returns type information for a Table API boolean or SQL BOOLEAN type.
+*/
+  val BOOLEAN: TypeInformation[lang.Boolean] = JTypes.BOOLEAN
+
+  /**
+* Returns type information for a Table API byte or SQL TINYINT type.
+*/
+  val BYTE: TypeInformation[lang.Byte] = JTypes.BYTE
+
+  /**
+* Returns type information for a Table API short or SQL SMALLINT type.
+*/
+  val SHORT: TypeInformation[lang.Short] = JTypes.SHORT
+
+  /**
+* Returns type information for a Table API integer or SQL INT/INTEGER 
type.
+*/
+  val INT: TypeInformation[lang.Integer] = JTypes.INT
 
-  val BYTE = JTypes.BYTE
-  val SHORT = JTypes.SHORT
-  val INT = JTypes.INT
-  val LONG = JTypes.LONG
-  val FLOAT = JTypes.FLOAT
-  val DOUBLE = JTypes.DOUBLE
-  val DECIMAL = JTypes.DECIMAL
+  /**
+* Returns type information for a Table API long or SQL BIGINT type.
+*/
+  val LONG: TypeInformation[lang.Long] = JTypes.LONG
 
-  val SQL_DATE = JTypes.SQL_DATE
-  val SQL_TIME = JTypes.SQL_TIME
-  val SQL_TIMESTAMP = JTypes.SQL_TIMESTAMP
-  val INTERVAL_MONTHS = TimeIntervalTypeInfo.INTERVAL_MONTHS
-  val INTERVAL_MILLIS = TimeIntervalTypeInfo.INTERVAL_MILLIS
+  /**
+* Returns type information for a Table API float or SQL FLOAT/REAL 
type.
+*/
+  val FLOAT: TypeInformation[lang.Float] = JTypes.FLOAT
+
+  /**
+* Returns type information for a Table API integer or SQL DOUBLE type.
+*/
+  val DOUBLE: TypeInformation[lang.Double] = JTypes.DOUBLE
 
   /**
-* Generates row type information.
+* Returns type information for a Table API big decimal or SQL DECIMAL 
type.
+*/
+  val DECIMAL: TypeInformation[math.BigDecimal] = JTypes.BIG_DEC
+
+  /**
+* Returns type information for a Table API SQL date or SQL DATE type.
+*/
+  val SQL_DATE: TypeInformation[sql.Date] = JTypes.SQL_DATE
+
+  /**
+* Returns type information for a Table API SQL time or SQL TIME type.
+*/
+  val SQL_TIME: TypeInformation[sql.Time] = JTypes.SQL_TIME
+
+  /**
+* Returns type information for a Table API SQL timestamp or SQL 
TIMESTAMP type.
+*/
+  val SQL_TIMESTAMP: TypeInformation[sql.Timestamp] = JTypes.SQL_TIMESTAMP
+
+  /**
+* Returns type information for a Table API interval of months.
+*/
+  val INTERVAL_MONTHS: TypeInformation[lang.Integer] = 
TimeIntervalTypeInfo.INTERVAL_MONTHS
+
+  /**
+* Returns type information for a Table API interval milliseconds.
+*/
+  val INTERVAL_MILLIS: TypeInformation[lang.Long] = 
TimeIntervalTypeInfo.INTERVAL_MILLIS
+
+  /**
+* Returns type information for [[org.apache.flink.types.Row]] with 
fields of the given types.
+*
+* A row is a variable-length, null-aware composite type for storing 
multiple values in a
+* deterministic field order. Every field can be null independent of 
the field's type.
+* The type of row fields cannot be automatically inferred; therefore, 
it is required to pass
+* type information whenever a row is used.
 *
-* A row type consists of zero or more fields with a field name and a 
corresponding type.
+* The schema of rows can have up to Integer.MAX_VALUE 
fields, however, all row instances
--- End diff --

line exceeds 100 characters.


---


[GitHub] flink pull request #4612: [FLINK-7452] [types] Add helper methods for all bu...

2017-12-14 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r156940309
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala
 ---
@@ -25,55 +27,125 @@ import org.apache.flink.types.Row
 import _root_.scala.annotation.varargs
 
 /**
-  * This class enumerates all supported types of the Table API.
+  * This class enumerates all supported types of the Table API & SQL.
   */
 object Types {
 
-  val STRING = JTypes.STRING
-  val BOOLEAN = JTypes.BOOLEAN
+  /**
+* Returns type information for a Table API string or SQL VARCHAR type.
+*/
+  val STRING: TypeInformation[String] = JTypes.STRING
+
+  /**
+* Returns type information for a Table API boolean or SQL BOOLEAN type.
+*/
+  val BOOLEAN: TypeInformation[lang.Boolean] = JTypes.BOOLEAN
+
+  /**
+* Returns type information for a Table API byte or SQL TINYINT type.
+*/
+  val BYTE: TypeInformation[lang.Byte] = JTypes.BYTE
+
+  /**
+* Returns type information for a Table API short or SQL SMALLINT type.
+*/
+  val SHORT: TypeInformation[lang.Short] = JTypes.SHORT
+
+  /**
+* Returns type information for a Table API integer or SQL INT/INTEGER 
type.
+*/
+  val INT: TypeInformation[lang.Integer] = JTypes.INT
 
-  val BYTE = JTypes.BYTE
-  val SHORT = JTypes.SHORT
-  val INT = JTypes.INT
-  val LONG = JTypes.LONG
-  val FLOAT = JTypes.FLOAT
-  val DOUBLE = JTypes.DOUBLE
-  val DECIMAL = JTypes.DECIMAL
+  /**
+* Returns type information for a Table API long or SQL BIGINT type.
+*/
+  val LONG: TypeInformation[lang.Long] = JTypes.LONG
 
-  val SQL_DATE = JTypes.SQL_DATE
-  val SQL_TIME = JTypes.SQL_TIME
-  val SQL_TIMESTAMP = JTypes.SQL_TIMESTAMP
-  val INTERVAL_MONTHS = TimeIntervalTypeInfo.INTERVAL_MONTHS
-  val INTERVAL_MILLIS = TimeIntervalTypeInfo.INTERVAL_MILLIS
+  /**
+* Returns type information for a Table API float or SQL FLOAT/REAL 
type.
+*/
+  val FLOAT: TypeInformation[lang.Float] = JTypes.FLOAT
+
+  /**
+* Returns type information for a Table API integer or SQL DOUBLE type.
+*/
+  val DOUBLE: TypeInformation[lang.Double] = JTypes.DOUBLE
 
   /**
-* Generates row type information.
+* Returns type information for a Table API big decimal or SQL DECIMAL 
type.
+*/
+  val DECIMAL: TypeInformation[math.BigDecimal] = JTypes.BIG_DEC
+
+  /**
+* Returns type information for a Table API SQL date or SQL DATE type.
+*/
+  val SQL_DATE: TypeInformation[sql.Date] = JTypes.SQL_DATE
+
+  /**
+* Returns type information for a Table API SQL time or SQL TIME type.
+*/
+  val SQL_TIME: TypeInformation[sql.Time] = JTypes.SQL_TIME
+
+  /**
+* Returns type information for a Table API SQL timestamp or SQL 
TIMESTAMP type.
+*/
+  val SQL_TIMESTAMP: TypeInformation[sql.Timestamp] = JTypes.SQL_TIMESTAMP
+
+  /**
+* Returns type information for a Table API interval of months.
+*/
+  val INTERVAL_MONTHS: TypeInformation[lang.Integer] = 
TimeIntervalTypeInfo.INTERVAL_MONTHS
+
+  /**
+* Returns type information for a Table API interval milliseconds.
+*/
+  val INTERVAL_MILLIS: TypeInformation[lang.Long] = 
TimeIntervalTypeInfo.INTERVAL_MILLIS
+
+  /**
+* Returns type information for [[org.apache.flink.types.Row]] with 
fields of the given types.
+*
+* A row is a variable-length, null-aware composite type for storing 
multiple values in a
+* deterministic field order. Every field can be null independent of 
the field's type.
+* The type of row fields cannot be automatically inferred; therefore, 
it is required to pass
+* type information whenever a row is used.
 *
-* A row type consists of zero or more fields with a field name and a 
corresponding type.
+* The schema of rows can have up to Integer.MAX_VALUE 
fields, however, all row instances
+* must have the same length otherwise serialization fails or 
information is lost.
 *
-* The fields have the default names (f0, f1, f2 ..).
+* This method generates type information with fields of the given 
types; the fields have
+* the default names (f0, f1, f2 ..).
 *
-* @param types types of row fields; e.g. Types.STRING, Types.INT
+* @param types The types of the row fields, e.g., Types.STRING, 
Types.INT
 */
   @varargs
   def ROW(types: TypeInformation[_]*): TypeInformation[Row] = {
 JTypes.ROW(types: _*)
   }
 
   /**
-* Generates row type information.
+* Returns 

[GitHub] flink pull request #4612: [FLINK-7452] [types] Add helper methods for all bu...

2017-12-14 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r156937828
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/Types.scala ---
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.typeutils
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.functions.InvalidTypesException
+import org.apache.flink.api.common.typeinfo.{TypeInformation, Types => 
JTypes}
+import org.apache.flink.types.Row
+
+import _root_.scala.collection.JavaConverters._
+import _root_.scala.util.{Either, Try}
+
+/**
+  * This class gives access to the type information of the most common 
Scala types for which Flink
+  * has built-in serializers and comparators.
+  *
+  * This class contains types of 
[[org.apache.flink.api.common.typeinfo.Types]] and adds
+  * types for Scala specific classes (such as [[Unit]] or case classes).
+  *
+  * In many cases, Flink tries to analyze generic signatures of functions 
to determine return
+  * types automatically. This class is intended for cases where type 
information has to be
+  * supplied manually or would result in an inefficient type.
+  *
+  * Scala macros allow to determine type information of classes and type 
parameters. You can
+  * use [[Types.of]] to let type information be determined automatically.
+  */
+@PublicEvolving
+object Types {
+
+  /**
+* Generates type information based on the given class and/or its type 
parameters.
+*
+* The definition is similar to a 
[[org.apache.flink.api.common.typeinfo.TypeHint]] but does
+* not require to implement anonymous classes.
+*
+* If the class could not be analyzed by the Scala type analyzer, the 
Java analyzer
+* will be used.
+*
+* Example use:
+*
+* `Types.of[(Int, String, String)]` for Scala tuples
+* `Types.of[Unit]` for Scala specific types
+*
+* @tparam T class to be analyzed
+*/
+  def of[T: TypeInformation]: TypeInformation[T] = {
+val typeInfo: TypeInformation[T] = implicitly[TypeInformation[T]]
+typeInfo
+  }
+
+  /**
+* Returns type information for Scala [[Nothing]]. Does not support a 
null value.
+*/
+  val NOTHING: TypeInformation[Nothing] = new ScalaNothingTypeInfo
+
+  /**
+* Returns type information for Scala [[Unit]]. Does not support a null 
value.
+*/
+  val UNIT: TypeInformation[Unit] = new UnitTypeInfo
+
+  /**
+* Returns type information for [[String]] and [[java.lang.String]]. 
Supports a null value.
+*/
+  val STRING: TypeInformation[String] = JTypes.STRING
+
+  /**
+* Returns type information for primitive [[Byte]] and 
[[java.lang.Byte]]. Does not
+* support a null value.
+*/
+  val BYTE: TypeInformation[java.lang.Byte] = JTypes.BYTE
+
+  /**
+* Returns type information for primitive [[Boolean]] and 
[[java.lang.Boolean]]. Does not
+* support a null value.
+*/
+  val BOOLEAN: TypeInformation[java.lang.Boolean] = JTypes.BOOLEAN
+
+  /**
+* Returns type information for primitive [[Short]] and 
[[java.lang.Short]]. Does not
+* support a null value.
+*/
+  val SHORT: TypeInformation[java.lang.Short] = JTypes.SHORT
+
+  /**
+* Returns type information for primitive [[Int]] and 
[[java.lang.Integer]]. Does not
+* support a null value.
+*/
+  val INT: TypeInformation[java.lang.Integer] = JTypes.INT
+
+  /**
+* Returns type information for primitive [[Long]] and 
[[java.lang.Long]]. Does not
+* support a null value.
+*/
+  val LONG: TypeInformation[java.lang.Long] = JTypes.LONG
+
+  /**
+* Returns type information for primitive [[Float]] and 
[[java.lang.Float]]. Does not

[GitHub] flink pull request #4612: [FLINK-7452] [types] Add helper methods for all bu...

2017-12-14 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r156940557
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/CorrelateStringExpressionTest.scala
 ---
@@ -34,7 +34,7 @@ class CorrelateStringExpressionTest extends TableTestBase 
{
 
 val util = streamTestUtil()
 val sTab = util.addTable[(Int, Long, String)]('a, 'b, 'c)
-val typeInfo = new RowTypeInfo(Seq(Types.INT, Types.LONG, 
Types.STRING): _*)
+val typeInfo = new RowTypeInfo(Seq(typeutils.Types.INT, 
typeutils.Types.LONG, typeutils.Types.STRING): _*)
--- End diff --

line exceeds 100 characters.


---


[jira] [Commented] (FLINK-8087) Decouple Slot from SlotPool

2017-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291016#comment-16291016
 ] 

ASF GitHub Bot commented on FLINK-8087:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5088


> Decouple Slot from SlotPool
> ---
>
> Key: FLINK-8087
> URL: https://issues.apache.org/jira/browse/FLINK-8087
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
>
> In order to let the {{SlotPool}} return a a different {{LogicalSlot}} 
> implementation than {{SimpleSlot}} we should not store the {{Slot}} inside of 
> the {{SlotPool}}. Moreover, we should introduce a abstraction for the 
> {{AllocatedSlot}} which contains the information required by the 
> {{SimpleSlot}}. That way we decouple the {{SimpleSlot}} from the 
> {{AllocatedSlot}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8089) Fulfil slot requests with unused offered slots

2017-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291014#comment-16291014
 ] 

ASF GitHub Bot commented on FLINK-8089:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5090


> Fulfil slot requests with unused offered slots
> --
>
> Key: FLINK-8089
> URL: https://issues.apache.org/jira/browse/FLINK-8089
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
>
> The {{SlotPool}} adds unused offered slots to the list of available slots 
> without checking whether another pending slot request could be fulfilled with 
> this slot. This should be changed.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8120) Cannot access Web UI from YARN application overview in FLIP-6 mode

2017-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291015#comment-16291015
 ] 

ASF GitHub Bot commented on FLINK-8120:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5128


> Cannot access Web UI from YARN application overview in FLIP-6 mode
> --
>
> Key: FLINK-8120
> URL: https://issues.apache.org/jira/browse/FLINK-8120
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The Web UI cannot be accessed through YARN's application overview (_Tracking 
> UI_ link). The proxy displays a stacktrace.
> {noformat}
> Caused by:
> org.apache.http.client.ClientProtocolException
>   at 
> org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:888)
>   at 
> org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
>   at 
> org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:107)
>   at 
> org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet.proxyLink(WebAppProxyServlet.java:242)
>   at 
> org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet.methodAction(WebAppProxyServlet.java:461)
>   at 
> org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet.doGet(WebAppProxyServlet.java:290)
>   at javax.servlet.http.HttpServlet.service(HttpServlet.java:707)
>   at javax.servlet.http.HttpServlet.service(HttpServlet.java:820)
>   at 
> org.mortbay.jetty.servlet.ServletHolder.handle(ServletHolder.java:511)
>   at 
> org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1221)
>   at 
> com.google.inject.servlet.FilterChainInvocation.doFilter(FilterChainInvocation.java:66)
>   at 
> com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:900)
>   at 
> com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:834)
>   at 
> org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppFilter.doFilter(RMWebAppFilter.java:178)
>   at 
> com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:795)
>   at 
> com.google.inject.servlet.FilterDefinition.doFilter(FilterDefinition.java:163)
>   at 
> com.google.inject.servlet.FilterChainInvocation.doFilter(FilterChainInvocation.java:58)
>   at 
> com.google.inject.servlet.ManagedFilterPipeline.dispatch(ManagedFilterPipeline.java:118)
>   at com.google.inject.servlet.GuiceFilter.doFilter(GuiceFilter.java:113)
>   at 
> org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
>   at 
> org.apache.hadoop.http.lib.StaticUserWebFilter$StaticUserFilter.doFilter(StaticUserWebFilter.java:109)
>   at 
> org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
>   at 
> org.apache.hadoop.security.authentication.server.AuthenticationFilter.doFilter(AuthenticationFilter.java:636)
>   at 
> org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticationFilter.doFilter(DelegationTokenAuthenticationFilter.java:294)
>   at 
> org.apache.hadoop.security.authentication.server.AuthenticationFilter.doFilter(AuthenticationFilter.java:588)
>   at 
> org.apache.hadoop.yarn.server.security.http.RMAuthenticationFilter.doFilter(RMAuthenticationFilter.java:82)
>   at 
> org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
>   at 
> org.apache.hadoop.http.HttpServer2$QuotingInputFilter.doFilter(HttpServer2.java:1353)
>   at 
> org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
>   at org.apache.hadoop.http.NoCacheFilter.doFilter(NoCacheFilter.java:45)
>   at 
> org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
>   at org.apache.hadoop.http.NoCacheFilter.doFilter(NoCacheFilter.java:45)
>   at 
> org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
>   at 
> org.mortbay.jetty.servlet.ServletHandler.handle(ServletHandler.java:399)
>   at 
> org.mortbay.jetty.security.SecurityHandler.handle(SecurityHandler.java:216)
>   at 
> org.mortbay.jetty.servlet.SessionHandler.handle(SessionHandler.java:182)
>   at 
> org.mortbay.jetty.handler.ContextHandler.handle(ContextHandler.java:766)
>   at org.mortbay.jetty.webapp.WebAppContext.handle(WebAppContext.java:450)
>   at 
> org.mortbay.jetty.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:230)
>   at 
> 

[jira] [Commented] (FLINK-8088) Bind logical slots to their request id instead of the slot allocation id

2017-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291017#comment-16291017
 ] 

ASF GitHub Bot commented on FLINK-8088:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5089


> Bind logical slots to their request id instead of the slot allocation id
> 
>
> Key: FLINK-8088
> URL: https://issues.apache.org/jira/browse/FLINK-8088
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
>
> Since allocated slots can be reused to fulfil multiple slot requests, we 
> should bind the resulting logical slots to their slot request id instead of 
> the allocation id of the underlying allocated slot.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5128: [FLINK-8120] [flip6] Register Yarn application wit...

2017-12-14 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5128


---


[GitHub] flink pull request #5089: [FLINK-8088] Associate logical slots with the slot...

2017-12-14 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5089


---


[GitHub] flink pull request #5088: [FLINK-8087] Decouple Slot from AllocatedSlot

2017-12-14 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5088


---


[GitHub] flink pull request #5090: [FLINK-8089] Also check for other pending slot req...

2017-12-14 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5090


---


[jira] [Commented] (FLINK-8089) Fulfil slot requests with unused offered slots

2017-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291010#comment-16291010
 ] 

ASF GitHub Bot commented on FLINK-8089:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5090
  
Thanks for the review @GJL. Merging this PR.


> Fulfil slot requests with unused offered slots
> --
>
> Key: FLINK-8089
> URL: https://issues.apache.org/jira/browse/FLINK-8089
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
>
> The {{SlotPool}} adds unused offered slots to the list of available slots 
> without checking whether another pending slot request could be fulfilled with 
> this slot. This should be changed.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8087) Decouple Slot from SlotPool

2017-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291007#comment-16291007
 ] 

ASF GitHub Bot commented on FLINK-8087:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5088
  
Thanks for the review @GJL. Merging this PR.


> Decouple Slot from SlotPool
> ---
>
> Key: FLINK-8087
> URL: https://issues.apache.org/jira/browse/FLINK-8087
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
>
> In order to let the {{SlotPool}} return a a different {{LogicalSlot}} 
> implementation than {{SimpleSlot}} we should not store the {{Slot}} inside of 
> the {{SlotPool}}. Moreover, we should introduce a abstraction for the 
> {{AllocatedSlot}} which contains the information required by the 
> {{SimpleSlot}}. That way we decouple the {{SimpleSlot}} from the 
> {{AllocatedSlot}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8088) Bind logical slots to their request id instead of the slot allocation id

2017-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291009#comment-16291009
 ] 

ASF GitHub Bot commented on FLINK-8088:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5089
  
Merging this PR.


> Bind logical slots to their request id instead of the slot allocation id
> 
>
> Key: FLINK-8088
> URL: https://issues.apache.org/jira/browse/FLINK-8088
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
>
> Since allocated slots can be reused to fulfil multiple slot requests, we 
> should bind the resulting logical slots to their slot request id instead of 
> the allocation id of the underlying allocated slot.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5090: [FLINK-8089] Also check for other pending slot requests i...

2017-12-14 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5090
  
Thanks for the review @GJL. Merging this PR.


---


[GitHub] flink issue #5088: [FLINK-8087] Decouple Slot from AllocatedSlot

2017-12-14 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5088
  
Thanks for the review @GJL. Merging this PR.


---


[GitHub] flink issue #5089: [FLINK-8088] Associate logical slots with the slot reques...

2017-12-14 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5089
  
Merging this PR.


---


[jira] [Commented] (FLINK-8256) Cannot use Scala functions to filter in 1.4 - java.lang.ClassCastException

2017-12-14 Thread Ryan Brideau (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16290978#comment-16290978
 ] 

Ryan Brideau commented on FLINK-8256:
-

That works for me! Thanks again.

> Cannot use Scala functions to filter in 1.4 - java.lang.ClassCastException
> --
>
> Key: FLINK-8256
> URL: https://issues.apache.org/jira/browse/FLINK-8256
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
> Environment: macOS, Local Flink v1.4.0, Scala 2.11
>Reporter: Ryan Brideau
>
> I built the newest release locally today, but when I try to filter a stream 
> using an anonymous or named function, I get an error. Here's a simple example:
> {code:java}
> import org.apache.flink.api.java.utils.ParameterTool
> import org.apache.flink.streaming.api.scala._
> object TestFunction {
>   def main(args: Array[String]): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val params = ParameterTool.fromArgs(args)
> env.getConfig.setGlobalJobParameters(params)
> val someArray = Array(1,2,3)
> val stream = env.fromCollection(someArray).filter(_ => true)
> stream.print().setParallelism(1)
> env.execute("Testing Function")
>   }
> }
> {code}
> This results in:
> {code:java}
> Job execution switched to status FAILING.
> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot 
> instantiate user function.
> at 
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:235)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:355)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:282)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:126)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ClassCastException: cannot assign instance of 
> org.peopleinmotion.TestFunction$$anonfun$1 to field 
> org.apache.flink.streaming.api.scala.DataStream$$anon$7.cleanFun$6 of type 
> scala.Function1 in instance of 
> org.apache.flink.streaming.api.scala.DataStream$$anon$7
> at 
> java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2233)
> at 
> java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1405)
> at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2288)
> at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
> at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282)
> at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428)
> at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290)
> at 
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:248)
> at 
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:220)
> ... 6 more
> 12/13/2017 15:10:01 Job execution switched to status FAILED.
> 
>  The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: The program 
> execution failed: Job execution failed.
> at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:492)
> at 
> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
> at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:456)
> at 
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
> at 
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:638)
> at org.peopleinmotion.TestFunction$.main(TestFunction.scala:20)
> at org.peopleinmotion.TestFunction.main(TestFunction.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> 

[jira] [Commented] (FLINK-8256) Cannot use Scala functions to filter in 1.4 - java.lang.ClassCastException

2017-12-14 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16290970#comment-16290970
 ] 

Stephan Ewen commented on FLINK-8256:
-

Great to hear!

Do you think we should close the issue, with a reference to FLINK-8264?
(which should make sure this works even when the project setup uses an older 
quickstart template)

> Cannot use Scala functions to filter in 1.4 - java.lang.ClassCastException
> --
>
> Key: FLINK-8256
> URL: https://issues.apache.org/jira/browse/FLINK-8256
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
> Environment: macOS, Local Flink v1.4.0, Scala 2.11
>Reporter: Ryan Brideau
>
> I built the newest release locally today, but when I try to filter a stream 
> using an anonymous or named function, I get an error. Here's a simple example:
> {code:java}
> import org.apache.flink.api.java.utils.ParameterTool
> import org.apache.flink.streaming.api.scala._
> object TestFunction {
>   def main(args: Array[String]): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val params = ParameterTool.fromArgs(args)
> env.getConfig.setGlobalJobParameters(params)
> val someArray = Array(1,2,3)
> val stream = env.fromCollection(someArray).filter(_ => true)
> stream.print().setParallelism(1)
> env.execute("Testing Function")
>   }
> }
> {code}
> This results in:
> {code:java}
> Job execution switched to status FAILING.
> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot 
> instantiate user function.
> at 
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:235)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:355)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:282)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:126)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ClassCastException: cannot assign instance of 
> org.peopleinmotion.TestFunction$$anonfun$1 to field 
> org.apache.flink.streaming.api.scala.DataStream$$anon$7.cleanFun$6 of type 
> scala.Function1 in instance of 
> org.apache.flink.streaming.api.scala.DataStream$$anon$7
> at 
> java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2233)
> at 
> java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1405)
> at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2288)
> at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
> at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282)
> at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428)
> at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290)
> at 
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:248)
> at 
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:220)
> ... 6 more
> 12/13/2017 15:10:01 Job execution switched to status FAILED.
> 
>  The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: The program 
> execution failed: Job execution failed.
> at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:492)
> at 
> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
> at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:456)
> at 
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
> at 
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:638)
> at org.peopleinmotion.TestFunction$.main(TestFunction.scala:20)
> at 

[jira] [Commented] (FLINK-8264) Add Scala to the parent-first loading patterns

2017-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8264?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16290961#comment-16290961
 ] 

ASF GitHub Bot commented on FLINK-8264:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5166
  
@aljoscha I think you may have an opinion on this one


> Add Scala to the parent-first loading patterns
> --
>
> Key: FLINK-8264
> URL: https://issues.apache.org/jira/browse/FLINK-8264
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.4.0
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.5.0, 1.4.1
>
>
> A confusing experience happens when users accidentally package the Scala 
> Library into their jar file. The reversed class loading duplicates Scala's 
> classes, leading to exceptions like the one below.
> By adding {{scala.}} to the default 'parent-first-patterns' we can improve 
> the user experience in such situations.
> Exception Stack Trace:
> {code}
> java.lang.ClassCastException: cannot assign instance of 
> org.peopleinmotion.TestFunction$$anonfun$1 to field 
> org.apache.flink.streaming.api.scala.DataStream$$anon$7.cleanFun$6 of type 
> scala.Function1 in instance of 
> org.apache.flink.streaming.api.scala.DataStream$$anon$7
> at 
> java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2233)
> at 
> java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1405)
> at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2288)
> at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
> at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282)
> at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428)
> at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290)
> at 
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:248)
> at 
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:220)
> ... 6 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5166: [FLINK-8264] [core] Add 'scala.' to the 'parent-first' cl...

2017-12-14 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5166
  
@aljoscha I think you may have an opinion on this one


---


[jira] [Commented] (FLINK-7956) Add support for scheduling with slot sharing

2017-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16290867#comment-16290867
 ] 

ASF GitHub Bot commented on FLINK-7956:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r156948978
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
 ---
@@ -0,0 +1,722 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.AbstractCollection;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Manager which is responsible for slot sharing. Slot sharing allows to 
run different
+ * tasks in the same slot and to realize co-location constraints.
+ *
+ * The SlotSharingManager allows to create a hierarchy of {@link 
TaskSlot} such that
+ * every {@link TaskSlot} is uniquely identified by a {@link 
SlotRequestId} identifying
+ * the request for the TaskSlot and a {@link AbstractID} identifying the 
task or the
+ * co-location constraint running in this slot.
+ *
+ * The {@link TaskSlot} hierarchy is implemented by {@link 
MultiTaskSlot} and
+ * {@link SingleTaskSlot}. The former class represents inner nodes which 
can contain
+ * a number of other {@link TaskSlot} and the latter class represents the 
leave nodes.
+ * The hierarchy starts with a root {@link MultiTaskSlot} which is a future
+ * {@link SlotContext} assigned. The {@link SlotContext} represents the 
allocated slot
+ * on the TaskExecutor in which all slots of this hierarchy run. A {@link 
MultiTaskSlot}
+ * can be assigned multiple {@link SingleTaskSlot} or {@link 
MultiTaskSlot} if and only if
+ * the task slot does not yet contain another child with the same {@link 
AbstractID} identifying
+ * the actual task or the co-location constraint.
+ *
+ * Normal slot sharing is represented by a root {@link MultiTaskSlot} 
which contains a set
+ * of {@link SingleTaskSlot} on the second layer. Each {@link 
SingleTaskSlot} represents a different
+ * task.
+ *
+ * Co-location constraints are modeled by adding a {@link 
MultiTaskSlot} to the root node. The co-location
+ * constraint is uniquely identified by a {@link AbstractID} such that we 
cannot add a second co-located
+ * {@link MultiTaskSlot} to the same root node. Now all co-located tasks 
will be added to co-located
+ * multi task slot.
+ */
+public class SlotSharingManager {
+
+   private final SlotSharingGroupId slotSharingGroupId;
+
+   // needed to release allocated slots after a complete multi task slot 
hierarchy has been released
+   private final AllocatedSlotActions allocatedSlotActions;
+
+   // owner of the slots to which to return them when they are released 
from the outside
+   private final SlotOwner slotOwner;
+
+   private final Map allTaskSlots;
+
+   // Root nodes which have not been completed because 

[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-14 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r156948978
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
 ---
@@ -0,0 +1,722 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.AbstractCollection;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Manager which is responsible for slot sharing. Slot sharing allows to 
run different
+ * tasks in the same slot and to realize co-location constraints.
+ *
+ * The SlotSharingManager allows to create a hierarchy of {@link 
TaskSlot} such that
+ * every {@link TaskSlot} is uniquely identified by a {@link 
SlotRequestId} identifying
+ * the request for the TaskSlot and a {@link AbstractID} identifying the 
task or the
+ * co-location constraint running in this slot.
+ *
+ * The {@link TaskSlot} hierarchy is implemented by {@link 
MultiTaskSlot} and
+ * {@link SingleTaskSlot}. The former class represents inner nodes which 
can contain
+ * a number of other {@link TaskSlot} and the latter class represents the 
leave nodes.
+ * The hierarchy starts with a root {@link MultiTaskSlot} which is a future
+ * {@link SlotContext} assigned. The {@link SlotContext} represents the 
allocated slot
+ * on the TaskExecutor in which all slots of this hierarchy run. A {@link 
MultiTaskSlot}
+ * can be assigned multiple {@link SingleTaskSlot} or {@link 
MultiTaskSlot} if and only if
+ * the task slot does not yet contain another child with the same {@link 
AbstractID} identifying
+ * the actual task or the co-location constraint.
+ *
+ * Normal slot sharing is represented by a root {@link MultiTaskSlot} 
which contains a set
+ * of {@link SingleTaskSlot} on the second layer. Each {@link 
SingleTaskSlot} represents a different
+ * task.
+ *
+ * Co-location constraints are modeled by adding a {@link 
MultiTaskSlot} to the root node. The co-location
+ * constraint is uniquely identified by a {@link AbstractID} such that we 
cannot add a second co-located
+ * {@link MultiTaskSlot} to the same root node. Now all co-located tasks 
will be added to co-located
+ * multi task slot.
+ */
+public class SlotSharingManager {
+
+   private final SlotSharingGroupId slotSharingGroupId;
+
+   // needed to release allocated slots after a complete multi task slot 
hierarchy has been released
+   private final AllocatedSlotActions allocatedSlotActions;
+
+   // owner of the slots to which to return them when they are released 
from the outside
+   private final SlotOwner slotOwner;
+
+   private final Map allTaskSlots;
+
+   // Root nodes which have not been completed because the allocated slot 
is still pending
+   private final Map unresolvedRootSlots;
+
+   // Root nodes which have been completed (the underlying allocated slot 
has been assigned)
+   private final 

[jira] [Comment Edited] (FLINK-8256) Cannot use Scala functions to filter in 1.4 - java.lang.ClassCastException

2017-12-14 Thread Ryan Brideau (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16290846#comment-16290846
 ] 

Ryan Brideau edited comment on FLINK-8256 at 12/14/17 1:45 PM:
---

Thanks for looking into this so quickly. I managed to track down the root of 
the issue on my end. I had built my project previously using the snapshot 
archetype, and not the newest 1.4.0 one:

{code:java}
mvn archetype:generate   \
  -DarchetypeGroupId=org.apache.flink  \
  -DarchetypeArtifactId=flink-quickstart-scala \
  -DarchetypeVersion=1.4-SNAPSHOT
{code}

To fix the problem I just built a new empty project using the 1.4.0 archetype 
version and did a diff of the pom.xml of the two, updating my existing one to 
match the new one, and now everything works perfectly. I suspect that anybody 
who made a project recently might find themselves in the same situation.


was (Author: brideau):
Thanks for looking into this so quickly. I managed to track down the root of 
the issue on my end. I had built my project previously using the snapshot 
archetype, and not the newest 1.4.0 one:

{code:java}
mvn archetype:generate   \
  -DarchetypeGroupId=org.apache.flink  \
  -DarchetypeArtifactId=flink-quickstart-scala \
  -DarchetypeVersion=1.4-SNAPSHOT
{code}

To fix the problem I just build a new empty project using the 1.4.0 archetype 
version and did a diff of the pom.xml of the two, updating my existing one to 
match the new one, and now everything works perfectly. I suspect that anybody 
who made a project recently might find themselves in the same situation.

> Cannot use Scala functions to filter in 1.4 - java.lang.ClassCastException
> --
>
> Key: FLINK-8256
> URL: https://issues.apache.org/jira/browse/FLINK-8256
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
> Environment: macOS, Local Flink v1.4.0, Scala 2.11
>Reporter: Ryan Brideau
>
> I built the newest release locally today, but when I try to filter a stream 
> using an anonymous or named function, I get an error. Here's a simple example:
> {code:java}
> import org.apache.flink.api.java.utils.ParameterTool
> import org.apache.flink.streaming.api.scala._
> object TestFunction {
>   def main(args: Array[String]): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val params = ParameterTool.fromArgs(args)
> env.getConfig.setGlobalJobParameters(params)
> val someArray = Array(1,2,3)
> val stream = env.fromCollection(someArray).filter(_ => true)
> stream.print().setParallelism(1)
> env.execute("Testing Function")
>   }
> }
> {code}
> This results in:
> {code:java}
> Job execution switched to status FAILING.
> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot 
> instantiate user function.
> at 
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:235)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:355)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:282)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:126)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ClassCastException: cannot assign instance of 
> org.peopleinmotion.TestFunction$$anonfun$1 to field 
> org.apache.flink.streaming.api.scala.DataStream$$anon$7.cleanFun$6 of type 
> scala.Function1 in instance of 
> org.apache.flink.streaming.api.scala.DataStream$$anon$7
> at 
> java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2233)
> at 
> java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1405)
> at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2288)
> at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
> at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282)
> at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
> at 

[jira] [Commented] (FLINK-8256) Cannot use Scala functions to filter in 1.4 - java.lang.ClassCastException

2017-12-14 Thread Ryan Brideau (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16290846#comment-16290846
 ] 

Ryan Brideau commented on FLINK-8256:
-

Thanks for looking into this so quickly. I managed to track down the root of 
the issue on my end. I had built my project previously using the snapshot 
archetype, and not the newest 1.4.0 one:

{code:java}
mvn archetype:generate   \
  -DarchetypeGroupId=org.apache.flink  \
  -DarchetypeArtifactId=flink-quickstart-scala \
  -DarchetypeVersion=1.4-SNAPSHOT
{code}

To fix the problem I just build a new empty project using the 1.4.0 archetype 
version and did a diff of the pom.xml of the two, updating my existing one to 
match the new one, and now everything works perfectly. I suspect that anybody 
who made a project recently might find themselves in the same situation.

> Cannot use Scala functions to filter in 1.4 - java.lang.ClassCastException
> --
>
> Key: FLINK-8256
> URL: https://issues.apache.org/jira/browse/FLINK-8256
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
> Environment: macOS, Local Flink v1.4.0, Scala 2.11
>Reporter: Ryan Brideau
>
> I built the newest release locally today, but when I try to filter a stream 
> using an anonymous or named function, I get an error. Here's a simple example:
> {code:java}
> import org.apache.flink.api.java.utils.ParameterTool
> import org.apache.flink.streaming.api.scala._
> object TestFunction {
>   def main(args: Array[String]): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val params = ParameterTool.fromArgs(args)
> env.getConfig.setGlobalJobParameters(params)
> val someArray = Array(1,2,3)
> val stream = env.fromCollection(someArray).filter(_ => true)
> stream.print().setParallelism(1)
> env.execute("Testing Function")
>   }
> }
> {code}
> This results in:
> {code:java}
> Job execution switched to status FAILING.
> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot 
> instantiate user function.
> at 
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:235)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:355)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:282)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:126)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ClassCastException: cannot assign instance of 
> org.peopleinmotion.TestFunction$$anonfun$1 to field 
> org.apache.flink.streaming.api.scala.DataStream$$anon$7.cleanFun$6 of type 
> scala.Function1 in instance of 
> org.apache.flink.streaming.api.scala.DataStream$$anon$7
> at 
> java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2233)
> at 
> java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1405)
> at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2288)
> at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
> at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282)
> at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428)
> at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290)
> at 
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:248)
> at 
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:220)
> ... 6 more
> 12/13/2017 15:10:01 Job execution switched to status FAILED.
> 
>  The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: The program 
> execution failed: Job execution failed.
> at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:492)
> at 
> 

[jira] [Commented] (FLINK-8200) RocksDBAsyncSnapshotTest should use temp fold instead of fold with fixed name

2017-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16290830#comment-16290830
 ] 

ASF GitHub Bot commented on FLINK-8200:
---

Github user wenlong88 closed the pull request at:

https://github.com/apache/flink/pull/5122


> RocksDBAsyncSnapshotTest should use temp fold instead of fold with fixed name
> -
>
> Key: FLINK-8200
> URL: https://issues.apache.org/jira/browse/FLINK-8200
> Project: Flink
>  Issue Type: Bug
>Reporter: Wenlong Lyu
>Assignee: Wenlong Lyu
>
> The following case failed when different user run the test in the same 
> machine.
> Tests run: 4, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 3.226 sec <<< 
> FAILURE! - in 
> org.apache.flink.contrib.streaming.state.RocksDBAsyncSnapshotTest
> testCleanupOfSnapshotsInFailureCase(org.apache.flink.contrib.streaming.state.RocksDBAsyncSnapshotTest)
>   Time elapsed: 0.023 sec  <<< ERROR!
> java.io.IOException: No local storage directories available. Local DB files 
> directory 'file:/tmp/foobar' does not exist and cannot be created.
>     at 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.lazyInitializeForJob(RocksDBStateBackend.java:251)
>     at 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:300)
>     at 
> org.apache.flink.contrib.streaming.state.RocksDBAsyncSnapshotTest.testCleanupOfSnapshotsInFailureCase(RocksDBAsyncSnapshotTest.java:338)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5122: [FLINK-8200] RocksDBAsyncSnapshotTest should use t...

2017-12-14 Thread wenlong88
Github user wenlong88 closed the pull request at:

https://github.com/apache/flink/pull/5122


---


[jira] [Closed] (FLINK-8262) IndividualRestartsConcurrencyTest.testLocalFailureFailsPendingCheckpoints fails on Travis

2017-12-14 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8262?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-8262.

Resolution: Fixed

Fixed via e80dd8ea3fef0398048a40c3ffd5136bef204b80

> IndividualRestartsConcurrencyTest.testLocalFailureFailsPendingCheckpoints 
> fails on Travis
> -
>
> Key: FLINK-8262
> URL: https://issues.apache.org/jira/browse/FLINK-8262
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.5.0
>
>
> The 
> {{IndividualRestartsConcurrencyTest.testLocalFailureFailsPendingCheckpoints}} 
> fails on Travis. The reason is a concurrent restart attempt which fails and 
> thus discards all pending checkpoints.
> https://travis-ci.org/tillrohrmann/flink/jobs/316300683



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8264) Add Scala to the parent-first loading patterns

2017-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8264?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16290810#comment-16290810
 ] 

ASF GitHub Bot commented on FLINK-8264:
---

GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/5167

[FLINK-8264] [core] Add 'scala.' to the 'parent-first' classloading patterns

**BACKPORT of #5166 to release-1.4**

## What is the purpose of the change

Adding `scala.` to the "parent-first-patterns" makes sure that Scala 
classes are not duplicated through "child-first" classloading when users 
accidentally package the Scala Library into the application jar.

Since Scala classes traverse the boundary between core and user space, they 
should never be duplicated.

## Brief change log

  - Adds `scala.` to the default value of 
`classloader.parent-first-patterns`.

## Verifying this change

This change can be verified as follows:
  - Create a very simple quickstart Scala project using a Scala lambda for 
a filter function (`_ => true`).
  - Package it such that the Scala library is in the user code jar
  - Without the fix, you get a weird class cast exception during 
deserialization, with this fix, everything is fine.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no)**
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink 8264_backport

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5167.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5167


commit 8bd70c1e3d31f5c285ac5995504e52e39063e90b
Author: Stephan Ewen 
Date:   2017-12-14T12:50:39Z

[FLINK-8264] [core] Add 'scala.' to the 'parent-first' classloading 
patterns.




> Add Scala to the parent-first loading patterns
> --
>
> Key: FLINK-8264
> URL: https://issues.apache.org/jira/browse/FLINK-8264
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.4.0
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.5.0, 1.4.1
>
>
> A confusing experience happens when users accidentally package the Scala 
> Library into their jar file. The reversed class loading duplicates Scala's 
> classes, leading to exceptions like the one below.
> By adding {{scala.}} to the default 'parent-first-patterns' we can improve 
> the user experience in such situations.
> Exception Stack Trace:
> {code}
> java.lang.ClassCastException: cannot assign instance of 
> org.peopleinmotion.TestFunction$$anonfun$1 to field 
> org.apache.flink.streaming.api.scala.DataStream$$anon$7.cleanFun$6 of type 
> scala.Function1 in instance of 
> org.apache.flink.streaming.api.scala.DataStream$$anon$7
> at 
> java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2233)
> at 
> java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1405)
> at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2288)
> at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
> at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282)
> at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428)
> at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290)
> at 
> 

[GitHub] flink pull request #5167: [FLINK-8264] [core] Add 'scala.' to the 'parent-fi...

2017-12-14 Thread StephanEwen
GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/5167

[FLINK-8264] [core] Add 'scala.' to the 'parent-first' classloading patterns

**BACKPORT of #5166 to release-1.4**

## What is the purpose of the change

Adding `scala.` to the "parent-first-patterns" makes sure that Scala 
classes are not duplicated through "child-first" classloading when users 
accidentally package the Scala Library into the application jar.

Since Scala classes traverse the boundary between core and user space, they 
should never be duplicated.

## Brief change log

  - Adds `scala.` to the default value of 
`classloader.parent-first-patterns`.

## Verifying this change

This change can be verified as follows:
  - Create a very simple quickstart Scala project using a Scala lambda for 
a filter function (`_ => true`).
  - Package it such that the Scala library is in the user code jar
  - Without the fix, you get a weird class cast exception during 
deserialization, with this fix, everything is fine.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no)**
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink 8264_backport

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5167.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5167


commit 8bd70c1e3d31f5c285ac5995504e52e39063e90b
Author: Stephan Ewen 
Date:   2017-12-14T12:50:39Z

[FLINK-8264] [core] Add 'scala.' to the 'parent-first' classloading 
patterns.




---


  1   2   >