[GitHub] flink pull request #6238: [FLINK-9636][network] fix inconsistency with faile...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/6238#discussion_r199702444 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java --- @@ -147,7 +151,12 @@ public void recycle(MemorySegment segment) { this.numTotalRequiredBuffers += numRequiredBuffers; - redistributeBuffers(); + try { + redistributeBuffers(); + } catch (Throwable t) { + this.numTotalRequiredBuffers -= numRequiredBuffers; + ExceptionUtils.rethrowIOException(t); + } } final List segments = new ArrayList<>(numRequiredBuffers); --- End diff -- ah, true, thanks for pointing this out, I must have been blind on one eye yesterday I'll integrate this change as well ---
[jira] [Commented] (FLINK-9622) DistributedCacheDfsTest failed on travis
[ https://issues.apache.org/jira/browse/FLINK-9622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530928#comment-16530928 ] Till Rohrmann commented on FLINK-9622: -- Well the actual problem is that there are two concurrent directory creation operations happening and {{LocalFileSystem#create}} fails if it cannot create the directory. > DistributedCacheDfsTest failed on travis > > > Key: FLINK-9622 > URL: https://issues.apache.org/jira/browse/FLINK-9622 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.6.0 >Reporter: Sihua Zhou >Assignee: Till Rohrmann >Priority: Blocker > Fix For: 1.6.0 > > > DistributedCacheDfsTest#testDistributeFileViaDFS() failed flakey on travis. > instance: https://api.travis-ci.org/v3/job/394399700/log.txt -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9706) DispatcherTest#testSubmittedJobGraphListener fails on Travis
Chesnay Schepler created FLINK-9706: --- Summary: DispatcherTest#testSubmittedJobGraphListener fails on Travis Key: FLINK-9706 URL: https://issues.apache.org/jira/browse/FLINK-9706 Project: Flink Issue Type: Improvement Components: Distributed Coordination, Tests Affects Versions: 1.5.0, 1.6.0 Reporter: Chesnay Schepler https://travis-ci.org/apache/flink/jobs/399331775 {code:java} testSubmittedJobGraphListener(org.apache.flink.runtime.dispatcher.DispatcherTest) Time elapsed: 0.103 sec <<< FAILURE! java.lang.AssertionError: Expected: a collection with size <1> but: collection size was <0> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at org.junit.Assert.assertThat(Assert.java:956) at org.junit.Assert.assertThat(Assert.java:923) at org.apache.flink.runtime.dispatcher.DispatcherTest.testSubmittedJobGraphListener(DispatcherTest.java:294) testSubmittedJobGraphListener(org.apache.flink.runtime.dispatcher.DispatcherTest) Time elapsed: 0.11 sec <<< ERROR! org.apache.flink.runtime.util.TestingFatalErrorHandler$TestingException: org.apache.flink.runtime.dispatcher.DispatcherException: Could not start the added job b8ab3b7fa8a929bf608a5b65896a2b17 at org.apache.flink.runtime.util.TestingFatalErrorHandler.rethrowError(TestingFatalErrorHandler.java:51) at org.apache.flink.runtime.dispatcher.DispatcherTest.tearDown(DispatcherTest.java:219) Caused by: org.apache.flink.runtime.dispatcher.DispatcherException: Could not start the added job b8ab3b7fa8a929bf608a5b65896a2b17 at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$onAddedJobGraph$28(Dispatcher.java:845) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) at akka.actor.Actor$class.aroundReceive(Actor.scala:502) at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) at akka.actor.ActorCell.invoke(ActorCell.scala:495) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:224) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: org.apache.flink.util.FlinkException: Failed to submit job b8ab3b7fa8a929bf608a5b65896a2b17. at org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:254) at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$onAddedJobGraph$27(Dispatcher.java:836) at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) at
[jira] [Commented] (FLINK-9178) Add rate control for kafka source
[ https://issues.apache.org/jira/browse/FLINK-9178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530933#comment-16530933 ] buptljy commented on FLINK-9178: [~tzulitai] I'd like to add something about this issue, whick is very similar with the problem that I've met recently. The program is developed for receiving realtime data and count distinct ip within a 10-minutes window, and sink the aggregated data into hbase.(The window is based on event time.) Now something goes wrong and we want to re-consume all data from kafka's earliest offset, but it can't work very well because there will be too many event-time-windows in the memory. I think it'll be okay if we use ProcessingTime instead, because there will be only a single window even though you consume from the earliest offset. So I wonder if we can add a parameter to control the rate of receiving data, like a upper bound of consuming rate ? > Add rate control for kafka source > - > > Key: FLINK-9178 > URL: https://issues.apache.org/jira/browse/FLINK-9178 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: buptljy >Assignee: Tarush Grover >Priority: Major > > When I want to run the flink program from the earliest offset in Kafka, it'll > be very easy to cause OOM if there are too much data, because of too many > HeapMemorySegment in NetworkBufferPool. > Maybe we should have some settings to control the rate of the receiving data? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9004) Cluster test: Run general purpose job with failures with Yarn session
[ https://issues.apache.org/jira/browse/FLINK-9004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530977#comment-16530977 ] ASF GitHub Bot commented on FLINK-9004: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/6239 As i understand it we don't want this to be part of the source release, as such we need an exclusion in [create_source_release.sh](https://github.com/apache/flink/blob/master/tools/releasing/create_source_release.sh). > Cluster test: Run general purpose job with failures with Yarn session > - > > Key: FLINK-9004 > URL: https://issues.apache.org/jira/browse/FLINK-9004 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Gary Yao >Priority: Blocker > Labels: pull-request-available > Fix For: 1.6.0, 1.5.1 > > > Similar to FLINK-8973, we should run the general purpose job (FLINK-8971) on > a Yarn session cluster and simulate failures. > The job jar should be ill-packaged, meaning that we include too many > dependencies in the user jar. We should include the Scala library, Hadoop and > Flink itself to verify that there are no class loading issues. > The general purpose job should run with misbehavior activated. Additionally, > we should simulate at least the following failure scenarios: > * Kill Flink processes > * Kill connection to storage system for checkpoints and jobs > * Simulate network partition > We should run the test at least with the following state backend: RocksDB > incremental async and checkpointing to S3. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9004) Cluster test: Run general purpose job with failures with Yarn session
[ https://issues.apache.org/jira/browse/FLINK-9004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530983#comment-16530983 ] ASF GitHub Bot commented on FLINK-9004: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6239 > Cluster test: Run general purpose job with failures with Yarn session > - > > Key: FLINK-9004 > URL: https://issues.apache.org/jira/browse/FLINK-9004 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Gary Yao >Priority: Blocker > Labels: pull-request-available > Fix For: 1.6.0, 1.5.1 > > > Similar to FLINK-8973, we should run the general purpose job (FLINK-8971) on > a Yarn session cluster and simulate failures. > The job jar should be ill-packaged, meaning that we include too many > dependencies in the user jar. We should include the Scala library, Hadoop and > Flink itself to verify that there are no class loading issues. > The general purpose job should run with misbehavior activated. Additionally, > we should simulate at least the following failure scenarios: > * Kill Flink processes > * Kill connection to storage system for checkpoints and jobs > * Simulate network partition > We should run the test at least with the following state backend: RocksDB > incremental async and checkpointing to S3. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6211: [FLINK-9665] PrometheusReporter does not properly unregis...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/6211 I've merged the PR, @jelmerk could you close it? Thanks! ---
[GitHub] flink issue #6239: [FLINK-9004][tests] Implement Jepsen tests to test job av...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/6239 whoops, sorry I accidentally closed this PR, added the wrong PR ID to a commit :( I'm so sorry. ---
[jira] [Commented] (FLINK-9004) Cluster test: Run general purpose job with failures with Yarn session
[ https://issues.apache.org/jira/browse/FLINK-9004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530985#comment-16530985 ] ASF GitHub Bot commented on FLINK-9004: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/6239 whoops, sorry I accidentally closed this PR, added the wrong PR ID to a commit :( I'm so sorry. > Cluster test: Run general purpose job with failures with Yarn session > - > > Key: FLINK-9004 > URL: https://issues.apache.org/jira/browse/FLINK-9004 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Gary Yao >Priority: Blocker > Labels: pull-request-available > Fix For: 1.6.0, 1.5.1 > > > Similar to FLINK-8973, we should run the general purpose job (FLINK-8971) on > a Yarn session cluster and simulate failures. > The job jar should be ill-packaged, meaning that we include too many > dependencies in the user jar. We should include the Scala library, Hadoop and > Flink itself to verify that there are no class loading issues. > The general purpose job should run with misbehavior activated. Additionally, > we should simulate at least the following failure scenarios: > * Kill Flink processes > * Kill connection to storage system for checkpoints and jobs > * Simulate network partition > We should run the test at least with the following state backend: RocksDB > incremental async and checkpointing to S3. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9665) PrometheusReporter does not properly unregister metrics
[ https://issues.apache.org/jira/browse/FLINK-9665?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530986#comment-16530986 ] ASF GitHub Bot commented on FLINK-9665: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/6211 I've merged the PR, @jelmerk could you close it? Thanks! > PrometheusReporter does not properly unregister metrics > --- > > Key: FLINK-9665 > URL: https://issues.apache.org/jira/browse/FLINK-9665 > Project: Flink > Issue Type: Bug > Components: Metrics >Affects Versions: 1.5.0, 1.4.2, 1.6.0 >Reporter: Chesnay Schepler >Assignee: Jelmer Kuperus >Priority: Major > Labels: pull-request-available > > The {{PrometheusReporter}} groups metrics with the same logical scope in a > single {{Collector}} which are periodically polled by Prometheus. > New metrics are added to an existing collector, and a reference count is > maintained so we can eventually cleanup the {{Collector}} itself. > For removed metrics we decrease the reference count, do not however remove > the metrics that were added. As a result the collector will continue to > expose metrics, as long as at least 1 metric exists with the same logical > scope. > If the collector is a {{io.prometheus.client.Gauge}} we can use the > {{#remove()}} method. For histograms we will have to modify our > {{HistogramSummaryProxy}} class to allow removing individual histograms. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-9665) PrometheusReporter does not properly unregister metrics
[ https://issues.apache.org/jira/browse/FLINK-9665?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-9665. --- Resolution: Fixed Fix Version/s: 1.5.1 1.4.3 1.6.0 master: 75d12f967ccef5df3c6c513765bb8db1106a7c87 1.5: 903a323366b91a6f2f471f067462df9dd20cd3f4 1.4: 7719fddbf570075465a7a81722f9412735e83e6e > PrometheusReporter does not properly unregister metrics > --- > > Key: FLINK-9665 > URL: https://issues.apache.org/jira/browse/FLINK-9665 > Project: Flink > Issue Type: Bug > Components: Metrics >Affects Versions: 1.5.0, 1.4.2, 1.6.0 >Reporter: Chesnay Schepler >Assignee: Jelmer Kuperus >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0, 1.4.3, 1.5.1 > > > The {{PrometheusReporter}} groups metrics with the same logical scope in a > single {{Collector}} which are periodically polled by Prometheus. > New metrics are added to an existing collector, and a reference count is > maintained so we can eventually cleanup the {{Collector}} itself. > For removed metrics we decrease the reference count, do not however remove > the metrics that were added. As a result the collector will continue to > expose metrics, as long as at least 1 metric exists with the same logical > scope. > If the collector is a {{io.prometheus.client.Gauge}} we can use the > {{#remove()}} method. For histograms we will have to modify our > {{HistogramSummaryProxy}} class to allow removing individual histograms. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7593) Generated plan does not create correct groups
[ https://issues.apache.org/jira/browse/FLINK-7593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530995#comment-16530995 ] Fabian Hueske commented on FLINK-7593: -- Thanks for the feedback and the effort to reproduce the problem [~cshi]! I'm curious how that bug was fixed and would like to have a look before closing the issue. Thanks, Fabian > Generated plan does not create correct groups > - > > Key: FLINK-7593 > URL: https://issues.apache.org/jira/browse/FLINK-7593 > Project: Flink > Issue Type: Bug > Components: Optimizer >Affects Versions: 1.3.2 > Environment: Windows 7, Ubuntu 16.04, Flink 1.3.2 >Reporter: Steffen Dienst >Priority: Critical > Attachments: flink-good-plan.json > > > Under specific circumstances Flink seems to generate an execution plan that > is incorrect. I was using `groupBy(0).sum(1)` but the resulting csv files > contained multiple entries per group, the grouping did not occur. After some > work I managed to reduce the relevant part of our code to the minimal test > case below. Be careful: All parts need to be present, even the irrelevant > secondary output. If I remove anything else Flink generates correct code > (either by introducing a combiner node prior to the reducer or by using "Sum > (combine))" an the edge before the reducer. > {code:java} > import java.util.ArrayList; > import java.util.Collection; > import org.apache.flink.api.java.DataSet; > import org.apache.flink.api.java.ExecutionEnvironment; > import org.apache.flink.api.java.io.LocalCollectionOutputFormat; > import org.apache.flink.api.java.tuple.Tuple2; > import org.apache.flink.core.fs.FileSystem.WriteMode; > import org.apache.flink.types.LongValue; > import org.apache.flink.util.LongValueSequenceIterator; > public class FlinkOptimizerBug { > public static void main(String[] args) throws Exception { > ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); > > DataSet> x = > env.fromParallelCollection(new LongValueSequenceIterator(0,1000), > LongValue.class) > .map(l -> Tuple2.of(Math.round(Math.random()*1000) % 4, 1L)) > .join(env.fromParallelCollection(new > LongValueSequenceIterator(0,1000), LongValue.class) > .map(l -> Tuple2.of(Math.round(Math.random()*1000) % 4, 1L))) > .where(0).equalTo(0).with((t1,t2) -> t1) > .union(env.fromParallelCollection(new > LongValueSequenceIterator(0,1000), LongValue.class) > .map(l -> Tuple2.of(Math.round(Math.random()*1000) % 4,1L))) > .map(l->l) > .withForwardedFields("f0;f1"); > > Collection out = new ArrayList(); > x.output(new LocalCollectionOutputFormat<>(out )); > > x.groupBy(0) > .sum(1) //BUG: this will not be grouped correctly, so there will be > multiple outputs per group! > .writeAsCsv("/tmp/foo", WriteMode.OVERWRITE) > .setParallelism(1); > env.setParallelism(4); > > System.out.println(env.getExecutionPlan()); > env.execute(); > } > } > {code} > Invalid execution plan generated: > {code:javascript} > { > "nodes": [ > { > "id": 5, > "type": "source", > "pact": "Data Source", > "contents": "at > fromParallelCollection(ExecutionEnvironment.java:870) > (org.apache.flink.api.java.io.ParallelIteratorInputFormat)", > "parallelism": "4", > "global_properties": [ > { "name": "Partitioning", "value": "RANDOM_PARTITIONED" > }, > { "name": "Partitioning Order", "value": "(none)" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "local_properties": [ > { "name": "Order", "value": "(none)" }, > { "name": "Grouping", "value": "not grouped" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "estimates": [ > { "name": "Est. Output Size", "value": "(unknown)" }, > { "name": "Est. Cardinality", "value": "(unknown)" } > ], > "costs": [ > { "name": "Network", "value": "0.0" }, > { "name": "Disk I/O", "value": "0.0" }, > { "name": "CPU", "value": "0.0" }, > { "name": "Cumulative Network", "value": "0.0" }, > { "name": "Cumulative Disk I/O", "value": "0.0" }, > { "name": "Cumulative CPU", "value": "0.0" } > ], > "compiler_hints": [ > { "name": "Output Size (bytes)", "value": "(none)" }, > { "name": "Output Cardinality",
[GitHub] flink issue #6216: [FLINK-9674][tests] Replace hard-coded sleeps in QS E2E t...
Github user florianschmidt1994 commented on the issue: https://github.com/apache/flink/pull/6216 Thanks @zentol, I have one remark (see above), besides that looks good to me! Additionally I had some ideas that came to mind that I think we could discuss: - We have a common pattern of `wait_for_sth` functions, that either - get stuck in a loop for ever if the desired event doesn't happen (I think `wait_for_job_state_transition` also behaves like that, right?) - or iterate a fixed number of times and then continue execution, whereas instead they should fail. I think we should add an issue for that to refactor that over all the tests to have consistent and useful behaviour - Also I think that we could have the backup config and revert config as part of the test runner and always do that, so we avoid running into a corrupted flink-dist if tests don't behave correctly? What do you think! ---
[jira] [Commented] (FLINK-9699) Add api to replace registered table
[ https://issues.apache.org/jira/browse/FLINK-9699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16531004#comment-16531004 ] Fabian Hueske commented on FLINK-9699: -- It might be possible to change the translation behavior, i.e., to immediately create a logical TableScan that references the correct DataSet / TableSource when the {{tEnv.scan()}} method is called. That would require a bit of a redesign of the Table API translation phase. I'm not opposed to that, especially if we can improve the support for notebook-style environments. However, it might be a larger effort that would need to be carefully planned. For example, the Table API handles it's own validation. > Add api to replace registered table > --- > > Key: FLINK-9699 > URL: https://issues.apache.org/jira/browse/FLINK-9699 > Project: Flink > Issue Type: Improvement >Reporter: Jeff Zhang >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9674) Remove 65s sleep in QueryableState E2E test
[ https://issues.apache.org/jira/browse/FLINK-9674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16531006#comment-16531006 ] ASF GitHub Bot commented on FLINK-9674: --- Github user florianschmidt1994 commented on the issue: https://github.com/apache/flink/pull/6216 Thanks @zentol, I have one remark (see above), besides that looks good to me! Additionally I had some ideas that came to mind that I think we could discuss: - We have a common pattern of `wait_for_sth` functions, that either - get stuck in a loop for ever if the desired event doesn't happen (I think `wait_for_job_state_transition` also behaves like that, right?) - or iterate a fixed number of times and then continue execution, whereas instead they should fail. I think we should add an issue for that to refactor that over all the tests to have consistent and useful behaviour - Also I think that we could have the backup config and revert config as part of the test runner and always do that, so we avoid running into a corrupted flink-dist if tests don't behave correctly? What do you think! > Remove 65s sleep in QueryableState E2E test > --- > > Key: FLINK-9674 > URL: https://issues.apache.org/jira/browse/FLINK-9674 > Project: Flink > Issue Type: Improvement > Components: Queryable State, Tests >Affects Versions: 1.5.0, 1.6.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > > The {{test_queryable_state_restart_tm.sh}} kills a taskmanager, waits for the > loss to be noticed, starts a new tm and waits for the job to continue. > {code} > kill_random_taskmanager > [...] > sleep 65 # this is a little longer than the heartbeat timeout so that the TM > is gone > start_and_wait_for_tm > {code} > Instead of waiting for a fixed amount of time that is tied to some config > value we should wait for a specific event, like the job being canceled. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9554) flink scala shell doesn't work in yarn mode
[ https://issues.apache.org/jira/browse/FLINK-9554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-9554: -- Labels: pull-request-available (was: ) > flink scala shell doesn't work in yarn mode > --- > > Key: FLINK-9554 > URL: https://issues.apache.org/jira/browse/FLINK-9554 > Project: Flink > Issue Type: Bug > Components: Scala Shell >Affects Versions: 1.5.0 >Reporter: Jeff Zhang >Priority: Blocker > Labels: pull-request-available > Fix For: 1.5.1 > > > It still try to use StandaloneCluster even I specify it using yarn mode. > > Command I Use: bin/start-scala-shell.sh yarn -n 1 > > {code:java} > Starting Flink Shell: > 2018-06-06 12:30:02,672 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: jobmanager.rpc.address, localhost > 2018-06-06 12:30:02,673 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: jobmanager.rpc.port, 6123 > 2018-06-06 12:30:02,674 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: jobmanager.heap.mb, 1024 > 2018-06-06 12:30:02,674 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: taskmanager.heap.mb, 1024 > 2018-06-06 12:30:02,674 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: taskmanager.numberOfTaskSlots, 1 > 2018-06-06 12:30:02,674 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: parallelism.default, 1 > 2018-06-06 12:30:02,675 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: rest.port, 8081 > Exception in thread "main" java.lang.UnsupportedOperationException: Can't > deploy a standalone cluster. > at > org.apache.flink.client.deployment.StandaloneClusterDescriptor.deploySessionCluster(StandaloneClusterDescriptor.java:57) > at > org.apache.flink.client.deployment.StandaloneClusterDescriptor.deploySessionCluster(StandaloneClusterDescriptor.java:31) > at > org.apache.flink.api.scala.FlinkShell$.deployNewYarnCluster(FlinkShell.scala:272) > at > org.apache.flink.api.scala.FlinkShell$.fetchConnectionInfo(FlinkShell.scala:164) > at org.apache.flink.api.scala.FlinkShell$.liftedTree1$1(FlinkShell.scala:194) > at org.apache.flink.api.scala.FlinkShell$.startShell(FlinkShell.scala:193) > at org.apache.flink.api.scala.FlinkShell$.main(FlinkShell.scala:135) > at org.apache.flink.api.scala.FlinkShell.main(FlinkShell.scala){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6140: [FLINK-9554] flink scala shell doesn't work in yar...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6140#discussion_r199724031 --- Diff: flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala --- @@ -255,14 +257,25 @@ object FlinkShell { yarnConfig.queue.foreach((queue) => args ++= Seq("-yqu", queue.toString)) yarnConfig.slots.foreach((slots) => args ++= Seq("-ys", slots.toString)) +val customCommandLines = CliFrontend.loadCustomCommandLines( + configuration,configurationDirectory) +val commandOptions = CliFrontendParser.getRunCommandOptions +val customCommandLineOptions = new Options() +customCommandLines.asScala.foreach(cmd => { --- End diff -- this is already done int he `CliFrontend` constructor. It may be sufficient to switch the initialization of `frontend` and `commandLine`. ---
[jira] [Created] (FLINK-9709) Docs: Unnecessary transition in flink job lifecycle visualization
Florian Schmidt created FLINK-9709: -- Summary: Docs: Unnecessary transition in flink job lifecycle visualization Key: FLINK-9709 URL: https://issues.apache.org/jira/browse/FLINK-9709 Project: Flink Issue Type: Bug Components: Documentation Reporter: Florian Schmidt -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9554) flink scala shell doesn't work in yarn mode
[ https://issues.apache.org/jira/browse/FLINK-9554?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16531011#comment-16531011 ] ASF GitHub Bot commented on FLINK-9554: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6140#discussion_r199724031 --- Diff: flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala --- @@ -255,14 +257,25 @@ object FlinkShell { yarnConfig.queue.foreach((queue) => args ++= Seq("-yqu", queue.toString)) yarnConfig.slots.foreach((slots) => args ++= Seq("-ys", slots.toString)) +val customCommandLines = CliFrontend.loadCustomCommandLines( + configuration,configurationDirectory) +val commandOptions = CliFrontendParser.getRunCommandOptions +val customCommandLineOptions = new Options() +customCommandLines.asScala.foreach(cmd => { --- End diff -- this is already done int he `CliFrontend` constructor. It may be sufficient to switch the initialization of `frontend` and `commandLine`. > flink scala shell doesn't work in yarn mode > --- > > Key: FLINK-9554 > URL: https://issues.apache.org/jira/browse/FLINK-9554 > Project: Flink > Issue Type: Bug > Components: Scala Shell >Affects Versions: 1.5.0 >Reporter: Jeff Zhang >Priority: Blocker > Labels: pull-request-available > Fix For: 1.5.1 > > > It still try to use StandaloneCluster even I specify it using yarn mode. > > Command I Use: bin/start-scala-shell.sh yarn -n 1 > > {code:java} > Starting Flink Shell: > 2018-06-06 12:30:02,672 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: jobmanager.rpc.address, localhost > 2018-06-06 12:30:02,673 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: jobmanager.rpc.port, 6123 > 2018-06-06 12:30:02,674 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: jobmanager.heap.mb, 1024 > 2018-06-06 12:30:02,674 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: taskmanager.heap.mb, 1024 > 2018-06-06 12:30:02,674 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: taskmanager.numberOfTaskSlots, 1 > 2018-06-06 12:30:02,674 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: parallelism.default, 1 > 2018-06-06 12:30:02,675 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: rest.port, 8081 > Exception in thread "main" java.lang.UnsupportedOperationException: Can't > deploy a standalone cluster. > at > org.apache.flink.client.deployment.StandaloneClusterDescriptor.deploySessionCluster(StandaloneClusterDescriptor.java:57) > at > org.apache.flink.client.deployment.StandaloneClusterDescriptor.deploySessionCluster(StandaloneClusterDescriptor.java:31) > at > org.apache.flink.api.scala.FlinkShell$.deployNewYarnCluster(FlinkShell.scala:272) > at > org.apache.flink.api.scala.FlinkShell$.fetchConnectionInfo(FlinkShell.scala:164) > at org.apache.flink.api.scala.FlinkShell$.liftedTree1$1(FlinkShell.scala:194) > at org.apache.flink.api.scala.FlinkShell$.startShell(FlinkShell.scala:193) > at org.apache.flink.api.scala.FlinkShell$.main(FlinkShell.scala:135) > at org.apache.flink.api.scala.FlinkShell.main(FlinkShell.scala){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9709) Docs: Unnecessary transition in flink job lifecycle visualization
[ https://issues.apache.org/jira/browse/FLINK-9709?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Florian Schmidt updated FLINK-9709: --- Description: The docs at [https://ci.apache.org/projects/flink/flink-docs-release-1.5/internals/job_scheduling.html] show the state transitions of a flink job. There are two actions called "Fail job" and two transitions called "Cancel job" that cause a transition from Running --> Failed or Running --> Cancelled, where there should only be one > Docs: Unnecessary transition in flink job lifecycle visualization > - > > Key: FLINK-9709 > URL: https://issues.apache.org/jira/browse/FLINK-9709 > Project: Flink > Issue Type: Bug > Components: Documentation >Reporter: Florian Schmidt >Priority: Trivial > > The docs at > [https://ci.apache.org/projects/flink/flink-docs-release-1.5/internals/job_scheduling.html] > show the state transitions of a flink job. There are two actions called > "Fail job" and two transitions called "Cancel job" that cause a transition > from Running --> Failed or Running --> Cancelled, where there should only be > one -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9709) Docs: Unnecessary transition in flink job state visualization
[ https://issues.apache.org/jira/browse/FLINK-9709?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Florian Schmidt updated FLINK-9709: --- Summary: Docs: Unnecessary transition in flink job state visualization (was: Docs: Unnecessary transition in flink job lifecycle visualization) > Docs: Unnecessary transition in flink job state visualization > - > > Key: FLINK-9709 > URL: https://issues.apache.org/jira/browse/FLINK-9709 > Project: Flink > Issue Type: Bug > Components: Documentation >Reporter: Florian Schmidt >Priority: Trivial > > The docs at > [https://ci.apache.org/projects/flink/flink-docs-release-1.5/internals/job_scheduling.html] > show the state transitions of a flink job. There are two actions called > "Fail job" and two transitions called "Cancel job" that cause a transition > from Running --> Failed or Running --> Cancelled, where there should only be > one -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9004) Cluster test: Run general purpose job with failures with Yarn session
[ https://issues.apache.org/jira/browse/FLINK-9004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16531018#comment-16531018 ] ASF GitHub Bot commented on FLINK-9004: --- GitHub user GJL opened a pull request: https://github.com/apache/flink/pull/6240 [FLINK-9004][tests] Implement Jepsen tests to test job availability. ## What is the purpose of the change *Use the Jepsen framework (https://github.com/jepsen-io/jepsen) to implement tests that verify Flink's HA capabilities under real-world faults, such as sudden TaskManager/JobManager termination, HDFS NameNode unavailability, network partitions, etc. The Flink cluster under test is automatically deployed on YARN (session & job mode) and Mesos.* Previous PR got closed accidentally: https://github.com/apache/flink/pull/6239 ## Brief change log - *Implement Jepsen tests.* ## Verifying this change This change added tests and can be verified as follows: - *The changes themselves are tests.* - *Run Jepsen tests in docker containers.* - *Run unit tests with `lein test`* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no** (at least not to Flink)) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** (but it will as soon as test failures appear) / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) cc: @tillrohrmann @cewood @zentol @aljoscha You can merge this pull request into a Git repository by running: $ git pull https://github.com/GJL/flink FLINK-9004 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6240.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6240 commit 063e4621a5982b55ee7f7b0935290bbc717a5a45 Author: gyao Date: 2018-03-05T21:23:33Z [FLINK-9004][tests] Implement Jepsen tests to test job availability. Use the Jepsen framework (https://github.com/jepsen-io/jepsen) to implement tests that verify Flink's HA capabilities under real-world faults, such as sudden TaskManager/JobManager termination, HDFS NameNode unavailability, network partitions, etc. The Flink cluster under test is automatically deployed on YARN (session & job mode) and Mesos. Provide Dockerfiles for local test development. commit 46f0ea7b14c9c59d6cc40903486978f4fd8354d3 Author: gyao Date: 2018-07-02T12:21:18Z fixup! [FLINK-9004][tests] Implement Jepsen tests to test job availability. > Cluster test: Run general purpose job with failures with Yarn session > - > > Key: FLINK-9004 > URL: https://issues.apache.org/jira/browse/FLINK-9004 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Gary Yao >Priority: Blocker > Labels: pull-request-available > Fix For: 1.6.0, 1.5.1 > > > Similar to FLINK-8973, we should run the general purpose job (FLINK-8971) on > a Yarn session cluster and simulate failures. > The job jar should be ill-packaged, meaning that we include too many > dependencies in the user jar. We should include the Scala library, Hadoop and > Flink itself to verify that there are no class loading issues. > The general purpose job should run with misbehavior activated. Additionally, > we should simulate at least the following failure scenarios: > * Kill Flink processes > * Kill connection to storage system for checkpoints and jobs > * Simulate network partition > We should run the test at least with the following state backend: RocksDB > incremental async and checkpointing to S3. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6240: [FLINK-9004][tests] Implement Jepsen tests to test...
GitHub user GJL opened a pull request: https://github.com/apache/flink/pull/6240 [FLINK-9004][tests] Implement Jepsen tests to test job availability. ## What is the purpose of the change *Use the Jepsen framework (https://github.com/jepsen-io/jepsen) to implement tests that verify Flink's HA capabilities under real-world faults, such as sudden TaskManager/JobManager termination, HDFS NameNode unavailability, network partitions, etc. The Flink cluster under test is automatically deployed on YARN (session & job mode) and Mesos.* Previous PR got closed accidentally: https://github.com/apache/flink/pull/6239 ## Brief change log - *Implement Jepsen tests.* ## Verifying this change This change added tests and can be verified as follows: - *The changes themselves are tests.* - *Run Jepsen tests in docker containers.* - *Run unit tests with `lein test`* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no** (at least not to Flink)) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** (but it will as soon as test failures appear) / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) cc: @tillrohrmann @cewood @zentol @aljoscha You can merge this pull request into a Git repository by running: $ git pull https://github.com/GJL/flink FLINK-9004 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6240.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6240 commit 063e4621a5982b55ee7f7b0935290bbc717a5a45 Author: gyao Date: 2018-03-05T21:23:33Z [FLINK-9004][tests] Implement Jepsen tests to test job availability. Use the Jepsen framework (https://github.com/jepsen-io/jepsen) to implement tests that verify Flink's HA capabilities under real-world faults, such as sudden TaskManager/JobManager termination, HDFS NameNode unavailability, network partitions, etc. The Flink cluster under test is automatically deployed on YARN (session & job mode) and Mesos. Provide Dockerfiles for local test development. commit 46f0ea7b14c9c59d6cc40903486978f4fd8354d3 Author: gyao Date: 2018-07-02T12:21:18Z fixup! [FLINK-9004][tests] Implement Jepsen tests to test job availability. ---
[jira] [Created] (FLINK-9710) Make ClusterClient be used as multiple instances in a single jvm process
Chuanlei Ni created FLINK-9710: -- Summary: Make ClusterClient be used as multiple instances in a single jvm process Key: FLINK-9710 URL: https://issues.apache.org/jira/browse/FLINK-9710 Project: Flink Issue Type: Improvement Components: Client Affects Versions: 1.4.2 Reporter: Chuanlei Ni We can use `ClusterClient` to submit job, but it is designed for command. So we cannot use this class in a long running jvm process which will create multiple cluster client concurrently. This Jira aims to make `ClusterClient` be used as multiple instances in a single jvm process. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9004) Cluster test: Run general purpose job with failures with Yarn session
[ https://issues.apache.org/jira/browse/FLINK-9004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16531019#comment-16531019 ] ASF GitHub Bot commented on FLINK-9004: --- Github user GJL commented on the issue: https://github.com/apache/flink/pull/6239 @zentol No problem, I opened a new one. > Cluster test: Run general purpose job with failures with Yarn session > - > > Key: FLINK-9004 > URL: https://issues.apache.org/jira/browse/FLINK-9004 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Gary Yao >Priority: Blocker > Labels: pull-request-available > Fix For: 1.6.0, 1.5.1 > > > Similar to FLINK-8973, we should run the general purpose job (FLINK-8971) on > a Yarn session cluster and simulate failures. > The job jar should be ill-packaged, meaning that we include too many > dependencies in the user jar. We should include the Scala library, Hadoop and > Flink itself to verify that there are no class loading issues. > The general purpose job should run with misbehavior activated. Additionally, > we should simulate at least the following failure scenarios: > * Kill Flink processes > * Kill connection to storage system for checkpoints and jobs > * Simulate network partition > We should run the test at least with the following state backend: RocksDB > incremental async and checkpointing to S3. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6239: [FLINK-9004][tests] Implement Jepsen tests to test job av...
Github user GJL commented on the issue: https://github.com/apache/flink/pull/6239 @zentol No problem, I opened a new one. ---
[GitHub] flink issue #6240: [FLINK-9004][tests] Implement Jepsen tests to test job av...
Github user GJL commented on the issue: https://github.com/apache/flink/pull/6240 As @zentol mentioned, we might want to exclude this from create_source_release.sh. ---
[jira] [Commented] (FLINK-9004) Cluster test: Run general purpose job with failures with Yarn session
[ https://issues.apache.org/jira/browse/FLINK-9004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16531021#comment-16531021 ] ASF GitHub Bot commented on FLINK-9004: --- Github user GJL commented on the issue: https://github.com/apache/flink/pull/6240 As @zentol mentioned, we might want to exclude this from create_source_release.sh. > Cluster test: Run general purpose job with failures with Yarn session > - > > Key: FLINK-9004 > URL: https://issues.apache.org/jira/browse/FLINK-9004 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Gary Yao >Priority: Blocker > Labels: pull-request-available > Fix For: 1.6.0, 1.5.1 > > > Similar to FLINK-8973, we should run the general purpose job (FLINK-8971) on > a Yarn session cluster and simulate failures. > The job jar should be ill-packaged, meaning that we include too many > dependencies in the user jar. We should include the Scala library, Hadoop and > Flink itself to verify that there are no class loading issues. > The general purpose job should run with misbehavior activated. Additionally, > we should simulate at least the following failure scenarios: > * Kill Flink processes > * Kill connection to storage system for checkpoints and jobs > * Simulate network partition > We should run the test at least with the following state backend: RocksDB > incremental async and checkpointing to S3. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9699) Add api to replace registered table
[ https://issues.apache.org/jira/browse/FLINK-9699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16531027#comment-16531027 ] Jeff Zhang commented on FLINK-9699: --- Thanks [~fhueske], let's hold this ticket for now. I will try to look at the flink sql component and see how we do such redesign in a simple way. > Add api to replace registered table > --- > > Key: FLINK-9699 > URL: https://issues.apache.org/jira/browse/FLINK-9699 > Project: Flink > Issue Type: Improvement >Reporter: Jeff Zhang >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9674) Remove 65s sleep in QueryableState E2E test
[ https://issues.apache.org/jira/browse/FLINK-9674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16531030#comment-16531030 ] ASF GitHub Bot commented on FLINK-9674: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/6216 @florianschmidt1994 I don't see the comment you're referring to :( (Did you start a review, but wrote your comment separately? Then your review would still be in progress!) The issues you identified are certainly valid, but I'm wondering if it really makes sense to invest time into this now. There've been discussions about writing a python/java based framework for the tests (and I've already started tinkering on a java version); so any _significant_ changes we make to the bash-scripts might be subsumed soon. > Remove 65s sleep in QueryableState E2E test > --- > > Key: FLINK-9674 > URL: https://issues.apache.org/jira/browse/FLINK-9674 > Project: Flink > Issue Type: Improvement > Components: Queryable State, Tests >Affects Versions: 1.5.0, 1.6.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > > The {{test_queryable_state_restart_tm.sh}} kills a taskmanager, waits for the > loss to be noticed, starts a new tm and waits for the job to continue. > {code} > kill_random_taskmanager > [...] > sleep 65 # this is a little longer than the heartbeat timeout so that the TM > is gone > start_and_wait_for_tm > {code} > Instead of waiting for a fixed amount of time that is tied to some config > value we should wait for a specific event, like the job being canceled. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6216: [FLINK-9674][tests] Replace hard-coded sleeps in QS E2E t...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/6216 @florianschmidt1994 I don't see the comment you're referring to :( (Did you start a review, but wrote your comment separately? Then your review would still be in progress!) The issues you identified are certainly valid, but I'm wondering if it really makes sense to invest time into this now. There've been discussions about writing a python/java based framework for the tests (and I've already started tinkering on a java version); so any _significant_ changes we make to the bash-scripts might be subsumed soon. ---
[jira] [Commented] (FLINK-9674) Remove 65s sleep in QueryableState E2E test
[ https://issues.apache.org/jira/browse/FLINK-9674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16531034#comment-16531034 ] ASF GitHub Bot commented on FLINK-9674: --- Github user florianschmidt1994 commented on a diff in the pull request: https://github.com/apache/flink/pull/6216#discussion_r199719951 --- Diff: flink-end-to-end-tests/test-scripts/test_queryable_state_restart_tm.sh --- @@ -85,20 +90,23 @@ function run_test() { exit 1 fi -local current_num_checkpoints=current_num_checkpoints$(get_completed_number_of_checkpoints ${JOB_ID}) - kill_random_taskmanager latest_snapshot_count=$(cat $FLINK_DIR/log/*out* | grep "on snapshot" | tail -n 1 | awk '{print $4}') echo "Latest snapshot count was ${latest_snapshot_count}" -sleep 65 # this is a little longer than the heartbeat timeout so that the TM is gone +# wait until the TM loss was detected +wait_for_job_state_transition ${JOB_ID} "RESTARTING" "CREATED" start_and_wait_for_tm +wait_job_running ${JOB_ID} + +local current_num_checkpoints="$(get_completed_number_of_checkpoints ${JOB_ID})" --- End diff -- Why did you move this from `before killing the TM` to `after having the TM restarted again` ? > Remove 65s sleep in QueryableState E2E test > --- > > Key: FLINK-9674 > URL: https://issues.apache.org/jira/browse/FLINK-9674 > Project: Flink > Issue Type: Improvement > Components: Queryable State, Tests >Affects Versions: 1.5.0, 1.6.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > > The {{test_queryable_state_restart_tm.sh}} kills a taskmanager, waits for the > loss to be noticed, starts a new tm and waits for the job to continue. > {code} > kill_random_taskmanager > [...] > sleep 65 # this is a little longer than the heartbeat timeout so that the TM > is gone > start_and_wait_for_tm > {code} > Instead of waiting for a fixed amount of time that is tied to some config > value we should wait for a specific event, like the job being canceled. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6216: [FLINK-9674][tests] Replace hard-coded sleeps in Q...
Github user florianschmidt1994 commented on a diff in the pull request: https://github.com/apache/flink/pull/6216#discussion_r199719951 --- Diff: flink-end-to-end-tests/test-scripts/test_queryable_state_restart_tm.sh --- @@ -85,20 +90,23 @@ function run_test() { exit 1 fi -local current_num_checkpoints=current_num_checkpoints$(get_completed_number_of_checkpoints ${JOB_ID}) - kill_random_taskmanager latest_snapshot_count=$(cat $FLINK_DIR/log/*out* | grep "on snapshot" | tail -n 1 | awk '{print $4}') echo "Latest snapshot count was ${latest_snapshot_count}" -sleep 65 # this is a little longer than the heartbeat timeout so that the TM is gone +# wait until the TM loss was detected +wait_for_job_state_transition ${JOB_ID} "RESTARTING" "CREATED" start_and_wait_for_tm +wait_job_running ${JOB_ID} + +local current_num_checkpoints="$(get_completed_number_of_checkpoints ${JOB_ID})" --- End diff -- Why did you move this from `before killing the TM` to `after having the TM restarted again` ? ---
[GitHub] flink issue #6216: [FLINK-9674][tests] Replace hard-coded sleeps in QS E2E t...
Github user florianschmidt1994 commented on the issue: https://github.com/apache/flink/pull/6216 > (Did you start a review, but wrote your comment separately? Then your review would still be in progress!) Yes :D I submitted it now >The issues you identified are certainly valid, but I'm wondering if it really makes sense to invest time into this now. >There've been discussions about writing a python/java based framework for the tests (and I've already started tinkering on a java version); so any significant changes we make to the bash-scripts might be subsumed soonâ¢. Alright, then let's leave it this way and we can still take care of it should we run into problems with this in the future ---
[jira] [Commented] (FLINK-9674) Remove 65s sleep in QueryableState E2E test
[ https://issues.apache.org/jira/browse/FLINK-9674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16531040#comment-16531040 ] ASF GitHub Bot commented on FLINK-9674: --- Github user florianschmidt1994 commented on the issue: https://github.com/apache/flink/pull/6216 > (Did you start a review, but wrote your comment separately? Then your review would still be in progress!) Yes :D I submitted it now >The issues you identified are certainly valid, but I'm wondering if it really makes sense to invest time into this now. >There've been discussions about writing a python/java based framework for the tests (and I've already started tinkering on a java version); so any significant changes we make to the bash-scripts might be subsumed soon™. Alright, then let's leave it this way and we can still take care of it should we run into problems with this in the future > Remove 65s sleep in QueryableState E2E test > --- > > Key: FLINK-9674 > URL: https://issues.apache.org/jira/browse/FLINK-9674 > Project: Flink > Issue Type: Improvement > Components: Queryable State, Tests >Affects Versions: 1.5.0, 1.6.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > > The {{test_queryable_state_restart_tm.sh}} kills a taskmanager, waits for the > loss to be noticed, starts a new tm and waits for the job to continue. > {code} > kill_random_taskmanager > [...] > sleep 65 # this is a little longer than the heartbeat timeout so that the TM > is gone > start_and_wait_for_tm > {code} > Instead of waiting for a fixed amount of time that is tied to some config > value we should wait for a specific event, like the job being canceled. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9444) KafkaAvroTableSource failed to work for map and array fields
[ https://issues.apache.org/jira/browse/FLINK-9444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16531044#comment-16531044 ] ASF GitHub Bot commented on FLINK-9444: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6218#discussion_r199731327 --- Diff: flink-core/src/test/java/org/apache/flink/api/java/typeutils/RowTypeInfoTest.java --- @@ -123,4 +125,24 @@ public void testNestedRowTypeInfo() { assertEquals("Short", typeInfo.getTypeAt("f1.f0").toString()); } + @Test + public void testSchemaEquals() { + final RowTypeInfo row1 = new RowTypeInfo( + new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO}, + new String[] {"field1", "field2"}); + final RowTypeInfo row2 = new RowTypeInfo( + new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO}, + new String[] {"field1", "field2"}); + assertTrue(row1.schemaEquals(row2)); --- End diff -- This is covered by the test base. But I added another test data entry with different field names. > KafkaAvroTableSource failed to work for map and array fields > > > Key: FLINK-9444 > URL: https://issues.apache.org/jira/browse/FLINK-9444 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Table API SQL >Affects Versions: 1.6.0 >Reporter: Jun Zhang >Assignee: Jun Zhang >Priority: Blocker > Labels: patch, pull-request-available > Fix For: 1.6.0 > > Attachments: flink-9444.patch > > > When some Avro schema has map/array fields and the corresponding TableSchema > declares *MapTypeInfo/ListTypeInfo* for these fields, an exception will be > thrown when registering the *KafkaAvroTableSource*, complaining like: > Exception in thread "main" org.apache.flink.table.api.ValidationException: > Type Map of table field 'event' does not match with type > GenericType of the field 'event' of the TableSource return > type. > at org.apache.flink.table.api.ValidationException$.apply(exceptions.scala:74) > at > org.apache.flink.table.sources.TableSourceUtil$$anonfun$validateTableSource$1.apply(TableSourceUtil.scala:92) > at > org.apache.flink.table.sources.TableSourceUtil$$anonfun$validateTableSource$1.apply(TableSourceUtil.scala:71) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) > at > org.apache.flink.table.sources.TableSourceUtil$.validateTableSource(TableSourceUtil.scala:71) > at > org.apache.flink.table.plan.schema.StreamTableSourceTable.(StreamTableSourceTable.scala:33) > at > org.apache.flink.table.api.StreamTableEnvironment.registerTableSourceInternal(StreamTableEnvironment.scala:124) > at > org.apache.flink.table.api.TableEnvironment.registerTableSource(TableEnvironment.scala:438) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6218: [FLINK-9444] [formats] Add full SQL support for Av...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6218#discussion_r199731327 --- Diff: flink-core/src/test/java/org/apache/flink/api/java/typeutils/RowTypeInfoTest.java --- @@ -123,4 +125,24 @@ public void testNestedRowTypeInfo() { assertEquals("Short", typeInfo.getTypeAt("f1.f0").toString()); } + @Test + public void testSchemaEquals() { + final RowTypeInfo row1 = new RowTypeInfo( + new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO}, + new String[] {"field1", "field2"}); + final RowTypeInfo row2 = new RowTypeInfo( + new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO}, + new String[] {"field1", "field2"}); + assertTrue(row1.schemaEquals(row2)); --- End diff -- This is covered by the test base. But I added another test data entry with different field names. ---
[jira] [Commented] (FLINK-9699) Add api to replace registered table
[ https://issues.apache.org/jira/browse/FLINK-9699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530922#comment-16530922 ] Jeff Zhang commented on FLINK-9699: --- [~fhueske] I don't know much about the internal mechanism of flink sql. But is it possible to translate which DataSet does MyT point to beforehand instead of delaying to converted into a DataStream/DataSet or emitted through a TableSink. > Add api to replace registered table > --- > > Key: FLINK-9699 > URL: https://issues.apache.org/jira/browse/FLINK-9699 > Project: Flink > Issue Type: Improvement >Reporter: Jeff Zhang >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9636) Network buffer leaks in requesting a batch of segments during canceling
[ https://issues.apache.org/jira/browse/FLINK-9636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530927#comment-16530927 ] ASF GitHub Bot commented on FLINK-9636: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/6238#discussion_r199702444 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java --- @@ -147,7 +151,12 @@ public void recycle(MemorySegment segment) { this.numTotalRequiredBuffers += numRequiredBuffers; - redistributeBuffers(); + try { + redistributeBuffers(); + } catch (Throwable t) { + this.numTotalRequiredBuffers -= numRequiredBuffers; + ExceptionUtils.rethrowIOException(t); + } } final List segments = new ArrayList<>(numRequiredBuffers); --- End diff -- ah, true, thanks for pointing this out, I must have been blind on one eye yesterday I'll integrate this change as well > Network buffer leaks in requesting a batch of segments during canceling > --- > > Key: FLINK-9636 > URL: https://issues.apache.org/jira/browse/FLINK-9636 > Project: Flink > Issue Type: Bug > Components: Network >Affects Versions: 1.5.0, 1.6.0 >Reporter: zhijiang >Assignee: Nico Kruber >Priority: Major > Labels: pull-request-available > Fix For: 1.5.1 > > > In {{NetworkBufferPool#requestMemorySegments}}, {{numTotalRequiredBuffers}} > is increased by {{numRequiredBuffers}} first. > If {{InterruptedException}} is thrown during polling segments from the > available queue, the requested segments will be recycled back to > {{NetworkBufferPool}}, {{numTotalRequiredBuffers}} is decreased by the number > of polled segments which is now inconsistent with {{numRequiredBuffers}}. So > {{numTotalRequiredBuffers}} in {{NetworkBufferPool}} leaks in this case, and > we can also decrease {{numRequiredBuffers}} to fix this bug. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9622) DistributedCacheDfsTest failed on travis
[ https://issues.apache.org/jira/browse/FLINK-9622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530931#comment-16530931 ] Till Rohrmann commented on FLINK-9622: -- The problem there seems to be that {{LocalFileSystem#mkdirsInternal}} fails if there is a concurrent directory make operation happening [1]. [1] https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java#L257 > DistributedCacheDfsTest failed on travis > > > Key: FLINK-9622 > URL: https://issues.apache.org/jira/browse/FLINK-9622 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.6.0 >Reporter: Sihua Zhou >Assignee: Till Rohrmann >Priority: Blocker > Fix For: 1.6.0 > > > DistributedCacheDfsTest#testDistributeFileViaDFS() failed flakey on travis. > instance: https://api.travis-ci.org/v3/job/394399700/log.txt -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9707) LocalFileSystem does not support concurrent directory creations
Till Rohrmann created FLINK-9707: Summary: LocalFileSystem does not support concurrent directory creations Key: FLINK-9707 URL: https://issues.apache.org/jira/browse/FLINK-9707 Project: Flink Issue Type: Improvement Components: FileSystem Affects Versions: 1.5.0, 1.6.0 Reporter: Till Rohrmann Assignee: Till Rohrmann Fix For: 1.6.0, 1.5.1 The {{LocalFileSystem}} does not support concurrent directory creations. The consequence is that file system operations fail. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6201: [FLINK-8866][Table API & SQL] Add support for unified tab...
Github user suez1224 commented on the issue: https://github.com/apache/flink/pull/6201 @fhueske @twalthr thanks for the comments. In `from-source`, the only system i know of is Kafka10 or Kafka11, which support writing record along with timestamp. To support `from-source` in table sink, I think we can do the following: 1) add a connector property, e.g. connector.support-timestamp. Only if connector.support-timestamp is true, we will allow the sink table schema to contain a field with rowtime type `from-source`. Otherwise, an exception will be thrown. 2) if the condition in 1) is satisfied, we will create corresponding rowtime field in the sink table schema with type LONG, in TableEnvironment.insertInto(), we will validate the sink schema against the insertion source. Also, in the TableSink.emitDataStream() implementation, we will need to insert an timestamp assigner operator to set StreamRecord.timestamp (should we reuse existing interface, or create a new timestampInserter interface?) and remove the extra rowtime field from StreamRecord.value before we emit the datastream to the sink. (for kafkaTableSink, we will also need to invoke setWriteTimestampToKafka(true)) Please correct me if I missed something here. What do you think? ---
[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530939#comment-16530939 ] ASF GitHub Bot commented on FLINK-8866: --- Github user suez1224 commented on the issue: https://github.com/apache/flink/pull/6201 @fhueske @twalthr thanks for the comments. In `from-source`, the only system i know of is Kafka10 or Kafka11, which support writing record along with timestamp. To support `from-source` in table sink, I think we can do the following: 1) add a connector property, e.g. connector.support-timestamp. Only if connector.support-timestamp is true, we will allow the sink table schema to contain a field with rowtime type `from-source`. Otherwise, an exception will be thrown. 2) if the condition in 1) is satisfied, we will create corresponding rowtime field in the sink table schema with type LONG, in TableEnvironment.insertInto(), we will validate the sink schema against the insertion source. Also, in the TableSink.emitDataStream() implementation, we will need to insert an timestamp assigner operator to set StreamRecord.timestamp (should we reuse existing interface, or create a new timestampInserter interface?) and remove the extra rowtime field from StreamRecord.value before we emit the datastream to the sink. (for kafkaTableSink, we will also need to invoke setWriteTimestampToKafka(true)) Please correct me if I missed something here. What do you think? > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9707) LocalFileSystem does not support concurrent directory creations
[ https://issues.apache.org/jira/browse/FLINK-9707?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-9707: - Priority: Blocker (was: Major) > LocalFileSystem does not support concurrent directory creations > --- > > Key: FLINK-9707 > URL: https://issues.apache.org/jira/browse/FLINK-9707 > Project: Flink > Issue Type: Improvement > Components: FileSystem >Affects Versions: 1.5.0, 1.6.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Blocker > Fix For: 1.6.0, 1.5.1 > > > The {{LocalFileSystem}} does not support concurrent directory creations. The > consequence is that file system operations fail. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9708) Network buffer leaks when buffer request fails during buffer redistribution
Nico Kruber created FLINK-9708: -- Summary: Network buffer leaks when buffer request fails during buffer redistribution Key: FLINK-9708 URL: https://issues.apache.org/jira/browse/FLINK-9708 Project: Flink Issue Type: Bug Components: Network Affects Versions: 1.5.0 Reporter: Nico Kruber Assignee: Nico Kruber Fix For: 1.5.1 If an exception is thrown in {{NetworkBufferPool#requestMemorySegments()}}'s first call to {{redistributeBuffers()}}, the accounting for {{numTotalRequiredBuffers}} is wrong for future uses of this buffer pool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9688) ATAN2 Sql Function support
[ https://issues.apache.org/jira/browse/FLINK-9688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530946#comment-16530946 ] ASF GitHub Bot commented on FLINK-9688: --- Github user snuyanzin commented on the issue: https://github.com/apache/flink/pull/6223 @fhueske @hequn8128 @yanghua thank you very much for your review and comments I did almost all the corrections except one related to atan2 description So I have a question: as @hequn8128 wrote > I checked the [wiki](https://en.wikipedia.org/wiki/Atan2) about atan2, it said: The atan2 function calculates one unique arc tangent value from two variables y and x. So, would it be better of two variables? atan2(y,x) can be the arc tangent of (x,y) or (nx, ny). At the same time I checked [Calcite's definition](https://calcite.apache.org/docs/reference.html) of it: _ATAN2(numeric, numeric) | Returns the arc tangent of the numeric coordinates._ What do you think what is more suitable? > ATAN2 Sql Function support > -- > > Key: FLINK-9688 > URL: https://issues.apache.org/jira/browse/FLINK-9688 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Sergey Nuyanzin >Assignee: Sergey Nuyanzin >Priority: Minor > Labels: pull-request-available > > simple query fails {code} > ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); > BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, > config()); > DataSet> ds = > CollectionDataSets.get3TupleDataSet(env); > tableEnv.registerDataSet("t1", ds, "x, y, z"); > String sqlQuery = "SELECT atan2(1,2)"; > Table result = tableEnv.sqlQuery(sqlQuery); > {code} > while at the same time Calcite supports it and in Calcite's sqlline it works > like {noformat} > 0: jdbc:calcite:model=target/test-classes/mod> select atan2(1,2); > +-+ > | EXPR$0 | > +-+ > | 0.4636476090008061 | > +-+ > 1 row selected (0.173 seconds) > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9636) Network buffer leaks in requesting a batch of segments during canceling
[ https://issues.apache.org/jira/browse/FLINK-9636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530944#comment-16530944 ] ASF GitHub Bot commented on FLINK-9636: --- Github user NicoK commented on the issue: https://github.com/apache/flink/pull/6238 actually, it was quite easy to reproduce and the fix was also just as you proposed - please see the new commits (the old one was only renamed since I created a separate issue for that now) > Network buffer leaks in requesting a batch of segments during canceling > --- > > Key: FLINK-9636 > URL: https://issues.apache.org/jira/browse/FLINK-9636 > Project: Flink > Issue Type: Bug > Components: Network >Affects Versions: 1.5.0, 1.6.0 >Reporter: zhijiang >Assignee: Nico Kruber >Priority: Major > Labels: pull-request-available > Fix For: 1.5.1 > > > In {{NetworkBufferPool#requestMemorySegments}}, {{numTotalRequiredBuffers}} > is increased by {{numRequiredBuffers}} first. > If {{InterruptedException}} is thrown during polling segments from the > available queue, the requested segments will be recycled back to > {{NetworkBufferPool}}, {{numTotalRequiredBuffers}} is decreased by the number > of polled segments which is now inconsistent with {{numRequiredBuffers}}. So > {{numTotalRequiredBuffers}} in {{NetworkBufferPool}} leaks in this case, and > we can also decrease {{numRequiredBuffers}} to fix this bug. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6238: [FLINK-9636][network] fix inconsistency with failed buffe...
Github user NicoK commented on the issue: https://github.com/apache/flink/pull/6238 actually, it was quite easy to reproduce and the fix was also just as you proposed - please see the new commits (the old one was only renamed since I created a separate issue for that now) ---
[GitHub] flink issue #6223: [FLINK-9688] ATAN2 sql function support
Github user snuyanzin commented on the issue: https://github.com/apache/flink/pull/6223 @fhueske @hequn8128 @yanghua thank you very much for your review and comments I did almost all the corrections except one related to atan2 description So I have a question: as @hequn8128 wrote > I checked the [wiki](https://en.wikipedia.org/wiki/Atan2) about atan2, it said: The atan2 function calculates one unique arc tangent value from two variables y and x. So, would it be better of two variables? atan2(y,x) can be the arc tangent of (x,y) or (nx, ny). At the same time I checked [Calcite's definition](https://calcite.apache.org/docs/reference.html) of it: _ATAN2(numeric, numeric) | Returns the arc tangent of the numeric coordinates._ What do you think what is more suitable? ---
[jira] [Updated] (FLINK-9554) flink scala shell doesn't work in yarn mode
[ https://issues.apache.org/jira/browse/FLINK-9554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Zhang updated FLINK-9554: -- Fix Version/s: 1.5.1 > flink scala shell doesn't work in yarn mode > --- > > Key: FLINK-9554 > URL: https://issues.apache.org/jira/browse/FLINK-9554 > Project: Flink > Issue Type: Bug > Components: Scala Shell >Affects Versions: 1.5.0 >Reporter: Jeff Zhang >Priority: Blocker > Fix For: 1.5.1 > > > It still try to use StandaloneCluster even I specify it using yarn mode. > > Command I Use: bin/start-scala-shell.sh yarn -n 1 > > {code:java} > Starting Flink Shell: > 2018-06-06 12:30:02,672 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: jobmanager.rpc.address, localhost > 2018-06-06 12:30:02,673 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: jobmanager.rpc.port, 6123 > 2018-06-06 12:30:02,674 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: jobmanager.heap.mb, 1024 > 2018-06-06 12:30:02,674 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: taskmanager.heap.mb, 1024 > 2018-06-06 12:30:02,674 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: taskmanager.numberOfTaskSlots, 1 > 2018-06-06 12:30:02,674 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: parallelism.default, 1 > 2018-06-06 12:30:02,675 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: rest.port, 8081 > Exception in thread "main" java.lang.UnsupportedOperationException: Can't > deploy a standalone cluster. > at > org.apache.flink.client.deployment.StandaloneClusterDescriptor.deploySessionCluster(StandaloneClusterDescriptor.java:57) > at > org.apache.flink.client.deployment.StandaloneClusterDescriptor.deploySessionCluster(StandaloneClusterDescriptor.java:31) > at > org.apache.flink.api.scala.FlinkShell$.deployNewYarnCluster(FlinkShell.scala:272) > at > org.apache.flink.api.scala.FlinkShell$.fetchConnectionInfo(FlinkShell.scala:164) > at org.apache.flink.api.scala.FlinkShell$.liftedTree1$1(FlinkShell.scala:194) > at org.apache.flink.api.scala.FlinkShell$.startShell(FlinkShell.scala:193) > at org.apache.flink.api.scala.FlinkShell$.main(FlinkShell.scala:135) > at org.apache.flink.api.scala.FlinkShell.main(FlinkShell.scala){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9554) flink scala shell doesn't work in yarn mode
[ https://issues.apache.org/jira/browse/FLINK-9554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Zhang updated FLINK-9554: -- Priority: Blocker (was: Major) > flink scala shell doesn't work in yarn mode > --- > > Key: FLINK-9554 > URL: https://issues.apache.org/jira/browse/FLINK-9554 > Project: Flink > Issue Type: Bug > Components: Scala Shell >Affects Versions: 1.5.0 >Reporter: Jeff Zhang >Priority: Blocker > Fix For: 1.5.1 > > > It still try to use StandaloneCluster even I specify it using yarn mode. > > Command I Use: bin/start-scala-shell.sh yarn -n 1 > > {code:java} > Starting Flink Shell: > 2018-06-06 12:30:02,672 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: jobmanager.rpc.address, localhost > 2018-06-06 12:30:02,673 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: jobmanager.rpc.port, 6123 > 2018-06-06 12:30:02,674 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: jobmanager.heap.mb, 1024 > 2018-06-06 12:30:02,674 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: taskmanager.heap.mb, 1024 > 2018-06-06 12:30:02,674 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: taskmanager.numberOfTaskSlots, 1 > 2018-06-06 12:30:02,674 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: parallelism.default, 1 > 2018-06-06 12:30:02,675 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: rest.port, 8081 > Exception in thread "main" java.lang.UnsupportedOperationException: Can't > deploy a standalone cluster. > at > org.apache.flink.client.deployment.StandaloneClusterDescriptor.deploySessionCluster(StandaloneClusterDescriptor.java:57) > at > org.apache.flink.client.deployment.StandaloneClusterDescriptor.deploySessionCluster(StandaloneClusterDescriptor.java:31) > at > org.apache.flink.api.scala.FlinkShell$.deployNewYarnCluster(FlinkShell.scala:272) > at > org.apache.flink.api.scala.FlinkShell$.fetchConnectionInfo(FlinkShell.scala:164) > at org.apache.flink.api.scala.FlinkShell$.liftedTree1$1(FlinkShell.scala:194) > at org.apache.flink.api.scala.FlinkShell$.startShell(FlinkShell.scala:193) > at org.apache.flink.api.scala.FlinkShell$.main(FlinkShell.scala:135) > at org.apache.flink.api.scala.FlinkShell.main(FlinkShell.scala){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6239: [FLINK-9004][tests] Implement Jepsen tests to test job av...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/6239 As i understand it we don't want this to be part of the source release, as such we need an exclusion in [create_source_release.sh](https://github.com/apache/flink/blob/master/tools/releasing/create_source_release.sh). ---
[GitHub] flink pull request #6239: [FLINK-9004][tests] Implement Jepsen tests to test...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6239 ---
[GitHub] flink pull request #6222: [FLINK-8785][rest] Handle JobSubmissionExceptions
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6222#discussion_r199719645 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java --- @@ -66,6 +67,9 @@ public JobSubmitHandler( } return gateway.submitJob(jobGraph, timeout) - .thenApply(ack -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID())); + .thenApply(ack -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID())) + .exceptionally(exception -> { + throw new CompletionException(new RestHandlerException("Job submission failed.", HttpResponseStatus.INTERNAL_SERVER_ERROR, exception)); --- End diff -- Do note that this discussion isn't really blocking the PR from being merged as it would effectively be an extension of it. ---
[jira] [Commented] (FLINK-8785) JobSubmitHandler does not handle JobSubmissionExceptions
[ https://issues.apache.org/jira/browse/FLINK-8785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530990#comment-16530990 ] ASF GitHub Bot commented on FLINK-8785: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6222#discussion_r199719645 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java --- @@ -66,6 +67,9 @@ public JobSubmitHandler( } return gateway.submitJob(jobGraph, timeout) - .thenApply(ack -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID())); + .thenApply(ack -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID())) + .exceptionally(exception -> { + throw new CompletionException(new RestHandlerException("Job submission failed.", HttpResponseStatus.INTERNAL_SERVER_ERROR, exception)); --- End diff -- Do note that this discussion isn't really blocking the PR from being merged as it would effectively be an extension of it. > JobSubmitHandler does not handle JobSubmissionExceptions > > > Key: FLINK-8785 > URL: https://issues.apache.org/jira/browse/FLINK-8785 > Project: Flink > Issue Type: Bug > Components: Job-Submission, JobManager, REST >Affects Versions: 1.5.0, 1.6.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Blocker > Labels: flip-6, pull-request-available > > If the job submission, i.e. {{DispatcherGateway#submitJob}} fails with a > {{JobSubmissionException}} the {{JobSubmissionHandler}} returns "Internal > server error" instead of signaling the failed job submission. > This can for example occur if the transmitted execution graph is faulty, as > tested by the \{{JobSubmissionFailsITCase}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6218: [FLINK-9444] [formats] Add full SQL support for Av...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6218#discussion_r199733710 --- Diff: flink-formats/flink-avro/pom.xml --- @@ -51,6 +51,17 @@ under the License. + + joda-time + joda-time + --- End diff -- Yes, we assume that the user provides a Joda-Time that matches the specific record. We only call 4 methods. I think changes are unlikely there. I went for the Flink-version the Avro version would be `2.9` but we would always have to keep this in sync. ---
[jira] [Commented] (FLINK-9444) KafkaAvroTableSource failed to work for map and array fields
[ https://issues.apache.org/jira/browse/FLINK-9444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16531052#comment-16531052 ] ASF GitHub Bot commented on FLINK-9444: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6218#discussion_r199733710 --- Diff: flink-formats/flink-avro/pom.xml --- @@ -51,6 +51,17 @@ under the License. + + joda-time + joda-time + --- End diff -- Yes, we assume that the user provides a Joda-Time that matches the specific record. We only call 4 methods. I think changes are unlikely there. I went for the Flink-version the Avro version would be `2.9` but we would always have to keep this in sync. > KafkaAvroTableSource failed to work for map and array fields > > > Key: FLINK-9444 > URL: https://issues.apache.org/jira/browse/FLINK-9444 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Table API SQL >Affects Versions: 1.6.0 >Reporter: Jun Zhang >Assignee: Jun Zhang >Priority: Blocker > Labels: patch, pull-request-available > Fix For: 1.6.0 > > Attachments: flink-9444.patch > > > When some Avro schema has map/array fields and the corresponding TableSchema > declares *MapTypeInfo/ListTypeInfo* for these fields, an exception will be > thrown when registering the *KafkaAvroTableSource*, complaining like: > Exception in thread "main" org.apache.flink.table.api.ValidationException: > Type Map of table field 'event' does not match with type > GenericType of the field 'event' of the TableSource return > type. > at org.apache.flink.table.api.ValidationException$.apply(exceptions.scala:74) > at > org.apache.flink.table.sources.TableSourceUtil$$anonfun$validateTableSource$1.apply(TableSourceUtil.scala:92) > at > org.apache.flink.table.sources.TableSourceUtil$$anonfun$validateTableSource$1.apply(TableSourceUtil.scala:71) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) > at > org.apache.flink.table.sources.TableSourceUtil$.validateTableSource(TableSourceUtil.scala:71) > at > org.apache.flink.table.plan.schema.StreamTableSourceTable.(StreamTableSourceTable.scala:33) > at > org.apache.flink.table.api.StreamTableEnvironment.registerTableSourceInternal(StreamTableEnvironment.scala:124) > at > org.apache.flink.table.api.TableEnvironment.registerTableSource(TableEnvironment.scala:438) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6218: [FLINK-9444] [formats] Add full SQL support for Av...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6218#discussion_r199734399 --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java --- @@ -44,7 +41,7 @@ @Override protected void configureBuilder(KafkaTableSource.Builder builder) { super.configureBuilder(builder); - ((KafkaAvroTableSource.Builder) builder).forAvroRecordClass(SameFieldsAvroClass.class); + ((KafkaAvroTableSource.Builder) builder).forAvroRecordClass(SchemaRecord.class); --- End diff -- No, but it simplifies the code base and uses only real-world generated records for testing. ---
[jira] [Commented] (FLINK-9444) KafkaAvroTableSource failed to work for map and array fields
[ https://issues.apache.org/jira/browse/FLINK-9444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16531056#comment-16531056 ] ASF GitHub Bot commented on FLINK-9444: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6218#discussion_r199734399 --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java --- @@ -44,7 +41,7 @@ @Override protected void configureBuilder(KafkaTableSource.Builder builder) { super.configureBuilder(builder); - ((KafkaAvroTableSource.Builder) builder).forAvroRecordClass(SameFieldsAvroClass.class); + ((KafkaAvroTableSource.Builder) builder).forAvroRecordClass(SchemaRecord.class); --- End diff -- No, but it simplifies the code base and uses only real-world generated records for testing. > KafkaAvroTableSource failed to work for map and array fields > > > Key: FLINK-9444 > URL: https://issues.apache.org/jira/browse/FLINK-9444 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Table API SQL >Affects Versions: 1.6.0 >Reporter: Jun Zhang >Assignee: Jun Zhang >Priority: Blocker > Labels: patch, pull-request-available > Fix For: 1.6.0 > > Attachments: flink-9444.patch > > > When some Avro schema has map/array fields and the corresponding TableSchema > declares *MapTypeInfo/ListTypeInfo* for these fields, an exception will be > thrown when registering the *KafkaAvroTableSource*, complaining like: > Exception in thread "main" org.apache.flink.table.api.ValidationException: > Type Map of table field 'event' does not match with type > GenericType of the field 'event' of the TableSource return > type. > at org.apache.flink.table.api.ValidationException$.apply(exceptions.scala:74) > at > org.apache.flink.table.sources.TableSourceUtil$$anonfun$validateTableSource$1.apply(TableSourceUtil.scala:92) > at > org.apache.flink.table.sources.TableSourceUtil$$anonfun$validateTableSource$1.apply(TableSourceUtil.scala:71) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) > at > org.apache.flink.table.sources.TableSourceUtil$.validateTableSource(TableSourceUtil.scala:71) > at > org.apache.flink.table.plan.schema.StreamTableSourceTable.(StreamTableSourceTable.scala:33) > at > org.apache.flink.table.api.StreamTableEnvironment.registerTableSourceInternal(StreamTableEnvironment.scala:124) > at > org.apache.flink.table.api.TableEnvironment.registerTableSource(TableEnvironment.scala:438) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9444) KafkaAvroTableSource failed to work for map and array fields
[ https://issues.apache.org/jira/browse/FLINK-9444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16531061#comment-16531061 ] ASF GitHub Bot commented on FLINK-9444: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6218#discussion_r199736070 --- Diff: flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java --- @@ -26,7 +26,7 @@ import org.apache.flink.api.java.typeutils.runtime.PojoSerializer; import org.apache.flink.api.java.typeutils.runtime.PojoSerializer.PojoSerializerConfigSnapshot; import org.apache.flink.core.memory.DataInputViewStreamWrapper; -import org.apache.flink.formats.avro.generated.User; +import org.apache.flink.formats.avro.generated.SimpleUser; --- End diff -- The problem is that {{BackwardsCompatibleAvroSerializer}} does not support records with logical types. Logical types need a Kryo configuration that the serializer does not set correctly. This might be a bug or at least a missing feature. Given that this serializer only exists for backwards compatibility for 1.3 (which used Avro 1.7 without logical type), I added a simple user for this test. I will add a comment about this to the code. > KafkaAvroTableSource failed to work for map and array fields > > > Key: FLINK-9444 > URL: https://issues.apache.org/jira/browse/FLINK-9444 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Table API SQL >Affects Versions: 1.6.0 >Reporter: Jun Zhang >Assignee: Jun Zhang >Priority: Blocker > Labels: patch, pull-request-available > Fix For: 1.6.0 > > Attachments: flink-9444.patch > > > When some Avro schema has map/array fields and the corresponding TableSchema > declares *MapTypeInfo/ListTypeInfo* for these fields, an exception will be > thrown when registering the *KafkaAvroTableSource*, complaining like: > Exception in thread "main" org.apache.flink.table.api.ValidationException: > Type Map of table field 'event' does not match with type > GenericType of the field 'event' of the TableSource return > type. > at org.apache.flink.table.api.ValidationException$.apply(exceptions.scala:74) > at > org.apache.flink.table.sources.TableSourceUtil$$anonfun$validateTableSource$1.apply(TableSourceUtil.scala:92) > at > org.apache.flink.table.sources.TableSourceUtil$$anonfun$validateTableSource$1.apply(TableSourceUtil.scala:71) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) > at > org.apache.flink.table.sources.TableSourceUtil$.validateTableSource(TableSourceUtil.scala:71) > at > org.apache.flink.table.plan.schema.StreamTableSourceTable.(StreamTableSourceTable.scala:33) > at > org.apache.flink.table.api.StreamTableEnvironment.registerTableSourceInternal(StreamTableEnvironment.scala:124) > at > org.apache.flink.table.api.TableEnvironment.registerTableSource(TableEnvironment.scala:438) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6218: [FLINK-9444] [formats] Add full SQL support for Av...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6218#discussion_r199736070 --- Diff: flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java --- @@ -26,7 +26,7 @@ import org.apache.flink.api.java.typeutils.runtime.PojoSerializer; import org.apache.flink.api.java.typeutils.runtime.PojoSerializer.PojoSerializerConfigSnapshot; import org.apache.flink.core.memory.DataInputViewStreamWrapper; -import org.apache.flink.formats.avro.generated.User; +import org.apache.flink.formats.avro.generated.SimpleUser; --- End diff -- The problem is that {{BackwardsCompatibleAvroSerializer}} does not support records with logical types. Logical types need a Kryo configuration that the serializer does not set correctly. This might be a bug or at least a missing feature. Given that this serializer only exists for backwards compatibility for 1.3 (which used Avro 1.7 without logical type), I added a simple user for this test. I will add a comment about this to the code. ---
[GitHub] flink pull request #6216: [FLINK-9674][tests] Replace hard-coded sleeps in Q...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6216#discussion_r199738568 --- Diff: flink-end-to-end-tests/test-scripts/test_queryable_state_restart_tm.sh --- @@ -85,20 +90,23 @@ function run_test() { exit 1 fi -local current_num_checkpoints=current_num_checkpoints$(get_completed_number_of_checkpoints ${JOB_ID}) - kill_random_taskmanager latest_snapshot_count=$(cat $FLINK_DIR/log/*out* | grep "on snapshot" | tail -n 1 | awk '{print $4}') echo "Latest snapshot count was ${latest_snapshot_count}" -sleep 65 # this is a little longer than the heartbeat timeout so that the TM is gone +# wait until the TM loss was detected +wait_for_job_state_transition ${JOB_ID} "RESTARTING" "CREATED" start_and_wait_for_tm +wait_job_running ${JOB_ID} + +local current_num_checkpoints="$(get_completed_number_of_checkpoints ${JOB_ID})" --- End diff -- So when I ran this test locally it frequently occurred that we never actually waited for checkpoints to complete; the number of checkpoint that we waited for had already occurred and we exited early. The job just switches to running, and right away we got 3 completed checkpoints? This was a bit, well, _odd_. In the previous version the checkpoint count could further increase while we shut down the TM. Technically there's no guarantee how up-to-date the result if `get_completed_number_of_checkpoints` or how long `kill_random_taskmanager` actually takes, so we may have fulfilled the checkpoint count condition before the job even restarts. It's admittedly unlikely. This change simply guarantees that the job actually completes N checkpoints after it was restarted, and just serves to eliminate doubts about the correctness of the test. ---
[jira] [Commented] (FLINK-9674) Remove 65s sleep in QueryableState E2E test
[ https://issues.apache.org/jira/browse/FLINK-9674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16531072#comment-16531072 ] ASF GitHub Bot commented on FLINK-9674: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6216#discussion_r199738568 --- Diff: flink-end-to-end-tests/test-scripts/test_queryable_state_restart_tm.sh --- @@ -85,20 +90,23 @@ function run_test() { exit 1 fi -local current_num_checkpoints=current_num_checkpoints$(get_completed_number_of_checkpoints ${JOB_ID}) - kill_random_taskmanager latest_snapshot_count=$(cat $FLINK_DIR/log/*out* | grep "on snapshot" | tail -n 1 | awk '{print $4}') echo "Latest snapshot count was ${latest_snapshot_count}" -sleep 65 # this is a little longer than the heartbeat timeout so that the TM is gone +# wait until the TM loss was detected +wait_for_job_state_transition ${JOB_ID} "RESTARTING" "CREATED" start_and_wait_for_tm +wait_job_running ${JOB_ID} + +local current_num_checkpoints="$(get_completed_number_of_checkpoints ${JOB_ID})" --- End diff -- So when I ran this test locally it frequently occurred that we never actually waited for checkpoints to complete; the number of checkpoint that we waited for had already occurred and we exited early. The job just switches to running, and right away we got 3 completed checkpoints? This was a bit, well, _odd_. In the previous version the checkpoint count could further increase while we shut down the TM. Technically there's no guarantee how up-to-date the result if `get_completed_number_of_checkpoints` or how long `kill_random_taskmanager` actually takes, so we may have fulfilled the checkpoint count condition before the job even restarts. It's admittedly unlikely. This change simply guarantees that the job actually completes N checkpoints after it was restarted, and just serves to eliminate doubts about the correctness of the test. > Remove 65s sleep in QueryableState E2E test > --- > > Key: FLINK-9674 > URL: https://issues.apache.org/jira/browse/FLINK-9674 > Project: Flink > Issue Type: Improvement > Components: Queryable State, Tests >Affects Versions: 1.5.0, 1.6.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > > The {{test_queryable_state_restart_tm.sh}} kills a taskmanager, waits for the > loss to be noticed, starts a new tm and waits for the job to continue. > {code} > kill_random_taskmanager > [...] > sleep 65 # this is a little longer than the heartbeat timeout so that the TM > is gone > start_and_wait_for_tm > {code} > Instead of waiting for a fixed amount of time that is tied to some config > value we should wait for a specific event, like the job being canceled. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files
[ https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16531094#comment-16531094 ] ASF GitHub Bot commented on FLINK-9280: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6203#discussion_r199741252 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java --- @@ -18,64 +18,118 @@ package org.apache.flink.runtime.rest.messages.job; -import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.rest.messages.RequestBody; -import org.apache.flink.util.Preconditions; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.ObjectOutputStream; -import java.util.Arrays; +import java.util.Collection; +import java.util.Objects; /** * Request for submitting a job. * - * We currently require the job-jars to be uploaded through the blob-server. + * This request only contains the names of files that must be present on the server, and defines how these files are + * interpreted. */ public final class JobSubmitRequestBody implements RequestBody { - private static final String FIELD_NAME_SERIALIZED_JOB_GRAPH = "serializedJobGraph"; + private static final String FIELD_NAME_JOB_GRAPH = "jobGraphFileName"; + private static final String FIELD_NAME_JOB_JARS = "jobJarFileNames"; + private static final String FIELD_NAME_JOB_ARTIFACTS = "jobArtifactFileNames"; - /** -* The serialized job graph. -*/ - @JsonProperty(FIELD_NAME_SERIALIZED_JOB_GRAPH) - public final byte[] serializedJobGraph; + @JsonProperty(FIELD_NAME_JOB_GRAPH) + public final String jobGraphFileName; - public JobSubmitRequestBody(JobGraph jobGraph) throws IOException { - this(serializeJobGraph(jobGraph)); - } + @JsonProperty(FIELD_NAME_JOB_JARS) + public final Collection jarFileNames; + + @JsonProperty(FIELD_NAME_JOB_ARTIFACTS) + public final Collection artifactFileNames; - @JsonCreator --- End diff -- revert > Extend JobSubmitHandler to accept jar files > --- > > Key: FLINK-9280 > URL: https://issues.apache.org/jira/browse/FLINK-9280 > Project: Flink > Issue Type: New Feature > Components: Job-Submission, REST >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Critical > Labels: pull-request-available > Fix For: 1.6.0, 1.5.1 > > > The job submission through the CLI first uploads all require jars to the blob > server, sets the blob keys in the jobgraph, and then uploads this graph to > The {{JobSubmitHandler}} which submits it to the Dispatcher. > This process has the downside that it requires jars to be uploaded to the > blobserver before submitting the job graph, which does not happen via REST. > I propose an extension to the the {{JobSubmitHandler}} to also accept an > optional list of jar files, that were previously uploaded through the > {{JarUploadHandler}}. If present, the handler would upload these jars to the > blobserver and set the blob keys. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6203#discussion_r199741252 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java --- @@ -18,64 +18,118 @@ package org.apache.flink.runtime.rest.messages.job; -import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.rest.messages.RequestBody; -import org.apache.flink.util.Preconditions; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.ObjectOutputStream; -import java.util.Arrays; +import java.util.Collection; +import java.util.Objects; /** * Request for submitting a job. * - * We currently require the job-jars to be uploaded through the blob-server. + * This request only contains the names of files that must be present on the server, and defines how these files are + * interpreted. */ public final class JobSubmitRequestBody implements RequestBody { - private static final String FIELD_NAME_SERIALIZED_JOB_GRAPH = "serializedJobGraph"; + private static final String FIELD_NAME_JOB_GRAPH = "jobGraphFileName"; + private static final String FIELD_NAME_JOB_JARS = "jobJarFileNames"; + private static final String FIELD_NAME_JOB_ARTIFACTS = "jobArtifactFileNames"; - /** -* The serialized job graph. -*/ - @JsonProperty(FIELD_NAME_SERIALIZED_JOB_GRAPH) - public final byte[] serializedJobGraph; + @JsonProperty(FIELD_NAME_JOB_GRAPH) + public final String jobGraphFileName; - public JobSubmitRequestBody(JobGraph jobGraph) throws IOException { - this(serializeJobGraph(jobGraph)); - } + @JsonProperty(FIELD_NAME_JOB_JARS) + public final Collection jarFileNames; + + @JsonProperty(FIELD_NAME_JOB_ARTIFACTS) + public final Collection artifactFileNames; - @JsonCreator --- End diff -- revert ---
[jira] [Created] (FLINK-9711) Flink CLI does not filter RUNNING only jobs
Sayat Satybaldiyev created FLINK-9711: - Summary: Flink CLI does not filter RUNNING only jobs Key: FLINK-9711 URL: https://issues.apache.org/jira/browse/FLINK-9711 Project: Flink Issue Type: Bug Components: Client Affects Versions: 1.5.0 Reporter: Sayat Satybaldiyev In Flink CLI there's a command list with option --running that according to descriptions "Show only running programs and their JobIDs". However, in practice, it also shows jobs that are in the *CANCELED* state, which is a completed job. {code:java} flink list --running -m job-manager:8081 Waiting for response... -- Running/Restarting Jobs --- 03.07.2018 10:29:34 : 6e49027e843ced2ad798da549004243e : Enriched TrackingClick (CANCELED) 03.07.2018 10:42:31 : c901ae58787ba6aea4a46d6bb9dc2b3c : Enriched TrackingClick (CANCELED) 03.07.2018 11:27:51 : 83ab149ad528cfd956da7090543cbc72 : Enriched TrackingClick (RUNNING) -- {code} Proposal it to extend CLI program to show jobs only in the *RUNNING* state. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9711) Flink CLI --running option does not show RUNNING only jobs
[ https://issues.apache.org/jira/browse/FLINK-9711?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sayat Satybaldiyev updated FLINK-9711: -- Summary: Flink CLI --running option does not show RUNNING only jobs (was: Flink CLI does not filter RUNNING only jobs) > Flink CLI --running option does not show RUNNING only jobs > -- > > Key: FLINK-9711 > URL: https://issues.apache.org/jira/browse/FLINK-9711 > Project: Flink > Issue Type: Bug > Components: Client >Affects Versions: 1.5.0 >Reporter: Sayat Satybaldiyev >Priority: Major > > In Flink CLI there's a command list with option --running that according to > descriptions "Show only running programs and their JobIDs". However, in > practice, it also shows jobs that are in the *CANCELED* state, which is a > completed job. > > {code:java} > flink list --running -m job-manager:8081 > Waiting for response... > -- Running/Restarting Jobs --- > 03.07.2018 10:29:34 : 6e49027e843ced2ad798da549004243e : Enriched > TrackingClick (CANCELED) > 03.07.2018 10:42:31 : c901ae58787ba6aea4a46d6bb9dc2b3c : Enriched > TrackingClick (CANCELED) > 03.07.2018 11:27:51 : 83ab149ad528cfd956da7090543cbc72 : Enriched > TrackingClick (RUNNING) > -- > {code} > > Proposal it to extend CLI program to show jobs only in the *RUNNING* state. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9688) ATAN2 Sql Function support
[ https://issues.apache.org/jira/browse/FLINK-9688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16531099#comment-16531099 ] ASF GitHub Bot commented on FLINK-9688: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/6223#discussion_r199743600 --- Diff: docs/dev/table/tableApi.md --- @@ -2184,6 +2184,17 @@ NUMERIC.atan() + + +{% highlight java %} +NUMERIC.atan2(NUMERIC) --- End diff -- given that both parameters are equally important, we might want to change the syntax to `atan2(Numeric, Numeric)`. IMO, that would be more intuitive. What do you think? > ATAN2 Sql Function support > -- > > Key: FLINK-9688 > URL: https://issues.apache.org/jira/browse/FLINK-9688 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Sergey Nuyanzin >Assignee: Sergey Nuyanzin >Priority: Minor > Labels: pull-request-available > > simple query fails {code} > ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); > BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, > config()); > DataSet> ds = > CollectionDataSets.get3TupleDataSet(env); > tableEnv.registerDataSet("t1", ds, "x, y, z"); > String sqlQuery = "SELECT atan2(1,2)"; > Table result = tableEnv.sqlQuery(sqlQuery); > {code} > while at the same time Calcite supports it and in Calcite's sqlline it works > like {noformat} > 0: jdbc:calcite:model=target/test-classes/mod> select atan2(1,2); > +-+ > | EXPR$0 | > +-+ > | 0.4636476090008061 | > +-+ > 1 row selected (0.173 seconds) > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6223: [FLINK-9688] ATAN2 sql function support
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/6223#discussion_r199743600 --- Diff: docs/dev/table/tableApi.md --- @@ -2184,6 +2184,17 @@ NUMERIC.atan() + + +{% highlight java %} +NUMERIC.atan2(NUMERIC) --- End diff -- given that both parameters are equally important, we might want to change the syntax to `atan2(Numeric, Numeric)`. IMO, that would be more intuitive. What do you think? ---
[jira] [Closed] (FLINK-9711) Flink CLI --running option does not show RUNNING only jobs
[ https://issues.apache.org/jira/browse/FLINK-9711?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-9711. --- Resolution: Duplicate Already fixed for 1.5.1. > Flink CLI --running option does not show RUNNING only jobs > -- > > Key: FLINK-9711 > URL: https://issues.apache.org/jira/browse/FLINK-9711 > Project: Flink > Issue Type: Bug > Components: Client >Affects Versions: 1.5.0 >Reporter: Sayat Satybaldiyev >Priority: Major > > In Flink CLI there's a command list with option --running that according to > descriptions "Show only running programs and their JobIDs". However, in > practice, it also shows jobs that are in the *CANCELED* state, which is a > completed job. > > {code:java} > flink list --running -m job-manager:8081 > Waiting for response... > -- Running/Restarting Jobs --- > 03.07.2018 10:29:34 : 6e49027e843ced2ad798da549004243e : Enriched > TrackingClick (CANCELED) > 03.07.2018 10:42:31 : c901ae58787ba6aea4a46d6bb9dc2b3c : Enriched > TrackingClick (CANCELED) > 03.07.2018 11:27:51 : 83ab149ad528cfd956da7090543cbc72 : Enriched > TrackingClick (RUNNING) > -- > {code} > > Proposal it to extend CLI program to show jobs only in the *RUNNING* state. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6241: [FLINK-9289][rest] Rework JobSubmitHandler to acce...
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/6241 [FLINK-9289][rest] Rework JobSubmitHandler to accept jar files Backport of #6203 for 1.5. The differences to the current version of the linked PR (state @ b9a804c350d12469eccf0fd78d82dcac8eaa3c5b) are: * `ClientUtils` were not introduced * removed all code related to the upload of artifacts, as this feature was introduced in 1.6 @tillrohrmann You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 9280_epsilon_bp Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6241.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6241 commit a8765ae2f006c99a905afd93e93cbe9b0cfef09b Author: zentol Date: 2018-06-11T09:45:12Z [FLINK-9289][rest] Rework JobSubmitHandler to accept jar files ---
[jira] [Updated] (FLINK-9713) Support versioned joins in planning phase
[ https://issues.apache.org/jira/browse/FLINK-9713?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski updated FLINK-9713: -- Component/s: Table API & SQL > Support versioned joins in planning phase > - > > Key: FLINK-9713 > URL: https://issues.apache.org/jira/browse/FLINK-9713 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6201: [FLINK-8866][Table API & SQL] Add support for unified tab...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/6201 Hi @suez1224, that sounds good overall. :-) A few comments: - I would not add a user-facing property `connector.support-timestamp` because a user chooses that by choosing the connector type. Whether the connector supports writing a system timestamp can be an internal field/annotation/interface of the `TableSink` that is generated from the properties. - Copying the timestamp to the StreamRecord timestamp field can be done with a process function. Actually, we do that already when converting a Table into a DataStream. Setting the flag in the Kafka TableSink should be easy. - Not sure if `from-source` needs to be supported by the initial version. We could just implement `from-field` for now, and handle `from-source` as a follow up issue. Since we are approaching feature freeze, I think this might be a good idea at this point. What do you think? Fabian ---
[jira] [Updated] (FLINK-9713) Support versioned joins in planning phase
[ https://issues.apache.org/jira/browse/FLINK-9713?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski updated FLINK-9713: -- Affects Version/s: 1.5.0 > Support versioned joins in planning phase > - > > Key: FLINK-9713 > URL: https://issues.apache.org/jira/browse/FLINK-9713 > Project: Flink > Issue Type: Sub-task >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9289) Parallelism of generated operators should have max parallism of input
[ https://issues.apache.org/jira/browse/FLINK-9289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16531112#comment-16531112 ] ASF GitHub Bot commented on FLINK-9289: --- GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/6241 [FLINK-9289][rest] Rework JobSubmitHandler to accept jar files Backport of #6203 for 1.5. The differences to the current version of the linked PR (state @ b9a804c350d12469eccf0fd78d82dcac8eaa3c5b) are: * `ClientUtils` were not introduced * removed all code related to the upload of artifacts, as this feature was introduced in 1.6 @tillrohrmann You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 9280_epsilon_bp Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6241.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6241 commit a8765ae2f006c99a905afd93e93cbe9b0cfef09b Author: zentol Date: 2018-06-11T09:45:12Z [FLINK-9289][rest] Rework JobSubmitHandler to accept jar files > Parallelism of generated operators should have max parallism of input > - > > Key: FLINK-9289 > URL: https://issues.apache.org/jira/browse/FLINK-9289 > Project: Flink > Issue Type: Bug > Components: DataSet API >Affects Versions: 1.5.0, 1.4.2, 1.6.0 >Reporter: Fabian Hueske >Assignee: Xingcan Cui >Priority: Major > Labels: pull-request-available > > The DataSet API aims to chain generated operators such as key extraction > mappers to their predecessor. This is done by assigning the same parallelism > as the input operator. > If a generated operator has more than two inputs, the operator cannot be > chained anymore and the operator is generated with default parallelism. This > can lead to a {code}NoResourceAvailableException: Not enough free slots > available to run the job.{code} as reported by a user on the mailing list: > https://lists.apache.org/thread.html/60a8bffcce54717b6273bf3de0f43f1940fbb711590f4b90cd666c9a@%3Cuser.flink.apache.org%3E > I suggest to set the parallelism of a generated operator to the max > parallelism of all of its inputs to fix this problem. > Until the problem is fixed, a workaround is to set the default parallelism at > the {{ExecutionEnvironment}}: > {code} > ExecutionEnvironment env = ... > env.setParallelism(2); > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9712) Support enrichment joins in Flink SQL/Table API
Piotr Nowojski created FLINK-9712: - Summary: Support enrichment joins in Flink SQL/Table API Key: FLINK-9712 URL: https://issues.apache.org/jira/browse/FLINK-9712 Project: Flink Issue Type: New Feature Components: Table API SQL Affects Versions: 1.5.0 Reporter: Piotr Nowojski -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9713) Support versioned joins in planning phase
Piotr Nowojski created FLINK-9713: - Summary: Support versioned joins in planning phase Key: FLINK-9713 URL: https://issues.apache.org/jira/browse/FLINK-9713 Project: Flink Issue Type: Sub-task Reporter: Piotr Nowojski -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9444) KafkaAvroTableSource failed to work for map and array fields
[ https://issues.apache.org/jira/browse/FLINK-9444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16531117#comment-16531117 ] ASF GitHub Bot commented on FLINK-9444: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6218#discussion_r199746426 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java --- @@ -37,18 +43,42 @@ import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.TimeZone; /** - * Serialization schema that serializes {@link Row} over {@link SpecificRecord} into a Avro bytes. + * Serialization schema that serializes {@link Row} into Avro bytes. + * + * Serializes objects that are represented in (nested) Flink rows. It support types that + * are compatible with Flink's Table & SQL API. + * + * Note: Changes in this class need to be kept in sync with the corresponding runtime + * class {@link AvroRowDeserializationSchema} and schema converter {@link AvroSchemaConverter}. */ public class AvroRowSerializationSchema implements SerializationSchema { /** -* Avro record class. +* Used for time conversions into SQL types. +*/ + private static final TimeZone LOCAL_TZ = TimeZone.getDefault(); --- End diff -- We are using this pattern at different places. E.g. `org.apache.flink.orc.OrcBatchReader`. The problem is that Java's SQL time/date/timestamp are a complete design fail. They are timezone specific. This adds/removes the local timezone from the timestamp. Such that the string representation of the produced `Timestamp` object is always correct. > KafkaAvroTableSource failed to work for map and array fields > > > Key: FLINK-9444 > URL: https://issues.apache.org/jira/browse/FLINK-9444 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Table API SQL >Affects Versions: 1.6.0 >Reporter: Jun Zhang >Assignee: Jun Zhang >Priority: Blocker > Labels: patch, pull-request-available > Fix For: 1.6.0 > > Attachments: flink-9444.patch > > > When some Avro schema has map/array fields and the corresponding TableSchema > declares *MapTypeInfo/ListTypeInfo* for these fields, an exception will be > thrown when registering the *KafkaAvroTableSource*, complaining like: > Exception in thread "main" org.apache.flink.table.api.ValidationException: > Type Map of table field 'event' does not match with type > GenericType of the field 'event' of the TableSource return > type. > at org.apache.flink.table.api.ValidationException$.apply(exceptions.scala:74) > at > org.apache.flink.table.sources.TableSourceUtil$$anonfun$validateTableSource$1.apply(TableSourceUtil.scala:92) > at > org.apache.flink.table.sources.TableSourceUtil$$anonfun$validateTableSource$1.apply(TableSourceUtil.scala:71) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) > at > org.apache.flink.table.sources.TableSourceUtil$.validateTableSource(TableSourceUtil.scala:71) > at > org.apache.flink.table.plan.schema.StreamTableSourceTable.(StreamTableSourceTable.scala:33) > at > org.apache.flink.table.api.StreamTableEnvironment.registerTableSourceInternal(StreamTableEnvironment.scala:124) > at > org.apache.flink.table.api.TableEnvironment.registerTableSource(TableEnvironment.scala:438) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9712) Support enrichment joins in Flink SQL/Table API
[ https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski updated FLINK-9712: -- Description: As described here: https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing > Support enrichment joins in Flink SQL/Table API > --- > > Key: FLINK-9712 > URL: https://issues.apache.org/jira/browse/FLINK-9712 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Priority: Major > > As described here: > https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16531114#comment-16531114 ] ASF GitHub Bot commented on FLINK-8866: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/6201 Hi @suez1224, that sounds good overall. :-) A few comments: - I would not add a user-facing property `connector.support-timestamp` because a user chooses that by choosing the connector type. Whether the connector supports writing a system timestamp can be an internal field/annotation/interface of the `TableSink` that is generated from the properties. - Copying the timestamp to the StreamRecord timestamp field can be done with a process function. Actually, we do that already when converting a Table into a DataStream. Setting the flag in the Kafka TableSink should be easy. - Not sure if `from-source` needs to be supported by the initial version. We could just implement `from-field` for now, and handle `from-source` as a follow up issue. Since we are approaching feature freeze, I think this might be a good idea at this point. What do you think? Fabian > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6218: [FLINK-9444] [formats] Add full SQL support for Av...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6218#discussion_r199746600 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java --- @@ -17,154 +17,338 @@ package org.apache.flink.formats.avro; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.serialization.AbstractDeserializationSchema; +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.formats.avro.typeutils.AvroRecordClassConverter; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter; import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream; import org.apache.flink.types.Row; import org.apache.flink.util.Preconditions; +import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericFixed; +import org.apache.avro.generic.IndexedRecord; import org.apache.avro.io.DatumReader; import org.apache.avro.io.Decoder; import org.apache.avro.io.DecoderFactory; import org.apache.avro.specific.SpecificData; import org.apache.avro.specific.SpecificDatumReader; import org.apache.avro.specific.SpecificRecord; -import org.apache.avro.specific.SpecificRecordBase; -import org.apache.avro.util.Utf8; +import org.joda.time.DateTime; +import org.joda.time.DateTimeFieldType; +import org.joda.time.LocalDate; +import org.joda.time.LocalTime; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; +import java.lang.reflect.Array; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.TimeZone; /** - * Deserialization schema from Avro bytes over {@link SpecificRecord} to {@link Row}. + * Deserialization schema from Avro bytes to {@link Row}. * - * Deserializes the byte[] messages into (nested) Flink Rows. + * Deserializes the byte[] messages into (nested) Flink rows. It converts Avro types + * into types that are compatible with Flink's Table & SQL API. * - * {@link Utf8} is converted to regular Java Strings. + * Projects with Avro records containing logical date/time types need to add a JodaTime + * dependency. + * + * Note: Changes in this class need to be kept in sync with the corresponding runtime + * class {@link AvroRowSerializationSchema} and schema converter {@link AvroSchemaConverter}. */ +@PublicEvolving public class AvroRowDeserializationSchema extends AbstractDeserializationSchema { /** -* Avro record class. +* Used for time conversions into SQL types. +*/ + private static final TimeZone LOCAL_TZ = TimeZone.getDefault(); --- End diff -- See comment above. ---
[jira] [Commented] (FLINK-9674) Remove 65s sleep in QueryableState E2E test
[ https://issues.apache.org/jira/browse/FLINK-9674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16531118#comment-16531118 ] ASF GitHub Bot commented on FLINK-9674: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/6216 merging. > Remove 65s sleep in QueryableState E2E test > --- > > Key: FLINK-9674 > URL: https://issues.apache.org/jira/browse/FLINK-9674 > Project: Flink > Issue Type: Improvement > Components: Queryable State, Tests >Affects Versions: 1.5.0, 1.6.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > > The {{test_queryable_state_restart_tm.sh}} kills a taskmanager, waits for the > loss to be noticed, starts a new tm and waits for the job to continue. > {code} > kill_random_taskmanager > [...] > sleep 65 # this is a little longer than the heartbeat timeout so that the TM > is gone > start_and_wait_for_tm > {code} > Instead of waiting for a fixed amount of time that is tied to some config > value we should wait for a specific event, like the job being canceled. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9714) Support versioned joins with processing time
Piotr Nowojski created FLINK-9714: - Summary: Support versioned joins with processing time Key: FLINK-9714 URL: https://issues.apache.org/jira/browse/FLINK-9714 Project: Flink Issue Type: Sub-task Components: Table API SQL Affects Versions: 1.5.0 Reporter: Piotr Nowojski -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6216: [FLINK-9674][tests] Replace hard-coded sleeps in QS E2E t...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/6216 merging. ---
[GitHub] flink pull request #6218: [FLINK-9444] [formats] Add full SQL support for Av...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6218#discussion_r199746426 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java --- @@ -37,18 +43,42 @@ import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.TimeZone; /** - * Serialization schema that serializes {@link Row} over {@link SpecificRecord} into a Avro bytes. + * Serialization schema that serializes {@link Row} into Avro bytes. + * + * Serializes objects that are represented in (nested) Flink rows. It support types that + * are compatible with Flink's Table & SQL API. + * + * Note: Changes in this class need to be kept in sync with the corresponding runtime + * class {@link AvroRowDeserializationSchema} and schema converter {@link AvroSchemaConverter}. */ public class AvroRowSerializationSchema implements SerializationSchema { /** -* Avro record class. +* Used for time conversions into SQL types. +*/ + private static final TimeZone LOCAL_TZ = TimeZone.getDefault(); --- End diff -- We are using this pattern at different places. E.g. `org.apache.flink.orc.OrcBatchReader`. The problem is that Java's SQL time/date/timestamp are a complete design fail. They are timezone specific. This adds/removes the local timezone from the timestamp. Such that the string representation of the produced `Timestamp` object is always correct. ---
[jira] [Updated] (FLINK-9715) Support versioned joins with event time
[ https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski updated FLINK-9715: -- Affects Version/s: 1.5.0 > Support versioned joins with event time > --- > > Key: FLINK-9715 > URL: https://issues.apache.org/jira/browse/FLINK-9715 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9444) KafkaAvroTableSource failed to work for map and array fields
[ https://issues.apache.org/jira/browse/FLINK-9444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16531120#comment-16531120 ] ASF GitHub Bot commented on FLINK-9444: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6218#discussion_r199746600 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java --- @@ -17,154 +17,338 @@ package org.apache.flink.formats.avro; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.serialization.AbstractDeserializationSchema; +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.formats.avro.typeutils.AvroRecordClassConverter; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter; import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream; import org.apache.flink.types.Row; import org.apache.flink.util.Preconditions; +import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericFixed; +import org.apache.avro.generic.IndexedRecord; import org.apache.avro.io.DatumReader; import org.apache.avro.io.Decoder; import org.apache.avro.io.DecoderFactory; import org.apache.avro.specific.SpecificData; import org.apache.avro.specific.SpecificDatumReader; import org.apache.avro.specific.SpecificRecord; -import org.apache.avro.specific.SpecificRecordBase; -import org.apache.avro.util.Utf8; +import org.joda.time.DateTime; +import org.joda.time.DateTimeFieldType; +import org.joda.time.LocalDate; +import org.joda.time.LocalTime; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; +import java.lang.reflect.Array; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.TimeZone; /** - * Deserialization schema from Avro bytes over {@link SpecificRecord} to {@link Row}. + * Deserialization schema from Avro bytes to {@link Row}. * - * Deserializes the byte[] messages into (nested) Flink Rows. + * Deserializes the byte[] messages into (nested) Flink rows. It converts Avro types + * into types that are compatible with Flink's Table & SQL API. * - * {@link Utf8} is converted to regular Java Strings. + * Projects with Avro records containing logical date/time types need to add a JodaTime + * dependency. + * + * Note: Changes in this class need to be kept in sync with the corresponding runtime + * class {@link AvroRowSerializationSchema} and schema converter {@link AvroSchemaConverter}. */ +@PublicEvolving public class AvroRowDeserializationSchema extends AbstractDeserializationSchema { /** -* Avro record class. +* Used for time conversions into SQL types. +*/ + private static final TimeZone LOCAL_TZ = TimeZone.getDefault(); --- End diff -- See comment above. > KafkaAvroTableSource failed to work for map and array fields > > > Key: FLINK-9444 > URL: https://issues.apache.org/jira/browse/FLINK-9444 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Table API SQL >Affects Versions: 1.6.0 >Reporter: Jun Zhang >Assignee: Jun Zhang >Priority: Blocker > Labels: patch, pull-request-available > Fix For: 1.6.0 > > Attachments: flink-9444.patch > > > When some Avro schema has map/array fields and the corresponding TableSchema > declares *MapTypeInfo/ListTypeInfo* for these fields, an exception will be > thrown when registering the *KafkaAvroTableSource*, complaining like: > Exception in thread "main" org.apache.flink.table.api.ValidationException: > Type Map of table field 'event' does not match with type > GenericType of the field 'event' of the TableSource return > type. > at
[jira] [Updated] (FLINK-9715) Support versioned joins with event time
[ https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski updated FLINK-9715: -- Component/s: Table API & SQL > Support versioned joins with event time > --- > > Key: FLINK-9715 > URL: https://issues.apache.org/jira/browse/FLINK-9715 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9715) Support versioned joins with event time
Piotr Nowojski created FLINK-9715: - Summary: Support versioned joins with event time Key: FLINK-9715 URL: https://issues.apache.org/jira/browse/FLINK-9715 Project: Flink Issue Type: Sub-task Reporter: Piotr Nowojski -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6218: [FLINK-9444] [formats] Add full SQL support for Av...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6218#discussion_r199747223 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java --- @@ -17,154 +17,338 @@ package org.apache.flink.formats.avro; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.serialization.AbstractDeserializationSchema; +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.formats.avro.typeutils.AvroRecordClassConverter; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter; import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream; import org.apache.flink.types.Row; import org.apache.flink.util.Preconditions; +import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericFixed; +import org.apache.avro.generic.IndexedRecord; import org.apache.avro.io.DatumReader; import org.apache.avro.io.Decoder; import org.apache.avro.io.DecoderFactory; import org.apache.avro.specific.SpecificData; import org.apache.avro.specific.SpecificDatumReader; import org.apache.avro.specific.SpecificRecord; -import org.apache.avro.specific.SpecificRecordBase; -import org.apache.avro.util.Utf8; +import org.joda.time.DateTime; +import org.joda.time.DateTimeFieldType; +import org.joda.time.LocalDate; +import org.joda.time.LocalTime; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; +import java.lang.reflect.Array; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.TimeZone; /** - * Deserialization schema from Avro bytes over {@link SpecificRecord} to {@link Row}. + * Deserialization schema from Avro bytes to {@link Row}. * - * Deserializes the byte[] messages into (nested) Flink Rows. + * Deserializes the byte[] messages into (nested) Flink rows. It converts Avro types + * into types that are compatible with Flink's Table & SQL API. * - * {@link Utf8} is converted to regular Java Strings. + * Projects with Avro records containing logical date/time types need to add a JodaTime + * dependency. + * + * Note: Changes in this class need to be kept in sync with the corresponding runtime + * class {@link AvroRowSerializationSchema} and schema converter {@link AvroSchemaConverter}. */ +@PublicEvolving public class AvroRowDeserializationSchema extends AbstractDeserializationSchema { /** -* Avro record class. +* Used for time conversions into SQL types. +*/ + private static final TimeZone LOCAL_TZ = TimeZone.getDefault(); + + /** +* Avro record class for deserialization. Might be null if record class is not available. */ private Class recordClazz; /** -* Schema for deterministic field order. +* Schema string for deserialization. +*/ + private String schemaString; + + /** +* Avro serialization schema. */ private transient Schema schema; /** -* Reader that deserializes byte array into a record. +* Type information describing the result type. --- End diff -- Sorry about that. I actually rewrote the entire class. It might make sense to review it entirely instead of the diff. ---
[GitHub] flink pull request #6242: [FLINK-9711][CLI] Filter only RUNNING jobs when --...
GitHub user satybald opened a pull request: https://github.com/apache/flink/pull/6242 [FLINK-9711][CLI] Filter only RUNNING jobs when --running option provided in CLI ## What is the purpose of the change In Flink CLI there's a command list with option --running that according to descriptions "Show only running programs and their JobIDs". However, in practice, it also shows jobs that are in the CANCELED and other states, which does not belong to a RUNNING state. ## Brief change log - *filter only RUNNING jobs in CLI if corresponding option is provided* ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? (no) You can merge this pull request into a Git repository by running: $ git pull https://github.com/satybald/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6242.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6242 commit 86529e8843dd073f79da8be07cc821d46c10bf7a Author: Sayat Satybaldiyev Date: 2018-07-03T09:40:57Z Filter only RUNNING jobs when --running option provided in CLI ---
[jira] [Commented] (FLINK-9444) KafkaAvroTableSource failed to work for map and array fields
[ https://issues.apache.org/jira/browse/FLINK-9444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16531123#comment-16531123 ] ASF GitHub Bot commented on FLINK-9444: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6218#discussion_r199747223 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java --- @@ -17,154 +17,338 @@ package org.apache.flink.formats.avro; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.serialization.AbstractDeserializationSchema; +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.formats.avro.typeutils.AvroRecordClassConverter; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter; import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream; import org.apache.flink.types.Row; import org.apache.flink.util.Preconditions; +import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericFixed; +import org.apache.avro.generic.IndexedRecord; import org.apache.avro.io.DatumReader; import org.apache.avro.io.Decoder; import org.apache.avro.io.DecoderFactory; import org.apache.avro.specific.SpecificData; import org.apache.avro.specific.SpecificDatumReader; import org.apache.avro.specific.SpecificRecord; -import org.apache.avro.specific.SpecificRecordBase; -import org.apache.avro.util.Utf8; +import org.joda.time.DateTime; +import org.joda.time.DateTimeFieldType; +import org.joda.time.LocalDate; +import org.joda.time.LocalTime; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; +import java.lang.reflect.Array; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.TimeZone; /** - * Deserialization schema from Avro bytes over {@link SpecificRecord} to {@link Row}. + * Deserialization schema from Avro bytes to {@link Row}. * - * Deserializes the byte[] messages into (nested) Flink Rows. + * Deserializes the byte[] messages into (nested) Flink rows. It converts Avro types + * into types that are compatible with Flink's Table & SQL API. * - * {@link Utf8} is converted to regular Java Strings. + * Projects with Avro records containing logical date/time types need to add a JodaTime + * dependency. + * + * Note: Changes in this class need to be kept in sync with the corresponding runtime + * class {@link AvroRowSerializationSchema} and schema converter {@link AvroSchemaConverter}. */ +@PublicEvolving public class AvroRowDeserializationSchema extends AbstractDeserializationSchema { /** -* Avro record class. +* Used for time conversions into SQL types. +*/ + private static final TimeZone LOCAL_TZ = TimeZone.getDefault(); + + /** +* Avro record class for deserialization. Might be null if record class is not available. */ private Class recordClazz; /** -* Schema for deterministic field order. +* Schema string for deserialization. +*/ + private String schemaString; + + /** +* Avro serialization schema. */ private transient Schema schema; /** -* Reader that deserializes byte array into a record. +* Type information describing the result type. --- End diff -- Sorry about that. I actually rewrote the entire class. It might make sense to review it entirely instead of the diff. > KafkaAvroTableSource failed to work for map and array fields > > > Key: FLINK-9444 > URL: https://issues.apache.org/jira/browse/FLINK-9444 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Table API SQL >Affects Versions: 1.6.0 >Reporter: Jun Zhang >
[jira] [Created] (FLINK-9716) Support scans from table version function
Piotr Nowojski created FLINK-9716: - Summary: Support scans from table version function Key: FLINK-9716 URL: https://issues.apache.org/jira/browse/FLINK-9716 Project: Flink Issue Type: Sub-task Reporter: Piotr Nowojski Given TVF of {{Rates}} this should work: {code:java} SELECT * FROM Rates(2016-06-27 10:10:42.123) {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9711) Flink CLI --running option does not show RUNNING only jobs
[ https://issues.apache.org/jira/browse/FLINK-9711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16531124#comment-16531124 ] ASF GitHub Bot commented on FLINK-9711: --- GitHub user satybald opened a pull request: https://github.com/apache/flink/pull/6242 [FLINK-9711][CLI] Filter only RUNNING jobs when --running option provided in CLI ## What is the purpose of the change In Flink CLI there's a command list with option --running that according to descriptions "Show only running programs and their JobIDs". However, in practice, it also shows jobs that are in the CANCELED and other states, which does not belong to a RUNNING state. ## Brief change log - *filter only RUNNING jobs in CLI if corresponding option is provided* ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? (no) You can merge this pull request into a Git repository by running: $ git pull https://github.com/satybald/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6242.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6242 commit 86529e8843dd073f79da8be07cc821d46c10bf7a Author: Sayat Satybaldiyev Date: 2018-07-03T09:40:57Z Filter only RUNNING jobs when --running option provided in CLI > Flink CLI --running option does not show RUNNING only jobs > -- > > Key: FLINK-9711 > URL: https://issues.apache.org/jira/browse/FLINK-9711 > Project: Flink > Issue Type: Bug > Components: Client >Affects Versions: 1.5.0 >Reporter: Sayat Satybaldiyev >Priority: Major > > In Flink CLI there's a command list with option --running that according to > descriptions "Show only running programs and their JobIDs". However, in > practice, it also shows jobs that are in the *CANCELED* state, which is a > completed job. > > {code:java} > flink list --running -m job-manager:8081 > Waiting for response... > -- Running/Restarting Jobs --- > 03.07.2018 10:29:34 : 6e49027e843ced2ad798da549004243e : Enriched > TrackingClick (CANCELED) > 03.07.2018 10:42:31 : c901ae58787ba6aea4a46d6bb9dc2b3c : Enriched > TrackingClick (CANCELED) > 03.07.2018 11:27:51 : 83ab149ad528cfd956da7090543cbc72 : Enriched > TrackingClick (RUNNING) > -- > {code} > > Proposal it to extend CLI program to show jobs only in the *RUNNING* state. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9714) Support versioned joins with processing time
[ https://issues.apache.org/jira/browse/FLINK-9714?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski updated FLINK-9714: -- Description: Queries like: {code:java} SELECT o.amount * r.rate FROM Orders AS o, LATERAL TABLE (Rates(o.rowtime)) AS r WHERE o.currency = r.currency{code} should work for processing time > Support versioned joins with processing time > > > Key: FLINK-9714 > URL: https://issues.apache.org/jira/browse/FLINK-9714 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Priority: Major > > Queries like: > {code:java} > SELECT > o.amount * r.rate > FROM > Orders AS o, > LATERAL TABLE (Rates(o.rowtime)) AS r > WHERE o.currency = r.currency{code} > should work for processing time -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9713) Support versioned joins in planning phase
[ https://issues.apache.org/jira/browse/FLINK-9713?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski updated FLINK-9713: -- Description: Queries like: {code:java} SELECT o.amount * r.rate FROM Orders AS o, LATERAL TABLE (Rates(o.rowtime)) AS r WHERE o.currency = r.currency{code} should evaluate to valid plan with versioned joins plan node. was: Queries like: {code:java} SELECT o.amount * r.rate FROM Orders AS o, LATERAL TABLE (TemporalRates(o.rowtime)) AS r WHERE o.currency = r.currency{code} should evaluate to valid plan with versioned joins plan node. > Support versioned joins in planning phase > - > > Key: FLINK-9713 > URL: https://issues.apache.org/jira/browse/FLINK-9713 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Priority: Major > > Queries like: > {code:java} > SELECT > o.amount * r.rate > FROM > Orders AS o, > LATERAL TABLE (Rates(o.rowtime)) AS r > WHERE o.currency = r.currency{code} > should evaluate to valid plan with versioned joins plan node. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6218: [FLINK-9444] [formats] Add full SQL support for Av...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6218#discussion_r199748214 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java --- @@ -17,154 +17,338 @@ package org.apache.flink.formats.avro; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.serialization.AbstractDeserializationSchema; +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.formats.avro.typeutils.AvroRecordClassConverter; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter; import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream; import org.apache.flink.types.Row; import org.apache.flink.util.Preconditions; +import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericFixed; +import org.apache.avro.generic.IndexedRecord; import org.apache.avro.io.DatumReader; import org.apache.avro.io.Decoder; import org.apache.avro.io.DecoderFactory; import org.apache.avro.specific.SpecificData; import org.apache.avro.specific.SpecificDatumReader; import org.apache.avro.specific.SpecificRecord; -import org.apache.avro.specific.SpecificRecordBase; -import org.apache.avro.util.Utf8; +import org.joda.time.DateTime; +import org.joda.time.DateTimeFieldType; +import org.joda.time.LocalDate; +import org.joda.time.LocalTime; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; +import java.lang.reflect.Array; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.TimeZone; /** - * Deserialization schema from Avro bytes over {@link SpecificRecord} to {@link Row}. + * Deserialization schema from Avro bytes to {@link Row}. * - * Deserializes the byte[] messages into (nested) Flink Rows. + * Deserializes the byte[] messages into (nested) Flink rows. It converts Avro types + * into types that are compatible with Flink's Table & SQL API. * - * {@link Utf8} is converted to regular Java Strings. + * Projects with Avro records containing logical date/time types need to add a JodaTime + * dependency. + * + * Note: Changes in this class need to be kept in sync with the corresponding runtime + * class {@link AvroRowSerializationSchema} and schema converter {@link AvroSchemaConverter}. */ +@PublicEvolving public class AvroRowDeserializationSchema extends AbstractDeserializationSchema { /** -* Avro record class. +* Used for time conversions into SQL types. +*/ + private static final TimeZone LOCAL_TZ = TimeZone.getDefault(); + + /** +* Avro record class for deserialization. Might be null if record class is not available. */ private Class recordClazz; /** -* Schema for deterministic field order. +* Schema string for deserialization. +*/ + private String schemaString; + + /** +* Avro serialization schema. */ private transient Schema schema; /** -* Reader that deserializes byte array into a record. +* Type information describing the result type. */ - private transient DatumReader datumReader; + private transient TypeInformation typeInfo; /** -* Input stream to read message from. +* Record to deserialize byte array. */ - private transient MutableByteArrayInputStream inputStream; + private transient IndexedRecord record; /** -* Avro decoder that decodes binary data. +* Reader that deserializes byte array into a record. */ - private transient Decoder decoder; + private transient DatumReader datumReader; /** -* Record to deserialize byte array to. +* Input stream to read message from. */ - private SpecificRecord record; + private transient MutableByteArrayInputStream
[jira] [Commented] (FLINK-9444) KafkaAvroTableSource failed to work for map and array fields
[ https://issues.apache.org/jira/browse/FLINK-9444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16531128#comment-16531128 ] ASF GitHub Bot commented on FLINK-9444: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6218#discussion_r199748214 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java --- @@ -17,154 +17,338 @@ package org.apache.flink.formats.avro; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.serialization.AbstractDeserializationSchema; +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.formats.avro.typeutils.AvroRecordClassConverter; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter; import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream; import org.apache.flink.types.Row; import org.apache.flink.util.Preconditions; +import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericFixed; +import org.apache.avro.generic.IndexedRecord; import org.apache.avro.io.DatumReader; import org.apache.avro.io.Decoder; import org.apache.avro.io.DecoderFactory; import org.apache.avro.specific.SpecificData; import org.apache.avro.specific.SpecificDatumReader; import org.apache.avro.specific.SpecificRecord; -import org.apache.avro.specific.SpecificRecordBase; -import org.apache.avro.util.Utf8; +import org.joda.time.DateTime; +import org.joda.time.DateTimeFieldType; +import org.joda.time.LocalDate; +import org.joda.time.LocalTime; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; +import java.lang.reflect.Array; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.TimeZone; /** - * Deserialization schema from Avro bytes over {@link SpecificRecord} to {@link Row}. + * Deserialization schema from Avro bytes to {@link Row}. * - * Deserializes the byte[] messages into (nested) Flink Rows. + * Deserializes the byte[] messages into (nested) Flink rows. It converts Avro types + * into types that are compatible with Flink's Table & SQL API. * - * {@link Utf8} is converted to regular Java Strings. + * Projects with Avro records containing logical date/time types need to add a JodaTime + * dependency. + * + * Note: Changes in this class need to be kept in sync with the corresponding runtime + * class {@link AvroRowSerializationSchema} and schema converter {@link AvroSchemaConverter}. */ +@PublicEvolving public class AvroRowDeserializationSchema extends AbstractDeserializationSchema { /** -* Avro record class. +* Used for time conversions into SQL types. +*/ + private static final TimeZone LOCAL_TZ = TimeZone.getDefault(); + + /** +* Avro record class for deserialization. Might be null if record class is not available. */ private Class recordClazz; /** -* Schema for deterministic field order. +* Schema string for deserialization. +*/ + private String schemaString; + + /** +* Avro serialization schema. */ private transient Schema schema; /** -* Reader that deserializes byte array into a record. +* Type information describing the result type. */ - private transient DatumReader datumReader; + private transient TypeInformation typeInfo; /** -* Input stream to read message from. +* Record to deserialize byte array. */ - private transient MutableByteArrayInputStream inputStream; + private transient IndexedRecord record; /** -* Avro decoder that decodes binary data. +* Reader that deserializes byte array into a record. */ - private transient Decoder decoder; + private transient DatumReader
[jira] [Updated] (FLINK-9713) Support versioned joins in planning phase
[ https://issues.apache.org/jira/browse/FLINK-9713?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski updated FLINK-9713: -- Description: Queries like: {code:java} SELECT o.amount * r.rate FROM Orders AS o, LATERAL TABLE (TemporalRates(o.rowtime)) AS r WHERE o.currency = r.currency{code} should evaluate to valid plan with versioned joins plan node. > Support versioned joins in planning phase > - > > Key: FLINK-9713 > URL: https://issues.apache.org/jira/browse/FLINK-9713 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Priority: Major > > Queries like: > {code:java} > SELECT > o.amount * r.rate > FROM > Orders AS o, > LATERAL TABLE (TemporalRates(o.rowtime)) AS r > WHERE o.currency = r.currency{code} > should evaluate to valid plan with versioned joins plan node. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9715) Support versioned joins with event time
[ https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski updated FLINK-9715: -- Description: Queries like: {code:java} SELECT o.amount * r.rate FROM Orders AS o, LATERAL TABLE (Rates(o.rowtime)) AS r WHERE o.currency = r.currency{code} should work with event time > Support versioned joins with event time > --- > > Key: FLINK-9715 > URL: https://issues.apache.org/jira/browse/FLINK-9715 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Priority: Major > > Queries like: > {code:java} > SELECT > o.amount * r.rate > FROM > Orders AS o, > LATERAL TABLE (Rates(o.rowtime)) AS r > WHERE o.currency = r.currency{code} > should work with event time -- This message was sent by Atlassian JIRA (v7.6.3#76005)