[jira] [Created] (FLINK-9717) Flush state of one side of the join if other side is bounded

2018-07-03 Thread Piotr Nowojski (JIRA)
Piotr Nowojski created FLINK-9717:
-

 Summary: Flush state of one side of the join if other side is 
bounded
 Key: FLINK-9717
 URL: https://issues.apache.org/jira/browse/FLINK-9717
 Project: Flink
  Issue Type: Sub-task
  Components: Table API & SQL
Affects Versions: 1.5.0
Reporter: Piotr Nowojski


Whenever one side of join receives {{MAX_WATERMARK}}, other side in joins (both 
normal and versioned joins) could flush the state from other side.

This highly useful optimisation that would speed up versioned joins and would 
allow normal joins of large unbounded streams with bounded tables (for example 
some static data).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9712) Support enrichment joins in Flink SQL/Table API

2018-07-03 Thread Piotr Nowojski (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski reassigned FLINK-9712:
-

Assignee: Piotr Nowojski

> Support enrichment joins in Flink SQL/Table API
> ---
>
> Key: FLINK-9712
> URL: https://issues.apache.org/jira/browse/FLINK-9712
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>
> As described here:
> https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9713) Support versioned joins in planning phase

2018-07-03 Thread Piotr Nowojski (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9713?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski reassigned FLINK-9713:
-

Assignee: Piotr Nowojski

> Support versioned joins in planning phase
> -
>
> Key: FLINK-9713
> URL: https://issues.apache.org/jira/browse/FLINK-9713
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>
> Queries like:
> {code:java}
> SELECT 
>   o.amount * r.rate 
> FROM 
>   Orders AS o, 
>   LATERAL TABLE (Rates(o.rowtime)) AS r 
> WHERE o.currency = r.currency{code}
> should evaluate to valid plan with versioned joins plan node.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6140: [FLINK-9554] flink scala shell doesn't work in yar...

2018-07-03 Thread zjffdu
Github user zjffdu commented on a diff in the pull request:

https://github.com/apache/flink/pull/6140#discussion_r199750956
  
--- Diff: 
flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala ---
@@ -255,14 +257,25 @@ object FlinkShell {
 yarnConfig.queue.foreach((queue) => args ++= Seq("-yqu", 
queue.toString))
 yarnConfig.slots.foreach((slots) => args ++= Seq("-ys", 
slots.toString))
 
+val customCommandLines = CliFrontend.loadCustomCommandLines(
+  configuration,configurationDirectory)
+val commandOptions = CliFrontendParser.getRunCommandOptions
+val customCommandLineOptions = new Options()
+customCommandLines.asScala.foreach(cmd => {
--- End diff --

@zentol  I have fixed that, but I have to make `customCommandLineOptions` 
of `CliFrontend` to be static (a little ugly)


---


[GitHub] flink issue #6242: [FLINK-9711][CLI] Filter only RUNNING jobs when --running...

2018-07-03 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6242
  
-1

The PR description is outdated, the CLI only lists jobs in non-terminal 
states as running, i.e. CANCELED, FAILED and FINISHED jobs are not included. 
For all intents and purposes this is the correct behavior, as from a 
user-perspective a job that is currently restarting is effectively still 
running.

Finally, this change would list `SCHEDULED` jobs as terminated, which 
doesn't make any sense.


---


[jira] [Commented] (FLINK-9711) Flink CLI --running option does not show RUNNING only jobs

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531142#comment-16531142
 ] 

ASF GitHub Bot commented on FLINK-9711:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6242
  
-1

The PR description is outdated, the CLI only lists jobs in non-terminal 
states as running, i.e. CANCELED, FAILED and FINISHED jobs are not included. 
For all intents and purposes this is the correct behavior, as from a 
user-perspective a job that is currently restarting is effectively still 
running.

Finally, this change would list `SCHEDULED` jobs as terminated, which 
doesn't make any sense.


> Flink CLI --running option does not show RUNNING only jobs
> --
>
> Key: FLINK-9711
> URL: https://issues.apache.org/jira/browse/FLINK-9711
> Project: Flink
>  Issue Type: Bug
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Sayat Satybaldiyev
>Priority: Major
>
> In Flink CLI there's a command list with option --running that according to 
> descriptions "Show only running programs and their JobIDs". However, in 
> practice, it also shows jobs that are in the *CANCELED* state, which is a 
> completed job.
>  
> {code:java}
> flink list --running -m job-manager:8081 
> Waiting for response...
> -- Running/Restarting Jobs ---
> 03.07.2018 10:29:34 : 6e49027e843ced2ad798da549004243e : Enriched 
> TrackingClick (CANCELED)
> 03.07.2018 10:42:31 : c901ae58787ba6aea4a46d6bb9dc2b3c : Enriched 
> TrackingClick (CANCELED)
> 03.07.2018 11:27:51 : 83ab149ad528cfd956da7090543cbc72 : Enriched 
> TrackingClick (RUNNING)
> --
> {code}
>  
> Proposal it to extend CLI program to show jobs only in the *RUNNING* state. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9554) flink scala shell doesn't work in yarn mode

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9554?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531138#comment-16531138
 ] 

ASF GitHub Bot commented on FLINK-9554:
---

Github user zjffdu commented on a diff in the pull request:

https://github.com/apache/flink/pull/6140#discussion_r199750956
  
--- Diff: 
flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala ---
@@ -255,14 +257,25 @@ object FlinkShell {
 yarnConfig.queue.foreach((queue) => args ++= Seq("-yqu", 
queue.toString))
 yarnConfig.slots.foreach((slots) => args ++= Seq("-ys", 
slots.toString))
 
+val customCommandLines = CliFrontend.loadCustomCommandLines(
+  configuration,configurationDirectory)
+val commandOptions = CliFrontendParser.getRunCommandOptions
+val customCommandLineOptions = new Options()
+customCommandLines.asScala.foreach(cmd => {
--- End diff --

@zentol  I have fixed that, but I have to make `customCommandLineOptions` 
of `CliFrontend` to be static (a little ugly)


> flink scala shell doesn't work in yarn mode
> ---
>
> Key: FLINK-9554
> URL: https://issues.apache.org/jira/browse/FLINK-9554
> Project: Flink
>  Issue Type: Bug
>  Components: Scala Shell
>Affects Versions: 1.5.0
>Reporter: Jeff Zhang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.5.1
>
>
> It still try to use StandaloneCluster even I specify it using yarn mode.
>  
> Command I Use: bin/start-scala-shell.sh yarn -n 1
>  
> {code:java}
> Starting Flink Shell:
> 2018-06-06 12:30:02,672 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: jobmanager.rpc.address, localhost
> 2018-06-06 12:30:02,673 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: jobmanager.rpc.port, 6123
> 2018-06-06 12:30:02,674 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: jobmanager.heap.mb, 1024
> 2018-06-06 12:30:02,674 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: taskmanager.heap.mb, 1024
> 2018-06-06 12:30:02,674 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: taskmanager.numberOfTaskSlots, 1
> 2018-06-06 12:30:02,674 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: parallelism.default, 1
> 2018-06-06 12:30:02,675 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: rest.port, 8081
> Exception in thread "main" java.lang.UnsupportedOperationException: Can't 
> deploy a standalone cluster.
> at 
> org.apache.flink.client.deployment.StandaloneClusterDescriptor.deploySessionCluster(StandaloneClusterDescriptor.java:57)
> at 
> org.apache.flink.client.deployment.StandaloneClusterDescriptor.deploySessionCluster(StandaloneClusterDescriptor.java:31)
> at 
> org.apache.flink.api.scala.FlinkShell$.deployNewYarnCluster(FlinkShell.scala:272)
> at 
> org.apache.flink.api.scala.FlinkShell$.fetchConnectionInfo(FlinkShell.scala:164)
> at org.apache.flink.api.scala.FlinkShell$.liftedTree1$1(FlinkShell.scala:194)
> at org.apache.flink.api.scala.FlinkShell$.startShell(FlinkShell.scala:193)
> at org.apache.flink.api.scala.FlinkShell$.main(FlinkShell.scala:135)
> at org.apache.flink.api.scala.FlinkShell.main(FlinkShell.scala){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9674) Remove 65s sleep in QueryableState E2E test

2018-07-03 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-9674.
---
   Resolution: Fixed
Fix Version/s: 1.6.0

master: 41277f6b7447f3542e439b7ae82b99be862df7c2

> Remove 65s sleep in QueryableState E2E test
> ---
>
> Key: FLINK-9674
> URL: https://issues.apache.org/jira/browse/FLINK-9674
> Project: Flink
>  Issue Type: Improvement
>  Components: Queryable State, Tests
>Affects Versions: 1.6.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> The {{test_queryable_state_restart_tm.sh}} kills a taskmanager, waits for the 
> loss to be noticed, starts a new tm and waits for the job to continue.
> {code}
> kill_random_taskmanager
> [...]
> sleep 65 # this is a little longer than the heartbeat timeout so that the TM 
> is gone
> start_and_wait_for_tm
> {code}
> Instead of waiting for a fixed amount of time that is tied to some config 
> value we should wait for a specific event, like the job being canceled.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9674) Remove 65s sleep in QueryableState E2E test

2018-07-03 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-9674:

Affects Version/s: (was: 1.5.0)

> Remove 65s sleep in QueryableState E2E test
> ---
>
> Key: FLINK-9674
> URL: https://issues.apache.org/jira/browse/FLINK-9674
> Project: Flink
>  Issue Type: Improvement
>  Components: Queryable State, Tests
>Affects Versions: 1.6.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> The {{test_queryable_state_restart_tm.sh}} kills a taskmanager, waits for the 
> loss to be noticed, starts a new tm and waits for the job to continue.
> {code}
> kill_random_taskmanager
> [...]
> sleep 65 # this is a little longer than the heartbeat timeout so that the TM 
> is gone
> start_and_wait_for_tm
> {code}
> Instead of waiting for a fixed amount of time that is tied to some config 
> value we should wait for a specific event, like the job being canceled.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5899: Klink

2018-07-03 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5899


---


[GitHub] flink pull request #5297: [FLINK-8434] Take over the running task manager af...

2018-07-03 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5297


---


[GitHub] flink pull request #6025: Release 1.4

2018-07-03 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6025


---


[GitHub] flink pull request #6216: [FLINK-9674][tests] Replace hard-coded sleeps in Q...

2018-07-03 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6216


---


[GitHub] flink pull request #5901: [FLINK-9235][Security] Add integration tests for Y...

2018-07-03 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5901


---


[GitHub] flink pull request #6211: [FLINK-9665] PrometheusReporter does not properly ...

2018-07-03 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6211


---


[GitHub] flink pull request #5888: [FLINK-9194] [histroyserver] Finished jobs are not...

2018-07-03 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5888


---


[jira] [Updated] (FLINK-9235) Add Integration test for Flink-Yarn-Kerberos integration for flip-6

2018-07-03 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9235?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-9235:
--
Labels: pull-request-available  (was: )

> Add Integration test for Flink-Yarn-Kerberos integration for flip-6
> ---
>
> Key: FLINK-9235
> URL: https://issues.apache.org/jira/browse/FLINK-9235
> Project: Flink
>  Issue Type: Test
>Affects Versions: 1.5.0
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> We need to provide an integration test for flip-6 similar to 
> YARNSessionFIFOSecuredITCase for the legacy deployment mode.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9665) PrometheusReporter does not properly unregister metrics

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9665?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531147#comment-16531147
 ] 

ASF GitHub Bot commented on FLINK-9665:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6211


> PrometheusReporter does not properly unregister metrics
> ---
>
> Key: FLINK-9665
> URL: https://issues.apache.org/jira/browse/FLINK-9665
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.5.0, 1.4.2, 1.6.0
>Reporter: Chesnay Schepler
>Assignee: Jelmer Kuperus
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.4.3, 1.5.1
>
>
> The {{PrometheusReporter}} groups metrics with the same logical scope in a 
> single {{Collector}} which are periodically polled by Prometheus.
> New metrics are added to an existing collector, and a reference count is 
> maintained so we can eventually cleanup the {{Collector}} itself.
> For removed metrics we decrease the reference count, do not however remove 
> the metrics that were added. As a result the collector will continue to 
> expose metrics, as long as at least 1 metric exists with the same logical 
> scope.
> If the collector is a {{io.prometheus.client.Gauge}} we can use the 
> {{#remove()}} method. For histograms we will have to modify our 
> {{HistogramSummaryProxy}} class to allow removing individual histograms.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9194) Finished jobs are not archived to HistoryServer

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531148#comment-16531148
 ] 

ASF GitHub Bot commented on FLINK-9194:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5888


> Finished jobs are not archived to HistoryServer
> ---
>
> Key: FLINK-9194
> URL: https://issues.apache.org/jira/browse/FLINK-9194
> Project: Flink
>  Issue Type: Bug
>  Components: History Server, JobManager
>Affects Versions: 1.5.0
> Environment: Flink 2af481a
>Reporter: Gary Yao
>Assignee: Chesnay Schepler
>Priority: Blocker
>  Labels: flip-6, pull-request-available
> Fix For: 1.5.0
>
>
> In flip6 mode, jobs are not archived to the HistoryServer. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9674) Remove 65s sleep in QueryableState E2E test

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531149#comment-16531149
 ] 

ASF GitHub Bot commented on FLINK-9674:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6216


> Remove 65s sleep in QueryableState E2E test
> ---
>
> Key: FLINK-9674
> URL: https://issues.apache.org/jira/browse/FLINK-9674
> Project: Flink
>  Issue Type: Improvement
>  Components: Queryable State, Tests
>Affects Versions: 1.6.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> The {{test_queryable_state_restart_tm.sh}} kills a taskmanager, waits for the 
> loss to be noticed, starts a new tm and waits for the job to continue.
> {code}
> kill_random_taskmanager
> [...]
> sleep 65 # this is a little longer than the heartbeat timeout so that the TM 
> is gone
> start_and_wait_for_tm
> {code}
> Instead of waiting for a fixed amount of time that is tied to some config 
> value we should wait for a specific event, like the job being canceled.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9235) Add Integration test for Flink-Yarn-Kerberos integration for flip-6

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9235?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531150#comment-16531150
 ] 

ASF GitHub Bot commented on FLINK-9235:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5901


> Add Integration test for Flink-Yarn-Kerberos integration for flip-6
> ---
>
> Key: FLINK-9235
> URL: https://issues.apache.org/jira/browse/FLINK-9235
> Project: Flink
>  Issue Type: Test
>Affects Versions: 1.5.0
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> We need to provide an integration test for flip-6 similar to 
> YARNSessionFIFOSecuredITCase for the legacy deployment mode.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9194) Finished jobs are not archived to HistoryServer

2018-07-03 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9194?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-9194:
--
Labels: flip-6 pull-request-available  (was: flip-6)

> Finished jobs are not archived to HistoryServer
> ---
>
> Key: FLINK-9194
> URL: https://issues.apache.org/jira/browse/FLINK-9194
> Project: Flink
>  Issue Type: Bug
>  Components: History Server, JobManager
>Affects Versions: 1.5.0
> Environment: Flink 2af481a
>Reporter: Gary Yao
>Assignee: Chesnay Schepler
>Priority: Blocker
>  Labels: flip-6, pull-request-available
> Fix For: 1.5.0
>
>
> In flip6 mode, jobs are not archived to the HistoryServer. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8434) The new yarn resource manager should take over the running task managers after failover

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531146#comment-16531146
 ] 

ASF GitHub Bot commented on FLINK-8434:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5297


> The new yarn resource manager should take over the running task managers 
> after failover
> ---
>
> Key: FLINK-8434
> URL: https://issues.apache.org/jira/browse/FLINK-8434
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Affects Versions: 1.5.0
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The app master which container the job master and yarn resource manager may 
> failover during running on yarn. The new resource manager should take over 
> the running task managers after started. But now the YarnResourceManager does 
> not record the running container to workerNodeMap, so when task managers 
> register to it, it will reject them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6140: [FLINK-9554] flink scala shell doesn't work in yar...

2018-07-03 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6140#discussion_r199752165
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java ---
@@ -166,6 +166,10 @@ public Configuration getConfiguration() {
return copiedConfiguration;
}
 
+   public static Options getCustomCommandLineOptions() {
--- End diff --

so.why does this getter have to be static?


---


[jira] [Commented] (FLINK-9554) flink scala shell doesn't work in yarn mode

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9554?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531151#comment-16531151
 ] 

ASF GitHub Bot commented on FLINK-9554:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6140#discussion_r199752165
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java ---
@@ -166,6 +166,10 @@ public Configuration getConfiguration() {
return copiedConfiguration;
}
 
+   public static Options getCustomCommandLineOptions() {
--- End diff --

so.why does this getter have to be static?


> flink scala shell doesn't work in yarn mode
> ---
>
> Key: FLINK-9554
> URL: https://issues.apache.org/jira/browse/FLINK-9554
> Project: Flink
>  Issue Type: Bug
>  Components: Scala Shell
>Affects Versions: 1.5.0
>Reporter: Jeff Zhang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.5.1
>
>
> It still try to use StandaloneCluster even I specify it using yarn mode.
>  
> Command I Use: bin/start-scala-shell.sh yarn -n 1
>  
> {code:java}
> Starting Flink Shell:
> 2018-06-06 12:30:02,672 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: jobmanager.rpc.address, localhost
> 2018-06-06 12:30:02,673 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: jobmanager.rpc.port, 6123
> 2018-06-06 12:30:02,674 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: jobmanager.heap.mb, 1024
> 2018-06-06 12:30:02,674 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: taskmanager.heap.mb, 1024
> 2018-06-06 12:30:02,674 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: taskmanager.numberOfTaskSlots, 1
> 2018-06-06 12:30:02,674 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: parallelism.default, 1
> 2018-06-06 12:30:02,675 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: rest.port, 8081
> Exception in thread "main" java.lang.UnsupportedOperationException: Can't 
> deploy a standalone cluster.
> at 
> org.apache.flink.client.deployment.StandaloneClusterDescriptor.deploySessionCluster(StandaloneClusterDescriptor.java:57)
> at 
> org.apache.flink.client.deployment.StandaloneClusterDescriptor.deploySessionCluster(StandaloneClusterDescriptor.java:31)
> at 
> org.apache.flink.api.scala.FlinkShell$.deployNewYarnCluster(FlinkShell.scala:272)
> at 
> org.apache.flink.api.scala.FlinkShell$.fetchConnectionInfo(FlinkShell.scala:164)
> at org.apache.flink.api.scala.FlinkShell$.liftedTree1$1(FlinkShell.scala:194)
> at org.apache.flink.api.scala.FlinkShell$.startShell(FlinkShell.scala:193)
> at org.apache.flink.api.scala.FlinkShell$.main(FlinkShell.scala:135)
> at org.apache.flink.api.scala.FlinkShell.main(FlinkShell.scala){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9707) LocalFileSystem does not support concurrent directory creations

2018-07-03 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9707?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-9707:
-
Description: 
The {{LocalFileSystem}} does not support concurrent directory creations. The 
consequence is that file system operations fail.

I think the culprit is the following line: 
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java#L257

  was:The {{LocalFileSystem}} does not support concurrent directory creations. 
The consequence is that file system operations fail.


> LocalFileSystem does not support concurrent directory creations
> ---
>
> Key: FLINK-9707
> URL: https://issues.apache.org/jira/browse/FLINK-9707
> Project: Flink
>  Issue Type: Improvement
>  Components: FileSystem
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Blocker
> Fix For: 1.6.0, 1.5.1
>
>
> The {{LocalFileSystem}} does not support concurrent directory creations. The 
> consequence is that file system operations fail.
> I think the culprit is the following line: 
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java#L257



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6196: [FLINK-9513] Implement TTL state wrappers factory and ser...

2018-07-03 Thread azagrebin
Github user azagrebin commented on the issue:

https://github.com/apache/flink/pull/6196
  
@StefanRRichter 
I added more precomputed fields to `CompositeSerializer` constructor and 
included `TtlStateFactory` in TTL tests.


---


[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531159#comment-16531159
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user azagrebin commented on the issue:

https://github.com/apache/flink/pull/6196
  
@StefanRRichter 
I added more precomputed fields to `CompositeSerializer` constructor and 
included `TtlStateFactory` in TTL tests.


> Wrap state binder with TTL logic
> 
>
> Key: FLINK-9513
> URL: https://issues.apache.org/jira/browse/FLINK-9513
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> The main idea is to wrap user state value with a class holding the value and 
> the expiration timestamp (maybe meta data in future) and use the new object 
> as a value in the existing implementations:
> {code:java}
> class TtlValue {
>   V value;
>   long expirationTimestamp;
> }
> {code}
> The original state binder factory is wrapped with TtlStateBinder if TTL is 
> enabled:
> {code:java}
> state = ttlConfig.updateType == DISABLED ?
>  bind(binder) : bind(new TtlStateBinder(binder, timerService));
> {code}
> TtlStateBinder decorates the states produced by the original binder with TTL 
> logic wrappers and adds TtlValue serialisation logic:
> {code:java}
> TtlStateBinder {
> StateBinder binder;
> ProcessingTimeProvier timeProvider; // System.currentTimeMillis()
>  TtlValueState createValueState(valueDesc) {
>  serializer = new TtlValueSerializer(valueDesc.getSerializer);
>  ttlValueDesc = new ValueDesc(serializer, ...);
>  // or implement custom TypeInfo
>  originalStateWithTtl = binder.createValueState(valueDesc);
>      return new TtlValueState(originalStateWithTtl, timeProvider);
> }
>   // List, Map, ...
> }
> {code}
> TTL serializer should add expiration timestamp



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6242: [FLINK-9711][CLI] Filter only RUNNING jobs when --running...

2018-07-03 Thread satybald
Github user satybald commented on the issue:

https://github.com/apache/flink/pull/6242
  
I'd agree with you that CREATED, RESTARTING and RUNNING is part of the 
definition of the running job. 

However, CLI does include CANCELED jobs as RUNNING one if --running options 
is provided.

```
flink list --running -m host:8081 
Waiting for response...
-- Running/Restarting Jobs ---
03.07.2018 12:07:26 : b14b50d6b6160035d7c62f135a05b5ea : Enriched 
TrackingClick (CANCELED)
03.07.2018 12:13:39 : 83ab149ad528cfd956da7090543cbc72 : Enriched 
TrackingClick (RUNNING)
--

```



---


[jira] [Commented] (FLINK-9711) Flink CLI --running option does not show RUNNING only jobs

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531165#comment-16531165
 ] 

ASF GitHub Bot commented on FLINK-9711:
---

Github user satybald commented on the issue:

https://github.com/apache/flink/pull/6242
  
I'd agree with you that CREATED, RESTARTING and RUNNING is part of the 
definition of the running job. 

However, CLI does include CANCELED jobs as RUNNING one if --running options 
is provided.

```
flink list --running -m host:8081 
Waiting for response...
-- Running/Restarting Jobs ---
03.07.2018 12:07:26 : b14b50d6b6160035d7c62f135a05b5ea : Enriched 
TrackingClick (CANCELED)
03.07.2018 12:13:39 : 83ab149ad528cfd956da7090543cbc72 : Enriched 
TrackingClick (RUNNING)
--

```



> Flink CLI --running option does not show RUNNING only jobs
> --
>
> Key: FLINK-9711
> URL: https://issues.apache.org/jira/browse/FLINK-9711
> Project: Flink
>  Issue Type: Bug
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Sayat Satybaldiyev
>Priority: Major
>
> In Flink CLI there's a command list with option --running that according to 
> descriptions "Show only running programs and their JobIDs". However, in 
> practice, it also shows jobs that are in the *CANCELED* state, which is a 
> completed job.
>  
> {code:java}
> flink list --running -m job-manager:8081 
> Waiting for response...
> -- Running/Restarting Jobs ---
> 03.07.2018 10:29:34 : 6e49027e843ced2ad798da549004243e : Enriched 
> TrackingClick (CANCELED)
> 03.07.2018 10:42:31 : c901ae58787ba6aea4a46d6bb9dc2b3c : Enriched 
> TrackingClick (CANCELED)
> 03.07.2018 11:27:51 : 83ab149ad528cfd956da7090543cbc72 : Enriched 
> TrackingClick (RUNNING)
> --
> {code}
>  
> Proposal it to extend CLI program to show jobs only in the *RUNNING* state. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9711) Flink CLI --running option does not show RUNNING only jobs

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531171#comment-16531171
 ] 

ASF GitHub Bot commented on FLINK-9711:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6242
  
I take it that happens in 1.5.0? If so, this was already fixed for master 
and 1.5.1.


> Flink CLI --running option does not show RUNNING only jobs
> --
>
> Key: FLINK-9711
> URL: https://issues.apache.org/jira/browse/FLINK-9711
> Project: Flink
>  Issue Type: Bug
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Sayat Satybaldiyev
>Priority: Major
>
> In Flink CLI there's a command list with option --running that according to 
> descriptions "Show only running programs and their JobIDs". However, in 
> practice, it also shows jobs that are in the *CANCELED* state, which is a 
> completed job.
>  
> {code:java}
> flink list --running -m job-manager:8081 
> Waiting for response...
> -- Running/Restarting Jobs ---
> 03.07.2018 10:29:34 : 6e49027e843ced2ad798da549004243e : Enriched 
> TrackingClick (CANCELED)
> 03.07.2018 10:42:31 : c901ae58787ba6aea4a46d6bb9dc2b3c : Enriched 
> TrackingClick (CANCELED)
> 03.07.2018 11:27:51 : 83ab149ad528cfd956da7090543cbc72 : Enriched 
> TrackingClick (RUNNING)
> --
> {code}
>  
> Proposal it to extend CLI program to show jobs only in the *RUNNING* state. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6242: [FLINK-9711][CLI] Filter only RUNNING jobs when --running...

2018-07-03 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6242
  
I take it that happens in 1.5.0? If so, this was already fixed for master 
and 1.5.1.


---


[GitHub] flink issue #6242: [FLINK-9711][CLI] Filter only RUNNING jobs when --running...

2018-07-03 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6242
  
see https://issues.apache.org/jira/browse/FLINK-9398


---


[jira] [Commented] (FLINK-9717) Flush state of one side of the join if other side is bounded

2018-07-03 Thread Fabian Hueske (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531172#comment-16531172
 ] 

Fabian Hueske commented on FLINK-9717:
--

There is another aspect to consider when improving the support for joins with 
bounded input.
We might want to ensure that the operator emits an append-only stream (and that 
the planner is also aware of this).
For example, we might need a dedicated operator to enable append-only record 
emission for outer joins.


> Flush state of one side of the join if other side is bounded
> 
>
> Key: FLINK-9717
> URL: https://issues.apache.org/jira/browse/FLINK-9717
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Priority: Major
>
> Whenever one side of join receives {{MAX_WATERMARK}}, other side in joins 
> (both normal and versioned joins) could flush the state from other side.
> This highly useful optimisation that would speed up versioned joins and would 
> allow normal joins of large unbounded streams with bounded tables (for 
> example some static data).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9711) Flink CLI --running option does not show RUNNING only jobs

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531174#comment-16531174
 ] 

ASF GitHub Bot commented on FLINK-9711:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6242
  
see https://issues.apache.org/jira/browse/FLINK-9398


> Flink CLI --running option does not show RUNNING only jobs
> --
>
> Key: FLINK-9711
> URL: https://issues.apache.org/jira/browse/FLINK-9711
> Project: Flink
>  Issue Type: Bug
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Sayat Satybaldiyev
>Priority: Major
>
> In Flink CLI there's a command list with option --running that according to 
> descriptions "Show only running programs and their JobIDs". However, in 
> practice, it also shows jobs that are in the *CANCELED* state, which is a 
> completed job.
>  
> {code:java}
> flink list --running -m job-manager:8081 
> Waiting for response...
> -- Running/Restarting Jobs ---
> 03.07.2018 10:29:34 : 6e49027e843ced2ad798da549004243e : Enriched 
> TrackingClick (CANCELED)
> 03.07.2018 10:42:31 : c901ae58787ba6aea4a46d6bb9dc2b3c : Enriched 
> TrackingClick (CANCELED)
> 03.07.2018 11:27:51 : 83ab149ad528cfd956da7090543cbc72 : Enriched 
> TrackingClick (RUNNING)
> --
> {code}
>  
> Proposal it to extend CLI program to show jobs only in the *RUNNING* state. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6218: [FLINK-9444] [formats] Add full SQL support for Av...

2018-07-03 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6218#discussion_r199758405
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
 ---
@@ -17,154 +17,338 @@
 
 package org.apache.flink.formats.avro;
 
+import org.apache.flink.annotation.PublicEvolving;
 import 
org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.formats.avro.typeutils.AvroRecordClassConverter;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
 import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.IndexedRecord;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.Decoder;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.specific.SpecificData;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.specific.SpecificRecord;
-import org.apache.avro.specific.SpecificRecordBase;
-import org.apache.avro.util.Utf8;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeFieldType;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
 
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
 
 /**
- * Deserialization schema from Avro bytes over {@link SpecificRecord} to 
{@link Row}.
+ * Deserialization schema from Avro bytes to {@link Row}.
  *
- * Deserializes the byte[] messages into (nested) Flink 
Rows.
+ * Deserializes the byte[] messages into (nested) Flink 
rows. It converts Avro types
+ * into types that are compatible with Flink's Table & SQL API.
  *
- * {@link Utf8} is converted to regular Java Strings.
+ * Projects with Avro records containing logical date/time types need 
to add a JodaTime
+ * dependency.
+ *
+ * Note: Changes in this class need to be kept in sync with the 
corresponding runtime
+ * class {@link AvroRowSerializationSchema} and schema converter {@link 
AvroSchemaConverter}.
  */
+@PublicEvolving
 public class AvroRowDeserializationSchema extends 
AbstractDeserializationSchema {
 
/**
-* Avro record class.
+* Used for time conversions into SQL types.
+*/
+   private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
+
+   /**
+* Avro record class for deserialization. Might be null if record class 
is not available.
 */
private Class recordClazz;
 
/**
-* Schema for deterministic field order.
+* Schema string for deserialization.
+*/
+   private String schemaString;
+
+   /**
+* Avro serialization schema.
 */
private transient Schema schema;
 
/**
-* Reader that deserializes byte array into a record.
+* Type information describing the result type.
 */
-   private transient DatumReader datumReader;
+   private transient TypeInformation typeInfo;
 
/**
-* Input stream to read message from.
+* Record to deserialize byte array.
 */
-   private transient MutableByteArrayInputStream inputStream;
+   private transient IndexedRecord record;
 
/**
-* Avro decoder that decodes binary data.
+* Reader that deserializes byte array into a record.
 */
-   private transient Decoder decoder;
+   private transient DatumReader datumReader;
 
/**
-* Record to deserialize byte array to.
+* Input stream to read message from.
 */
-   private SpecificRecord record;
+   private transient MutableByteArrayInputStream inputStrea

[jira] [Commented] (FLINK-9444) KafkaAvroTableSource failed to work for map and array fields

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531176#comment-16531176
 ] 

ASF GitHub Bot commented on FLINK-9444:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6218#discussion_r199758405
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
 ---
@@ -17,154 +17,338 @@
 
 package org.apache.flink.formats.avro;
 
+import org.apache.flink.annotation.PublicEvolving;
 import 
org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.formats.avro.typeutils.AvroRecordClassConverter;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
 import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.IndexedRecord;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.Decoder;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.specific.SpecificData;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.specific.SpecificRecord;
-import org.apache.avro.specific.SpecificRecordBase;
-import org.apache.avro.util.Utf8;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeFieldType;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
 
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
 
 /**
- * Deserialization schema from Avro bytes over {@link SpecificRecord} to 
{@link Row}.
+ * Deserialization schema from Avro bytes to {@link Row}.
  *
- * Deserializes the byte[] messages into (nested) Flink 
Rows.
+ * Deserializes the byte[] messages into (nested) Flink 
rows. It converts Avro types
+ * into types that are compatible with Flink's Table & SQL API.
  *
- * {@link Utf8} is converted to regular Java Strings.
+ * Projects with Avro records containing logical date/time types need 
to add a JodaTime
+ * dependency.
+ *
+ * Note: Changes in this class need to be kept in sync with the 
corresponding runtime
+ * class {@link AvroRowSerializationSchema} and schema converter {@link 
AvroSchemaConverter}.
  */
+@PublicEvolving
 public class AvroRowDeserializationSchema extends 
AbstractDeserializationSchema {
 
/**
-* Avro record class.
+* Used for time conversions into SQL types.
+*/
+   private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
+
+   /**
+* Avro record class for deserialization. Might be null if record class 
is not available.
 */
private Class recordClazz;
 
/**
-* Schema for deterministic field order.
+* Schema string for deserialization.
+*/
+   private String schemaString;
+
+   /**
+* Avro serialization schema.
 */
private transient Schema schema;
 
/**
-* Reader that deserializes byte array into a record.
+* Type information describing the result type.
 */
-   private transient DatumReader datumReader;
+   private transient TypeInformation typeInfo;
 
/**
-* Input stream to read message from.
+* Record to deserialize byte array.
 */
-   private transient MutableByteArrayInputStream inputStream;
+   private transient IndexedRecord record;
 
/**
-* Avro decoder that decodes binary data.
+* Reader that deserializes byte array into a record.
 */
-   private transient Decoder decoder;
+   private trans

[GitHub] flink pull request #6218: [FLINK-9444] [formats] Add full SQL support for Av...

2018-07-03 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6218#discussion_r199759148
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
 ---
@@ -17,154 +17,338 @@
 
 package org.apache.flink.formats.avro;
 
+import org.apache.flink.annotation.PublicEvolving;
 import 
org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.formats.avro.typeutils.AvroRecordClassConverter;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
 import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.IndexedRecord;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.Decoder;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.specific.SpecificData;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.specific.SpecificRecord;
-import org.apache.avro.specific.SpecificRecordBase;
-import org.apache.avro.util.Utf8;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeFieldType;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
 
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
 
 /**
- * Deserialization schema from Avro bytes over {@link SpecificRecord} to 
{@link Row}.
+ * Deserialization schema from Avro bytes to {@link Row}.
  *
- * Deserializes the byte[] messages into (nested) Flink 
Rows.
+ * Deserializes the byte[] messages into (nested) Flink 
rows. It converts Avro types
+ * into types that are compatible with Flink's Table & SQL API.
  *
- * {@link Utf8} is converted to regular Java Strings.
+ * Projects with Avro records containing logical date/time types need 
to add a JodaTime
+ * dependency.
+ *
+ * Note: Changes in this class need to be kept in sync with the 
corresponding runtime
+ * class {@link AvroRowSerializationSchema} and schema converter {@link 
AvroSchemaConverter}.
  */
+@PublicEvolving
 public class AvroRowDeserializationSchema extends 
AbstractDeserializationSchema {
 
/**
-* Avro record class.
+* Used for time conversions into SQL types.
+*/
+   private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
+
+   /**
+* Avro record class for deserialization. Might be null if record class 
is not available.
 */
private Class recordClazz;
 
/**
-* Schema for deterministic field order.
+* Schema string for deserialization.
+*/
+   private String schemaString;
+
+   /**
+* Avro serialization schema.
 */
private transient Schema schema;
 
/**
-* Reader that deserializes byte array into a record.
+* Type information describing the result type.
 */
-   private transient DatumReader datumReader;
+   private transient TypeInformation typeInfo;
 
/**
-* Input stream to read message from.
+* Record to deserialize byte array.
 */
-   private transient MutableByteArrayInputStream inputStream;
+   private transient IndexedRecord record;
 
/**
-* Avro decoder that decodes binary data.
+* Reader that deserializes byte array into a record.
 */
-   private transient Decoder decoder;
+   private transient DatumReader datumReader;
 
/**
-* Record to deserialize byte array to.
+* Input stream to read message from.
 */
-   private SpecificRecord record;
+   private transient MutableByteArrayInputStream inputStrea

[jira] [Commented] (FLINK-9444) KafkaAvroTableSource failed to work for map and array fields

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531177#comment-16531177
 ] 

ASF GitHub Bot commented on FLINK-9444:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6218#discussion_r199759148
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
 ---
@@ -17,154 +17,338 @@
 
 package org.apache.flink.formats.avro;
 
+import org.apache.flink.annotation.PublicEvolving;
 import 
org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.formats.avro.typeutils.AvroRecordClassConverter;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
 import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.IndexedRecord;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.Decoder;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.specific.SpecificData;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.specific.SpecificRecord;
-import org.apache.avro.specific.SpecificRecordBase;
-import org.apache.avro.util.Utf8;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeFieldType;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
 
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
 
 /**
- * Deserialization schema from Avro bytes over {@link SpecificRecord} to 
{@link Row}.
+ * Deserialization schema from Avro bytes to {@link Row}.
  *
- * Deserializes the byte[] messages into (nested) Flink 
Rows.
+ * Deserializes the byte[] messages into (nested) Flink 
rows. It converts Avro types
+ * into types that are compatible with Flink's Table & SQL API.
  *
- * {@link Utf8} is converted to regular Java Strings.
+ * Projects with Avro records containing logical date/time types need 
to add a JodaTime
+ * dependency.
+ *
+ * Note: Changes in this class need to be kept in sync with the 
corresponding runtime
+ * class {@link AvroRowSerializationSchema} and schema converter {@link 
AvroSchemaConverter}.
  */
+@PublicEvolving
 public class AvroRowDeserializationSchema extends 
AbstractDeserializationSchema {
 
/**
-* Avro record class.
+* Used for time conversions into SQL types.
+*/
+   private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
+
+   /**
+* Avro record class for deserialization. Might be null if record class 
is not available.
 */
private Class recordClazz;
 
/**
-* Schema for deterministic field order.
+* Schema string for deserialization.
+*/
+   private String schemaString;
+
+   /**
+* Avro serialization schema.
 */
private transient Schema schema;
 
/**
-* Reader that deserializes byte array into a record.
+* Type information describing the result type.
 */
-   private transient DatumReader datumReader;
+   private transient TypeInformation typeInfo;
 
/**
-* Input stream to read message from.
+* Record to deserialize byte array.
 */
-   private transient MutableByteArrayInputStream inputStream;
+   private transient IndexedRecord record;
 
/**
-* Avro decoder that decodes binary data.
+* Reader that deserializes byte array into a record.
 */
-   private transient Decoder decoder;
+   private trans

[GitHub] flink pull request #6218: [FLINK-9444] [formats] Add full SQL support for Av...

2018-07-03 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6218#discussion_r199759847
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
 ---
@@ -17,154 +17,338 @@
 
 package org.apache.flink.formats.avro;
 
+import org.apache.flink.annotation.PublicEvolving;
 import 
org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.formats.avro.typeutils.AvroRecordClassConverter;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
 import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.IndexedRecord;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.Decoder;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.specific.SpecificData;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.specific.SpecificRecord;
-import org.apache.avro.specific.SpecificRecordBase;
-import org.apache.avro.util.Utf8;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeFieldType;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
 
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
 
 /**
- * Deserialization schema from Avro bytes over {@link SpecificRecord} to 
{@link Row}.
+ * Deserialization schema from Avro bytes to {@link Row}.
  *
- * Deserializes the byte[] messages into (nested) Flink 
Rows.
+ * Deserializes the byte[] messages into (nested) Flink 
rows. It converts Avro types
+ * into types that are compatible with Flink's Table & SQL API.
  *
- * {@link Utf8} is converted to regular Java Strings.
+ * Projects with Avro records containing logical date/time types need 
to add a JodaTime
+ * dependency.
+ *
+ * Note: Changes in this class need to be kept in sync with the 
corresponding runtime
+ * class {@link AvroRowSerializationSchema} and schema converter {@link 
AvroSchemaConverter}.
  */
+@PublicEvolving
 public class AvroRowDeserializationSchema extends 
AbstractDeserializationSchema {
 
/**
-* Avro record class.
+* Used for time conversions into SQL types.
+*/
+   private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
+
+   /**
+* Avro record class for deserialization. Might be null if record class 
is not available.
 */
private Class recordClazz;
 
/**
-* Schema for deterministic field order.
+* Schema string for deserialization.
+*/
+   private String schemaString;
+
+   /**
+* Avro serialization schema.
 */
private transient Schema schema;
 
/**
-* Reader that deserializes byte array into a record.
+* Type information describing the result type.
 */
-   private transient DatumReader datumReader;
+   private transient TypeInformation typeInfo;
 
/**
-* Input stream to read message from.
+* Record to deserialize byte array.
 */
-   private transient MutableByteArrayInputStream inputStream;
+   private transient IndexedRecord record;
 
/**
-* Avro decoder that decodes binary data.
+* Reader that deserializes byte array into a record.
 */
-   private transient Decoder decoder;
+   private transient DatumReader datumReader;
 
/**
-* Record to deserialize byte array to.
+* Input stream to read message from.
 */
-   private SpecificRecord record;
+   private transient MutableByteArrayInputStream inputStrea

[jira] [Commented] (FLINK-9444) KafkaAvroTableSource failed to work for map and array fields

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531178#comment-16531178
 ] 

ASF GitHub Bot commented on FLINK-9444:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6218#discussion_r199759847
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
 ---
@@ -17,154 +17,338 @@
 
 package org.apache.flink.formats.avro;
 
+import org.apache.flink.annotation.PublicEvolving;
 import 
org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.formats.avro.typeutils.AvroRecordClassConverter;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
 import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.IndexedRecord;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.Decoder;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.specific.SpecificData;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.specific.SpecificRecord;
-import org.apache.avro.specific.SpecificRecordBase;
-import org.apache.avro.util.Utf8;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeFieldType;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
 
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
 
 /**
- * Deserialization schema from Avro bytes over {@link SpecificRecord} to 
{@link Row}.
+ * Deserialization schema from Avro bytes to {@link Row}.
  *
- * Deserializes the byte[] messages into (nested) Flink 
Rows.
+ * Deserializes the byte[] messages into (nested) Flink 
rows. It converts Avro types
+ * into types that are compatible with Flink's Table & SQL API.
  *
- * {@link Utf8} is converted to regular Java Strings.
+ * Projects with Avro records containing logical date/time types need 
to add a JodaTime
+ * dependency.
+ *
+ * Note: Changes in this class need to be kept in sync with the 
corresponding runtime
+ * class {@link AvroRowSerializationSchema} and schema converter {@link 
AvroSchemaConverter}.
  */
+@PublicEvolving
 public class AvroRowDeserializationSchema extends 
AbstractDeserializationSchema {
 
/**
-* Avro record class.
+* Used for time conversions into SQL types.
+*/
+   private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
+
+   /**
+* Avro record class for deserialization. Might be null if record class 
is not available.
 */
private Class recordClazz;
 
/**
-* Schema for deterministic field order.
+* Schema string for deserialization.
+*/
+   private String schemaString;
+
+   /**
+* Avro serialization schema.
 */
private transient Schema schema;
 
/**
-* Reader that deserializes byte array into a record.
+* Type information describing the result type.
 */
-   private transient DatumReader datumReader;
+   private transient TypeInformation typeInfo;
 
/**
-* Input stream to read message from.
+* Record to deserialize byte array.
 */
-   private transient MutableByteArrayInputStream inputStream;
+   private transient IndexedRecord record;
 
/**
-* Avro decoder that decodes binary data.
+* Reader that deserializes byte array into a record.
 */
-   private transient Decoder decoder;
+   private trans

[GitHub] flink pull request #6140: [FLINK-9554] flink scala shell doesn't work in yar...

2018-07-03 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6140#discussion_r199760506
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java ---
@@ -166,6 +166,10 @@ public Configuration getConfiguration() {
return copiedConfiguration;
}
 
+   public static Options getCustomCommandLineOptions() {
--- End diff --

it should be possible for this getter to be non-static. Then the changes to 
the `customCommandLineOptions` field are unnecessary, and the in `FlinkShell` 
replace `CliFrontend.getCustomCommandLineOptions()` with 
`frontend.getCustomCommandLineOptions`


---


[GitHub] flink pull request #6140: [FLINK-9554] flink scala shell doesn't work in yar...

2018-07-03 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6140#discussion_r199760246
  
--- Diff: flink-scala-shell/start-script/start-scala-shell.sh ---
@@ -19,6 +19,9 @@
 
 # from scala-lang 2.10.4
 
+# Uncomment the following line to enable remote debug
+# export 
FLINK_SCALA_SHELL_JAVA_OPTS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005"
--- End diff --

please move this into a separate commit


---


[jira] [Commented] (FLINK-9554) flink scala shell doesn't work in yarn mode

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9554?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531182#comment-16531182
 ] 

ASF GitHub Bot commented on FLINK-9554:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6140#discussion_r199760506
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java ---
@@ -166,6 +166,10 @@ public Configuration getConfiguration() {
return copiedConfiguration;
}
 
+   public static Options getCustomCommandLineOptions() {
--- End diff --

it should be possible for this getter to be non-static. Then the changes to 
the `customCommandLineOptions` field are unnecessary, and the in `FlinkShell` 
replace `CliFrontend.getCustomCommandLineOptions()` with 
`frontend.getCustomCommandLineOptions`


> flink scala shell doesn't work in yarn mode
> ---
>
> Key: FLINK-9554
> URL: https://issues.apache.org/jira/browse/FLINK-9554
> Project: Flink
>  Issue Type: Bug
>  Components: Scala Shell
>Affects Versions: 1.5.0
>Reporter: Jeff Zhang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.5.1
>
>
> It still try to use StandaloneCluster even I specify it using yarn mode.
>  
> Command I Use: bin/start-scala-shell.sh yarn -n 1
>  
> {code:java}
> Starting Flink Shell:
> 2018-06-06 12:30:02,672 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: jobmanager.rpc.address, localhost
> 2018-06-06 12:30:02,673 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: jobmanager.rpc.port, 6123
> 2018-06-06 12:30:02,674 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: jobmanager.heap.mb, 1024
> 2018-06-06 12:30:02,674 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: taskmanager.heap.mb, 1024
> 2018-06-06 12:30:02,674 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: taskmanager.numberOfTaskSlots, 1
> 2018-06-06 12:30:02,674 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: parallelism.default, 1
> 2018-06-06 12:30:02,675 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: rest.port, 8081
> Exception in thread "main" java.lang.UnsupportedOperationException: Can't 
> deploy a standalone cluster.
> at 
> org.apache.flink.client.deployment.StandaloneClusterDescriptor.deploySessionCluster(StandaloneClusterDescriptor.java:57)
> at 
> org.apache.flink.client.deployment.StandaloneClusterDescriptor.deploySessionCluster(StandaloneClusterDescriptor.java:31)
> at 
> org.apache.flink.api.scala.FlinkShell$.deployNewYarnCluster(FlinkShell.scala:272)
> at 
> org.apache.flink.api.scala.FlinkShell$.fetchConnectionInfo(FlinkShell.scala:164)
> at org.apache.flink.api.scala.FlinkShell$.liftedTree1$1(FlinkShell.scala:194)
> at org.apache.flink.api.scala.FlinkShell$.startShell(FlinkShell.scala:193)
> at org.apache.flink.api.scala.FlinkShell$.main(FlinkShell.scala:135)
> at org.apache.flink.api.scala.FlinkShell.main(FlinkShell.scala){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9554) flink scala shell doesn't work in yarn mode

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9554?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531181#comment-16531181
 ] 

ASF GitHub Bot commented on FLINK-9554:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6140#discussion_r199760246
  
--- Diff: flink-scala-shell/start-script/start-scala-shell.sh ---
@@ -19,6 +19,9 @@
 
 # from scala-lang 2.10.4
 
+# Uncomment the following line to enable remote debug
+# export 
FLINK_SCALA_SHELL_JAVA_OPTS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005"
--- End diff --

please move this into a separate commit


> flink scala shell doesn't work in yarn mode
> ---
>
> Key: FLINK-9554
> URL: https://issues.apache.org/jira/browse/FLINK-9554
> Project: Flink
>  Issue Type: Bug
>  Components: Scala Shell
>Affects Versions: 1.5.0
>Reporter: Jeff Zhang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.5.1
>
>
> It still try to use StandaloneCluster even I specify it using yarn mode.
>  
> Command I Use: bin/start-scala-shell.sh yarn -n 1
>  
> {code:java}
> Starting Flink Shell:
> 2018-06-06 12:30:02,672 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: jobmanager.rpc.address, localhost
> 2018-06-06 12:30:02,673 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: jobmanager.rpc.port, 6123
> 2018-06-06 12:30:02,674 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: jobmanager.heap.mb, 1024
> 2018-06-06 12:30:02,674 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: taskmanager.heap.mb, 1024
> 2018-06-06 12:30:02,674 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: taskmanager.numberOfTaskSlots, 1
> 2018-06-06 12:30:02,674 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: parallelism.default, 1
> 2018-06-06 12:30:02,675 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: rest.port, 8081
> Exception in thread "main" java.lang.UnsupportedOperationException: Can't 
> deploy a standalone cluster.
> at 
> org.apache.flink.client.deployment.StandaloneClusterDescriptor.deploySessionCluster(StandaloneClusterDescriptor.java:57)
> at 
> org.apache.flink.client.deployment.StandaloneClusterDescriptor.deploySessionCluster(StandaloneClusterDescriptor.java:31)
> at 
> org.apache.flink.api.scala.FlinkShell$.deployNewYarnCluster(FlinkShell.scala:272)
> at 
> org.apache.flink.api.scala.FlinkShell$.fetchConnectionInfo(FlinkShell.scala:164)
> at org.apache.flink.api.scala.FlinkShell$.liftedTree1$1(FlinkShell.scala:194)
> at org.apache.flink.api.scala.FlinkShell$.startShell(FlinkShell.scala:193)
> at org.apache.flink.api.scala.FlinkShell$.main(FlinkShell.scala:135)
> at org.apache.flink.api.scala.FlinkShell.main(FlinkShell.scala){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6218: [FLINK-9444] [formats] Add full SQL support for Av...

2018-07-03 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6218#discussion_r199761646
  
--- Diff: 
flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java
 ---
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.typeutils;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.formats.avro.generated.User;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link AvroSchemaConverter}.
+ */
+public class AvroSchemaConverterTest {
+
+   @Test
+   public void testAvroClassConversion() {
+   validateUserSchema(AvroSchemaConverter.convert(User.class));
+   }
+
+   @Test
+   public void testAvroSchemaConversion() {
+   final String schema = User.getClassSchema().toString(true);
+   validateUserSchema(AvroSchemaConverter.convert(schema));
+   }
+
+   private void validateUserSchema(TypeInformation actual) {
+   final TypeInformation address = Types.ROW_NAMED(
+   new String[]{"num", "street", "city", "state", "zip"},
+   Types.INT, Types.STRING, Types.STRING, Types.STRING, 
Types.STRING);
+
+   final TypeInformation user = Types.ROW_NAMED(
+   new String[] {"name", "favorite_number", 
"favorite_color", "type_long_test",
--- End diff --

Actually, I'm a big fan of per line fields but it also blows up the code.


---


[GitHub] flink pull request #6084: [FLINK-8654][Docs] Extend quickstart docs on how t...

2018-07-03 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6084#discussion_r199761621
  
--- Diff: docs/quickstart/java_api_quickstart.md ---
@@ -111,7 +111,7 @@ In IntelliJ IDEA recommended way to change JVM options 
is from the `Help | Edit
 ## Build Project
 
 If you want to __build/package your project__, go to your project 
directory and
-run the '`mvn clean package`' command.
--- End diff --

I think it actually looks nicer with the ticks. 
https://ci.apache.org/projects/flink/flink-docs-master/quickstart/java_api_quickstart.html#build-project

Will revert this while merging.


---


[jira] [Commented] (FLINK-9444) KafkaAvroTableSource failed to work for map and array fields

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531189#comment-16531189
 ] 

ASF GitHub Bot commented on FLINK-9444:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6218#discussion_r199761646
  
--- Diff: 
flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java
 ---
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.typeutils;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.formats.avro.generated.User;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link AvroSchemaConverter}.
+ */
+public class AvroSchemaConverterTest {
+
+   @Test
+   public void testAvroClassConversion() {
+   validateUserSchema(AvroSchemaConverter.convert(User.class));
+   }
+
+   @Test
+   public void testAvroSchemaConversion() {
+   final String schema = User.getClassSchema().toString(true);
+   validateUserSchema(AvroSchemaConverter.convert(schema));
+   }
+
+   private void validateUserSchema(TypeInformation actual) {
+   final TypeInformation address = Types.ROW_NAMED(
+   new String[]{"num", "street", "city", "state", "zip"},
+   Types.INT, Types.STRING, Types.STRING, Types.STRING, 
Types.STRING);
+
+   final TypeInformation user = Types.ROW_NAMED(
+   new String[] {"name", "favorite_number", 
"favorite_color", "type_long_test",
--- End diff --

Actually, I'm a big fan of per line fields but it also blows up the code.


> KafkaAvroTableSource failed to work for map and array fields
> 
>
> Key: FLINK-9444
> URL: https://issues.apache.org/jira/browse/FLINK-9444
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Table API & SQL
>Affects Versions: 1.6.0
>Reporter: Jun Zhang
>Assignee: Jun Zhang
>Priority: Blocker
>  Labels: patch, pull-request-available
> Fix For: 1.6.0
>
> Attachments: flink-9444.patch
>
>
> When some Avro schema has map/array fields and the corresponding TableSchema 
> declares *MapTypeInfo/ListTypeInfo* for these fields, an exception will be 
> thrown when registering the *KafkaAvroTableSource*, complaining like:
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> Type Map of table field 'event' does not match with type 
> GenericType of the field 'event' of the TableSource return 
> type.
>  at org.apache.flink.table.api.ValidationException$.apply(exceptions.scala:74)
>  at 
> org.apache.flink.table.sources.TableSourceUtil$$anonfun$validateTableSource$1.apply(TableSourceUtil.scala:92)
>  at 
> org.apache.flink.table.sources.TableSourceUtil$$anonfun$validateTableSource$1.apply(TableSourceUtil.scala:71)
>  at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>  at 
> org.apache.flink.table.sources.TableSourceUtil$.validateTableSource(TableSourceUtil.scala:71)
>  at 
> org.apache.flink.table.plan.schema.StreamTableSourceTable.(StreamTableSourceTable.scala:33)
>  at 
> org.apache.flink.table.api.StreamTableEnvironment.registerTableSourceInternal(StreamTableEnvironment.scala:124)
>  at 
> org.apache.flink.table.api.TableEnvironment.registerTableSource(TableEnvironment.scala:438)



--
This message was sent by Atlassian JIRA
(v7.6.3#7

[jira] [Commented] (FLINK-8654) Extend quickstart docs on how to submit jobs

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531190#comment-16531190
 ] 

ASF GitHub Bot commented on FLINK-8654:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6084#discussion_r199761621
  
--- Diff: docs/quickstart/java_api_quickstart.md ---
@@ -111,7 +111,7 @@ In IntelliJ IDEA recommended way to change JVM options 
is from the `Help | Edit
 ## Build Project
 
 If you want to __build/package your project__, go to your project 
directory and
-run the '`mvn clean package`' command.
--- End diff --

I think it actually looks nicer with the ticks. 
https://ci.apache.org/projects/flink/flink-docs-master/quickstart/java_api_quickstart.html#build-project

Will revert this while merging.


> Extend quickstart docs on how to submit jobs
> 
>
> Key: FLINK-8654
> URL: https://issues.apache.org/jira/browse/FLINK-8654
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Quickstarts
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Yazdan Shirvany
>Priority: Major
>  Labels: pull-request-available
>
> The quickstart documentation explains how to setup the project, build the jar 
> and run things in the IDE, but neither explains how to submit the jar to a 
> cluster nor guides the user to where he could find this information (like the 
> CLI docs).
> Additionally, the quickstart poms should also contain the commands for 
> submitting the jar to a cluster, in particular how to select a main-class if 
> it wasn't set in the pom. (-c CLI flag)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6047: [FLINK-8160]Extend OperatorHarness to expose...

2018-07-03 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6047


---


[jira] [Updated] (FLINK-8654) Extend quickstart docs on how to submit jobs

2018-07-03 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8654?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-8654:

Affects Version/s: 1.6.0
   1.4.2

> Extend quickstart docs on how to submit jobs
> 
>
> Key: FLINK-8654
> URL: https://issues.apache.org/jira/browse/FLINK-8654
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Quickstarts
>Affects Versions: 1.5.0, 1.4.2, 1.6.0
>Reporter: Chesnay Schepler
>Assignee: Yazdan Shirvany
>Priority: Major
>  Labels: pull-request-available
>
> The quickstart documentation explains how to setup the project, build the jar 
> and run things in the IDE, but neither explains how to submit the jar to a 
> cluster nor guides the user to where he could find this information (like the 
> CLI docs).
> Additionally, the quickstart poms should also contain the commands for 
> submitting the jar to a cluster, in particular how to select a main-class if 
> it wasn't set in the pom. (-c CLI flag)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6084: [FLINK-8654][Docs] Extend quickstart docs on how t...

2018-07-03 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6084


---


[jira] [Updated] (FLINK-8160) Extend OperatorHarness to expose metrics

2018-07-03 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-8160:
--
Labels: pull-request-available  (was: )

> Extend OperatorHarness to expose metrics
> 
>
> Key: FLINK-8160
> URL: https://issues.apache.org/jira/browse/FLINK-8160
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics, Streaming
>Reporter: Chesnay Schepler
>Assignee: Tuo Wang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> To better test interactions between operators and metrics the harness should 
> expose the metrics registered by the operator.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8160) Extend OperatorHarness to expose metrics

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531194#comment-16531194
 ] 

ASF GitHub Bot commented on FLINK-8160:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6047


> Extend OperatorHarness to expose metrics
> 
>
> Key: FLINK-8160
> URL: https://issues.apache.org/jira/browse/FLINK-8160
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics, Streaming
>Reporter: Chesnay Schepler
>Assignee: Tuo Wang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> To better test interactions between operators and metrics the harness should 
> expose the metrics registered by the operator.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8654) Extend quickstart docs on how to submit jobs

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531193#comment-16531193
 ] 

ASF GitHub Bot commented on FLINK-8654:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6084


> Extend quickstart docs on how to submit jobs
> 
>
> Key: FLINK-8654
> URL: https://issues.apache.org/jira/browse/FLINK-8654
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Quickstarts
>Affects Versions: 1.5.0, 1.4.2, 1.6.0
>Reporter: Chesnay Schepler
>Assignee: Yazdan Shirvany
>Priority: Major
>  Labels: pull-request-available
>
> The quickstart documentation explains how to setup the project, build the jar 
> and run things in the IDE, but neither explains how to submit the jar to a 
> cluster nor guides the user to where he could find this information (like the 
> CLI docs).
> Additionally, the quickstart poms should also contain the commands for 
> submitting the jar to a cluster, in particular how to select a main-class if 
> it wasn't set in the pom. (-c CLI flag)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8654) Extend quickstart docs on how to submit jobs

2018-07-03 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8654?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-8654.
---
   Resolution: Fixed
Fix Version/s: 1.5.1
   1.4.3
   1.6.0

master: 8c15d379dcc1dd1b0c4e22a83b0ce169b54feec0

1.5: e0a4c3591865bcb8ed41edefbcae37c4528e4130

1.4: db14fe13e6cb648e1c27019b0d63ee216be3ccab

> Extend quickstart docs on how to submit jobs
> 
>
> Key: FLINK-8654
> URL: https://issues.apache.org/jira/browse/FLINK-8654
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Quickstarts
>Affects Versions: 1.5.0, 1.4.2, 1.6.0
>Reporter: Chesnay Schepler
>Assignee: Yazdan Shirvany
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.4.3, 1.5.1
>
>
> The quickstart documentation explains how to setup the project, build the jar 
> and run things in the IDE, but neither explains how to submit the jar to a 
> cluster nor guides the user to where he could find this information (like the 
> CLI docs).
> Additionally, the quickstart poms should also contain the commands for 
> submitting the jar to a cluster, in particular how to select a main-class if 
> it wasn't set in the pom. (-c CLI flag)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6243: [FLINK-9707] Support concurrent directory creation...

2018-07-03 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/6243

[FLINK-9707] Support concurrent directory creations in LocalFileSystem

## What is the purpose of the change

Support concurrent directory creations by accepting directories which have 
been
created by a different thread/process in LocalFileSystem#mkdirs.


## Verifying this change

- Added `LocalFileSystemTest#testConcurrentMkdirs`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink fixLocalFileSystem

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6243.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6243


commit 82cd571b12f15fc99293173502d21370802aea55
Author: Till Rohrmann 
Date:   2018-07-03T11:36:33Z

[FLINK-9707] Support concurrent directory creations in LocalFileSystem

Support concurrent directory creations by accepting directories which have 
been
created by a different thread/process in LocalFileSystem#mkdirs.




---


[jira] [Updated] (FLINK-9707) LocalFileSystem does not support concurrent directory creations

2018-07-03 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9707?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-9707:
--
Labels: pull-request-available  (was: )

> LocalFileSystem does not support concurrent directory creations
> ---
>
> Key: FLINK-9707
> URL: https://issues.apache.org/jira/browse/FLINK-9707
> Project: Flink
>  Issue Type: Improvement
>  Components: FileSystem
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> The {{LocalFileSystem}} does not support concurrent directory creations. The 
> consequence is that file system operations fail.
> I think the culprit is the following line: 
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java#L257



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9707) LocalFileSystem does not support concurrent directory creations

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531233#comment-16531233
 ] 

ASF GitHub Bot commented on FLINK-9707:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/6243

[FLINK-9707] Support concurrent directory creations in LocalFileSystem

## What is the purpose of the change

Support concurrent directory creations by accepting directories which have 
been
created by a different thread/process in LocalFileSystem#mkdirs.


## Verifying this change

- Added `LocalFileSystemTest#testConcurrentMkdirs`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink fixLocalFileSystem

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6243.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6243


commit 82cd571b12f15fc99293173502d21370802aea55
Author: Till Rohrmann 
Date:   2018-07-03T11:36:33Z

[FLINK-9707] Support concurrent directory creations in LocalFileSystem

Support concurrent directory creations by accepting directories which have 
been
created by a different thread/process in LocalFileSystem#mkdirs.




> LocalFileSystem does not support concurrent directory creations
> ---
>
> Key: FLINK-9707
> URL: https://issues.apache.org/jira/browse/FLINK-9707
> Project: Flink
>  Issue Type: Improvement
>  Components: FileSystem
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> The {{LocalFileSystem}} does not support concurrent directory creations. The 
> consequence is that file system operations fail.
> I think the culprit is the following line: 
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java#L257



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6201: [FLINK-8866][Table API & SQL] Add support for unified tab...

2018-07-03 Thread twalthr
Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/6201
  
I agree with @fhueske. Let's do the `from-source` in a follow-up issue. I 
will open a PR soon for FLINK-8558 which separates connector and format. For 
this I also introduced a method `KafkaTableSource#supportsKafkaTimestamps`. The 
`KafkaTableFactory` can read this property and throw an exception accordingly.


---


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531237#comment-16531237
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/6201
  
I agree with @fhueske. Let's do the `from-source` in a follow-up issue. I 
will open a PR soon for FLINK-8558 which separates connector and format. For 
this I also introduced a method `KafkaTableSource#supportsKafkaTimestamps`. The 
`KafkaTableFactory` can read this property and throw an exception accordingly.


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6102: [FLINK-9091][build] Dependency convergence run aga...

2018-07-03 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6102#discussion_r199776329
  
--- Diff: tools/check_dependency_convergence.sh ---
@@ -0,0 +1,67 @@
+#!/usr/bin/env bash

+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+HERE="`dirname \"$0\"`"# relative
+HERE="`( cd \"$HERE\" && pwd )`"   # absolutized and normalized
+if [ -z "$HERE" ] ; then
+   # error; for some reason, the path is not accessible
+   # to the script (e.g. permissions re-evaled after suid)
+   exit 1  # fail
+fi
+
+FLINK_DIR=HERE
+
+if [[ $(basename ${HERE}) == "tools" ]] ; then
+  FLINK_DIR="${HERE}/.."
+fi
+
+FLINK_DIR="`( cd \"${FLINK_DIR}\" && pwd )`" 
+
+echo ${FLINK_DIR}
+
+# get list of all flink modules
+# searches for directories containing a pom.xml file
+# sorts the list alphabetically
+# only accepts directories starting with "flink" to filter force-shading
+modules=$(find -maxdepth 3 -name 'pom.xml' -printf '%h\n' | sort -u | grep 
"flink")
+
+for module in ${modules}
+do
+# we are only interested in child modules
+for other_module in ${modules}
+do 
+if [[ "${other_module}" != "${module}" && "${other_module}" = 
"${module}"/* ]]; then
+echo "excluding ${module} since it is not a leaf module"
+continue 2
+fi
+done
+
+cd "${module}"
+echo "checking ${module}"
+output=$(mvn validate -nsu -Dcheckstyle.skip=true -Dcheck-convergence)
--- End diff --

both plugins are not executed in the `validate` phase.


---


[GitHub] flink pull request #6244: [FLINK-9622] Do not swallow exceptions in FileUtil...

2018-07-03 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/6244

[FLINK-9622] Do not swallow exceptions in FileUtils#copy

## What is the purpose of the change

This commit lets `IOExceptions` originating from `FileUtil#copy` bubble up 
instead of swallowing them.

This PR is based on #6243.

## Verifying this change

- Trivial fix

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink 
cleanUpDistributedCacheDfsTest

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6244.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6244


commit 1ba98b2dbd2f91c815761d3b2e382a78230d021e
Author: Till Rohrmann 
Date:   2018-07-02T15:44:52Z

[FLINK-9622] Do not swallow exceptions in FileUtils#copy

commit 82cd571b12f15fc99293173502d21370802aea55
Author: Till Rohrmann 
Date:   2018-07-03T11:36:33Z

[FLINK-9707] Support concurrent directory creations in LocalFileSystem

Support concurrent directory creations by accepting directories which have 
been
created by a different thread/process in LocalFileSystem#mkdirs.




---


[jira] [Commented] (FLINK-9622) DistributedCacheDfsTest failed on travis

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531240#comment-16531240
 ] 

ASF GitHub Bot commented on FLINK-9622:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/6244

[FLINK-9622] Do not swallow exceptions in FileUtils#copy

## What is the purpose of the change

This commit lets `IOExceptions` originating from `FileUtil#copy` bubble up 
instead of swallowing them.

This PR is based on #6243.

## Verifying this change

- Trivial fix

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink 
cleanUpDistributedCacheDfsTest

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6244.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6244


commit 1ba98b2dbd2f91c815761d3b2e382a78230d021e
Author: Till Rohrmann 
Date:   2018-07-02T15:44:52Z

[FLINK-9622] Do not swallow exceptions in FileUtils#copy

commit 82cd571b12f15fc99293173502d21370802aea55
Author: Till Rohrmann 
Date:   2018-07-03T11:36:33Z

[FLINK-9707] Support concurrent directory creations in LocalFileSystem

Support concurrent directory creations by accepting directories which have 
been
created by a different thread/process in LocalFileSystem#mkdirs.




> DistributedCacheDfsTest failed on travis
> 
>
> Key: FLINK-9622
> URL: https://issues.apache.org/jira/browse/FLINK-9622
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.6.0
>Reporter: Sihua Zhou
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> DistributedCacheDfsTest#testDistributeFileViaDFS() failed flakey on travis.
> instance: https://api.travis-ci.org/v3/job/394399700/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9091) Failure while enforcing releasability in building flink-json module

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531239#comment-16531239
 ] 

ASF GitHub Bot commented on FLINK-9091:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6102#discussion_r199776329
  
--- Diff: tools/check_dependency_convergence.sh ---
@@ -0,0 +1,67 @@
+#!/usr/bin/env bash

+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+HERE="`dirname \"$0\"`"# relative
+HERE="`( cd \"$HERE\" && pwd )`"   # absolutized and normalized
+if [ -z "$HERE" ] ; then
+   # error; for some reason, the path is not accessible
+   # to the script (e.g. permissions re-evaled after suid)
+   exit 1  # fail
+fi
+
+FLINK_DIR=HERE
+
+if [[ $(basename ${HERE}) == "tools" ]] ; then
+  FLINK_DIR="${HERE}/.."
+fi
+
+FLINK_DIR="`( cd \"${FLINK_DIR}\" && pwd )`" 
+
+echo ${FLINK_DIR}
+
+# get list of all flink modules
+# searches for directories containing a pom.xml file
+# sorts the list alphabetically
+# only accepts directories starting with "flink" to filter force-shading
+modules=$(find -maxdepth 3 -name 'pom.xml' -printf '%h\n' | sort -u | grep 
"flink")
+
+for module in ${modules}
+do
+# we are only interested in child modules
+for other_module in ${modules}
+do 
+if [[ "${other_module}" != "${module}" && "${other_module}" = 
"${module}"/* ]]; then
+echo "excluding ${module} since it is not a leaf module"
+continue 2
+fi
+done
+
+cd "${module}"
+echo "checking ${module}"
+output=$(mvn validate -nsu -Dcheckstyle.skip=true -Dcheck-convergence)
--- End diff --

both plugins are not executed in the `validate` phase.


> Failure while enforcing releasability in building flink-json module
> ---
>
> Key: FLINK-9091
> URL: https://issues.apache.org/jira/browse/FLINK-9091
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Reporter: Ted Yu
>Assignee: Hai Zhou
>Priority: Major
>  Labels: pull-request-available
> Attachments: f-json.out
>
>
> Got the following when building flink-json module:
> {code}
> [WARNING] Rule 0: org.apache.maven.plugins.enforcer.DependencyConvergence 
> failed with message:
> Failed while enforcing releasability. See above detailed error message.
> ...
> [ERROR] Failed to execute goal 
> org.apache.maven.plugins:maven-enforcer-plugin:3.0.0-M1:enforce 
> (dependency-convergence) on project flink-json: Some Enforcer rules have 
> failed.   Look above for specific messages explaining why the rule failed. -> 
> [Help 1]
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9622) DistributedCacheDfsTest failed on travis

2018-07-03 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-9622:
--
Labels: pull-request-available  (was: )

> DistributedCacheDfsTest failed on travis
> 
>
> Key: FLINK-9622
> URL: https://issues.apache.org/jira/browse/FLINK-9622
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.6.0
>Reporter: Sihua Zhou
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> DistributedCacheDfsTest#testDistributeFileViaDFS() failed flakey on travis.
> instance: https://api.travis-ci.org/v3/job/394399700/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6102: [FLINK-9091][build] Dependency convergence run aga...

2018-07-03 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6102#discussion_r199776816
  
--- Diff: tools/check_dependency_convergence.sh ---
@@ -0,0 +1,67 @@
+#!/usr/bin/env bash

+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+HERE="`dirname \"$0\"`"# relative
+HERE="`( cd \"$HERE\" && pwd )`"   # absolutized and normalized
+if [ -z "$HERE" ] ; then
+   # error; for some reason, the path is not accessible
+   # to the script (e.g. permissions re-evaled after suid)
+   exit 1  # fail
+fi
+
+FLINK_DIR=HERE
+
+if [[ $(basename ${HERE}) == "tools" ]] ; then
+  FLINK_DIR="${HERE}/.."
+fi
+
+FLINK_DIR="`( cd \"${FLINK_DIR}\" && pwd )`" 
+
+echo ${FLINK_DIR}
+
+# get list of all flink modules
+# searches for directories containing a pom.xml file
+# sorts the list alphabetically
+# only accepts directories starting with "flink" to filter force-shading
+modules=$(find -maxdepth 3 -name 'pom.xml' -printf '%h\n' | sort -u | grep 
"flink")
+
+for module in ${modules}
+do
+# we are only interested in child modules
+for other_module in ${modules}
+do 
+if [[ "${other_module}" != "${module}" && "${other_module}" = 
"${module}"/* ]]; then
+echo "excluding ${module} since it is not a leaf module"
+continue 2
+fi
+done
+
+cd "${module}"
+echo "checking ${module}"
+output=$(mvn validate -nsu -Dcheckstyle.skip=true -Dcheck-convergence)
--- End diff --

We explicitly run `rat` in the `verify`, which is also the one `scalastyle` 
runs in by default. 


---


[jira] [Commented] (FLINK-9091) Failure while enforcing releasability in building flink-json module

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531242#comment-16531242
 ] 

ASF GitHub Bot commented on FLINK-9091:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6102#discussion_r199776816
  
--- Diff: tools/check_dependency_convergence.sh ---
@@ -0,0 +1,67 @@
+#!/usr/bin/env bash

+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+HERE="`dirname \"$0\"`"# relative
+HERE="`( cd \"$HERE\" && pwd )`"   # absolutized and normalized
+if [ -z "$HERE" ] ; then
+   # error; for some reason, the path is not accessible
+   # to the script (e.g. permissions re-evaled after suid)
+   exit 1  # fail
+fi
+
+FLINK_DIR=HERE
+
+if [[ $(basename ${HERE}) == "tools" ]] ; then
+  FLINK_DIR="${HERE}/.."
+fi
+
+FLINK_DIR="`( cd \"${FLINK_DIR}\" && pwd )`" 
+
+echo ${FLINK_DIR}
+
+# get list of all flink modules
+# searches for directories containing a pom.xml file
+# sorts the list alphabetically
+# only accepts directories starting with "flink" to filter force-shading
+modules=$(find -maxdepth 3 -name 'pom.xml' -printf '%h\n' | sort -u | grep 
"flink")
+
+for module in ${modules}
+do
+# we are only interested in child modules
+for other_module in ${modules}
+do 
+if [[ "${other_module}" != "${module}" && "${other_module}" = 
"${module}"/* ]]; then
+echo "excluding ${module} since it is not a leaf module"
+continue 2
+fi
+done
+
+cd "${module}"
+echo "checking ${module}"
+output=$(mvn validate -nsu -Dcheckstyle.skip=true -Dcheck-convergence)
--- End diff --

We explicitly run `rat` in the `verify`, which is also the one `scalastyle` 
runs in by default. 


> Failure while enforcing releasability in building flink-json module
> ---
>
> Key: FLINK-9091
> URL: https://issues.apache.org/jira/browse/FLINK-9091
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Reporter: Ted Yu
>Assignee: Hai Zhou
>Priority: Major
>  Labels: pull-request-available
> Attachments: f-json.out
>
>
> Got the following when building flink-json module:
> {code}
> [WARNING] Rule 0: org.apache.maven.plugins.enforcer.DependencyConvergence 
> failed with message:
> Failed while enforcing releasability. See above detailed error message.
> ...
> [ERROR] Failed to execute goal 
> org.apache.maven.plugins:maven-enforcer-plugin:3.0.0-M1:enforce 
> (dependency-convergence) on project flink-json: Some Enforcer rules have 
> failed.   Look above for specific messages explaining why the rule failed. -> 
> [Help 1]
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-9578) Allow to define an auto watermark interval in SQL Client

2018-07-03 Thread Timo Walther (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther resolved FLINK-9578.
-
   Resolution: Fixed
Fix Version/s: 1.6.0

Fixed in 1.6.0: 19040a632c66af64d49707cdf07adb25af04d92b

> Allow to define an auto watermark interval in SQL Client
> 
>
> Key: FLINK-9578
> URL: https://issues.apache.org/jira/browse/FLINK-9578
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.5.0
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
> Fix For: 1.6.0
>
>
> Currently it is not possible to define an auto watermark interval in a 
> non-programmatic way for the SQL Client.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6160: [FLINK-9578] [sql-client] Allow to define an auto ...

2018-07-03 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6160


---


[jira] [Commented] (FLINK-9578) Allow to define an auto watermark interval in SQL Client

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531249#comment-16531249
 ] 

ASF GitHub Bot commented on FLINK-9578:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6160


> Allow to define an auto watermark interval in SQL Client
> 
>
> Key: FLINK-9578
> URL: https://issues.apache.org/jira/browse/FLINK-9578
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.5.0
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> Currently it is not possible to define an auto watermark interval in a 
> non-programmatic way for the SQL Client.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9578) Allow to define an auto watermark interval in SQL Client

2018-07-03 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-9578:
--
Labels: pull-request-available  (was: )

> Allow to define an auto watermark interval in SQL Client
> 
>
> Key: FLINK-9578
> URL: https://issues.apache.org/jira/browse/FLINK-9578
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.5.0
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> Currently it is not possible to define an auto watermark interval in a 
> non-programmatic way for the SQL Client.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6218: [FLINK-9444] [formats] Add full SQL support for Av...

2018-07-03 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6218#discussion_r199778602
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
 ---
@@ -148,25 +148,26 @@ public AvroRowDeserializationSchema(Class recordClazz)
public AvroRowDeserializationSchema(String avroSchemaString) {
Preconditions.checkNotNull(avroSchemaString, "Avro schema must 
not be null.");
recordClazz = null;
-   typeInfo = AvroSchemaConverter.convert(avroSchemaString);
+   final TypeInformation typeInfo = 
AvroSchemaConverter.convertToTypeInfo(avroSchemaString);
+   // check for a schema that describes a record
+   if (!(typeInfo instanceof RowTypeInfo)) {
+   throw new IllegalArgumentException("Row type 
information expected.");
--- End diff --

`Preconditions.checkArgument`?


---


[GitHub] flink pull request #6218: [FLINK-9444] [formats] Add full SQL support for Av...

2018-07-03 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6218#discussion_r199780249
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
 ---
@@ -17,154 +17,338 @@
 
 package org.apache.flink.formats.avro;
 
+import org.apache.flink.annotation.PublicEvolving;
 import 
org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.formats.avro.typeutils.AvroRecordClassConverter;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
 import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.IndexedRecord;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.Decoder;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.specific.SpecificData;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.specific.SpecificRecord;
-import org.apache.avro.specific.SpecificRecordBase;
-import org.apache.avro.util.Utf8;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeFieldType;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
 
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
 
 /**
- * Deserialization schema from Avro bytes over {@link SpecificRecord} to 
{@link Row}.
+ * Deserialization schema from Avro bytes to {@link Row}.
  *
- * Deserializes the byte[] messages into (nested) Flink 
Rows.
+ * Deserializes the byte[] messages into (nested) Flink 
rows. It converts Avro types
+ * into types that are compatible with Flink's Table & SQL API.
  *
- * {@link Utf8} is converted to regular Java Strings.
+ * Projects with Avro records containing logical date/time types need 
to add a JodaTime
+ * dependency.
+ *
+ * Note: Changes in this class need to be kept in sync with the 
corresponding runtime
+ * class {@link AvroRowSerializationSchema} and schema converter {@link 
AvroSchemaConverter}.
  */
+@PublicEvolving
 public class AvroRowDeserializationSchema extends 
AbstractDeserializationSchema {
 
/**
-* Avro record class.
+* Used for time conversions into SQL types.
+*/
+   private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
+
+   /**
+* Avro record class for deserialization. Might be null if record class 
is not available.
 */
private Class recordClazz;
 
/**
-* Schema for deterministic field order.
+* Schema string for deserialization.
+*/
+   private String schemaString;
+
+   /**
+* Avro serialization schema.
 */
private transient Schema schema;
 
/**
-* Reader that deserializes byte array into a record.
+* Type information describing the result type.
 */
-   private transient DatumReader datumReader;
+   private transient TypeInformation typeInfo;
 
/**
-* Input stream to read message from.
+* Record to deserialize byte array.
 */
-   private transient MutableByteArrayInputStream inputStream;
+   private transient IndexedRecord record;
 
/**
-* Avro decoder that decodes binary data.
+* Reader that deserializes byte array into a record.
 */
-   private transient Decoder decoder;
+   private transient DatumReader datumReader;
 
/**
-* Record to deserialize byte array to.
+* Input stream to read message from.
 */
-   private SpecificRecord record;
+   private transient MutableByteArrayInputStream inputStr

[jira] [Commented] (FLINK-9444) KafkaAvroTableSource failed to work for map and array fields

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531252#comment-16531252
 ] 

ASF GitHub Bot commented on FLINK-9444:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6218#discussion_r199780249
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
 ---
@@ -17,154 +17,338 @@
 
 package org.apache.flink.formats.avro;
 
+import org.apache.flink.annotation.PublicEvolving;
 import 
org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.formats.avro.typeutils.AvroRecordClassConverter;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
 import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.IndexedRecord;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.Decoder;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.specific.SpecificData;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.specific.SpecificRecord;
-import org.apache.avro.specific.SpecificRecordBase;
-import org.apache.avro.util.Utf8;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeFieldType;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
 
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
 
 /**
- * Deserialization schema from Avro bytes over {@link SpecificRecord} to 
{@link Row}.
+ * Deserialization schema from Avro bytes to {@link Row}.
  *
- * Deserializes the byte[] messages into (nested) Flink 
Rows.
+ * Deserializes the byte[] messages into (nested) Flink 
rows. It converts Avro types
+ * into types that are compatible with Flink's Table & SQL API.
  *
- * {@link Utf8} is converted to regular Java Strings.
+ * Projects with Avro records containing logical date/time types need 
to add a JodaTime
+ * dependency.
+ *
+ * Note: Changes in this class need to be kept in sync with the 
corresponding runtime
+ * class {@link AvroRowSerializationSchema} and schema converter {@link 
AvroSchemaConverter}.
  */
+@PublicEvolving
 public class AvroRowDeserializationSchema extends 
AbstractDeserializationSchema {
 
/**
-* Avro record class.
+* Used for time conversions into SQL types.
+*/
+   private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
+
+   /**
+* Avro record class for deserialization. Might be null if record class 
is not available.
 */
private Class recordClazz;
 
/**
-* Schema for deterministic field order.
+* Schema string for deserialization.
+*/
+   private String schemaString;
+
+   /**
+* Avro serialization schema.
 */
private transient Schema schema;
 
/**
-* Reader that deserializes byte array into a record.
+* Type information describing the result type.
 */
-   private transient DatumReader datumReader;
+   private transient TypeInformation typeInfo;
 
/**
-* Input stream to read message from.
+* Record to deserialize byte array.
 */
-   private transient MutableByteArrayInputStream inputStream;
+   private transient IndexedRecord record;
 
/**
-* Avro decoder that decodes binary data.
+* Reader that deserializes byte array into a record.
 */
-   private transient Decoder decoder;
+   private tra

[jira] [Commented] (FLINK-9444) KafkaAvroTableSource failed to work for map and array fields

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531251#comment-16531251
 ] 

ASF GitHub Bot commented on FLINK-9444:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6218#discussion_r199778602
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
 ---
@@ -148,25 +148,26 @@ public AvroRowDeserializationSchema(Class recordClazz)
public AvroRowDeserializationSchema(String avroSchemaString) {
Preconditions.checkNotNull(avroSchemaString, "Avro schema must 
not be null.");
recordClazz = null;
-   typeInfo = AvroSchemaConverter.convert(avroSchemaString);
+   final TypeInformation typeInfo = 
AvroSchemaConverter.convertToTypeInfo(avroSchemaString);
+   // check for a schema that describes a record
+   if (!(typeInfo instanceof RowTypeInfo)) {
+   throw new IllegalArgumentException("Row type 
information expected.");
--- End diff --

`Preconditions.checkArgument`?


> KafkaAvroTableSource failed to work for map and array fields
> 
>
> Key: FLINK-9444
> URL: https://issues.apache.org/jira/browse/FLINK-9444
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Table API & SQL
>Affects Versions: 1.6.0
>Reporter: Jun Zhang
>Assignee: Jun Zhang
>Priority: Blocker
>  Labels: patch, pull-request-available
> Fix For: 1.6.0
>
> Attachments: flink-9444.patch
>
>
> When some Avro schema has map/array fields and the corresponding TableSchema 
> declares *MapTypeInfo/ListTypeInfo* for these fields, an exception will be 
> thrown when registering the *KafkaAvroTableSource*, complaining like:
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> Type Map of table field 'event' does not match with type 
> GenericType of the field 'event' of the TableSource return 
> type.
>  at org.apache.flink.table.api.ValidationException$.apply(exceptions.scala:74)
>  at 
> org.apache.flink.table.sources.TableSourceUtil$$anonfun$validateTableSource$1.apply(TableSourceUtil.scala:92)
>  at 
> org.apache.flink.table.sources.TableSourceUtil$$anonfun$validateTableSource$1.apply(TableSourceUtil.scala:71)
>  at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>  at 
> org.apache.flink.table.sources.TableSourceUtil$.validateTableSource(TableSourceUtil.scala:71)
>  at 
> org.apache.flink.table.plan.schema.StreamTableSourceTable.(StreamTableSourceTable.scala:33)
>  at 
> org.apache.flink.table.api.StreamTableEnvironment.registerTableSourceInternal(StreamTableEnvironment.scala:124)
>  at 
> org.apache.flink.table.api.TableEnvironment.registerTableSource(TableEnvironment.scala:438)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6242: [FLINK-9711][CLI] Filter only RUNNING jobs when --...

2018-07-03 Thread satybald
Github user satybald closed the pull request at:

https://github.com/apache/flink/pull/6242


---


[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...

2018-07-03 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199780490
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
 ---
@@ -79,6 +79,7 @@ protected void respondToRequest(ChannelHandlerContext 
ctx, HttpRequest httpReque
response = FutureUtils.completedExceptionally(e);
}
 
+   CompletableFuture processingFinishedFuture = new 
CompletableFuture<>();
response.whenComplete((P resp, Throwable throwable) -> {
--- End diff --

This is a good point.


---


[GitHub] flink issue #6242: [FLINK-9711][CLI] Filter only RUNNING jobs when --running...

2018-07-03 Thread satybald
Github user satybald commented on the issue:

https://github.com/apache/flink/pull/6242
  
yep, it's flink 1.5.0. I'm closing PR then.


---


[jira] [Commented] (FLINK-9711) Flink CLI --running option does not show RUNNING only jobs

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531255#comment-16531255
 ] 

ASF GitHub Bot commented on FLINK-9711:
---

Github user satybald closed the pull request at:

https://github.com/apache/flink/pull/6242


> Flink CLI --running option does not show RUNNING only jobs
> --
>
> Key: FLINK-9711
> URL: https://issues.apache.org/jira/browse/FLINK-9711
> Project: Flink
>  Issue Type: Bug
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Sayat Satybaldiyev
>Priority: Major
>
> In Flink CLI there's a command list with option --running that according to 
> descriptions "Show only running programs and their JobIDs". However, in 
> practice, it also shows jobs that are in the *CANCELED* state, which is a 
> completed job.
>  
> {code:java}
> flink list --running -m job-manager:8081 
> Waiting for response...
> -- Running/Restarting Jobs ---
> 03.07.2018 10:29:34 : 6e49027e843ced2ad798da549004243e : Enriched 
> TrackingClick (CANCELED)
> 03.07.2018 10:42:31 : c901ae58787ba6aea4a46d6bb9dc2b3c : Enriched 
> TrackingClick (CANCELED)
> 03.07.2018 11:27:51 : 83ab149ad528cfd956da7090543cbc72 : Enriched 
> TrackingClick (RUNNING)
> --
> {code}
>  
> Proposal it to extend CLI program to show jobs only in the *RUNNING* state. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9711) Flink CLI --running option does not show RUNNING only jobs

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531254#comment-16531254
 ] 

ASF GitHub Bot commented on FLINK-9711:
---

Github user satybald commented on the issue:

https://github.com/apache/flink/pull/6242
  
yep, it's flink 1.5.0. I'm closing PR then.


> Flink CLI --running option does not show RUNNING only jobs
> --
>
> Key: FLINK-9711
> URL: https://issues.apache.org/jira/browse/FLINK-9711
> Project: Flink
>  Issue Type: Bug
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Sayat Satybaldiyev
>Priority: Major
>
> In Flink CLI there's a command list with option --running that according to 
> descriptions "Show only running programs and their JobIDs". However, in 
> practice, it also shows jobs that are in the *CANCELED* state, which is a 
> completed job.
>  
> {code:java}
> flink list --running -m job-manager:8081 
> Waiting for response...
> -- Running/Restarting Jobs ---
> 03.07.2018 10:29:34 : 6e49027e843ced2ad798da549004243e : Enriched 
> TrackingClick (CANCELED)
> 03.07.2018 10:42:31 : c901ae58787ba6aea4a46d6bb9dc2b3c : Enriched 
> TrackingClick (CANCELED)
> 03.07.2018 11:27:51 : 83ab149ad528cfd956da7090543cbc72 : Enriched 
> TrackingClick (RUNNING)
> --
> {code}
>  
> Proposal it to extend CLI program to show jobs only in the *RUNNING* state. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531256#comment-16531256
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199780490
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
 ---
@@ -79,6 +79,7 @@ protected void respondToRequest(ChannelHandlerContext 
ctx, HttpRequest httpReque
response = FutureUtils.completedExceptionally(e);
}
 
+   CompletableFuture processingFinishedFuture = new 
CompletableFuture<>();
response.whenComplete((P resp, Throwable throwable) -> {
--- End diff --

This is a good point.


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...

2018-07-03 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199780706
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
 ---
@@ -105,7 +106,8 @@ protected void respondToRequest(ChannelHandlerContext 
ctx, HttpRequest httpReque
messageHeaders.getResponseStatusCode(),
responseHeaders);
}
-   });
+   }).whenComplete((P resp, Throwable throwable) -> 
processingFinishedFuture.complete(null));
--- End diff --

I think we are swallowing potential exceptions here. I think it would be 
better to do something like
```
return response.whenComplete(...).thenApply(ignored -> null)
```

That way we would also get rid of the `processingFinishedFuture`.


---


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531258#comment-16531258
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199780706
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
 ---
@@ -105,7 +106,8 @@ protected void respondToRequest(ChannelHandlerContext 
ctx, HttpRequest httpReque
messageHeaders.getResponseStatusCode(),
responseHeaders);
}
-   });
+   }).whenComplete((P resp, Throwable throwable) -> 
processingFinishedFuture.complete(null));
--- End diff --

I think we are swallowing potential exceptions here. I think it would be 
better to do something like
```
return response.whenComplete(...).thenApply(ignored -> null)
```

That way we would also get rid of the `processingFinishedFuture`.


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6218: [FLINK-9444] [formats] Add full SQL support for Av...

2018-07-03 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6218#discussion_r199780422
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
 ---
@@ -201,71 +202,69 @@ private Object convert(Schema schema, 
TypeInformation info, Object object) {
switch (schema.getType()) {
case RECORD:
if (object instanceof IndexedRecord) {
-   return convertRecord(schema, 
(RowTypeInfo) info, (IndexedRecord) object);
+   return convertAvroRecordToRow(schema, 
(RowTypeInfo) info, (IndexedRecord) object);
}
throw new IllegalStateException("IndexedRecord 
expected but was: " + object.getClass());
case ENUM:
case STRING:
return object.toString();
case ARRAY:
if (info instanceof BasicArrayTypeInfo) {
-   final BasicArrayTypeInfo bati = 
(BasicArrayTypeInfo) info;
-   final TypeInformation elementInfo = 
bati.getComponentInfo();
-   return 
convertObjectArray(schema.getElementType(), elementInfo, object);
+   final TypeInformation elementInfo = 
((BasicArrayTypeInfo) info).getComponentInfo();
+   return 
convertToObjectArray(schema.getElementType(), elementInfo, object);
} else {
-   final ObjectArrayTypeInfo oati = 
(ObjectArrayTypeInfo) info;
-   final TypeInformation elementInfo = 
oati.getComponentInfo();
-   return 
convertObjectArray(schema.getElementType(), elementInfo, object);
+   final TypeInformation elementInfo = 
((ObjectArrayTypeInfo) info).getComponentInfo();
+   return 
convertToObjectArray(schema.getElementType(), elementInfo, object);
}
case MAP:
-   final MapTypeInfo mti = (MapTypeInfo) info;
+   final MapTypeInfo mapTypeInfo = 
(MapTypeInfo) info;
final Map convertedMap = new 
HashMap<>();
final Map map = (Map) object;
for (Map.Entry entry : map.entrySet()) {
convertedMap.put(
entry.getKey().toString(),
-   convert(schema.getValueType(), 
mti.getValueTypeInfo(), entry.getValue()));
+   
convertAvroType(schema.getValueType(), mapTypeInfo.getValueTypeInfo(), 
entry.getValue()));
}
return convertedMap;
case UNION:
final List types = schema.getTypes();
final int size = types.size();
final Schema actualSchema;
if (size == 2 && types.get(0).getType() == 
Schema.Type.NULL) {
-   return convert(types.get(1), info, 
object);
+   return convertAvroType(types.get(1), 
info, object);
} else if (size == 2 && types.get(1).getType() 
== Schema.Type.NULL) {
-   return convert(types.get(0), info, 
object);
+   return convertAvroType(types.get(0), 
info, object);
} else if (size == 1) {
-   return convert(types.get(0), info, 
object);
+   return convertAvroType(types.get(0), 
info, object);
} else {
// generic type
return object;
}
case FIXED:
final byte[] fixedBytes = ((GenericFixed) 
object).bytes();
if (info == Types.BIG_DEC) {
-   return convertDecimal(schema, 
fixedBytes);
+   return convertToDecimal(schema, 
fixedBytes);
}
return fixedBytes;
case BYTES:
-   final ByteBuffer bb = (ByteBuffer) object;
- 

[GitHub] flink pull request #6218: [FLINK-9444] [formats] Add full SQL support for Av...

2018-07-03 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6218#discussion_r199781443
  
--- Diff: 
flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java
 ---
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.typeutils;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.formats.avro.generated.User;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link AvroSchemaConverter}.
+ */
+public class AvroSchemaConverterTest {
+
+   @Test
+   public void testAvroClassConversion() {
+   validateUserSchema(AvroSchemaConverter.convert(User.class));
+   }
+
+   @Test
+   public void testAvroSchemaConversion() {
+   final String schema = User.getClassSchema().toString(true);
+   validateUserSchema(AvroSchemaConverter.convert(schema));
+   }
+
+   private void validateUserSchema(TypeInformation actual) {
+   final TypeInformation address = Types.ROW_NAMED(
+   new String[]{"num", "street", "city", "state", "zip"},
+   Types.INT, Types.STRING, Types.STRING, Types.STRING, 
Types.STRING);
+
+   final TypeInformation user = Types.ROW_NAMED(
+   new String[] {"name", "favorite_number", 
"favorite_color", "type_long_test",
--- End diff --

I would argue that in that case one entry per line is more readable. The 
problem with such lines is that whenever someone modifies one entry or add an 
entry in the middle, diffs are unreadable. Also any conflicts (if two commits 
added an entry) with multiple entries per line are nasty, while with one entry 
per line usually there are no conflicts - or they are easy to solve.


---


[jira] [Commented] (FLINK-9444) KafkaAvroTableSource failed to work for map and array fields

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531263#comment-16531263
 ] 

ASF GitHub Bot commented on FLINK-9444:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6218#discussion_r199780422
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
 ---
@@ -201,71 +202,69 @@ private Object convert(Schema schema, 
TypeInformation info, Object object) {
switch (schema.getType()) {
case RECORD:
if (object instanceof IndexedRecord) {
-   return convertRecord(schema, 
(RowTypeInfo) info, (IndexedRecord) object);
+   return convertAvroRecordToRow(schema, 
(RowTypeInfo) info, (IndexedRecord) object);
}
throw new IllegalStateException("IndexedRecord 
expected but was: " + object.getClass());
case ENUM:
case STRING:
return object.toString();
case ARRAY:
if (info instanceof BasicArrayTypeInfo) {
-   final BasicArrayTypeInfo bati = 
(BasicArrayTypeInfo) info;
-   final TypeInformation elementInfo = 
bati.getComponentInfo();
-   return 
convertObjectArray(schema.getElementType(), elementInfo, object);
+   final TypeInformation elementInfo = 
((BasicArrayTypeInfo) info).getComponentInfo();
+   return 
convertToObjectArray(schema.getElementType(), elementInfo, object);
} else {
-   final ObjectArrayTypeInfo oati = 
(ObjectArrayTypeInfo) info;
-   final TypeInformation elementInfo = 
oati.getComponentInfo();
-   return 
convertObjectArray(schema.getElementType(), elementInfo, object);
+   final TypeInformation elementInfo = 
((ObjectArrayTypeInfo) info).getComponentInfo();
+   return 
convertToObjectArray(schema.getElementType(), elementInfo, object);
}
case MAP:
-   final MapTypeInfo mti = (MapTypeInfo) info;
+   final MapTypeInfo mapTypeInfo = 
(MapTypeInfo) info;
final Map convertedMap = new 
HashMap<>();
final Map map = (Map) object;
for (Map.Entry entry : map.entrySet()) {
convertedMap.put(
entry.getKey().toString(),
-   convert(schema.getValueType(), 
mti.getValueTypeInfo(), entry.getValue()));
+   
convertAvroType(schema.getValueType(), mapTypeInfo.getValueTypeInfo(), 
entry.getValue()));
}
return convertedMap;
case UNION:
final List types = schema.getTypes();
final int size = types.size();
final Schema actualSchema;
if (size == 2 && types.get(0).getType() == 
Schema.Type.NULL) {
-   return convert(types.get(1), info, 
object);
+   return convertAvroType(types.get(1), 
info, object);
} else if (size == 2 && types.get(1).getType() 
== Schema.Type.NULL) {
-   return convert(types.get(0), info, 
object);
+   return convertAvroType(types.get(0), 
info, object);
} else if (size == 1) {
-   return convert(types.get(0), info, 
object);
+   return convertAvroType(types.get(0), 
info, object);
} else {
// generic type
return object;
}
case FIXED:
final byte[] fixedBytes = ((GenericFixed) 
object).bytes();
if (info == Types.BIG_DEC) {
-   return convertDecimal(schema, 
fixedBytes);
+  

[jira] [Commented] (FLINK-9444) KafkaAvroTableSource failed to work for map and array fields

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531264#comment-16531264
 ] 

ASF GitHub Bot commented on FLINK-9444:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6218#discussion_r199781443
  
--- Diff: 
flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java
 ---
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.typeutils;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.formats.avro.generated.User;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link AvroSchemaConverter}.
+ */
+public class AvroSchemaConverterTest {
+
+   @Test
+   public void testAvroClassConversion() {
+   validateUserSchema(AvroSchemaConverter.convert(User.class));
+   }
+
+   @Test
+   public void testAvroSchemaConversion() {
+   final String schema = User.getClassSchema().toString(true);
+   validateUserSchema(AvroSchemaConverter.convert(schema));
+   }
+
+   private void validateUserSchema(TypeInformation actual) {
+   final TypeInformation address = Types.ROW_NAMED(
+   new String[]{"num", "street", "city", "state", "zip"},
+   Types.INT, Types.STRING, Types.STRING, Types.STRING, 
Types.STRING);
+
+   final TypeInformation user = Types.ROW_NAMED(
+   new String[] {"name", "favorite_number", 
"favorite_color", "type_long_test",
--- End diff --

I would argue that in that case one entry per line is more readable. The 
problem with such lines is that whenever someone modifies one entry or add an 
entry in the middle, diffs are unreadable. Also any conflicts (if two commits 
added an entry) with multiple entries per line are nasty, while with one entry 
per line usually there are no conflicts - or they are easy to solve.


> KafkaAvroTableSource failed to work for map and array fields
> 
>
> Key: FLINK-9444
> URL: https://issues.apache.org/jira/browse/FLINK-9444
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Table API & SQL
>Affects Versions: 1.6.0
>Reporter: Jun Zhang
>Assignee: Jun Zhang
>Priority: Blocker
>  Labels: patch, pull-request-available
> Fix For: 1.6.0
>
> Attachments: flink-9444.patch
>
>
> When some Avro schema has map/array fields and the corresponding TableSchema 
> declares *MapTypeInfo/ListTypeInfo* for these fields, an exception will be 
> thrown when registering the *KafkaAvroTableSource*, complaining like:
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> Type Map of table field 'event' does not match with type 
> GenericType of the field 'event' of the TableSource return 
> type.
>  at org.apache.flink.table.api.ValidationException$.apply(exceptions.scala:74)
>  at 
> org.apache.flink.table.sources.TableSourceUtil$$anonfun$validateTableSource$1.apply(TableSourceUtil.scala:92)
>  at 
> org.apache.flink.table.sources.TableSourceUtil$$anonfun$validateTableSource$1.apply(TableSourceUtil.scala:71)
>  at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>  at 
> org.apache.flink.table.sources.TableSourceUtil$.validateTableSource(TableSourceUtil.scala:71)
>  at 
> org.apache.flink.table.plan.schema.StreamTableSourceTable.(S

[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

2018-07-03 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r199782303
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
 ---
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.IntStream;
+
+/**
+ * Base class for composite serializers.
+ *
+ * This class serializes a composite type using array of its field 
serializers.
+ * Fields are indexed the same way as their serializers.
+ *
+ * @param  type of custom serialized value
+ */
+public abstract class CompositeSerializer extends TypeSerializer {
+   private static final long serialVersionUID = 1L;
+
+   /** Serializers for fields which constitute T. */
+   protected final TypeSerializer[] fieldSerializers;
+
+   /** Whether T is an immutable type. */
+   final boolean immutableTargetType;
+
+   /** Byte length of target object in serialized form. */
+   private final int length;
+
+   /** Whether any field serializer is stateful. */
+   private final boolean stateful;
+
+   private final int hashCode;
+
+   @SuppressWarnings("unchecked")
+   protected CompositeSerializer(boolean immutableTargetType, 
TypeSerializer ... fieldSerializers) {
+   Preconditions.checkNotNull(fieldSerializers);
+   
Preconditions.checkArgument(Arrays.stream(fieldSerializers).allMatch(Objects::nonNull));
+   this.immutableTargetType = immutableTargetType &&
+   
Arrays.stream(fieldSerializers).allMatch(TypeSerializer::isImmutableType);
+   this.fieldSerializers = (TypeSerializer[]) 
fieldSerializers;
+   this.length = calcLength();
+   this.stateful = isStateful();
+   this.hashCode = Arrays.hashCode(fieldSerializers);
--- End diff --

I think up to this point, the code is iterating `fieldSerializers` 5 times 
(null checks, immutable check, length calc, stateful check, and hash code 
computation. It could be done in one iteration, but since this method should 
typically not be called in hot loops, this is an optional improvement.


---


[GitHub] flink issue #6206: [FLINK-9654] [core] Changed the check for anonymous class...

2018-07-03 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/6206
  
@zsolt-donca I actually added some test to my branch 
https://github.com/tillrohrmann/flink/commits/FLINK-9654. If this should cover 
your fix, then I could merge your PR with this test. What do you think?


---


[GitHub] flink pull request #6218: [FLINK-9444] [formats] Add full SQL support for Av...

2018-07-03 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6218#discussion_r199782571
  
--- Diff: flink-formats/flink-avro/pom.xml ---
@@ -51,6 +51,17 @@ under the License.


 
+   
+   joda-time
+   joda-time
+   
--- End diff --

I do not feel competent enough here to make final call. Maybe you could ask 
@zentol (or someone else) about it?


---


[jira] [Commented] (FLINK-9654) Internal error while deserializing custom Scala TypeSerializer instances

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531266#comment-16531266
 ] 

ASF GitHub Bot commented on FLINK-9654:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/6206
  
@zsolt-donca I actually added some test to my branch 
https://github.com/tillrohrmann/flink/commits/FLINK-9654. If this should cover 
your fix, then I could merge your PR with this test. What do you think?


> Internal error while deserializing custom Scala TypeSerializer instances
> 
>
> Key: FLINK-9654
> URL: https://issues.apache.org/jira/browse/FLINK-9654
> Project: Flink
>  Issue Type: Bug
>Reporter: Zsolt Donca
>Assignee: Zsolt Donca
>Priority: Major
>  Labels: pull-request-available
>
> When you are using custom `TypeSerializer` instances implemented in Scala, 
> the Scala issue [SI-2034|https://issues.scala-lang.org/browse/SI-2034] can 
> manifest itself when a Flink job is restored from checkpoint or started with 
> a savepoint.
> The reason is that in such a restore from checkpoint or savepoint, Flink uses 
> `InstantiationUtil.FailureTolerantObjectInputStream` to deserialize the type 
> serializers and their configurations. The deserialization walks through the 
> entire object graph corresponding, and for each class it calls 
> `isAnonymousClass`, which, in turn, calls `getSimpleName` (mechanism in place 
> for FLINK-6869). If there is an internal class defined in a Scala object for 
> which `getSimpleName` fails (see the Scala issue), then a 
> `java.lang.InternalError` is thrown which causes the task manager to restart. 
> In this case, Flink tries to restart the job on another task manager, causing 
> all the task managers to restart, wreaking havoc on the entire Flink cluster.
> There are some alternative type information derivation mechanisms that rely 
> on anonymous classes and, most importantly, classes generated by macros, that 
> can easily trigger the above problem. I am personally working on 
> [https://github.com/zsolt-donca/flink-alt], and there is also 
> [https://github.com/joroKr21/flink-shapeless]
> I prepared a pull request that fixes the issue. 
>  
> Edit: added a stack trace to help demonstrate the issue.
> 2018-06-21 13:08:07.829 [today-stats (2/2)] ERROR 
> org.apache.flink.runtime.taskmanager.Task  - Encountered fatal error 
> java.lang.InternalError - terminating the JVM
>  java.lang.InternalError: Malformed class name
>          at java.lang.Class.getSimpleName(Class.java:1330) ~[na:1.8.0_171]
>          at java.lang.Class.isAnonymousClass(Class.java:1411) ~[na:1.8.0_171]
>          at 
> org.apache.flink.util.InstantiationUtil$FailureTolerantObjectInputStream.readClassDescriptor(InstantiationUtil.java:206)
>  ~[flink-dist_2.11-1.4.2.jar:1.4.2]
>          at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1855) 
> ~[na:1.8.0_171]
>          at 
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1749) 
> ~[na:1.8.0_171]
>          at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2040) 
> ~[na:1.8.0_171]
>          at 
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) 
> ~[na:1.8.0_171]
>          at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285) 
> ~[na:1.8.0_171]
>          at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209) 
> ~[na:1.8.0_171]
>          at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067) 
> ~[na:1.8.0_171]
>          at 
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) 
> ~[na:1.8.0_171]
>          at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285) 
> ~[na:1.8.0_171]
>          at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209) 
> ~[na:1.8.0_171]
>          at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067) 
> ~[na:1.8.0_171]
>          at 
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) 
> ~[na:1.8.0_171]
>          at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) 
> ~[na:1.8.0_171]
>          at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:375)
>  ~[flink-dist_2.11-1.4.2.jar:1.4.2]
>          at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:110)
>  ~[flink-dist_2.11-1.4.2.jar:1.4.2]
>          at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:83)
>  ~[flink-dist_2.11-1.

[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531265#comment-16531265
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r199782303
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
 ---
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.IntStream;
+
+/**
+ * Base class for composite serializers.
+ *
+ * This class serializes a composite type using array of its field 
serializers.
+ * Fields are indexed the same way as their serializers.
+ *
+ * @param  type of custom serialized value
+ */
+public abstract class CompositeSerializer extends TypeSerializer {
+   private static final long serialVersionUID = 1L;
+
+   /** Serializers for fields which constitute T. */
+   protected final TypeSerializer[] fieldSerializers;
+
+   /** Whether T is an immutable type. */
+   final boolean immutableTargetType;
+
+   /** Byte length of target object in serialized form. */
+   private final int length;
+
+   /** Whether any field serializer is stateful. */
+   private final boolean stateful;
+
+   private final int hashCode;
+
+   @SuppressWarnings("unchecked")
+   protected CompositeSerializer(boolean immutableTargetType, 
TypeSerializer ... fieldSerializers) {
+   Preconditions.checkNotNull(fieldSerializers);
+   
Preconditions.checkArgument(Arrays.stream(fieldSerializers).allMatch(Objects::nonNull));
+   this.immutableTargetType = immutableTargetType &&
+   
Arrays.stream(fieldSerializers).allMatch(TypeSerializer::isImmutableType);
+   this.fieldSerializers = (TypeSerializer[]) 
fieldSerializers;
+   this.length = calcLength();
+   this.stateful = isStateful();
+   this.hashCode = Arrays.hashCode(fieldSerializers);
--- End diff --

I think up to this point, the code is iterating `fieldSerializers` 5 times 
(null checks, immutable check, length calc, stateful check, and hash code 
computation. It could be done in one iteration, but since this method should 
typically not be called in hot loops, this is an optional improvement.


> Wrap state binder with TTL logic
> 
>
> Key: FLINK-9513
> URL: https://issues.apache.org/jira/browse/FLINK-9513
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> The main idea is to wrap user state value with a class holding the value and 
> the expiration timestamp (maybe meta data in future) and use the new object 
> as a value in the existing implementations:
> {code:java}
> class TtlValue {
>   V value;
>   long expirationTimestamp;
> }
> {code}
> The original state binder factory is wrapped with TtlStateBinder if TTL is 
> enabled:
> {code:java}
> state = ttlConfig.updateType == DISABLED ?
>  bind(binder) : bind(new TtlStateBinder(binder, timerService));
> {code}
> TtlStateBinder decorates the states produced by the original binder with TTL 
> logic wrappers and adds TtlValue serialisation logic:

[jira] [Commented] (FLINK-9444) KafkaAvroTableSource failed to work for map and array fields

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531269#comment-16531269
 ] 

ASF GitHub Bot commented on FLINK-9444:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6218#discussion_r199782571
  
--- Diff: flink-formats/flink-avro/pom.xml ---
@@ -51,6 +51,17 @@ under the License.


 
+   
+   joda-time
+   joda-time
+   
--- End diff --

I do not feel competent enough here to make final call. Maybe you could ask 
@zentol (or someone else) about it?


> KafkaAvroTableSource failed to work for map and array fields
> 
>
> Key: FLINK-9444
> URL: https://issues.apache.org/jira/browse/FLINK-9444
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Table API & SQL
>Affects Versions: 1.6.0
>Reporter: Jun Zhang
>Assignee: Jun Zhang
>Priority: Blocker
>  Labels: patch, pull-request-available
> Fix For: 1.6.0
>
> Attachments: flink-9444.patch
>
>
> When some Avro schema has map/array fields and the corresponding TableSchema 
> declares *MapTypeInfo/ListTypeInfo* for these fields, an exception will be 
> thrown when registering the *KafkaAvroTableSource*, complaining like:
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> Type Map of table field 'event' does not match with type 
> GenericType of the field 'event' of the TableSource return 
> type.
>  at org.apache.flink.table.api.ValidationException$.apply(exceptions.scala:74)
>  at 
> org.apache.flink.table.sources.TableSourceUtil$$anonfun$validateTableSource$1.apply(TableSourceUtil.scala:92)
>  at 
> org.apache.flink.table.sources.TableSourceUtil$$anonfun$validateTableSource$1.apply(TableSourceUtil.scala:71)
>  at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>  at 
> org.apache.flink.table.sources.TableSourceUtil$.validateTableSource(TableSourceUtil.scala:71)
>  at 
> org.apache.flink.table.plan.schema.StreamTableSourceTable.(StreamTableSourceTable.scala:33)
>  at 
> org.apache.flink.table.api.StreamTableEnvironment.registerTableSourceInternal(StreamTableEnvironment.scala:124)
>  at 
> org.apache.flink.table.api.TableEnvironment.registerTableSource(TableEnvironment.scala:438)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

2018-07-03 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r199782860
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
 ---
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.IntStream;
+
+/**
+ * Base class for composite serializers.
+ *
+ * This class serializes a composite type using array of its field 
serializers.
+ * Fields are indexed the same way as their serializers.
+ *
+ * @param  type of custom serialized value
+ */
+public abstract class CompositeSerializer extends TypeSerializer {
+   private static final long serialVersionUID = 1L;
+
+   /** Serializers for fields which constitute T. */
+   protected final TypeSerializer[] fieldSerializers;
+
+   /** Whether T is an immutable type. */
+   final boolean immutableTargetType;
+
+   /** Byte length of target object in serialized form. */
+   private final int length;
+
+   /** Whether any field serializer is stateful. */
+   private final boolean stateful;
+
+   private final int hashCode;
+
+   @SuppressWarnings("unchecked")
+   protected CompositeSerializer(boolean immutableTargetType, 
TypeSerializer ... fieldSerializers) {
+   Preconditions.checkNotNull(fieldSerializers);
+   
Preconditions.checkArgument(Arrays.stream(fieldSerializers).allMatch(Objects::nonNull));
+   this.immutableTargetType = immutableTargetType &&
+   
Arrays.stream(fieldSerializers).allMatch(TypeSerializer::isImmutableType);
+   this.fieldSerializers = (TypeSerializer[]) 
fieldSerializers;
+   this.length = calcLength();
+   this.stateful = isStateful();
+   this.hashCode = Arrays.hashCode(fieldSerializers);
+   }
+
+   private boolean isStateful() {
+   TypeSerializer[] duplicatedSerializers = 
duplicateFieldSerializers();
--- End diff --

The flag for `isStateful()` is the only one that I suggested as a candidate 
for lazy init when `duplicate()` is called for the first time. Reason is that 
duplicating some types of inner serializers can sometimes be a bit expensive. 
But again, I feel that this can also be changed in followup work, if needed.


---


[GitHub] flink pull request #6188: [FLINK-6846][Table API] add timestampAdd tableApi

2018-07-03 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6188#discussion_r199783102
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
 ---
@@ -1029,6 +1029,29 @@ object temporalOverlaps {
 TemporalOverlaps(leftTimePoint, leftTemporal, rightTimePoint, 
rightTemporal)
   }
 }
+/**
+  * Adds a (signed) integer interval to a timestamp. The unit for the 
interval is given
+  * by the unit argument, which should be one of the following values: 
"SECOND", "MINUTE",
+  * "HOUR", "DAY", "WEEK", "MONTH", "QUARTER" or "YEAR".
+  *
+  * e.g. timestampAdd("WEEK", 1, '2003-01-02'.toDate) leads to 
"2003-01-09".
+  */
+object timestampAdd {
+
+  /**
+* Adds a (signed) integer interval to a timestamp. The unit for the 
interval is given
+* by the unit argument, which should be one of the following values: 
"SECOND", "MINUTE",
+* "HOUR", "DAY", "WEEK", "MONTH", "QUARTER" or "YEAR".
+*
+* e.g. timestampAdd("WEEK", 1, '2003-01-02'.toDate) leads to 
"2003-01-09".
+  */
+  def apply(
+  unit: Expression,
--- End diff --

I agree, we should use `quarter` for expressing a time and not for a 
special extraction function that can also be achieved with `.extract(...)`.


---


[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531270#comment-16531270
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r199782860
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
 ---
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.IntStream;
+
+/**
+ * Base class for composite serializers.
+ *
+ * This class serializes a composite type using array of its field 
serializers.
+ * Fields are indexed the same way as their serializers.
+ *
+ * @param  type of custom serialized value
+ */
+public abstract class CompositeSerializer extends TypeSerializer {
+   private static final long serialVersionUID = 1L;
+
+   /** Serializers for fields which constitute T. */
+   protected final TypeSerializer[] fieldSerializers;
+
+   /** Whether T is an immutable type. */
+   final boolean immutableTargetType;
+
+   /** Byte length of target object in serialized form. */
+   private final int length;
+
+   /** Whether any field serializer is stateful. */
+   private final boolean stateful;
+
+   private final int hashCode;
+
+   @SuppressWarnings("unchecked")
+   protected CompositeSerializer(boolean immutableTargetType, 
TypeSerializer ... fieldSerializers) {
+   Preconditions.checkNotNull(fieldSerializers);
+   
Preconditions.checkArgument(Arrays.stream(fieldSerializers).allMatch(Objects::nonNull));
+   this.immutableTargetType = immutableTargetType &&
+   
Arrays.stream(fieldSerializers).allMatch(TypeSerializer::isImmutableType);
+   this.fieldSerializers = (TypeSerializer[]) 
fieldSerializers;
+   this.length = calcLength();
+   this.stateful = isStateful();
+   this.hashCode = Arrays.hashCode(fieldSerializers);
+   }
+
+   private boolean isStateful() {
+   TypeSerializer[] duplicatedSerializers = 
duplicateFieldSerializers();
--- End diff --

The flag for `isStateful()` is the only one that I suggested as a candidate 
for lazy init when `duplicate()` is called for the first time. Reason is that 
duplicating some types of inner serializers can sometimes be a bit expensive. 
But again, I feel that this can also be changed in followup work, if needed.


> Wrap state binder with TTL logic
> 
>
> Key: FLINK-9513
> URL: https://issues.apache.org/jira/browse/FLINK-9513
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> The main idea is to wrap user state value with a class holding the value and 
> the expiration timestamp (maybe meta data in future) and use the new object 
> as a value in the existing implementations:
> {code:java}
> class TtlValue {
>   V value;
>   long expirationTimestamp;
> }
> {code}
> The original state binder factory is wrapped with TtlStateBinder if TTL is 
> enabled:
> {code:java}
> state = ttlConfig.updateType == DISABLED ?
>  bind(binder) : bind(new TtlStateBinder(binder, timerSe

[jira] [Commented] (FLINK-6846) Add TIMESTAMPADD supported in TableAPI

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531274#comment-16531274
 ] 

ASF GitHub Bot commented on FLINK-6846:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6188#discussion_r199783102
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
 ---
@@ -1029,6 +1029,29 @@ object temporalOverlaps {
 TemporalOverlaps(leftTimePoint, leftTemporal, rightTimePoint, 
rightTemporal)
   }
 }
+/**
+  * Adds a (signed) integer interval to a timestamp. The unit for the 
interval is given
+  * by the unit argument, which should be one of the following values: 
"SECOND", "MINUTE",
+  * "HOUR", "DAY", "WEEK", "MONTH", "QUARTER" or "YEAR".
+  *
+  * e.g. timestampAdd("WEEK", 1, '2003-01-02'.toDate) leads to 
"2003-01-09".
+  */
+object timestampAdd {
+
+  /**
+* Adds a (signed) integer interval to a timestamp. The unit for the 
interval is given
+* by the unit argument, which should be one of the following values: 
"SECOND", "MINUTE",
+* "HOUR", "DAY", "WEEK", "MONTH", "QUARTER" or "YEAR".
+*
+* e.g. timestampAdd("WEEK", 1, '2003-01-02'.toDate) leads to 
"2003-01-09".
+  */
+  def apply(
+  unit: Expression,
--- End diff --

I agree, we should use `quarter` for expressing a time and not for a 
special extraction function that can also be achieved with `.extract(...)`.


> Add TIMESTAMPADD supported in TableAPI
> --
>
> Key: FLINK-6846
> URL: https://issues.apache.org/jira/browse/FLINK-6846
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>Priority: Major
>  Labels: pull-request-available, starter
>
> See FLINK-6811 for detail.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

2018-07-03 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r199783587
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
 ---
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.IntStream;
+
+/**
+ * Base class for composite serializers.
+ *
+ * This class serializes a composite type using array of its field 
serializers.
+ * Fields are indexed the same way as their serializers.
+ *
+ * @param  type of custom serialized value
+ */
+public abstract class CompositeSerializer extends TypeSerializer {
+   private static final long serialVersionUID = 1L;
+
+   /** Serializers for fields which constitute T. */
+   protected final TypeSerializer[] fieldSerializers;
+
+   /** Whether T is an immutable type. */
+   final boolean immutableTargetType;
+
+   /** Byte length of target object in serialized form. */
+   private final int length;
+
+   /** Whether any field serializer is stateful. */
+   private final boolean stateful;
+
+   private final int hashCode;
+
+   @SuppressWarnings("unchecked")
+   protected CompositeSerializer(boolean immutableTargetType, 
TypeSerializer ... fieldSerializers) {
+   Preconditions.checkNotNull(fieldSerializers);
+   
Preconditions.checkArgument(Arrays.stream(fieldSerializers).allMatch(Objects::nonNull));
+   this.immutableTargetType = immutableTargetType &&
+   
Arrays.stream(fieldSerializers).allMatch(TypeSerializer::isImmutableType);
+   this.fieldSerializers = (TypeSerializer[]) 
fieldSerializers;
+   this.length = calcLength();
+   this.stateful = isStateful();
+   this.hashCode = Arrays.hashCode(fieldSerializers);
+   }
+
+   private boolean isStateful() {
+   TypeSerializer[] duplicatedSerializers = 
duplicateFieldSerializers();
+   return IntStream.range(0, fieldSerializers.length)
+   .anyMatch(i -> fieldSerializers[i] != 
duplicatedSerializers[i]);
+   }
+
+   /** Create new instance from its fields.  */
+   public abstract T createInstance(@Nonnull Object ... values);
+
+   /** Modify field of existing instance. Supported only by mutable types. 
*/
+   protected abstract void setField(@Nonnull T value, int index, Object 
fieldValue);
+
+   /** Get field of existing instance. */
+   protected abstract Object getField(@Nonnull T value, int index);
+
+   /** Factory for concrete serializer. */
+   protected abstract CompositeSerializer 
createSerializerInstance(TypeSerializer ... originalSerializers);
+
+   @Override
+   public CompositeSerializer duplicate() {
+   return stateful ? 
createSerializerInstance(duplicateFieldSerializers()) : this;
--- End diff --

Another small point here for `createSerializerInstance(...)`: we have no 
(non-public) constructor that can also take all boolean flags, length, and 
(maybe) hash directly. So if we copy the serializer, I guess it always goes 
through the whole process again to figure this out, but we could just copy it 
from the previous instance.


---


[GitHub] flink pull request #6222: [FLINK-8785][rest] Handle JobSubmissionExceptions

2018-07-03 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6222#discussion_r199783421
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -66,6 +67,9 @@ public JobSubmitHandler(
}
 
return gateway.submitJob(jobGraph, timeout)
-   .thenApply(ack -> new JobSubmitResponseBody("/jobs/" + 
jobGraph.getJobID()));
+   .thenApply(ack -> new JobSubmitResponseBody("/jobs/" + 
jobGraph.getJobID()))
+   .exceptionally(exception -> {
+   throw new CompletionException(new 
RestHandlerException("Job submission failed.", 
HttpResponseStatus.INTERNAL_SERVER_ERROR, exception));
--- End diff --

I would be in favor of approach 3 because we are doing something similar 
for the `JobExecutionResult`/`JobResult`. We could then throw the exception on 
the `RestClusterClient`. And I also agree that this is something we can add as 
a follow up. Can you please create a JIRA issue for this @zentol.


---


<    1   2   3   4   5   >