[GitHub] flink pull request #6238: [FLINK-9636][network] fix inconsistency with faile...

2018-07-03 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/6238#discussion_r199702444
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
 ---
@@ -147,7 +151,12 @@ public void recycle(MemorySegment segment) {
 
this.numTotalRequiredBuffers += numRequiredBuffers;
 
-   redistributeBuffers();
+   try {
+   redistributeBuffers();
+   } catch (Throwable t) {
+   this.numTotalRequiredBuffers -= 
numRequiredBuffers;
+   ExceptionUtils.rethrowIOException(t);
+   }
}
 
final List segments = new 
ArrayList<>(numRequiredBuffers);
--- End diff --

ah, true, thanks for pointing this out, I must have been blind on one eye 
yesterday

I'll integrate this change as well


---


[jira] [Commented] (FLINK-9622) DistributedCacheDfsTest failed on travis

2018-07-03 Thread Till Rohrmann (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530928#comment-16530928
 ] 

Till Rohrmann commented on FLINK-9622:
--

Well the actual problem is that there are two concurrent directory creation 
operations happening and {{LocalFileSystem#create}} fails if it cannot create 
the directory.

> DistributedCacheDfsTest failed on travis
> 
>
> Key: FLINK-9622
> URL: https://issues.apache.org/jira/browse/FLINK-9622
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.6.0
>Reporter: Sihua Zhou
>Assignee: Till Rohrmann
>Priority: Blocker
> Fix For: 1.6.0
>
>
> DistributedCacheDfsTest#testDistributeFileViaDFS() failed flakey on travis.
> instance: https://api.travis-ci.org/v3/job/394399700/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9706) DispatcherTest#testSubmittedJobGraphListener fails on Travis

2018-07-03 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-9706:
---

 Summary: DispatcherTest#testSubmittedJobGraphListener fails on 
Travis
 Key: FLINK-9706
 URL: https://issues.apache.org/jira/browse/FLINK-9706
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Coordination, Tests
Affects Versions: 1.5.0, 1.6.0
Reporter: Chesnay Schepler


https://travis-ci.org/apache/flink/jobs/399331775
{code:java}
testSubmittedJobGraphListener(org.apache.flink.runtime.dispatcher.DispatcherTest)
  Time elapsed: 0.103 sec  <<< FAILURE!
java.lang.AssertionError: 
Expected: a collection with size <1>
 but: collection size was <0>
at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
at org.junit.Assert.assertThat(Assert.java:956)
at org.junit.Assert.assertThat(Assert.java:923)
at 
org.apache.flink.runtime.dispatcher.DispatcherTest.testSubmittedJobGraphListener(DispatcherTest.java:294)

testSubmittedJobGraphListener(org.apache.flink.runtime.dispatcher.DispatcherTest)
  Time elapsed: 0.11 sec  <<< ERROR!
org.apache.flink.runtime.util.TestingFatalErrorHandler$TestingException: 
org.apache.flink.runtime.dispatcher.DispatcherException: Could not start the 
added job b8ab3b7fa8a929bf608a5b65896a2b17
at 
org.apache.flink.runtime.util.TestingFatalErrorHandler.rethrowError(TestingFatalErrorHandler.java:51)
at 
org.apache.flink.runtime.dispatcher.DispatcherTest.tearDown(DispatcherTest.java:219)
Caused by: org.apache.flink.runtime.dispatcher.DispatcherException: Could not 
start the added job b8ab3b7fa8a929bf608a5b65896a2b17
at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$onAddedJobGraph$28(Dispatcher.java:845)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at 
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.util.FlinkException: Failed to submit job 
b8ab3b7fa8a929bf608a5b65896a2b17.
at 
org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:254)
at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$onAddedJobGraph$27(Dispatcher.java:836)
at 
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at 
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at 

[jira] [Commented] (FLINK-9178) Add rate control for kafka source

2018-07-03 Thread buptljy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530933#comment-16530933
 ] 

buptljy commented on FLINK-9178:


[~tzulitai] I'd like to add something about this issue, whick is very similar 
with the problem that I've met recently.
The program is developed for receiving realtime data and count distinct ip 
within a 10-minutes window, and sink the aggregated data into hbase.(The window 
is based on event time.) Now something goes wrong and we want to re-consume all 
data from kafka's earliest offset, but it can't work very well because there 
will be too many event-time-windows in the memory.
I think it'll be okay if we use ProcessingTime instead, because there will be 
only a single window even though you consume from the earliest offset. So I 
wonder if we can add a parameter to control the rate of receiving data, like a 
upper bound of consuming rate ?

> Add rate control for kafka source
> -
>
> Key: FLINK-9178
> URL: https://issues.apache.org/jira/browse/FLINK-9178
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: buptljy
>Assignee: Tarush Grover
>Priority: Major
>
> When I want to run the flink program from the earliest offset in Kafka, it'll 
> be very easy to cause OOM if there are too much data, because of too many 
> HeapMemorySegment in NetworkBufferPool.
> Maybe we should have some settings to control the rate of the receiving data?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9004) Cluster test: Run general purpose job with failures with Yarn session

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530977#comment-16530977
 ] 

ASF GitHub Bot commented on FLINK-9004:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6239
  
As i understand it we don't want this to be part of the source release, as 
such we need an exclusion in 
[create_source_release.sh](https://github.com/apache/flink/blob/master/tools/releasing/create_source_release.sh).


> Cluster test: Run general purpose job with failures with Yarn session
> -
>
> Key: FLINK-9004
> URL: https://issues.apache.org/jira/browse/FLINK-9004
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> Similar to FLINK-8973, we should run the general purpose job (FLINK-8971) on 
> a Yarn session cluster and simulate failures.
> The job jar should be ill-packaged, meaning that we include too many 
> dependencies in the user jar. We should include the Scala library, Hadoop and 
> Flink itself to verify that there are no class loading issues.
> The general purpose job should run with misbehavior activated. Additionally, 
> we should simulate at least the following failure scenarios:
> * Kill Flink processes
> * Kill connection to storage system for checkpoints and jobs
> * Simulate network partition
> We should run the test at least with the following state backend: RocksDB 
> incremental async and checkpointing to S3.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9004) Cluster test: Run general purpose job with failures with Yarn session

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530983#comment-16530983
 ] 

ASF GitHub Bot commented on FLINK-9004:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6239


> Cluster test: Run general purpose job with failures with Yarn session
> -
>
> Key: FLINK-9004
> URL: https://issues.apache.org/jira/browse/FLINK-9004
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> Similar to FLINK-8973, we should run the general purpose job (FLINK-8971) on 
> a Yarn session cluster and simulate failures.
> The job jar should be ill-packaged, meaning that we include too many 
> dependencies in the user jar. We should include the Scala library, Hadoop and 
> Flink itself to verify that there are no class loading issues.
> The general purpose job should run with misbehavior activated. Additionally, 
> we should simulate at least the following failure scenarios:
> * Kill Flink processes
> * Kill connection to storage system for checkpoints and jobs
> * Simulate network partition
> We should run the test at least with the following state backend: RocksDB 
> incremental async and checkpointing to S3.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6211: [FLINK-9665] PrometheusReporter does not properly unregis...

2018-07-03 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6211
  
I've merged the PR, @jelmerk could you close it? Thanks!


---


[GitHub] flink issue #6239: [FLINK-9004][tests] Implement Jepsen tests to test job av...

2018-07-03 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6239
  
whoops, sorry I accidentally closed this PR, added the wrong PR ID to a 
commit :(
I'm so sorry.



---


[jira] [Commented] (FLINK-9004) Cluster test: Run general purpose job with failures with Yarn session

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530985#comment-16530985
 ] 

ASF GitHub Bot commented on FLINK-9004:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6239
  
whoops, sorry I accidentally closed this PR, added the wrong PR ID to a 
commit :(
I'm so sorry.



> Cluster test: Run general purpose job with failures with Yarn session
> -
>
> Key: FLINK-9004
> URL: https://issues.apache.org/jira/browse/FLINK-9004
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> Similar to FLINK-8973, we should run the general purpose job (FLINK-8971) on 
> a Yarn session cluster and simulate failures.
> The job jar should be ill-packaged, meaning that we include too many 
> dependencies in the user jar. We should include the Scala library, Hadoop and 
> Flink itself to verify that there are no class loading issues.
> The general purpose job should run with misbehavior activated. Additionally, 
> we should simulate at least the following failure scenarios:
> * Kill Flink processes
> * Kill connection to storage system for checkpoints and jobs
> * Simulate network partition
> We should run the test at least with the following state backend: RocksDB 
> incremental async and checkpointing to S3.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9665) PrometheusReporter does not properly unregister metrics

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9665?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530986#comment-16530986
 ] 

ASF GitHub Bot commented on FLINK-9665:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6211
  
I've merged the PR, @jelmerk could you close it? Thanks!


> PrometheusReporter does not properly unregister metrics
> ---
>
> Key: FLINK-9665
> URL: https://issues.apache.org/jira/browse/FLINK-9665
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.5.0, 1.4.2, 1.6.0
>Reporter: Chesnay Schepler
>Assignee: Jelmer Kuperus
>Priority: Major
>  Labels: pull-request-available
>
> The {{PrometheusReporter}} groups metrics with the same logical scope in a 
> single {{Collector}} which are periodically polled by Prometheus.
> New metrics are added to an existing collector, and a reference count is 
> maintained so we can eventually cleanup the {{Collector}} itself.
> For removed metrics we decrease the reference count, do not however remove 
> the metrics that were added. As a result the collector will continue to 
> expose metrics, as long as at least 1 metric exists with the same logical 
> scope.
> If the collector is a {{io.prometheus.client.Gauge}} we can use the 
> {{#remove()}} method. For histograms we will have to modify our 
> {{HistogramSummaryProxy}} class to allow removing individual histograms.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9665) PrometheusReporter does not properly unregister metrics

2018-07-03 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9665?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-9665.
---
   Resolution: Fixed
Fix Version/s: 1.5.1
   1.4.3
   1.6.0

master: 75d12f967ccef5df3c6c513765bb8db1106a7c87

1.5: 903a323366b91a6f2f471f067462df9dd20cd3f4

1.4: 7719fddbf570075465a7a81722f9412735e83e6e

> PrometheusReporter does not properly unregister metrics
> ---
>
> Key: FLINK-9665
> URL: https://issues.apache.org/jira/browse/FLINK-9665
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.5.0, 1.4.2, 1.6.0
>Reporter: Chesnay Schepler
>Assignee: Jelmer Kuperus
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.4.3, 1.5.1
>
>
> The {{PrometheusReporter}} groups metrics with the same logical scope in a 
> single {{Collector}} which are periodically polled by Prometheus.
> New metrics are added to an existing collector, and a reference count is 
> maintained so we can eventually cleanup the {{Collector}} itself.
> For removed metrics we decrease the reference count, do not however remove 
> the metrics that were added. As a result the collector will continue to 
> expose metrics, as long as at least 1 metric exists with the same logical 
> scope.
> If the collector is a {{io.prometheus.client.Gauge}} we can use the 
> {{#remove()}} method. For histograms we will have to modify our 
> {{HistogramSummaryProxy}} class to allow removing individual histograms.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7593) Generated plan does not create correct groups

2018-07-03 Thread Fabian Hueske (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530995#comment-16530995
 ] 

Fabian Hueske commented on FLINK-7593:
--

Thanks for the feedback and the effort to reproduce the problem [~cshi]!
I'm curious how that bug was fixed and would like to have a look before closing 
the issue.

Thanks, Fabian

> Generated plan does not create correct groups
> -
>
> Key: FLINK-7593
> URL: https://issues.apache.org/jira/browse/FLINK-7593
> Project: Flink
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 1.3.2
> Environment: Windows 7, Ubuntu 16.04, Flink 1.3.2
>Reporter: Steffen Dienst
>Priority: Critical
> Attachments: flink-good-plan.json
>
>
> Under specific circumstances Flink seems to generate an execution plan that 
> is incorrect. I was using `groupBy(0).sum(1)` but the resulting csv files 
> contained multiple entries per group, the grouping did not occur. After some 
> work I managed to reduce the relevant part of our code to the minimal test 
> case below. Be careful: All parts need to be present, even the irrelevant 
> secondary output. If I remove anything else Flink generates correct code 
> (either by introducing a combiner node prior to the reducer or by using "Sum 
> (combine))" an the edge before the reducer.
> {code:java}
> import java.util.ArrayList;
> import java.util.Collection;
> import org.apache.flink.api.java.DataSet;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.core.fs.FileSystem.WriteMode;
> import org.apache.flink.types.LongValue;
> import org.apache.flink.util.LongValueSequenceIterator;
> public class FlinkOptimizerBug {
>   public static void main(String[] args) throws Exception {
> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> 
> DataSet> x = 
> env.fromParallelCollection(new LongValueSequenceIterator(0,1000), 
> LongValue.class)
> .map(l -> Tuple2.of(Math.round(Math.random()*1000) % 4, 1L))
> .join(env.fromParallelCollection(new 
> LongValueSequenceIterator(0,1000), LongValue.class)
> .map(l -> Tuple2.of(Math.round(Math.random()*1000) % 4, 1L)))
> .where(0).equalTo(0).with((t1,t2) -> t1)
> .union(env.fromParallelCollection(new 
> LongValueSequenceIterator(0,1000), LongValue.class)
> .map(l -> Tuple2.of(Math.round(Math.random()*1000) % 4,1L)))
> .map(l->l)
> .withForwardedFields("f0;f1");
> 
> Collection  out = new ArrayList();
> x.output(new LocalCollectionOutputFormat<>(out ));
> 
> x.groupBy(0)
> .sum(1) //BUG: this will not be grouped correctly, so there will be 
> multiple outputs per group!
> .writeAsCsv("/tmp/foo", WriteMode.OVERWRITE)
> .setParallelism(1);
> env.setParallelism(4);
> 
> System.out.println(env.getExecutionPlan());
> env.execute();
>   }
> }
> {code}
> Invalid execution plan generated:
> {code:javascript}
> {
>   "nodes": [
>   {
>   "id": 5,
>   "type": "source",
>   "pact": "Data Source",
>   "contents": "at 
> fromParallelCollection(ExecutionEnvironment.java:870) 
> (org.apache.flink.api.java.io.ParallelIteratorInputFormat)",
>   "parallelism": "4",
>   "global_properties": [
>   { "name": "Partitioning", "value": "RANDOM_PARTITIONED" 
> },
>   { "name": "Partitioning Order", "value": "(none)" },
>   { "name": "Uniqueness", "value": "not unique" }
>   ],
>   "local_properties": [
>   { "name": "Order", "value": "(none)" },
>   { "name": "Grouping", "value": "not grouped" },
>   { "name": "Uniqueness", "value": "not unique" }
>   ],
>   "estimates": [
>   { "name": "Est. Output Size", "value": "(unknown)" },
>   { "name": "Est. Cardinality", "value": "(unknown)" }
> ],
>   "costs": [
>   { "name": "Network", "value": "0.0" },
>   { "name": "Disk I/O", "value": "0.0" },
>   { "name": "CPU", "value": "0.0" },
>   { "name": "Cumulative Network", "value": "0.0" },
>   { "name": "Cumulative Disk I/O", "value": "0.0" },
>   { "name": "Cumulative CPU", "value": "0.0" }
>   ],
>   "compiler_hints": [
>   { "name": "Output Size (bytes)", "value": "(none)" },
>   { "name": "Output Cardinality", 

[GitHub] flink issue #6216: [FLINK-9674][tests] Replace hard-coded sleeps in QS E2E t...

2018-07-03 Thread florianschmidt1994
Github user florianschmidt1994 commented on the issue:

https://github.com/apache/flink/pull/6216
  
Thanks @zentol, I have one remark (see above), besides that looks good to 
me!

Additionally I had some ideas that came to mind that I think we could 
discuss:
- We have a common pattern of `wait_for_sth` functions, that either
  - get stuck in a loop for ever if the desired event doesn't happen (I 
think `wait_for_job_state_transition` also behaves like that, right?)
  - or iterate a fixed number of times and then continue execution, whereas 
instead they should fail.

I think we should add an issue for that to refactor that over all the tests 
to have consistent and useful behaviour
- Also I think that we could have the backup config and revert config as 
part of the test runner  and always do that, so we avoid running into a 
corrupted flink-dist if tests don't behave correctly?

What do you think!



---


[jira] [Commented] (FLINK-9699) Add api to replace registered table

2018-07-03 Thread Fabian Hueske (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16531004#comment-16531004
 ] 

Fabian Hueske commented on FLINK-9699:
--

It might be possible to change the translation behavior, i.e., to immediately 
create a logical TableScan that references the correct DataSet / TableSource 
when the {{tEnv.scan()}} method is called.
That would require a bit of a redesign of the Table API translation phase. I'm 
not opposed to that, especially if we can improve the support for 
notebook-style environments. 

However, it might be a larger effort that would need to be carefully planned. 
For example, the Table API handles it's own validation.

> Add api to replace registered table
> ---
>
> Key: FLINK-9699
> URL: https://issues.apache.org/jira/browse/FLINK-9699
> Project: Flink
>  Issue Type: Improvement
>Reporter: Jeff Zhang
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9674) Remove 65s sleep in QueryableState E2E test

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16531006#comment-16531006
 ] 

ASF GitHub Bot commented on FLINK-9674:
---

Github user florianschmidt1994 commented on the issue:

https://github.com/apache/flink/pull/6216
  
Thanks @zentol, I have one remark (see above), besides that looks good to 
me!

Additionally I had some ideas that came to mind that I think we could 
discuss:
- We have a common pattern of `wait_for_sth` functions, that either
  - get stuck in a loop for ever if the desired event doesn't happen (I 
think `wait_for_job_state_transition` also behaves like that, right?)
  - or iterate a fixed number of times and then continue execution, whereas 
instead they should fail.

I think we should add an issue for that to refactor that over all the tests 
to have consistent and useful behaviour
- Also I think that we could have the backup config and revert config as 
part of the test runner  and always do that, so we avoid running into a 
corrupted flink-dist if tests don't behave correctly?

What do you think!



> Remove 65s sleep in QueryableState E2E test
> ---
>
> Key: FLINK-9674
> URL: https://issues.apache.org/jira/browse/FLINK-9674
> Project: Flink
>  Issue Type: Improvement
>  Components: Queryable State, Tests
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
>
> The {{test_queryable_state_restart_tm.sh}} kills a taskmanager, waits for the 
> loss to be noticed, starts a new tm and waits for the job to continue.
> {code}
> kill_random_taskmanager
> [...]
> sleep 65 # this is a little longer than the heartbeat timeout so that the TM 
> is gone
> start_and_wait_for_tm
> {code}
> Instead of waiting for a fixed amount of time that is tied to some config 
> value we should wait for a specific event, like the job being canceled.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9554) flink scala shell doesn't work in yarn mode

2018-07-03 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-9554:
--
Labels: pull-request-available  (was: )

> flink scala shell doesn't work in yarn mode
> ---
>
> Key: FLINK-9554
> URL: https://issues.apache.org/jira/browse/FLINK-9554
> Project: Flink
>  Issue Type: Bug
>  Components: Scala Shell
>Affects Versions: 1.5.0
>Reporter: Jeff Zhang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.5.1
>
>
> It still try to use StandaloneCluster even I specify it using yarn mode.
>  
> Command I Use: bin/start-scala-shell.sh yarn -n 1
>  
> {code:java}
> Starting Flink Shell:
> 2018-06-06 12:30:02,672 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: jobmanager.rpc.address, localhost
> 2018-06-06 12:30:02,673 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: jobmanager.rpc.port, 6123
> 2018-06-06 12:30:02,674 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: jobmanager.heap.mb, 1024
> 2018-06-06 12:30:02,674 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: taskmanager.heap.mb, 1024
> 2018-06-06 12:30:02,674 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: taskmanager.numberOfTaskSlots, 1
> 2018-06-06 12:30:02,674 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: parallelism.default, 1
> 2018-06-06 12:30:02,675 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: rest.port, 8081
> Exception in thread "main" java.lang.UnsupportedOperationException: Can't 
> deploy a standalone cluster.
> at 
> org.apache.flink.client.deployment.StandaloneClusterDescriptor.deploySessionCluster(StandaloneClusterDescriptor.java:57)
> at 
> org.apache.flink.client.deployment.StandaloneClusterDescriptor.deploySessionCluster(StandaloneClusterDescriptor.java:31)
> at 
> org.apache.flink.api.scala.FlinkShell$.deployNewYarnCluster(FlinkShell.scala:272)
> at 
> org.apache.flink.api.scala.FlinkShell$.fetchConnectionInfo(FlinkShell.scala:164)
> at org.apache.flink.api.scala.FlinkShell$.liftedTree1$1(FlinkShell.scala:194)
> at org.apache.flink.api.scala.FlinkShell$.startShell(FlinkShell.scala:193)
> at org.apache.flink.api.scala.FlinkShell$.main(FlinkShell.scala:135)
> at org.apache.flink.api.scala.FlinkShell.main(FlinkShell.scala){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6140: [FLINK-9554] flink scala shell doesn't work in yar...

2018-07-03 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6140#discussion_r199724031
  
--- Diff: 
flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala ---
@@ -255,14 +257,25 @@ object FlinkShell {
 yarnConfig.queue.foreach((queue) => args ++= Seq("-yqu", 
queue.toString))
 yarnConfig.slots.foreach((slots) => args ++= Seq("-ys", 
slots.toString))
 
+val customCommandLines = CliFrontend.loadCustomCommandLines(
+  configuration,configurationDirectory)
+val commandOptions = CliFrontendParser.getRunCommandOptions
+val customCommandLineOptions = new Options()
+customCommandLines.asScala.foreach(cmd => {
--- End diff --

this is already done int he `CliFrontend` constructor. It may be sufficient 
to switch the initialization of `frontend` and `commandLine`.


---


[jira] [Created] (FLINK-9709) Docs: Unnecessary transition in flink job lifecycle visualization

2018-07-03 Thread Florian Schmidt (JIRA)
Florian Schmidt created FLINK-9709:
--

 Summary: Docs: Unnecessary transition in flink job lifecycle 
visualization
 Key: FLINK-9709
 URL: https://issues.apache.org/jira/browse/FLINK-9709
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Reporter: Florian Schmidt






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9554) flink scala shell doesn't work in yarn mode

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9554?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16531011#comment-16531011
 ] 

ASF GitHub Bot commented on FLINK-9554:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6140#discussion_r199724031
  
--- Diff: 
flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala ---
@@ -255,14 +257,25 @@ object FlinkShell {
 yarnConfig.queue.foreach((queue) => args ++= Seq("-yqu", 
queue.toString))
 yarnConfig.slots.foreach((slots) => args ++= Seq("-ys", 
slots.toString))
 
+val customCommandLines = CliFrontend.loadCustomCommandLines(
+  configuration,configurationDirectory)
+val commandOptions = CliFrontendParser.getRunCommandOptions
+val customCommandLineOptions = new Options()
+customCommandLines.asScala.foreach(cmd => {
--- End diff --

this is already done int he `CliFrontend` constructor. It may be sufficient 
to switch the initialization of `frontend` and `commandLine`.


> flink scala shell doesn't work in yarn mode
> ---
>
> Key: FLINK-9554
> URL: https://issues.apache.org/jira/browse/FLINK-9554
> Project: Flink
>  Issue Type: Bug
>  Components: Scala Shell
>Affects Versions: 1.5.0
>Reporter: Jeff Zhang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.5.1
>
>
> It still try to use StandaloneCluster even I specify it using yarn mode.
>  
> Command I Use: bin/start-scala-shell.sh yarn -n 1
>  
> {code:java}
> Starting Flink Shell:
> 2018-06-06 12:30:02,672 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: jobmanager.rpc.address, localhost
> 2018-06-06 12:30:02,673 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: jobmanager.rpc.port, 6123
> 2018-06-06 12:30:02,674 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: jobmanager.heap.mb, 1024
> 2018-06-06 12:30:02,674 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: taskmanager.heap.mb, 1024
> 2018-06-06 12:30:02,674 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: taskmanager.numberOfTaskSlots, 1
> 2018-06-06 12:30:02,674 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: parallelism.default, 1
> 2018-06-06 12:30:02,675 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: rest.port, 8081
> Exception in thread "main" java.lang.UnsupportedOperationException: Can't 
> deploy a standalone cluster.
> at 
> org.apache.flink.client.deployment.StandaloneClusterDescriptor.deploySessionCluster(StandaloneClusterDescriptor.java:57)
> at 
> org.apache.flink.client.deployment.StandaloneClusterDescriptor.deploySessionCluster(StandaloneClusterDescriptor.java:31)
> at 
> org.apache.flink.api.scala.FlinkShell$.deployNewYarnCluster(FlinkShell.scala:272)
> at 
> org.apache.flink.api.scala.FlinkShell$.fetchConnectionInfo(FlinkShell.scala:164)
> at org.apache.flink.api.scala.FlinkShell$.liftedTree1$1(FlinkShell.scala:194)
> at org.apache.flink.api.scala.FlinkShell$.startShell(FlinkShell.scala:193)
> at org.apache.flink.api.scala.FlinkShell$.main(FlinkShell.scala:135)
> at org.apache.flink.api.scala.FlinkShell.main(FlinkShell.scala){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9709) Docs: Unnecessary transition in flink job lifecycle visualization

2018-07-03 Thread Florian Schmidt (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9709?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Florian Schmidt updated FLINK-9709:
---
Description: The docs at 
[https://ci.apache.org/projects/flink/flink-docs-release-1.5/internals/job_scheduling.html]
 show the state transitions of a flink job. There are two actions called "Fail 
job" and two transitions called "Cancel job" that cause a transition from 
Running --> Failed or Running --> Cancelled, where there should only be one

> Docs: Unnecessary transition in flink job lifecycle visualization
> -
>
> Key: FLINK-9709
> URL: https://issues.apache.org/jira/browse/FLINK-9709
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: Florian Schmidt
>Priority: Trivial
>
> The docs at 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.5/internals/job_scheduling.html]
>  show the state transitions of a flink job. There are two actions called 
> "Fail job" and two transitions called "Cancel job" that cause a transition 
> from Running --> Failed or Running --> Cancelled, where there should only be 
> one



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9709) Docs: Unnecessary transition in flink job state visualization

2018-07-03 Thread Florian Schmidt (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9709?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Florian Schmidt updated FLINK-9709:
---
Summary: Docs: Unnecessary transition in flink job state visualization  
(was: Docs: Unnecessary transition in flink job lifecycle visualization)

> Docs: Unnecessary transition in flink job state visualization
> -
>
> Key: FLINK-9709
> URL: https://issues.apache.org/jira/browse/FLINK-9709
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: Florian Schmidt
>Priority: Trivial
>
> The docs at 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.5/internals/job_scheduling.html]
>  show the state transitions of a flink job. There are two actions called 
> "Fail job" and two transitions called "Cancel job" that cause a transition 
> from Running --> Failed or Running --> Cancelled, where there should only be 
> one



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9004) Cluster test: Run general purpose job with failures with Yarn session

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16531018#comment-16531018
 ] 

ASF GitHub Bot commented on FLINK-9004:
---

GitHub user GJL opened a pull request:

https://github.com/apache/flink/pull/6240

[FLINK-9004][tests] Implement Jepsen tests to test job availability.

## What is the purpose of the change

*Use the Jepsen framework (https://github.com/jepsen-io/jepsen) to implement
tests that verify Flink's HA capabilities under real-world faults, such as
sudden TaskManager/JobManager termination, HDFS NameNode unavailability, 
network
partitions, etc. The Flink cluster under test is automatically deployed on 
YARN
(session & job mode) and Mesos.*

Previous PR got closed accidentally: 
https://github.com/apache/flink/pull/6239

## Brief change log

  - *Implement Jepsen tests.*


## Verifying this change

This change added tests and can be verified as follows:

  - *The changes themselves are tests.*
  - *Run Jepsen tests in docker containers.*
  - *Run unit tests with `lein test`*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no** (at 
least not to Flink))
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** (but it will 
as soon as test failures appear) / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)

cc: @tillrohrmann @cewood @zentol @aljoscha 




You can merge this pull request into a Git repository by running:

$ git pull https://github.com/GJL/flink FLINK-9004

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6240.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6240


commit 063e4621a5982b55ee7f7b0935290bbc717a5a45
Author: gyao 
Date:   2018-03-05T21:23:33Z

[FLINK-9004][tests] Implement Jepsen tests to test job availability.

Use the Jepsen framework (https://github.com/jepsen-io/jepsen) to implement
tests that verify Flink's HA capabilities under real-world faults, such as
sudden TaskManager/JobManager termination, HDFS NameNode unavailability, 
network
partitions, etc. The Flink cluster under test is automatically deployed on 
YARN
(session & job mode) and Mesos.

Provide Dockerfiles for local test development.

commit 46f0ea7b14c9c59d6cc40903486978f4fd8354d3
Author: gyao 
Date:   2018-07-02T12:21:18Z

fixup! [FLINK-9004][tests] Implement Jepsen tests to test job availability.




> Cluster test: Run general purpose job with failures with Yarn session
> -
>
> Key: FLINK-9004
> URL: https://issues.apache.org/jira/browse/FLINK-9004
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> Similar to FLINK-8973, we should run the general purpose job (FLINK-8971) on 
> a Yarn session cluster and simulate failures.
> The job jar should be ill-packaged, meaning that we include too many 
> dependencies in the user jar. We should include the Scala library, Hadoop and 
> Flink itself to verify that there are no class loading issues.
> The general purpose job should run with misbehavior activated. Additionally, 
> we should simulate at least the following failure scenarios:
> * Kill Flink processes
> * Kill connection to storage system for checkpoints and jobs
> * Simulate network partition
> We should run the test at least with the following state backend: RocksDB 
> incremental async and checkpointing to S3.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6240: [FLINK-9004][tests] Implement Jepsen tests to test...

2018-07-03 Thread GJL
GitHub user GJL opened a pull request:

https://github.com/apache/flink/pull/6240

[FLINK-9004][tests] Implement Jepsen tests to test job availability.

## What is the purpose of the change

*Use the Jepsen framework (https://github.com/jepsen-io/jepsen) to implement
tests that verify Flink's HA capabilities under real-world faults, such as
sudden TaskManager/JobManager termination, HDFS NameNode unavailability, 
network
partitions, etc. The Flink cluster under test is automatically deployed on 
YARN
(session & job mode) and Mesos.*

Previous PR got closed accidentally: 
https://github.com/apache/flink/pull/6239

## Brief change log

  - *Implement Jepsen tests.*


## Verifying this change

This change added tests and can be verified as follows:

  - *The changes themselves are tests.*
  - *Run Jepsen tests in docker containers.*
  - *Run unit tests with `lein test`*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no** (at 
least not to Flink))
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** (but it will 
as soon as test failures appear) / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)

cc: @tillrohrmann @cewood @zentol @aljoscha 




You can merge this pull request into a Git repository by running:

$ git pull https://github.com/GJL/flink FLINK-9004

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6240.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6240


commit 063e4621a5982b55ee7f7b0935290bbc717a5a45
Author: gyao 
Date:   2018-03-05T21:23:33Z

[FLINK-9004][tests] Implement Jepsen tests to test job availability.

Use the Jepsen framework (https://github.com/jepsen-io/jepsen) to implement
tests that verify Flink's HA capabilities under real-world faults, such as
sudden TaskManager/JobManager termination, HDFS NameNode unavailability, 
network
partitions, etc. The Flink cluster under test is automatically deployed on 
YARN
(session & job mode) and Mesos.

Provide Dockerfiles for local test development.

commit 46f0ea7b14c9c59d6cc40903486978f4fd8354d3
Author: gyao 
Date:   2018-07-02T12:21:18Z

fixup! [FLINK-9004][tests] Implement Jepsen tests to test job availability.




---


[jira] [Created] (FLINK-9710) Make ClusterClient be used as multiple instances in a single jvm process

2018-07-03 Thread Chuanlei Ni (JIRA)
Chuanlei Ni created FLINK-9710:
--

 Summary: Make ClusterClient be used as multiple instances in a 
single jvm process
 Key: FLINK-9710
 URL: https://issues.apache.org/jira/browse/FLINK-9710
 Project: Flink
  Issue Type: Improvement
  Components: Client
Affects Versions: 1.4.2
Reporter: Chuanlei Ni


We can use `ClusterClient` to submit job, but it is designed for command. So we 
cannot use this class in a long running jvm process which will create multiple 
cluster client concurrently.

This Jira aims to make `ClusterClient` be used as multiple instances in a 
single jvm process.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9004) Cluster test: Run general purpose job with failures with Yarn session

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16531019#comment-16531019
 ] 

ASF GitHub Bot commented on FLINK-9004:
---

Github user GJL commented on the issue:

https://github.com/apache/flink/pull/6239
  
@zentol No problem, I opened a new one.


> Cluster test: Run general purpose job with failures with Yarn session
> -
>
> Key: FLINK-9004
> URL: https://issues.apache.org/jira/browse/FLINK-9004
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> Similar to FLINK-8973, we should run the general purpose job (FLINK-8971) on 
> a Yarn session cluster and simulate failures.
> The job jar should be ill-packaged, meaning that we include too many 
> dependencies in the user jar. We should include the Scala library, Hadoop and 
> Flink itself to verify that there are no class loading issues.
> The general purpose job should run with misbehavior activated. Additionally, 
> we should simulate at least the following failure scenarios:
> * Kill Flink processes
> * Kill connection to storage system for checkpoints and jobs
> * Simulate network partition
> We should run the test at least with the following state backend: RocksDB 
> incremental async and checkpointing to S3.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6239: [FLINK-9004][tests] Implement Jepsen tests to test job av...

2018-07-03 Thread GJL
Github user GJL commented on the issue:

https://github.com/apache/flink/pull/6239
  
@zentol No problem, I opened a new one.


---


[GitHub] flink issue #6240: [FLINK-9004][tests] Implement Jepsen tests to test job av...

2018-07-03 Thread GJL
Github user GJL commented on the issue:

https://github.com/apache/flink/pull/6240
  
As @zentol mentioned, we might want to exclude this from 
create_source_release.sh.


---


[jira] [Commented] (FLINK-9004) Cluster test: Run general purpose job with failures with Yarn session

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16531021#comment-16531021
 ] 

ASF GitHub Bot commented on FLINK-9004:
---

Github user GJL commented on the issue:

https://github.com/apache/flink/pull/6240
  
As @zentol mentioned, we might want to exclude this from 
create_source_release.sh.


> Cluster test: Run general purpose job with failures with Yarn session
> -
>
> Key: FLINK-9004
> URL: https://issues.apache.org/jira/browse/FLINK-9004
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> Similar to FLINK-8973, we should run the general purpose job (FLINK-8971) on 
> a Yarn session cluster and simulate failures.
> The job jar should be ill-packaged, meaning that we include too many 
> dependencies in the user jar. We should include the Scala library, Hadoop and 
> Flink itself to verify that there are no class loading issues.
> The general purpose job should run with misbehavior activated. Additionally, 
> we should simulate at least the following failure scenarios:
> * Kill Flink processes
> * Kill connection to storage system for checkpoints and jobs
> * Simulate network partition
> We should run the test at least with the following state backend: RocksDB 
> incremental async and checkpointing to S3.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9699) Add api to replace registered table

2018-07-03 Thread Jeff Zhang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16531027#comment-16531027
 ] 

Jeff Zhang commented on FLINK-9699:
---

Thanks [~fhueske], let's hold this ticket for now. I will try to look at the 
flink sql component and see how we do such redesign in a simple way. 

> Add api to replace registered table
> ---
>
> Key: FLINK-9699
> URL: https://issues.apache.org/jira/browse/FLINK-9699
> Project: Flink
>  Issue Type: Improvement
>Reporter: Jeff Zhang
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9674) Remove 65s sleep in QueryableState E2E test

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16531030#comment-16531030
 ] 

ASF GitHub Bot commented on FLINK-9674:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6216
  
@florianschmidt1994 I don't see the comment you're referring to :(
(Did you start a review, but wrote your comment separately? Then your 
review would still be in progress!)

The issues you identified are certainly valid, but I'm wondering if it 
really makes sense to invest time into this now.
There've been discussions about writing a python/java based framework for 
the tests (and I've already started tinkering on a java version); so any 
_significant_ changes we make to the bash-scripts might be subsumed soon.


> Remove 65s sleep in QueryableState E2E test
> ---
>
> Key: FLINK-9674
> URL: https://issues.apache.org/jira/browse/FLINK-9674
> Project: Flink
>  Issue Type: Improvement
>  Components: Queryable State, Tests
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
>
> The {{test_queryable_state_restart_tm.sh}} kills a taskmanager, waits for the 
> loss to be noticed, starts a new tm and waits for the job to continue.
> {code}
> kill_random_taskmanager
> [...]
> sleep 65 # this is a little longer than the heartbeat timeout so that the TM 
> is gone
> start_and_wait_for_tm
> {code}
> Instead of waiting for a fixed amount of time that is tied to some config 
> value we should wait for a specific event, like the job being canceled.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6216: [FLINK-9674][tests] Replace hard-coded sleeps in QS E2E t...

2018-07-03 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6216
  
@florianschmidt1994 I don't see the comment you're referring to :(
(Did you start a review, but wrote your comment separately? Then your 
review would still be in progress!)

The issues you identified are certainly valid, but I'm wondering if it 
really makes sense to invest time into this now.
There've been discussions about writing a python/java based framework for 
the tests (and I've already started tinkering on a java version); so any 
_significant_ changes we make to the bash-scripts might be subsumed soon.


---


[jira] [Commented] (FLINK-9674) Remove 65s sleep in QueryableState E2E test

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16531034#comment-16531034
 ] 

ASF GitHub Bot commented on FLINK-9674:
---

Github user florianschmidt1994 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6216#discussion_r199719951
  
--- Diff: 
flink-end-to-end-tests/test-scripts/test_queryable_state_restart_tm.sh ---
@@ -85,20 +90,23 @@ function run_test() {
 exit 1
 fi
 
-local 
current_num_checkpoints=current_num_checkpoints$(get_completed_number_of_checkpoints
 ${JOB_ID})
-
 kill_random_taskmanager
 
 latest_snapshot_count=$(cat $FLINK_DIR/log/*out* | grep "on snapshot" 
| tail -n 1 | awk '{print $4}')
 echo "Latest snapshot count was ${latest_snapshot_count}"
 
-sleep 65 # this is a little longer than the heartbeat timeout so that 
the TM is gone
+# wait until the TM loss was detected
+wait_for_job_state_transition ${JOB_ID} "RESTARTING" "CREATED"
 
 start_and_wait_for_tm
 
+wait_job_running ${JOB_ID}
+
+local current_num_checkpoints="$(get_completed_number_of_checkpoints 
${JOB_ID})"
--- End diff --

Why did you move this from `before killing the TM` to `after having the TM 
restarted again` ?


> Remove 65s sleep in QueryableState E2E test
> ---
>
> Key: FLINK-9674
> URL: https://issues.apache.org/jira/browse/FLINK-9674
> Project: Flink
>  Issue Type: Improvement
>  Components: Queryable State, Tests
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
>
> The {{test_queryable_state_restart_tm.sh}} kills a taskmanager, waits for the 
> loss to be noticed, starts a new tm and waits for the job to continue.
> {code}
> kill_random_taskmanager
> [...]
> sleep 65 # this is a little longer than the heartbeat timeout so that the TM 
> is gone
> start_and_wait_for_tm
> {code}
> Instead of waiting for a fixed amount of time that is tied to some config 
> value we should wait for a specific event, like the job being canceled.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6216: [FLINK-9674][tests] Replace hard-coded sleeps in Q...

2018-07-03 Thread florianschmidt1994
Github user florianschmidt1994 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6216#discussion_r199719951
  
--- Diff: 
flink-end-to-end-tests/test-scripts/test_queryable_state_restart_tm.sh ---
@@ -85,20 +90,23 @@ function run_test() {
 exit 1
 fi
 
-local 
current_num_checkpoints=current_num_checkpoints$(get_completed_number_of_checkpoints
 ${JOB_ID})
-
 kill_random_taskmanager
 
 latest_snapshot_count=$(cat $FLINK_DIR/log/*out* | grep "on snapshot" 
| tail -n 1 | awk '{print $4}')
 echo "Latest snapshot count was ${latest_snapshot_count}"
 
-sleep 65 # this is a little longer than the heartbeat timeout so that 
the TM is gone
+# wait until the TM loss was detected
+wait_for_job_state_transition ${JOB_ID} "RESTARTING" "CREATED"
 
 start_and_wait_for_tm
 
+wait_job_running ${JOB_ID}
+
+local current_num_checkpoints="$(get_completed_number_of_checkpoints 
${JOB_ID})"
--- End diff --

Why did you move this from `before killing the TM` to `after having the TM 
restarted again` ?


---


[GitHub] flink issue #6216: [FLINK-9674][tests] Replace hard-coded sleeps in QS E2E t...

2018-07-03 Thread florianschmidt1994
Github user florianschmidt1994 commented on the issue:

https://github.com/apache/flink/pull/6216
  
> (Did you start a review, but wrote your comment separately? Then your 
review would still be in progress!)

Yes :D I submitted it now

>The issues you identified are certainly valid, but I'm wondering if it 
really makes sense to invest time into this now.
>There've been discussions about writing a python/java based framework for 
the tests (and I've already started tinkering on a java version); so any 
significant changes we make to the bash-scripts might be subsumed soon™.

Alright, then let's leave it this way and we can still take care of it 
should we run into problems with this in the future



---


[jira] [Commented] (FLINK-9674) Remove 65s sleep in QueryableState E2E test

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16531040#comment-16531040
 ] 

ASF GitHub Bot commented on FLINK-9674:
---

Github user florianschmidt1994 commented on the issue:

https://github.com/apache/flink/pull/6216
  
> (Did you start a review, but wrote your comment separately? Then your 
review would still be in progress!)

Yes :D I submitted it now

>The issues you identified are certainly valid, but I'm wondering if it 
really makes sense to invest time into this now.
>There've been discussions about writing a python/java based framework for 
the tests (and I've already started tinkering on a java version); so any 
significant changes we make to the bash-scripts might be subsumed soon™.

Alright, then let's leave it this way and we can still take care of it 
should we run into problems with this in the future



> Remove 65s sleep in QueryableState E2E test
> ---
>
> Key: FLINK-9674
> URL: https://issues.apache.org/jira/browse/FLINK-9674
> Project: Flink
>  Issue Type: Improvement
>  Components: Queryable State, Tests
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
>
> The {{test_queryable_state_restart_tm.sh}} kills a taskmanager, waits for the 
> loss to be noticed, starts a new tm and waits for the job to continue.
> {code}
> kill_random_taskmanager
> [...]
> sleep 65 # this is a little longer than the heartbeat timeout so that the TM 
> is gone
> start_and_wait_for_tm
> {code}
> Instead of waiting for a fixed amount of time that is tied to some config 
> value we should wait for a specific event, like the job being canceled.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9444) KafkaAvroTableSource failed to work for map and array fields

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16531044#comment-16531044
 ] 

ASF GitHub Bot commented on FLINK-9444:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6218#discussion_r199731327
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/java/typeutils/RowTypeInfoTest.java
 ---
@@ -123,4 +125,24 @@ public void testNestedRowTypeInfo() {
assertEquals("Short", typeInfo.getTypeAt("f1.f0").toString());
}
 
+   @Test
+   public void testSchemaEquals() {
+   final RowTypeInfo row1 = new RowTypeInfo(
+   new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, 
BasicTypeInfo.STRING_TYPE_INFO},
+   new String[] {"field1", "field2"});
+   final RowTypeInfo row2 = new RowTypeInfo(
+   new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, 
BasicTypeInfo.STRING_TYPE_INFO},
+   new String[] {"field1", "field2"});
+   assertTrue(row1.schemaEquals(row2));
--- End diff --

This is covered by the test base. But I added another test data entry with 
different field names.


> KafkaAvroTableSource failed to work for map and array fields
> 
>
> Key: FLINK-9444
> URL: https://issues.apache.org/jira/browse/FLINK-9444
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Table API  SQL
>Affects Versions: 1.6.0
>Reporter: Jun Zhang
>Assignee: Jun Zhang
>Priority: Blocker
>  Labels: patch, pull-request-available
> Fix For: 1.6.0
>
> Attachments: flink-9444.patch
>
>
> When some Avro schema has map/array fields and the corresponding TableSchema 
> declares *MapTypeInfo/ListTypeInfo* for these fields, an exception will be 
> thrown when registering the *KafkaAvroTableSource*, complaining like:
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> Type Map of table field 'event' does not match with type 
> GenericType of the field 'event' of the TableSource return 
> type.
>  at org.apache.flink.table.api.ValidationException$.apply(exceptions.scala:74)
>  at 
> org.apache.flink.table.sources.TableSourceUtil$$anonfun$validateTableSource$1.apply(TableSourceUtil.scala:92)
>  at 
> org.apache.flink.table.sources.TableSourceUtil$$anonfun$validateTableSource$1.apply(TableSourceUtil.scala:71)
>  at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>  at 
> org.apache.flink.table.sources.TableSourceUtil$.validateTableSource(TableSourceUtil.scala:71)
>  at 
> org.apache.flink.table.plan.schema.StreamTableSourceTable.(StreamTableSourceTable.scala:33)
>  at 
> org.apache.flink.table.api.StreamTableEnvironment.registerTableSourceInternal(StreamTableEnvironment.scala:124)
>  at 
> org.apache.flink.table.api.TableEnvironment.registerTableSource(TableEnvironment.scala:438)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6218: [FLINK-9444] [formats] Add full SQL support for Av...

2018-07-03 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6218#discussion_r199731327
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/java/typeutils/RowTypeInfoTest.java
 ---
@@ -123,4 +125,24 @@ public void testNestedRowTypeInfo() {
assertEquals("Short", typeInfo.getTypeAt("f1.f0").toString());
}
 
+   @Test
+   public void testSchemaEquals() {
+   final RowTypeInfo row1 = new RowTypeInfo(
+   new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, 
BasicTypeInfo.STRING_TYPE_INFO},
+   new String[] {"field1", "field2"});
+   final RowTypeInfo row2 = new RowTypeInfo(
+   new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, 
BasicTypeInfo.STRING_TYPE_INFO},
+   new String[] {"field1", "field2"});
+   assertTrue(row1.schemaEquals(row2));
--- End diff --

This is covered by the test base. But I added another test data entry with 
different field names.


---


[jira] [Commented] (FLINK-9699) Add api to replace registered table

2018-07-03 Thread Jeff Zhang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530922#comment-16530922
 ] 

Jeff Zhang commented on FLINK-9699:
---

[~fhueske] I don't know much about the internal mechanism of flink sql. But is 
it possible to translate which DataSet does MyT point to beforehand instead of 
delaying to converted into a DataStream/DataSet or emitted through a TableSink. 

> Add api to replace registered table
> ---
>
> Key: FLINK-9699
> URL: https://issues.apache.org/jira/browse/FLINK-9699
> Project: Flink
>  Issue Type: Improvement
>Reporter: Jeff Zhang
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9636) Network buffer leaks in requesting a batch of segments during canceling

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530927#comment-16530927
 ] 

ASF GitHub Bot commented on FLINK-9636:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/6238#discussion_r199702444
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
 ---
@@ -147,7 +151,12 @@ public void recycle(MemorySegment segment) {
 
this.numTotalRequiredBuffers += numRequiredBuffers;
 
-   redistributeBuffers();
+   try {
+   redistributeBuffers();
+   } catch (Throwable t) {
+   this.numTotalRequiredBuffers -= 
numRequiredBuffers;
+   ExceptionUtils.rethrowIOException(t);
+   }
}
 
final List segments = new 
ArrayList<>(numRequiredBuffers);
--- End diff --

ah, true, thanks for pointing this out, I must have been blind on one eye 
yesterday

I'll integrate this change as well


> Network buffer leaks in requesting a batch of segments during canceling
> ---
>
> Key: FLINK-9636
> URL: https://issues.apache.org/jira/browse/FLINK-9636
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.5.0, 1.6.0
>Reporter: zhijiang
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.1
>
>
> In {{NetworkBufferPool#requestMemorySegments}}, {{numTotalRequiredBuffers}} 
> is increased by {{numRequiredBuffers}} first.
> If {{InterruptedException}} is thrown during polling segments from the 
> available queue, the requested segments will be recycled back to 
> {{NetworkBufferPool}}, {{numTotalRequiredBuffers}} is decreased by the number 
> of polled segments which is now inconsistent with {{numRequiredBuffers}}. So 
> {{numTotalRequiredBuffers}} in {{NetworkBufferPool}} leaks in this case, and 
> we can also decrease {{numRequiredBuffers}} to fix this bug.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9622) DistributedCacheDfsTest failed on travis

2018-07-03 Thread Till Rohrmann (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530931#comment-16530931
 ] 

Till Rohrmann commented on FLINK-9622:
--

The problem there seems to be that {{LocalFileSystem#mkdirsInternal}} fails if 
there is a concurrent directory make operation happening [1].

[1] 
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java#L257
 

> DistributedCacheDfsTest failed on travis
> 
>
> Key: FLINK-9622
> URL: https://issues.apache.org/jira/browse/FLINK-9622
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.6.0
>Reporter: Sihua Zhou
>Assignee: Till Rohrmann
>Priority: Blocker
> Fix For: 1.6.0
>
>
> DistributedCacheDfsTest#testDistributeFileViaDFS() failed flakey on travis.
> instance: https://api.travis-ci.org/v3/job/394399700/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9707) LocalFileSystem does not support concurrent directory creations

2018-07-03 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-9707:


 Summary: LocalFileSystem does not support concurrent directory 
creations
 Key: FLINK-9707
 URL: https://issues.apache.org/jira/browse/FLINK-9707
 Project: Flink
  Issue Type: Improvement
  Components: FileSystem
Affects Versions: 1.5.0, 1.6.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
 Fix For: 1.6.0, 1.5.1


The {{LocalFileSystem}} does not support concurrent directory creations. The 
consequence is that file system operations fail.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6201: [FLINK-8866][Table API & SQL] Add support for unified tab...

2018-07-03 Thread suez1224
Github user suez1224 commented on the issue:

https://github.com/apache/flink/pull/6201
  
@fhueske @twalthr thanks for the comments. In `from-source`, the only 
system i know of is Kafka10 or Kafka11, which support writing record along with 
timestamp. To support `from-source` in table sink, I think we can do the 
following:
1) add a connector property, e.g. connector.support-timestamp. Only if 
connector.support-timestamp is true, we will allow the sink table schema to 
contain a field with rowtime type `from-source`. Otherwise, an exception will 
be thrown.
2) if the condition in 1) is satisfied, we will create corresponding 
rowtime field in the sink table schema with type LONG, in 
TableEnvironment.insertInto(), we will validate the sink schema against the 
insertion source. Also, in the TableSink.emitDataStream() implementation, we 
will need to insert an timestamp assigner operator to set 
StreamRecord.timestamp (should we reuse existing interface, or create a new 
timestampInserter interface?) and remove the extra rowtime field from 
StreamRecord.value before we emit the datastream to the sink. (for 
kafkaTableSink, we will also need to invoke setWriteTimestampToKafka(true))

Please correct me if I missed something here. What do you think?


---


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530939#comment-16530939
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user suez1224 commented on the issue:

https://github.com/apache/flink/pull/6201
  
@fhueske @twalthr thanks for the comments. In `from-source`, the only 
system i know of is Kafka10 or Kafka11, which support writing record along with 
timestamp. To support `from-source` in table sink, I think we can do the 
following:
1) add a connector property, e.g. connector.support-timestamp. Only if 
connector.support-timestamp is true, we will allow the sink table schema to 
contain a field with rowtime type `from-source`. Otherwise, an exception will 
be thrown.
2) if the condition in 1) is satisfied, we will create corresponding 
rowtime field in the sink table schema with type LONG, in 
TableEnvironment.insertInto(), we will validate the sink schema against the 
insertion source. Also, in the TableSink.emitDataStream() implementation, we 
will need to insert an timestamp assigner operator to set 
StreamRecord.timestamp (should we reuse existing interface, or create a new 
timestampInserter interface?) and remove the extra rowtime field from 
StreamRecord.value before we emit the datastream to the sink. (for 
kafkaTableSink, we will also need to invoke setWriteTimestampToKafka(true))

Please correct me if I missed something here. What do you think?


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9707) LocalFileSystem does not support concurrent directory creations

2018-07-03 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9707?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-9707:
-
Priority: Blocker  (was: Major)

> LocalFileSystem does not support concurrent directory creations
> ---
>
> Key: FLINK-9707
> URL: https://issues.apache.org/jira/browse/FLINK-9707
> Project: Flink
>  Issue Type: Improvement
>  Components: FileSystem
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Blocker
> Fix For: 1.6.0, 1.5.1
>
>
> The {{LocalFileSystem}} does not support concurrent directory creations. The 
> consequence is that file system operations fail.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9708) Network buffer leaks when buffer request fails during buffer redistribution

2018-07-03 Thread Nico Kruber (JIRA)
Nico Kruber created FLINK-9708:
--

 Summary: Network buffer leaks when buffer request fails during 
buffer redistribution
 Key: FLINK-9708
 URL: https://issues.apache.org/jira/browse/FLINK-9708
 Project: Flink
  Issue Type: Bug
  Components: Network
Affects Versions: 1.5.0
Reporter: Nico Kruber
Assignee: Nico Kruber
 Fix For: 1.5.1


If an exception is thrown in {{NetworkBufferPool#requestMemorySegments()}}'s 
first call to {{redistributeBuffers()}}, the accounting for 
{{numTotalRequiredBuffers}} is wrong for future uses of this buffer pool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9688) ATAN2 Sql Function support

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530946#comment-16530946
 ] 

ASF GitHub Bot commented on FLINK-9688:
---

Github user snuyanzin commented on the issue:

https://github.com/apache/flink/pull/6223
  
@fhueske @hequn8128 @yanghua thank you very much for your review and 
comments
I did almost all the corrections except one related to atan2 description
So I have a question:
as @hequn8128 wrote
> I checked the [wiki](https://en.wikipedia.org/wiki/Atan2) about atan2, it 
said: The atan2 function calculates one unique arc tangent value from two 
variables y and x. So, would it be better of two variables? atan2(y,x) can be 
the arc tangent of (x,y) or (nx, ny).

At the same time I checked [Calcite's 
definition](https://calcite.apache.org/docs/reference.html) of it: 
_ATAN2(numeric, numeric) | Returns the arc tangent of the numeric coordinates._ 
What do you think what is more suitable?


> ATAN2 Sql Function support
> --
>
> Key: FLINK-9688
> URL: https://issues.apache.org/jira/browse/FLINK-9688
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Sergey Nuyanzin
>Assignee: Sergey Nuyanzin
>Priority: Minor
>  Labels: pull-request-available
>
> simple query fails {code}
> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, 
> config());
> DataSet> ds = 
> CollectionDataSets.get3TupleDataSet(env);
> tableEnv.registerDataSet("t1", ds, "x, y, z");
> String sqlQuery = "SELECT atan2(1,2)";
> Table result = tableEnv.sqlQuery(sqlQuery);
> {code}
> while at the same time Calcite supports it and in Calcite's sqlline it works 
> like {noformat}
> 0: jdbc:calcite:model=target/test-classes/mod> select atan2(1,2);
> +-+
> | EXPR$0  |
> +-+
> | 0.4636476090008061 |
> +-+
> 1 row selected (0.173 seconds)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9636) Network buffer leaks in requesting a batch of segments during canceling

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530944#comment-16530944
 ] 

ASF GitHub Bot commented on FLINK-9636:
---

Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/6238
  
actually, it was quite easy to reproduce and the fix was also just as you 
proposed - please see the new commits (the old one was only renamed since I 
created a separate issue for that now)


> Network buffer leaks in requesting a batch of segments during canceling
> ---
>
> Key: FLINK-9636
> URL: https://issues.apache.org/jira/browse/FLINK-9636
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.5.0, 1.6.0
>Reporter: zhijiang
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.1
>
>
> In {{NetworkBufferPool#requestMemorySegments}}, {{numTotalRequiredBuffers}} 
> is increased by {{numRequiredBuffers}} first.
> If {{InterruptedException}} is thrown during polling segments from the 
> available queue, the requested segments will be recycled back to 
> {{NetworkBufferPool}}, {{numTotalRequiredBuffers}} is decreased by the number 
> of polled segments which is now inconsistent with {{numRequiredBuffers}}. So 
> {{numTotalRequiredBuffers}} in {{NetworkBufferPool}} leaks in this case, and 
> we can also decrease {{numRequiredBuffers}} to fix this bug.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6238: [FLINK-9636][network] fix inconsistency with failed buffe...

2018-07-03 Thread NicoK
Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/6238
  
actually, it was quite easy to reproduce and the fix was also just as you 
proposed - please see the new commits (the old one was only renamed since I 
created a separate issue for that now)


---


[GitHub] flink issue #6223: [FLINK-9688] ATAN2 sql function support

2018-07-03 Thread snuyanzin
Github user snuyanzin commented on the issue:

https://github.com/apache/flink/pull/6223
  
@fhueske @hequn8128 @yanghua thank you very much for your review and 
comments
I did almost all the corrections except one related to atan2 description
So I have a question:
as @hequn8128 wrote
> I checked the [wiki](https://en.wikipedia.org/wiki/Atan2) about atan2, it 
said: The atan2 function calculates one unique arc tangent value from two 
variables y and x. So, would it be better of two variables? atan2(y,x) can be 
the arc tangent of (x,y) or (nx, ny).

At the same time I checked [Calcite's 
definition](https://calcite.apache.org/docs/reference.html) of it: 
_ATAN2(numeric, numeric) | Returns the arc tangent of the numeric coordinates._ 
What do you think what is more suitable?


---


[jira] [Updated] (FLINK-9554) flink scala shell doesn't work in yarn mode

2018-07-03 Thread Jeff Zhang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Zhang updated FLINK-9554:
--
Fix Version/s: 1.5.1

> flink scala shell doesn't work in yarn mode
> ---
>
> Key: FLINK-9554
> URL: https://issues.apache.org/jira/browse/FLINK-9554
> Project: Flink
>  Issue Type: Bug
>  Components: Scala Shell
>Affects Versions: 1.5.0
>Reporter: Jeff Zhang
>Priority: Blocker
> Fix For: 1.5.1
>
>
> It still try to use StandaloneCluster even I specify it using yarn mode.
>  
> Command I Use: bin/start-scala-shell.sh yarn -n 1
>  
> {code:java}
> Starting Flink Shell:
> 2018-06-06 12:30:02,672 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: jobmanager.rpc.address, localhost
> 2018-06-06 12:30:02,673 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: jobmanager.rpc.port, 6123
> 2018-06-06 12:30:02,674 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: jobmanager.heap.mb, 1024
> 2018-06-06 12:30:02,674 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: taskmanager.heap.mb, 1024
> 2018-06-06 12:30:02,674 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: taskmanager.numberOfTaskSlots, 1
> 2018-06-06 12:30:02,674 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: parallelism.default, 1
> 2018-06-06 12:30:02,675 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: rest.port, 8081
> Exception in thread "main" java.lang.UnsupportedOperationException: Can't 
> deploy a standalone cluster.
> at 
> org.apache.flink.client.deployment.StandaloneClusterDescriptor.deploySessionCluster(StandaloneClusterDescriptor.java:57)
> at 
> org.apache.flink.client.deployment.StandaloneClusterDescriptor.deploySessionCluster(StandaloneClusterDescriptor.java:31)
> at 
> org.apache.flink.api.scala.FlinkShell$.deployNewYarnCluster(FlinkShell.scala:272)
> at 
> org.apache.flink.api.scala.FlinkShell$.fetchConnectionInfo(FlinkShell.scala:164)
> at org.apache.flink.api.scala.FlinkShell$.liftedTree1$1(FlinkShell.scala:194)
> at org.apache.flink.api.scala.FlinkShell$.startShell(FlinkShell.scala:193)
> at org.apache.flink.api.scala.FlinkShell$.main(FlinkShell.scala:135)
> at org.apache.flink.api.scala.FlinkShell.main(FlinkShell.scala){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9554) flink scala shell doesn't work in yarn mode

2018-07-03 Thread Jeff Zhang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Zhang updated FLINK-9554:
--
Priority: Blocker  (was: Major)

> flink scala shell doesn't work in yarn mode
> ---
>
> Key: FLINK-9554
> URL: https://issues.apache.org/jira/browse/FLINK-9554
> Project: Flink
>  Issue Type: Bug
>  Components: Scala Shell
>Affects Versions: 1.5.0
>Reporter: Jeff Zhang
>Priority: Blocker
> Fix For: 1.5.1
>
>
> It still try to use StandaloneCluster even I specify it using yarn mode.
>  
> Command I Use: bin/start-scala-shell.sh yarn -n 1
>  
> {code:java}
> Starting Flink Shell:
> 2018-06-06 12:30:02,672 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: jobmanager.rpc.address, localhost
> 2018-06-06 12:30:02,673 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: jobmanager.rpc.port, 6123
> 2018-06-06 12:30:02,674 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: jobmanager.heap.mb, 1024
> 2018-06-06 12:30:02,674 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: taskmanager.heap.mb, 1024
> 2018-06-06 12:30:02,674 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: taskmanager.numberOfTaskSlots, 1
> 2018-06-06 12:30:02,674 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: parallelism.default, 1
> 2018-06-06 12:30:02,675 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: rest.port, 8081
> Exception in thread "main" java.lang.UnsupportedOperationException: Can't 
> deploy a standalone cluster.
> at 
> org.apache.flink.client.deployment.StandaloneClusterDescriptor.deploySessionCluster(StandaloneClusterDescriptor.java:57)
> at 
> org.apache.flink.client.deployment.StandaloneClusterDescriptor.deploySessionCluster(StandaloneClusterDescriptor.java:31)
> at 
> org.apache.flink.api.scala.FlinkShell$.deployNewYarnCluster(FlinkShell.scala:272)
> at 
> org.apache.flink.api.scala.FlinkShell$.fetchConnectionInfo(FlinkShell.scala:164)
> at org.apache.flink.api.scala.FlinkShell$.liftedTree1$1(FlinkShell.scala:194)
> at org.apache.flink.api.scala.FlinkShell$.startShell(FlinkShell.scala:193)
> at org.apache.flink.api.scala.FlinkShell$.main(FlinkShell.scala:135)
> at org.apache.flink.api.scala.FlinkShell.main(FlinkShell.scala){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6239: [FLINK-9004][tests] Implement Jepsen tests to test job av...

2018-07-03 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6239
  
As i understand it we don't want this to be part of the source release, as 
such we need an exclusion in 
[create_source_release.sh](https://github.com/apache/flink/blob/master/tools/releasing/create_source_release.sh).


---


[GitHub] flink pull request #6239: [FLINK-9004][tests] Implement Jepsen tests to test...

2018-07-03 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6239


---


[GitHub] flink pull request #6222: [FLINK-8785][rest] Handle JobSubmissionExceptions

2018-07-03 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6222#discussion_r199719645
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -66,6 +67,9 @@ public JobSubmitHandler(
}
 
return gateway.submitJob(jobGraph, timeout)
-   .thenApply(ack -> new JobSubmitResponseBody("/jobs/" + 
jobGraph.getJobID()));
+   .thenApply(ack -> new JobSubmitResponseBody("/jobs/" + 
jobGraph.getJobID()))
+   .exceptionally(exception -> {
+   throw new CompletionException(new 
RestHandlerException("Job submission failed.", 
HttpResponseStatus.INTERNAL_SERVER_ERROR, exception));
--- End diff --

Do note that this discussion isn't really blocking the PR from being merged 
as it would effectively be an extension of it.


---


[jira] [Commented] (FLINK-8785) JobSubmitHandler does not handle JobSubmissionExceptions

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530990#comment-16530990
 ] 

ASF GitHub Bot commented on FLINK-8785:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6222#discussion_r199719645
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -66,6 +67,9 @@ public JobSubmitHandler(
}
 
return gateway.submitJob(jobGraph, timeout)
-   .thenApply(ack -> new JobSubmitResponseBody("/jobs/" + 
jobGraph.getJobID()));
+   .thenApply(ack -> new JobSubmitResponseBody("/jobs/" + 
jobGraph.getJobID()))
+   .exceptionally(exception -> {
+   throw new CompletionException(new 
RestHandlerException("Job submission failed.", 
HttpResponseStatus.INTERNAL_SERVER_ERROR, exception));
--- End diff --

Do note that this discussion isn't really blocking the PR from being merged 
as it would effectively be an extension of it.


> JobSubmitHandler does not handle JobSubmissionExceptions
> 
>
> Key: FLINK-8785
> URL: https://issues.apache.org/jira/browse/FLINK-8785
> Project: Flink
>  Issue Type: Bug
>  Components: Job-Submission, JobManager, REST
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Blocker
>  Labels: flip-6, pull-request-available
>
> If the job submission, i.e. {{DispatcherGateway#submitJob}} fails with a 
> {{JobSubmissionException}} the {{JobSubmissionHandler}} returns "Internal 
> server error" instead of signaling the failed job submission.
> This can for example occur if the transmitted execution graph is faulty, as 
> tested by the \{{JobSubmissionFailsITCase}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6218: [FLINK-9444] [formats] Add full SQL support for Av...

2018-07-03 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6218#discussion_r199733710
  
--- Diff: flink-formats/flink-avro/pom.xml ---
@@ -51,6 +51,17 @@ under the License.


 
+   
+   joda-time
+   joda-time
+   
--- End diff --

Yes, we assume that the user provides a Joda-Time that matches the specific 
record. We only call 4 methods. I think changes are unlikely there. I went for 
the Flink-version the Avro version would be `2.9` but we would always have to 
keep this in sync.


---


[jira] [Commented] (FLINK-9444) KafkaAvroTableSource failed to work for map and array fields

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16531052#comment-16531052
 ] 

ASF GitHub Bot commented on FLINK-9444:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6218#discussion_r199733710
  
--- Diff: flink-formats/flink-avro/pom.xml ---
@@ -51,6 +51,17 @@ under the License.


 
+   
+   joda-time
+   joda-time
+   
--- End diff --

Yes, we assume that the user provides a Joda-Time that matches the specific 
record. We only call 4 methods. I think changes are unlikely there. I went for 
the Flink-version the Avro version would be `2.9` but we would always have to 
keep this in sync.


> KafkaAvroTableSource failed to work for map and array fields
> 
>
> Key: FLINK-9444
> URL: https://issues.apache.org/jira/browse/FLINK-9444
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Table API  SQL
>Affects Versions: 1.6.0
>Reporter: Jun Zhang
>Assignee: Jun Zhang
>Priority: Blocker
>  Labels: patch, pull-request-available
> Fix For: 1.6.0
>
> Attachments: flink-9444.patch
>
>
> When some Avro schema has map/array fields and the corresponding TableSchema 
> declares *MapTypeInfo/ListTypeInfo* for these fields, an exception will be 
> thrown when registering the *KafkaAvroTableSource*, complaining like:
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> Type Map of table field 'event' does not match with type 
> GenericType of the field 'event' of the TableSource return 
> type.
>  at org.apache.flink.table.api.ValidationException$.apply(exceptions.scala:74)
>  at 
> org.apache.flink.table.sources.TableSourceUtil$$anonfun$validateTableSource$1.apply(TableSourceUtil.scala:92)
>  at 
> org.apache.flink.table.sources.TableSourceUtil$$anonfun$validateTableSource$1.apply(TableSourceUtil.scala:71)
>  at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>  at 
> org.apache.flink.table.sources.TableSourceUtil$.validateTableSource(TableSourceUtil.scala:71)
>  at 
> org.apache.flink.table.plan.schema.StreamTableSourceTable.(StreamTableSourceTable.scala:33)
>  at 
> org.apache.flink.table.api.StreamTableEnvironment.registerTableSourceInternal(StreamTableEnvironment.scala:124)
>  at 
> org.apache.flink.table.api.TableEnvironment.registerTableSource(TableEnvironment.scala:438)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6218: [FLINK-9444] [formats] Add full SQL support for Av...

2018-07-03 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6218#discussion_r199734399
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java
 ---
@@ -44,7 +41,7 @@
@Override
protected void configureBuilder(KafkaTableSource.Builder builder) {
super.configureBuilder(builder);
-   ((KafkaAvroTableSource.Builder) 
builder).forAvroRecordClass(SameFieldsAvroClass.class);
+   ((KafkaAvroTableSource.Builder) 
builder).forAvroRecordClass(SchemaRecord.class);
--- End diff --

No, but it simplifies the code base and uses only real-world generated 
records for testing.


---


[jira] [Commented] (FLINK-9444) KafkaAvroTableSource failed to work for map and array fields

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16531056#comment-16531056
 ] 

ASF GitHub Bot commented on FLINK-9444:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6218#discussion_r199734399
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java
 ---
@@ -44,7 +41,7 @@
@Override
protected void configureBuilder(KafkaTableSource.Builder builder) {
super.configureBuilder(builder);
-   ((KafkaAvroTableSource.Builder) 
builder).forAvroRecordClass(SameFieldsAvroClass.class);
+   ((KafkaAvroTableSource.Builder) 
builder).forAvroRecordClass(SchemaRecord.class);
--- End diff --

No, but it simplifies the code base and uses only real-world generated 
records for testing.


> KafkaAvroTableSource failed to work for map and array fields
> 
>
> Key: FLINK-9444
> URL: https://issues.apache.org/jira/browse/FLINK-9444
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Table API  SQL
>Affects Versions: 1.6.0
>Reporter: Jun Zhang
>Assignee: Jun Zhang
>Priority: Blocker
>  Labels: patch, pull-request-available
> Fix For: 1.6.0
>
> Attachments: flink-9444.patch
>
>
> When some Avro schema has map/array fields and the corresponding TableSchema 
> declares *MapTypeInfo/ListTypeInfo* for these fields, an exception will be 
> thrown when registering the *KafkaAvroTableSource*, complaining like:
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> Type Map of table field 'event' does not match with type 
> GenericType of the field 'event' of the TableSource return 
> type.
>  at org.apache.flink.table.api.ValidationException$.apply(exceptions.scala:74)
>  at 
> org.apache.flink.table.sources.TableSourceUtil$$anonfun$validateTableSource$1.apply(TableSourceUtil.scala:92)
>  at 
> org.apache.flink.table.sources.TableSourceUtil$$anonfun$validateTableSource$1.apply(TableSourceUtil.scala:71)
>  at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>  at 
> org.apache.flink.table.sources.TableSourceUtil$.validateTableSource(TableSourceUtil.scala:71)
>  at 
> org.apache.flink.table.plan.schema.StreamTableSourceTable.(StreamTableSourceTable.scala:33)
>  at 
> org.apache.flink.table.api.StreamTableEnvironment.registerTableSourceInternal(StreamTableEnvironment.scala:124)
>  at 
> org.apache.flink.table.api.TableEnvironment.registerTableSource(TableEnvironment.scala:438)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9444) KafkaAvroTableSource failed to work for map and array fields

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16531061#comment-16531061
 ] 

ASF GitHub Bot commented on FLINK-9444:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6218#discussion_r199736070
  
--- Diff: 
flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java
 ---
@@ -26,7 +26,7 @@
 import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
 import 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.PojoSerializerConfigSnapshot;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.formats.avro.generated.User;
+import org.apache.flink.formats.avro.generated.SimpleUser;
--- End diff --

The problem is that {{BackwardsCompatibleAvroSerializer}} does not support 
records with logical types. Logical types need a Kryo configuration that the 
serializer does not set correctly. This might be a bug or at least a missing 
feature. Given that this serializer only exists for backwards compatibility for 
1.3 (which used Avro 1.7 without logical type), I added a simple user for this 
test. I will add a comment about this to the code.


> KafkaAvroTableSource failed to work for map and array fields
> 
>
> Key: FLINK-9444
> URL: https://issues.apache.org/jira/browse/FLINK-9444
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Table API  SQL
>Affects Versions: 1.6.0
>Reporter: Jun Zhang
>Assignee: Jun Zhang
>Priority: Blocker
>  Labels: patch, pull-request-available
> Fix For: 1.6.0
>
> Attachments: flink-9444.patch
>
>
> When some Avro schema has map/array fields and the corresponding TableSchema 
> declares *MapTypeInfo/ListTypeInfo* for these fields, an exception will be 
> thrown when registering the *KafkaAvroTableSource*, complaining like:
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> Type Map of table field 'event' does not match with type 
> GenericType of the field 'event' of the TableSource return 
> type.
>  at org.apache.flink.table.api.ValidationException$.apply(exceptions.scala:74)
>  at 
> org.apache.flink.table.sources.TableSourceUtil$$anonfun$validateTableSource$1.apply(TableSourceUtil.scala:92)
>  at 
> org.apache.flink.table.sources.TableSourceUtil$$anonfun$validateTableSource$1.apply(TableSourceUtil.scala:71)
>  at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>  at 
> org.apache.flink.table.sources.TableSourceUtil$.validateTableSource(TableSourceUtil.scala:71)
>  at 
> org.apache.flink.table.plan.schema.StreamTableSourceTable.(StreamTableSourceTable.scala:33)
>  at 
> org.apache.flink.table.api.StreamTableEnvironment.registerTableSourceInternal(StreamTableEnvironment.scala:124)
>  at 
> org.apache.flink.table.api.TableEnvironment.registerTableSource(TableEnvironment.scala:438)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6218: [FLINK-9444] [formats] Add full SQL support for Av...

2018-07-03 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6218#discussion_r199736070
  
--- Diff: 
flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java
 ---
@@ -26,7 +26,7 @@
 import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
 import 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.PojoSerializerConfigSnapshot;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.formats.avro.generated.User;
+import org.apache.flink.formats.avro.generated.SimpleUser;
--- End diff --

The problem is that {{BackwardsCompatibleAvroSerializer}} does not support 
records with logical types. Logical types need a Kryo configuration that the 
serializer does not set correctly. This might be a bug or at least a missing 
feature. Given that this serializer only exists for backwards compatibility for 
1.3 (which used Avro 1.7 without logical type), I added a simple user for this 
test. I will add a comment about this to the code.


---


[GitHub] flink pull request #6216: [FLINK-9674][tests] Replace hard-coded sleeps in Q...

2018-07-03 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6216#discussion_r199738568
  
--- Diff: 
flink-end-to-end-tests/test-scripts/test_queryable_state_restart_tm.sh ---
@@ -85,20 +90,23 @@ function run_test() {
 exit 1
 fi
 
-local 
current_num_checkpoints=current_num_checkpoints$(get_completed_number_of_checkpoints
 ${JOB_ID})
-
 kill_random_taskmanager
 
 latest_snapshot_count=$(cat $FLINK_DIR/log/*out* | grep "on snapshot" 
| tail -n 1 | awk '{print $4}')
 echo "Latest snapshot count was ${latest_snapshot_count}"
 
-sleep 65 # this is a little longer than the heartbeat timeout so that 
the TM is gone
+# wait until the TM loss was detected
+wait_for_job_state_transition ${JOB_ID} "RESTARTING" "CREATED"
 
 start_and_wait_for_tm
 
+wait_job_running ${JOB_ID}
+
+local current_num_checkpoints="$(get_completed_number_of_checkpoints 
${JOB_ID})"
--- End diff --

So when I ran this test locally it frequently occurred that we never 
actually waited for checkpoints to complete; the number of checkpoint that we 
waited for had already occurred and we exited early.
The job just switches to running, and right away we got 3 completed 
checkpoints?
This was a bit, well, _odd_. 

In the previous version the checkpoint count could further increase while 
we shut down the TM.
Technically there's no guarantee how up-to-date the result if 
`get_completed_number_of_checkpoints` or how long `kill_random_taskmanager` 
actually takes, so we may have fulfilled the checkpoint count condition before 
the job even restarts. It's admittedly unlikely.

This change simply guarantees that the job actually completes N checkpoints 
after it was restarted, and just serves to eliminate doubts about the 
correctness of the test.


---


[jira] [Commented] (FLINK-9674) Remove 65s sleep in QueryableState E2E test

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16531072#comment-16531072
 ] 

ASF GitHub Bot commented on FLINK-9674:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6216#discussion_r199738568
  
--- Diff: 
flink-end-to-end-tests/test-scripts/test_queryable_state_restart_tm.sh ---
@@ -85,20 +90,23 @@ function run_test() {
 exit 1
 fi
 
-local 
current_num_checkpoints=current_num_checkpoints$(get_completed_number_of_checkpoints
 ${JOB_ID})
-
 kill_random_taskmanager
 
 latest_snapshot_count=$(cat $FLINK_DIR/log/*out* | grep "on snapshot" 
| tail -n 1 | awk '{print $4}')
 echo "Latest snapshot count was ${latest_snapshot_count}"
 
-sleep 65 # this is a little longer than the heartbeat timeout so that 
the TM is gone
+# wait until the TM loss was detected
+wait_for_job_state_transition ${JOB_ID} "RESTARTING" "CREATED"
 
 start_and_wait_for_tm
 
+wait_job_running ${JOB_ID}
+
+local current_num_checkpoints="$(get_completed_number_of_checkpoints 
${JOB_ID})"
--- End diff --

So when I ran this test locally it frequently occurred that we never 
actually waited for checkpoints to complete; the number of checkpoint that we 
waited for had already occurred and we exited early.
The job just switches to running, and right away we got 3 completed 
checkpoints?
This was a bit, well, _odd_. 

In the previous version the checkpoint count could further increase while 
we shut down the TM.
Technically there's no guarantee how up-to-date the result if 
`get_completed_number_of_checkpoints` or how long `kill_random_taskmanager` 
actually takes, so we may have fulfilled the checkpoint count condition before 
the job even restarts. It's admittedly unlikely.

This change simply guarantees that the job actually completes N checkpoints 
after it was restarted, and just serves to eliminate doubts about the 
correctness of the test.


> Remove 65s sleep in QueryableState E2E test
> ---
>
> Key: FLINK-9674
> URL: https://issues.apache.org/jira/browse/FLINK-9674
> Project: Flink
>  Issue Type: Improvement
>  Components: Queryable State, Tests
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
>
> The {{test_queryable_state_restart_tm.sh}} kills a taskmanager, waits for the 
> loss to be noticed, starts a new tm and waits for the job to continue.
> {code}
> kill_random_taskmanager
> [...]
> sleep 65 # this is a little longer than the heartbeat timeout so that the TM 
> is gone
> start_and_wait_for_tm
> {code}
> Instead of waiting for a fixed amount of time that is tied to some config 
> value we should wait for a specific event, like the job being canceled.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16531094#comment-16531094
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199741252
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java
 ---
@@ -18,64 +18,118 @@
 
 package org.apache.flink.runtime.rest.messages.job;
 
-import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.rest.messages.RequestBody;
-import org.apache.flink.util.Preconditions;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectOutputStream;
-import java.util.Arrays;
+import java.util.Collection;
+import java.util.Objects;
 
 /**
  * Request for submitting a job.
  *
- * We currently require the job-jars to be uploaded through the 
blob-server.
+ * This request only contains the names of files that must be present 
on the server, and defines how these files are
+ * interpreted.
  */
 public final class JobSubmitRequestBody implements RequestBody {
 
-   private static final String FIELD_NAME_SERIALIZED_JOB_GRAPH = 
"serializedJobGraph";
+   private static final String FIELD_NAME_JOB_GRAPH = "jobGraphFileName";
+   private static final String FIELD_NAME_JOB_JARS = "jobJarFileNames";
+   private static final String FIELD_NAME_JOB_ARTIFACTS = 
"jobArtifactFileNames";
 
-   /**
-* The serialized job graph.
-*/
-   @JsonProperty(FIELD_NAME_SERIALIZED_JOB_GRAPH)
-   public final byte[] serializedJobGraph;
+   @JsonProperty(FIELD_NAME_JOB_GRAPH)
+   public final String jobGraphFileName;
 
-   public JobSubmitRequestBody(JobGraph jobGraph) throws IOException {
-   this(serializeJobGraph(jobGraph));
-   }
+   @JsonProperty(FIELD_NAME_JOB_JARS)
+   public final Collection jarFileNames;
+
+   @JsonProperty(FIELD_NAME_JOB_ARTIFACTS)
+   public final Collection artifactFileNames;
 
-   @JsonCreator
--- End diff --

revert


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...

2018-07-03 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199741252
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java
 ---
@@ -18,64 +18,118 @@
 
 package org.apache.flink.runtime.rest.messages.job;
 
-import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.rest.messages.RequestBody;
-import org.apache.flink.util.Preconditions;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectOutputStream;
-import java.util.Arrays;
+import java.util.Collection;
+import java.util.Objects;
 
 /**
  * Request for submitting a job.
  *
- * We currently require the job-jars to be uploaded through the 
blob-server.
+ * This request only contains the names of files that must be present 
on the server, and defines how these files are
+ * interpreted.
  */
 public final class JobSubmitRequestBody implements RequestBody {
 
-   private static final String FIELD_NAME_SERIALIZED_JOB_GRAPH = 
"serializedJobGraph";
+   private static final String FIELD_NAME_JOB_GRAPH = "jobGraphFileName";
+   private static final String FIELD_NAME_JOB_JARS = "jobJarFileNames";
+   private static final String FIELD_NAME_JOB_ARTIFACTS = 
"jobArtifactFileNames";
 
-   /**
-* The serialized job graph.
-*/
-   @JsonProperty(FIELD_NAME_SERIALIZED_JOB_GRAPH)
-   public final byte[] serializedJobGraph;
+   @JsonProperty(FIELD_NAME_JOB_GRAPH)
+   public final String jobGraphFileName;
 
-   public JobSubmitRequestBody(JobGraph jobGraph) throws IOException {
-   this(serializeJobGraph(jobGraph));
-   }
+   @JsonProperty(FIELD_NAME_JOB_JARS)
+   public final Collection jarFileNames;
+
+   @JsonProperty(FIELD_NAME_JOB_ARTIFACTS)
+   public final Collection artifactFileNames;
 
-   @JsonCreator
--- End diff --

revert


---


[jira] [Created] (FLINK-9711) Flink CLI does not filter RUNNING only jobs

2018-07-03 Thread Sayat Satybaldiyev (JIRA)
Sayat Satybaldiyev created FLINK-9711:
-

 Summary: Flink CLI does not filter RUNNING only jobs
 Key: FLINK-9711
 URL: https://issues.apache.org/jira/browse/FLINK-9711
 Project: Flink
  Issue Type: Bug
  Components: Client
Affects Versions: 1.5.0
Reporter: Sayat Satybaldiyev


In Flink CLI there's a command list with option --running that according to 
descriptions "Show only running programs and their JobIDs". However, in 
practice, it also shows jobs that are in the *CANCELED* state, which is a 
completed job.

 
{code:java}
flink list --running -m job-manager:8081 
Waiting for response...
-- Running/Restarting Jobs ---
03.07.2018 10:29:34 : 6e49027e843ced2ad798da549004243e : Enriched TrackingClick 
(CANCELED)
03.07.2018 10:42:31 : c901ae58787ba6aea4a46d6bb9dc2b3c : Enriched TrackingClick 
(CANCELED)
03.07.2018 11:27:51 : 83ab149ad528cfd956da7090543cbc72 : Enriched TrackingClick 
(RUNNING)
--

{code}
 

Proposal it to extend CLI program to show jobs only in the *RUNNING* state. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9711) Flink CLI --running option does not show RUNNING only jobs

2018-07-03 Thread Sayat Satybaldiyev (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9711?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sayat Satybaldiyev updated FLINK-9711:
--
Summary: Flink CLI --running option does not show RUNNING only jobs  (was: 
Flink CLI does not filter RUNNING only jobs)

> Flink CLI --running option does not show RUNNING only jobs
> --
>
> Key: FLINK-9711
> URL: https://issues.apache.org/jira/browse/FLINK-9711
> Project: Flink
>  Issue Type: Bug
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Sayat Satybaldiyev
>Priority: Major
>
> In Flink CLI there's a command list with option --running that according to 
> descriptions "Show only running programs and their JobIDs". However, in 
> practice, it also shows jobs that are in the *CANCELED* state, which is a 
> completed job.
>  
> {code:java}
> flink list --running -m job-manager:8081 
> Waiting for response...
> -- Running/Restarting Jobs ---
> 03.07.2018 10:29:34 : 6e49027e843ced2ad798da549004243e : Enriched 
> TrackingClick (CANCELED)
> 03.07.2018 10:42:31 : c901ae58787ba6aea4a46d6bb9dc2b3c : Enriched 
> TrackingClick (CANCELED)
> 03.07.2018 11:27:51 : 83ab149ad528cfd956da7090543cbc72 : Enriched 
> TrackingClick (RUNNING)
> --
> {code}
>  
> Proposal it to extend CLI program to show jobs only in the *RUNNING* state. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9688) ATAN2 Sql Function support

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16531099#comment-16531099
 ] 

ASF GitHub Bot commented on FLINK-9688:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/6223#discussion_r199743600
  
--- Diff: docs/dev/table/tableApi.md ---
@@ -2184,6 +2184,17 @@ NUMERIC.atan()
   
 
 
+
+  
+{% highlight java %}
+NUMERIC.atan2(NUMERIC)
--- End diff --

given that both parameters are equally important, we might want to change 
the syntax to `atan2(Numeric, Numeric)`. IMO, that would be more intuitive. 
What do you think?


> ATAN2 Sql Function support
> --
>
> Key: FLINK-9688
> URL: https://issues.apache.org/jira/browse/FLINK-9688
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Sergey Nuyanzin
>Assignee: Sergey Nuyanzin
>Priority: Minor
>  Labels: pull-request-available
>
> simple query fails {code}
> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, 
> config());
> DataSet> ds = 
> CollectionDataSets.get3TupleDataSet(env);
> tableEnv.registerDataSet("t1", ds, "x, y, z");
> String sqlQuery = "SELECT atan2(1,2)";
> Table result = tableEnv.sqlQuery(sqlQuery);
> {code}
> while at the same time Calcite supports it and in Calcite's sqlline it works 
> like {noformat}
> 0: jdbc:calcite:model=target/test-classes/mod> select atan2(1,2);
> +-+
> | EXPR$0  |
> +-+
> | 0.4636476090008061 |
> +-+
> 1 row selected (0.173 seconds)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6223: [FLINK-9688] ATAN2 sql function support

2018-07-03 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/6223#discussion_r199743600
  
--- Diff: docs/dev/table/tableApi.md ---
@@ -2184,6 +2184,17 @@ NUMERIC.atan()
   
 
 
+
+  
+{% highlight java %}
+NUMERIC.atan2(NUMERIC)
--- End diff --

given that both parameters are equally important, we might want to change 
the syntax to `atan2(Numeric, Numeric)`. IMO, that would be more intuitive. 
What do you think?


---


[jira] [Closed] (FLINK-9711) Flink CLI --running option does not show RUNNING only jobs

2018-07-03 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9711?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-9711.
---
Resolution: Duplicate

Already fixed for 1.5.1.

> Flink CLI --running option does not show RUNNING only jobs
> --
>
> Key: FLINK-9711
> URL: https://issues.apache.org/jira/browse/FLINK-9711
> Project: Flink
>  Issue Type: Bug
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Sayat Satybaldiyev
>Priority: Major
>
> In Flink CLI there's a command list with option --running that according to 
> descriptions "Show only running programs and their JobIDs". However, in 
> practice, it also shows jobs that are in the *CANCELED* state, which is a 
> completed job.
>  
> {code:java}
> flink list --running -m job-manager:8081 
> Waiting for response...
> -- Running/Restarting Jobs ---
> 03.07.2018 10:29:34 : 6e49027e843ced2ad798da549004243e : Enriched 
> TrackingClick (CANCELED)
> 03.07.2018 10:42:31 : c901ae58787ba6aea4a46d6bb9dc2b3c : Enriched 
> TrackingClick (CANCELED)
> 03.07.2018 11:27:51 : 83ab149ad528cfd956da7090543cbc72 : Enriched 
> TrackingClick (RUNNING)
> --
> {code}
>  
> Proposal it to extend CLI program to show jobs only in the *RUNNING* state. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6241: [FLINK-9289][rest] Rework JobSubmitHandler to acce...

2018-07-03 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/6241

[FLINK-9289][rest] Rework JobSubmitHandler to accept jar files

Backport of #6203 for 1.5.

The differences to the current version of the linked PR (state @ 
b9a804c350d12469eccf0fd78d82dcac8eaa3c5b) are:
* `ClientUtils` were not introduced
* removed all code related to the upload of artifacts, as this feature was 
introduced in 1.6

@tillrohrmann 


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 9280_epsilon_bp

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6241.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6241


commit a8765ae2f006c99a905afd93e93cbe9b0cfef09b
Author: zentol 
Date:   2018-06-11T09:45:12Z

[FLINK-9289][rest] Rework JobSubmitHandler to accept jar files




---


[jira] [Updated] (FLINK-9713) Support versioned joins in planning phase

2018-07-03 Thread Piotr Nowojski (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9713?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-9713:
--
Component/s: Table API & SQL

> Support versioned joins in planning phase
> -
>
> Key: FLINK-9713
> URL: https://issues.apache.org/jira/browse/FLINK-9713
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6201: [FLINK-8866][Table API & SQL] Add support for unified tab...

2018-07-03 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/6201
  
Hi @suez1224, that sounds good overall. :-)

A few comments:

- I would not add a user-facing property `connector.support-timestamp` 
because a user chooses that by choosing the connector type. Whether the 
connector supports writing a system timestamp can be an internal 
field/annotation/interface of the `TableSink` that is generated from the 
properties.
- Copying the timestamp to the StreamRecord timestamp field can be done 
with a process function. Actually, we do that already when converting a Table 
into a DataStream. Setting the flag in the Kafka TableSink should be easy.
- Not sure if `from-source` needs to be supported by the initial version. 
We could just implement `from-field` for now, and handle `from-source` as a 
follow up issue. Since we are approaching feature freeze, I think this might be 
a good idea at this point.

What do you think?
Fabian


---


[jira] [Updated] (FLINK-9713) Support versioned joins in planning phase

2018-07-03 Thread Piotr Nowojski (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9713?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-9713:
--
Affects Version/s: 1.5.0

> Support versioned joins in planning phase
> -
>
> Key: FLINK-9713
> URL: https://issues.apache.org/jira/browse/FLINK-9713
> Project: Flink
>  Issue Type: Sub-task
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9289) Parallelism of generated operators should have max parallism of input

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16531112#comment-16531112
 ] 

ASF GitHub Bot commented on FLINK-9289:
---

GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/6241

[FLINK-9289][rest] Rework JobSubmitHandler to accept jar files

Backport of #6203 for 1.5.

The differences to the current version of the linked PR (state @ 
b9a804c350d12469eccf0fd78d82dcac8eaa3c5b) are:
* `ClientUtils` were not introduced
* removed all code related to the upload of artifacts, as this feature was 
introduced in 1.6

@tillrohrmann 


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 9280_epsilon_bp

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6241.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6241


commit a8765ae2f006c99a905afd93e93cbe9b0cfef09b
Author: zentol 
Date:   2018-06-11T09:45:12Z

[FLINK-9289][rest] Rework JobSubmitHandler to accept jar files




> Parallelism of generated operators should have max parallism of input
> -
>
> Key: FLINK-9289
> URL: https://issues.apache.org/jira/browse/FLINK-9289
> Project: Flink
>  Issue Type: Bug
>  Components: DataSet API
>Affects Versions: 1.5.0, 1.4.2, 1.6.0
>Reporter: Fabian Hueske
>Assignee: Xingcan Cui
>Priority: Major
>  Labels: pull-request-available
>
> The DataSet API aims to chain generated operators such as key extraction 
> mappers to their predecessor. This is done by assigning the same parallelism 
> as the input operator.
> If a generated operator has more than two inputs, the operator cannot be 
> chained anymore and the operator is generated with default parallelism. This 
> can lead to a {code}NoResourceAvailableException: Not enough free slots 
> available to run the job.{code} as reported by a user on the mailing list: 
> https://lists.apache.org/thread.html/60a8bffcce54717b6273bf3de0f43f1940fbb711590f4b90cd666c9a@%3Cuser.flink.apache.org%3E
> I suggest to set the parallelism of a generated operator to the max 
> parallelism of all of its inputs to fix this problem.
> Until the problem is fixed, a workaround is to set the default parallelism at 
> the {{ExecutionEnvironment}}:
> {code}
> ExecutionEnvironment env = ...
> env.setParallelism(2);
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9712) Support enrichment joins in Flink SQL/Table API

2018-07-03 Thread Piotr Nowojski (JIRA)
Piotr Nowojski created FLINK-9712:
-

 Summary: Support enrichment joins in Flink SQL/Table API
 Key: FLINK-9712
 URL: https://issues.apache.org/jira/browse/FLINK-9712
 Project: Flink
  Issue Type: New Feature
  Components: Table API  SQL
Affects Versions: 1.5.0
Reporter: Piotr Nowojski






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9713) Support versioned joins in planning phase

2018-07-03 Thread Piotr Nowojski (JIRA)
Piotr Nowojski created FLINK-9713:
-

 Summary: Support versioned joins in planning phase
 Key: FLINK-9713
 URL: https://issues.apache.org/jira/browse/FLINK-9713
 Project: Flink
  Issue Type: Sub-task
Reporter: Piotr Nowojski






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9444) KafkaAvroTableSource failed to work for map and array fields

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16531117#comment-16531117
 ] 

ASF GitHub Bot commented on FLINK-9444:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6218#discussion_r199746426
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java
 ---
@@ -37,18 +43,42 @@
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
 
 /**
- * Serialization schema that serializes {@link Row} over {@link 
SpecificRecord} into a Avro bytes.
+ * Serialization schema that serializes {@link Row} into Avro bytes.
+ *
+ * Serializes objects that are represented in (nested) Flink rows. It 
support types that
+ * are compatible with Flink's Table & SQL API.
+ *
+ * Note: Changes in this class need to be kept in sync with the 
corresponding runtime
+ * class {@link AvroRowDeserializationSchema} and schema converter {@link 
AvroSchemaConverter}.
  */
 public class AvroRowSerializationSchema implements 
SerializationSchema {
 
/**
-* Avro record class.
+* Used for time conversions into SQL types.
+*/
+   private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
--- End diff --

We are using this pattern at different places. E.g. 
`org.apache.flink.orc.OrcBatchReader`. The problem is that Java's SQL 
time/date/timestamp are a complete design fail. They are timezone specific. 
This adds/removes the local timezone from the timestamp. Such that the string 
representation of the produced `Timestamp` object is always correct.


> KafkaAvroTableSource failed to work for map and array fields
> 
>
> Key: FLINK-9444
> URL: https://issues.apache.org/jira/browse/FLINK-9444
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Table API  SQL
>Affects Versions: 1.6.0
>Reporter: Jun Zhang
>Assignee: Jun Zhang
>Priority: Blocker
>  Labels: patch, pull-request-available
> Fix For: 1.6.0
>
> Attachments: flink-9444.patch
>
>
> When some Avro schema has map/array fields and the corresponding TableSchema 
> declares *MapTypeInfo/ListTypeInfo* for these fields, an exception will be 
> thrown when registering the *KafkaAvroTableSource*, complaining like:
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> Type Map of table field 'event' does not match with type 
> GenericType of the field 'event' of the TableSource return 
> type.
>  at org.apache.flink.table.api.ValidationException$.apply(exceptions.scala:74)
>  at 
> org.apache.flink.table.sources.TableSourceUtil$$anonfun$validateTableSource$1.apply(TableSourceUtil.scala:92)
>  at 
> org.apache.flink.table.sources.TableSourceUtil$$anonfun$validateTableSource$1.apply(TableSourceUtil.scala:71)
>  at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>  at 
> org.apache.flink.table.sources.TableSourceUtil$.validateTableSource(TableSourceUtil.scala:71)
>  at 
> org.apache.flink.table.plan.schema.StreamTableSourceTable.(StreamTableSourceTable.scala:33)
>  at 
> org.apache.flink.table.api.StreamTableEnvironment.registerTableSourceInternal(StreamTableEnvironment.scala:124)
>  at 
> org.apache.flink.table.api.TableEnvironment.registerTableSource(TableEnvironment.scala:438)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9712) Support enrichment joins in Flink SQL/Table API

2018-07-03 Thread Piotr Nowojski (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-9712:
--
Description: 
As described here:

https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing

> Support enrichment joins in Flink SQL/Table API
> ---
>
> Key: FLINK-9712
> URL: https://issues.apache.org/jira/browse/FLINK-9712
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Priority: Major
>
> As described here:
> https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16531114#comment-16531114
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/6201
  
Hi @suez1224, that sounds good overall. :-)

A few comments:

- I would not add a user-facing property `connector.support-timestamp` 
because a user chooses that by choosing the connector type. Whether the 
connector supports writing a system timestamp can be an internal 
field/annotation/interface of the `TableSink` that is generated from the 
properties.
- Copying the timestamp to the StreamRecord timestamp field can be done 
with a process function. Actually, we do that already when converting a Table 
into a DataStream. Setting the flag in the Kafka TableSink should be easy.
- Not sure if `from-source` needs to be supported by the initial version. 
We could just implement `from-field` for now, and handle `from-source` as a 
follow up issue. Since we are approaching feature freeze, I think this might be 
a good idea at this point.

What do you think?
Fabian


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6218: [FLINK-9444] [formats] Add full SQL support for Av...

2018-07-03 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6218#discussion_r199746600
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
 ---
@@ -17,154 +17,338 @@
 
 package org.apache.flink.formats.avro;
 
+import org.apache.flink.annotation.PublicEvolving;
 import 
org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.formats.avro.typeutils.AvroRecordClassConverter;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
 import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.IndexedRecord;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.Decoder;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.specific.SpecificData;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.specific.SpecificRecord;
-import org.apache.avro.specific.SpecificRecordBase;
-import org.apache.avro.util.Utf8;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeFieldType;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
 
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
 
 /**
- * Deserialization schema from Avro bytes over {@link SpecificRecord} to 
{@link Row}.
+ * Deserialization schema from Avro bytes to {@link Row}.
  *
- * Deserializes the byte[] messages into (nested) Flink 
Rows.
+ * Deserializes the byte[] messages into (nested) Flink 
rows. It converts Avro types
+ * into types that are compatible with Flink's Table & SQL API.
  *
- * {@link Utf8} is converted to regular Java Strings.
+ * Projects with Avro records containing logical date/time types need 
to add a JodaTime
+ * dependency.
+ *
+ * Note: Changes in this class need to be kept in sync with the 
corresponding runtime
+ * class {@link AvroRowSerializationSchema} and schema converter {@link 
AvroSchemaConverter}.
  */
+@PublicEvolving
 public class AvroRowDeserializationSchema extends 
AbstractDeserializationSchema {
 
/**
-* Avro record class.
+* Used for time conversions into SQL types.
+*/
+   private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
--- End diff --

See comment above.


---


[jira] [Commented] (FLINK-9674) Remove 65s sleep in QueryableState E2E test

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16531118#comment-16531118
 ] 

ASF GitHub Bot commented on FLINK-9674:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6216
  
merging.


> Remove 65s sleep in QueryableState E2E test
> ---
>
> Key: FLINK-9674
> URL: https://issues.apache.org/jira/browse/FLINK-9674
> Project: Flink
>  Issue Type: Improvement
>  Components: Queryable State, Tests
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
>
> The {{test_queryable_state_restart_tm.sh}} kills a taskmanager, waits for the 
> loss to be noticed, starts a new tm and waits for the job to continue.
> {code}
> kill_random_taskmanager
> [...]
> sleep 65 # this is a little longer than the heartbeat timeout so that the TM 
> is gone
> start_and_wait_for_tm
> {code}
> Instead of waiting for a fixed amount of time that is tied to some config 
> value we should wait for a specific event, like the job being canceled.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9714) Support versioned joins with processing time

2018-07-03 Thread Piotr Nowojski (JIRA)
Piotr Nowojski created FLINK-9714:
-

 Summary: Support versioned joins with processing time
 Key: FLINK-9714
 URL: https://issues.apache.org/jira/browse/FLINK-9714
 Project: Flink
  Issue Type: Sub-task
  Components: Table API  SQL
Affects Versions: 1.5.0
Reporter: Piotr Nowojski






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6216: [FLINK-9674][tests] Replace hard-coded sleeps in QS E2E t...

2018-07-03 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6216
  
merging.


---


[GitHub] flink pull request #6218: [FLINK-9444] [formats] Add full SQL support for Av...

2018-07-03 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6218#discussion_r199746426
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java
 ---
@@ -37,18 +43,42 @@
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
 
 /**
- * Serialization schema that serializes {@link Row} over {@link 
SpecificRecord} into a Avro bytes.
+ * Serialization schema that serializes {@link Row} into Avro bytes.
+ *
+ * Serializes objects that are represented in (nested) Flink rows. It 
support types that
+ * are compatible with Flink's Table & SQL API.
+ *
+ * Note: Changes in this class need to be kept in sync with the 
corresponding runtime
+ * class {@link AvroRowDeserializationSchema} and schema converter {@link 
AvroSchemaConverter}.
  */
 public class AvroRowSerializationSchema implements 
SerializationSchema {
 
/**
-* Avro record class.
+* Used for time conversions into SQL types.
+*/
+   private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
--- End diff --

We are using this pattern at different places. E.g. 
`org.apache.flink.orc.OrcBatchReader`. The problem is that Java's SQL 
time/date/timestamp are a complete design fail. They are timezone specific. 
This adds/removes the local timezone from the timestamp. Such that the string 
representation of the produced `Timestamp` object is always correct.


---


[jira] [Updated] (FLINK-9715) Support versioned joins with event time

2018-07-03 Thread Piotr Nowojski (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-9715:
--
Affects Version/s: 1.5.0

> Support versioned joins with event time
> ---
>
> Key: FLINK-9715
> URL: https://issues.apache.org/jira/browse/FLINK-9715
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9444) KafkaAvroTableSource failed to work for map and array fields

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16531120#comment-16531120
 ] 

ASF GitHub Bot commented on FLINK-9444:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6218#discussion_r199746600
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
 ---
@@ -17,154 +17,338 @@
 
 package org.apache.flink.formats.avro;
 
+import org.apache.flink.annotation.PublicEvolving;
 import 
org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.formats.avro.typeutils.AvroRecordClassConverter;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
 import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.IndexedRecord;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.Decoder;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.specific.SpecificData;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.specific.SpecificRecord;
-import org.apache.avro.specific.SpecificRecordBase;
-import org.apache.avro.util.Utf8;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeFieldType;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
 
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
 
 /**
- * Deserialization schema from Avro bytes over {@link SpecificRecord} to 
{@link Row}.
+ * Deserialization schema from Avro bytes to {@link Row}.
  *
- * Deserializes the byte[] messages into (nested) Flink 
Rows.
+ * Deserializes the byte[] messages into (nested) Flink 
rows. It converts Avro types
+ * into types that are compatible with Flink's Table & SQL API.
  *
- * {@link Utf8} is converted to regular Java Strings.
+ * Projects with Avro records containing logical date/time types need 
to add a JodaTime
+ * dependency.
+ *
+ * Note: Changes in this class need to be kept in sync with the 
corresponding runtime
+ * class {@link AvroRowSerializationSchema} and schema converter {@link 
AvroSchemaConverter}.
  */
+@PublicEvolving
 public class AvroRowDeserializationSchema extends 
AbstractDeserializationSchema {
 
/**
-* Avro record class.
+* Used for time conversions into SQL types.
+*/
+   private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
--- End diff --

See comment above.


> KafkaAvroTableSource failed to work for map and array fields
> 
>
> Key: FLINK-9444
> URL: https://issues.apache.org/jira/browse/FLINK-9444
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Table API  SQL
>Affects Versions: 1.6.0
>Reporter: Jun Zhang
>Assignee: Jun Zhang
>Priority: Blocker
>  Labels: patch, pull-request-available
> Fix For: 1.6.0
>
> Attachments: flink-9444.patch
>
>
> When some Avro schema has map/array fields and the corresponding TableSchema 
> declares *MapTypeInfo/ListTypeInfo* for these fields, an exception will be 
> thrown when registering the *KafkaAvroTableSource*, complaining like:
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> Type Map of table field 'event' does not match with type 
> GenericType of the field 'event' of the TableSource return 
> type.
>  at 

[jira] [Updated] (FLINK-9715) Support versioned joins with event time

2018-07-03 Thread Piotr Nowojski (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-9715:
--
Component/s: Table API & SQL

> Support versioned joins with event time
> ---
>
> Key: FLINK-9715
> URL: https://issues.apache.org/jira/browse/FLINK-9715
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9715) Support versioned joins with event time

2018-07-03 Thread Piotr Nowojski (JIRA)
Piotr Nowojski created FLINK-9715:
-

 Summary: Support versioned joins with event time
 Key: FLINK-9715
 URL: https://issues.apache.org/jira/browse/FLINK-9715
 Project: Flink
  Issue Type: Sub-task
Reporter: Piotr Nowojski






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6218: [FLINK-9444] [formats] Add full SQL support for Av...

2018-07-03 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6218#discussion_r199747223
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
 ---
@@ -17,154 +17,338 @@
 
 package org.apache.flink.formats.avro;
 
+import org.apache.flink.annotation.PublicEvolving;
 import 
org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.formats.avro.typeutils.AvroRecordClassConverter;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
 import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.IndexedRecord;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.Decoder;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.specific.SpecificData;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.specific.SpecificRecord;
-import org.apache.avro.specific.SpecificRecordBase;
-import org.apache.avro.util.Utf8;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeFieldType;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
 
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
 
 /**
- * Deserialization schema from Avro bytes over {@link SpecificRecord} to 
{@link Row}.
+ * Deserialization schema from Avro bytes to {@link Row}.
  *
- * Deserializes the byte[] messages into (nested) Flink 
Rows.
+ * Deserializes the byte[] messages into (nested) Flink 
rows. It converts Avro types
+ * into types that are compatible with Flink's Table & SQL API.
  *
- * {@link Utf8} is converted to regular Java Strings.
+ * Projects with Avro records containing logical date/time types need 
to add a JodaTime
+ * dependency.
+ *
+ * Note: Changes in this class need to be kept in sync with the 
corresponding runtime
+ * class {@link AvroRowSerializationSchema} and schema converter {@link 
AvroSchemaConverter}.
  */
+@PublicEvolving
 public class AvroRowDeserializationSchema extends 
AbstractDeserializationSchema {
 
/**
-* Avro record class.
+* Used for time conversions into SQL types.
+*/
+   private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
+
+   /**
+* Avro record class for deserialization. Might be null if record class 
is not available.
 */
private Class recordClazz;
 
/**
-* Schema for deterministic field order.
+* Schema string for deserialization.
+*/
+   private String schemaString;
+
+   /**
+* Avro serialization schema.
 */
private transient Schema schema;
 
/**
-* Reader that deserializes byte array into a record.
+* Type information describing the result type.
--- End diff --

Sorry about that. I actually rewrote the entire class. It might make sense 
to review it entirely instead of the diff.


---


[GitHub] flink pull request #6242: [FLINK-9711][CLI] Filter only RUNNING jobs when --...

2018-07-03 Thread satybald
GitHub user satybald opened a pull request:

https://github.com/apache/flink/pull/6242

[FLINK-9711][CLI] Filter only RUNNING jobs when --running option provided 
in CLI

## What is the purpose of the change

In Flink CLI there's a command list with option --running that according to 
descriptions "Show only running programs and their JobIDs". However, in 
practice, it also shows jobs that are in the CANCELED and other states, which 
does not belong to a RUNNING state.

## Brief change log
  - *filter only RUNNING jobs in CLI if corresponding option is provided*

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.


## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? (no)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/satybald/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6242.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6242


commit 86529e8843dd073f79da8be07cc821d46c10bf7a
Author: Sayat Satybaldiyev 
Date:   2018-07-03T09:40:57Z

Filter only RUNNING jobs when --running option provided in CLI




---


[jira] [Commented] (FLINK-9444) KafkaAvroTableSource failed to work for map and array fields

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16531123#comment-16531123
 ] 

ASF GitHub Bot commented on FLINK-9444:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6218#discussion_r199747223
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
 ---
@@ -17,154 +17,338 @@
 
 package org.apache.flink.formats.avro;
 
+import org.apache.flink.annotation.PublicEvolving;
 import 
org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.formats.avro.typeutils.AvroRecordClassConverter;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
 import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.IndexedRecord;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.Decoder;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.specific.SpecificData;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.specific.SpecificRecord;
-import org.apache.avro.specific.SpecificRecordBase;
-import org.apache.avro.util.Utf8;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeFieldType;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
 
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
 
 /**
- * Deserialization schema from Avro bytes over {@link SpecificRecord} to 
{@link Row}.
+ * Deserialization schema from Avro bytes to {@link Row}.
  *
- * Deserializes the byte[] messages into (nested) Flink 
Rows.
+ * Deserializes the byte[] messages into (nested) Flink 
rows. It converts Avro types
+ * into types that are compatible with Flink's Table & SQL API.
  *
- * {@link Utf8} is converted to regular Java Strings.
+ * Projects with Avro records containing logical date/time types need 
to add a JodaTime
+ * dependency.
+ *
+ * Note: Changes in this class need to be kept in sync with the 
corresponding runtime
+ * class {@link AvroRowSerializationSchema} and schema converter {@link 
AvroSchemaConverter}.
  */
+@PublicEvolving
 public class AvroRowDeserializationSchema extends 
AbstractDeserializationSchema {
 
/**
-* Avro record class.
+* Used for time conversions into SQL types.
+*/
+   private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
+
+   /**
+* Avro record class for deserialization. Might be null if record class 
is not available.
 */
private Class recordClazz;
 
/**
-* Schema for deterministic field order.
+* Schema string for deserialization.
+*/
+   private String schemaString;
+
+   /**
+* Avro serialization schema.
 */
private transient Schema schema;
 
/**
-* Reader that deserializes byte array into a record.
+* Type information describing the result type.
--- End diff --

Sorry about that. I actually rewrote the entire class. It might make sense 
to review it entirely instead of the diff.


> KafkaAvroTableSource failed to work for map and array fields
> 
>
> Key: FLINK-9444
> URL: https://issues.apache.org/jira/browse/FLINK-9444
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Table API  SQL
>Affects Versions: 1.6.0
>Reporter: Jun Zhang
>

[jira] [Created] (FLINK-9716) Support scans from table version function

2018-07-03 Thread Piotr Nowojski (JIRA)
Piotr Nowojski created FLINK-9716:
-

 Summary: Support scans from table version function
 Key: FLINK-9716
 URL: https://issues.apache.org/jira/browse/FLINK-9716
 Project: Flink
  Issue Type: Sub-task
Reporter: Piotr Nowojski


Given TVF of {{Rates}} this should work:

 
{code:java}
SELECT * FROM Rates(2016-06-27 10:10:42.123)
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9711) Flink CLI --running option does not show RUNNING only jobs

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16531124#comment-16531124
 ] 

ASF GitHub Bot commented on FLINK-9711:
---

GitHub user satybald opened a pull request:

https://github.com/apache/flink/pull/6242

[FLINK-9711][CLI] Filter only RUNNING jobs when --running option provided 
in CLI

## What is the purpose of the change

In Flink CLI there's a command list with option --running that according to 
descriptions "Show only running programs and their JobIDs". However, in 
practice, it also shows jobs that are in the CANCELED and other states, which 
does not belong to a RUNNING state.

## Brief change log
  - *filter only RUNNING jobs in CLI if corresponding option is provided*

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.


## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? (no)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/satybald/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6242.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6242


commit 86529e8843dd073f79da8be07cc821d46c10bf7a
Author: Sayat Satybaldiyev 
Date:   2018-07-03T09:40:57Z

Filter only RUNNING jobs when --running option provided in CLI




> Flink CLI --running option does not show RUNNING only jobs
> --
>
> Key: FLINK-9711
> URL: https://issues.apache.org/jira/browse/FLINK-9711
> Project: Flink
>  Issue Type: Bug
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Sayat Satybaldiyev
>Priority: Major
>
> In Flink CLI there's a command list with option --running that according to 
> descriptions "Show only running programs and their JobIDs". However, in 
> practice, it also shows jobs that are in the *CANCELED* state, which is a 
> completed job.
>  
> {code:java}
> flink list --running -m job-manager:8081 
> Waiting for response...
> -- Running/Restarting Jobs ---
> 03.07.2018 10:29:34 : 6e49027e843ced2ad798da549004243e : Enriched 
> TrackingClick (CANCELED)
> 03.07.2018 10:42:31 : c901ae58787ba6aea4a46d6bb9dc2b3c : Enriched 
> TrackingClick (CANCELED)
> 03.07.2018 11:27:51 : 83ab149ad528cfd956da7090543cbc72 : Enriched 
> TrackingClick (RUNNING)
> --
> {code}
>  
> Proposal it to extend CLI program to show jobs only in the *RUNNING* state. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9714) Support versioned joins with processing time

2018-07-03 Thread Piotr Nowojski (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9714?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-9714:
--
Description: 
Queries like:
{code:java}
SELECT 
  o.amount * r.rate 
FROM 
  Orders AS o, 
  LATERAL TABLE (Rates(o.rowtime)) AS r 
WHERE o.currency = r.currency{code}
should work for processing time

> Support versioned joins with processing time
> 
>
> Key: FLINK-9714
> URL: https://issues.apache.org/jira/browse/FLINK-9714
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Priority: Major
>
> Queries like:
> {code:java}
> SELECT 
>   o.amount * r.rate 
> FROM 
>   Orders AS o, 
>   LATERAL TABLE (Rates(o.rowtime)) AS r 
> WHERE o.currency = r.currency{code}
> should work for processing time



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9713) Support versioned joins in planning phase

2018-07-03 Thread Piotr Nowojski (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9713?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-9713:
--
Description: 
Queries like:
{code:java}
SELECT 
  o.amount * r.rate 
FROM 
  Orders AS o, 
  LATERAL TABLE (Rates(o.rowtime)) AS r 
WHERE o.currency = r.currency{code}
should evaluate to valid plan with versioned joins plan node.

  was:
Queries like:
{code:java}
SELECT 
  o.amount * r.rate 
FROM 
  Orders AS o, 
  LATERAL TABLE (TemporalRates(o.rowtime)) AS r 
WHERE o.currency = r.currency{code}
should evaluate to valid plan with versioned joins plan node.


> Support versioned joins in planning phase
> -
>
> Key: FLINK-9713
> URL: https://issues.apache.org/jira/browse/FLINK-9713
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Priority: Major
>
> Queries like:
> {code:java}
> SELECT 
>   o.amount * r.rate 
> FROM 
>   Orders AS o, 
>   LATERAL TABLE (Rates(o.rowtime)) AS r 
> WHERE o.currency = r.currency{code}
> should evaluate to valid plan with versioned joins plan node.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6218: [FLINK-9444] [formats] Add full SQL support for Av...

2018-07-03 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6218#discussion_r199748214
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
 ---
@@ -17,154 +17,338 @@
 
 package org.apache.flink.formats.avro;
 
+import org.apache.flink.annotation.PublicEvolving;
 import 
org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.formats.avro.typeutils.AvroRecordClassConverter;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
 import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.IndexedRecord;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.Decoder;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.specific.SpecificData;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.specific.SpecificRecord;
-import org.apache.avro.specific.SpecificRecordBase;
-import org.apache.avro.util.Utf8;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeFieldType;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
 
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
 
 /**
- * Deserialization schema from Avro bytes over {@link SpecificRecord} to 
{@link Row}.
+ * Deserialization schema from Avro bytes to {@link Row}.
  *
- * Deserializes the byte[] messages into (nested) Flink 
Rows.
+ * Deserializes the byte[] messages into (nested) Flink 
rows. It converts Avro types
+ * into types that are compatible with Flink's Table & SQL API.
  *
- * {@link Utf8} is converted to regular Java Strings.
+ * Projects with Avro records containing logical date/time types need 
to add a JodaTime
+ * dependency.
+ *
+ * Note: Changes in this class need to be kept in sync with the 
corresponding runtime
+ * class {@link AvroRowSerializationSchema} and schema converter {@link 
AvroSchemaConverter}.
  */
+@PublicEvolving
 public class AvroRowDeserializationSchema extends 
AbstractDeserializationSchema {
 
/**
-* Avro record class.
+* Used for time conversions into SQL types.
+*/
+   private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
+
+   /**
+* Avro record class for deserialization. Might be null if record class 
is not available.
 */
private Class recordClazz;
 
/**
-* Schema for deterministic field order.
+* Schema string for deserialization.
+*/
+   private String schemaString;
+
+   /**
+* Avro serialization schema.
 */
private transient Schema schema;
 
/**
-* Reader that deserializes byte array into a record.
+* Type information describing the result type.
 */
-   private transient DatumReader datumReader;
+   private transient TypeInformation typeInfo;
 
/**
-* Input stream to read message from.
+* Record to deserialize byte array.
 */
-   private transient MutableByteArrayInputStream inputStream;
+   private transient IndexedRecord record;
 
/**
-* Avro decoder that decodes binary data.
+* Reader that deserializes byte array into a record.
 */
-   private transient Decoder decoder;
+   private transient DatumReader datumReader;
 
/**
-* Record to deserialize byte array to.
+* Input stream to read message from.
 */
-   private SpecificRecord record;
+   private transient MutableByteArrayInputStream 

[jira] [Commented] (FLINK-9444) KafkaAvroTableSource failed to work for map and array fields

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16531128#comment-16531128
 ] 

ASF GitHub Bot commented on FLINK-9444:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6218#discussion_r199748214
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
 ---
@@ -17,154 +17,338 @@
 
 package org.apache.flink.formats.avro;
 
+import org.apache.flink.annotation.PublicEvolving;
 import 
org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.formats.avro.typeutils.AvroRecordClassConverter;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
 import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.IndexedRecord;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.Decoder;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.specific.SpecificData;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.specific.SpecificRecord;
-import org.apache.avro.specific.SpecificRecordBase;
-import org.apache.avro.util.Utf8;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeFieldType;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
 
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
 
 /**
- * Deserialization schema from Avro bytes over {@link SpecificRecord} to 
{@link Row}.
+ * Deserialization schema from Avro bytes to {@link Row}.
  *
- * Deserializes the byte[] messages into (nested) Flink 
Rows.
+ * Deserializes the byte[] messages into (nested) Flink 
rows. It converts Avro types
+ * into types that are compatible with Flink's Table & SQL API.
  *
- * {@link Utf8} is converted to regular Java Strings.
+ * Projects with Avro records containing logical date/time types need 
to add a JodaTime
+ * dependency.
+ *
+ * Note: Changes in this class need to be kept in sync with the 
corresponding runtime
+ * class {@link AvroRowSerializationSchema} and schema converter {@link 
AvroSchemaConverter}.
  */
+@PublicEvolving
 public class AvroRowDeserializationSchema extends 
AbstractDeserializationSchema {
 
/**
-* Avro record class.
+* Used for time conversions into SQL types.
+*/
+   private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
+
+   /**
+* Avro record class for deserialization. Might be null if record class 
is not available.
 */
private Class recordClazz;
 
/**
-* Schema for deterministic field order.
+* Schema string for deserialization.
+*/
+   private String schemaString;
+
+   /**
+* Avro serialization schema.
 */
private transient Schema schema;
 
/**
-* Reader that deserializes byte array into a record.
+* Type information describing the result type.
 */
-   private transient DatumReader datumReader;
+   private transient TypeInformation typeInfo;
 
/**
-* Input stream to read message from.
+* Record to deserialize byte array.
 */
-   private transient MutableByteArrayInputStream inputStream;
+   private transient IndexedRecord record;
 
/**
-* Avro decoder that decodes binary data.
+* Reader that deserializes byte array into a record.
 */
-   private transient Decoder decoder;
+   private transient DatumReader 

[jira] [Updated] (FLINK-9713) Support versioned joins in planning phase

2018-07-03 Thread Piotr Nowojski (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9713?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-9713:
--
Description: 
Queries like:
{code:java}
SELECT 
  o.amount * r.rate 
FROM 
  Orders AS o, 
  LATERAL TABLE (TemporalRates(o.rowtime)) AS r 
WHERE o.currency = r.currency{code}
should evaluate to valid plan with versioned joins plan node.

> Support versioned joins in planning phase
> -
>
> Key: FLINK-9713
> URL: https://issues.apache.org/jira/browse/FLINK-9713
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Priority: Major
>
> Queries like:
> {code:java}
> SELECT 
>   o.amount * r.rate 
> FROM 
>   Orders AS o, 
>   LATERAL TABLE (TemporalRates(o.rowtime)) AS r 
> WHERE o.currency = r.currency{code}
> should evaluate to valid plan with versioned joins plan node.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9715) Support versioned joins with event time

2018-07-03 Thread Piotr Nowojski (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-9715:
--
Description: 
Queries like:
{code:java}
SELECT 
  o.amount * r.rate 
FROM 
  Orders AS o, 
  LATERAL TABLE (Rates(o.rowtime)) AS r 
WHERE o.currency = r.currency{code}
should work with event time

> Support versioned joins with event time
> ---
>
> Key: FLINK-9715
> URL: https://issues.apache.org/jira/browse/FLINK-9715
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Priority: Major
>
> Queries like:
> {code:java}
> SELECT 
>   o.amount * r.rate 
> FROM 
>   Orders AS o, 
>   LATERAL TABLE (Rates(o.rowtime)) AS r 
> WHERE o.currency = r.currency{code}
> should work with event time



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   3   4   5   >