Re: how does flink assign windows to task
Thanks Sameer and Till, On Mon, Aug 1, 2016 at 9:31 AM, Till Rohrmannwrote: > Yes you're right Sameer. That's how things work in Flink. > > Cheers, > Till > > On Sun, Jul 31, 2016 at 12:33 PM, Sameer Wadkar > wrote: > >> Vishnu, >> >> I would imagine based on Max's explanation and how other systems like >> MapReduce and Spark partition keys, if you have 100 keys and 50 slots, 2 >> keys would be assigned to each slot. Each slot would maintain one or more >> windows (more for time based windows) and each window would have upto 2 >> panes (depending on whether there are elements for a key for a given >> window). The trigger would evaluate which of these panes will fire for >> global window (count windows) or which window as a whole fires for a time >> window. >> >> It seems like this is the only way to get the most efficient utilization >> for the entire cluster and allow all keys to be evaluated simultaneously >> without being starved by keys getting more elements in case of a slew. >> >> So I think you will need to have enough memory to hold all the elements >> that can arrive for all the active windows (not triggered) for two keys in >> a task. For count windows this is easy to estimate. But for times windows >> it is less clear if you receive elements out of order. >> >> Let's see what Max replies. I am just reasoning based on how Flink should >> work based on how other similar systems do it. >> >> Sameer >> >> >> On Jul 29, 2016, at 9:51 PM, Vishnu Viswanath < >> vishnu.viswanat...@gmail.com> wrote: >> >> Hi Max, >> >> Thanks for the explanation. >> >> "This happens one after another in a single task slot but in parallel >> across all the task slots". >> Could you explain more on how this happens in parallel? Which part does >> occur in parallel? Is it the Trigger going through each pane and the window >> function being executed. >> As in the first example, if there are 100 Panes (since I have 1 window >> and 100 keys) will trigger go through these 100 Panes using 50 task slots >> and then execute whichever fires? Does that mean that Flink determines >> which are the set of Panes that has to be evaluated in each task slot and >> then the trigger goes through it? >> >> The reason I am trying to understand exactly how it works is because : I >> need to decide how much memory each node in my cluster should have. I know >> that a single pane would not cause OOM in my case(since the number of >> elements per pane is not huge), but nodes might not have enough memory to >> hold the entire window in memory (since I can have a large number of Panes). >> >> Thanks, >> Vishnu >> >> >> On Fri, Jul 29, 2016 at 4:29 AM, Maximilian Michels >> wrote: >> >>> Hi Vishnu Viswanath, >>> >>> The keyed elements are spread across the 50 task slots (assuming you >>> have a parallelism of 50) using hash partitioning on the keys. Each >>> task slot runs one or multiple operators (depending on the slot >>> sharing options). One of them is a WindowOperator which will decide >>> when to trigger and process your keyed elements. >>> >>> The WindowOperator holds the WindowAssigner and the Trigger. The >>> WindowAssigner will determine which window an incoming element gets >>> assigned. Windows are kept for each key; the combination of window and >>> key is usually called Pane. The Trigger will go through all the Panes >>> and check if they should fire or not (whether the window function >>> should be executed). This happens one after another in a single task >>> slot but in parallel across all the task slots. >>> >>> Just a brief explanation. Hope it helps :) >>> >>> Cheers, >>> Max >>> >>> On Thu, Jul 28, 2016 at 5:49 PM, Vishnu Viswanath >>> wrote: >>> > Hi, >>> > >>> > Lets say I have a window on a keyed stream, and I have about 100 unique >>> > keys. >>> > And assume I have about 50 tasks slots in my cluster. And suppose my >>> trigger >>> > fired 70/100 windows/pane at the same time. >>> > >>> > How will flink handle this? Will it assign 50/70 triggered windows to >>> the 50 >>> > available task slots and wait for 20 of them to finish before >>> assigning the >>> > remaining 20 to the slots? >>> > >>> > Thanks, >>> > Vishnu Viswanath >>> >> >> >
Re: Flink - Once and once processing
Thanks Till. I will take a look at your pointers. Mans On Monday, August 1, 2016 6:27 AM, Till Rohrmannwrote: Hi Mans, Milind is right that in general external systems have to play along if you want to achieve exactly once processing guarantees while writing to these systems. Either by supporting idempotent operations or by allowing to roll back their state. In the batch world, this usually means to overwrite data from a previously failed execution run completely or having a unique key which does not change across runs. In the case of streaming we can achieve exactly once guarantees by committing the data to the external system only after we have taken a checkpoint and buffering the data in between. This guarantees that the changes are only materialized after we are sure that we can go back to a checkpoint where we've already seen all the elements which might have caused the sink output. You can take a look at the CassandraSink where we're exactly doing this. Cheers,Till On Sun, Jul 31, 2016 at 2:59 AM, milind parikh wrote: Flink operates in conjunction with sources and sinks. So ,yes, there are things that an underlying sink (or a source) must support in conjunction with Flink to enable a particular semantic.On Jul 30, 2016 11:46 AM, "M Singh" wrote: Thanks Konstantin. Just to clarify - unless the target database is resilient to duplicates, Flink's once-only configuration will not avoid duplicate updates. Mans On Saturday, July 30, 2016 7:40 AM, Konstantin Knauf wrote: Hi Mans, depending on the number of operations and the particular database, you might be able to use transactions. Maybe you can also find a data model, which is more resilient to these kind of failures. Cheers, Konstantin On 29.07.2016 19:26, M Singh wrote: > Hi: > > I have a use case where we need to update a counter in a db and for this > need to guarantee once only processing. If we have some entries in a > batch and it partially updates the counters and then fails, if Flink > retries the processing for that batch, some of the counters will be > updated twice (the ones which succeeded in the first batch). > > I think in order to guarantee once only processing, I will have to set > the buffer size to zero (ie, send one item at a time). > > Is there any alternative configuration or suggestion on how I can > achieve once only updates using a batch mode with partial failures ? > > Thanks > > Mans > -- Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182 TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke Sitz: Unterföhring * Amtsgericht München * HRB 135082
Re: Flink Streaming API
Hi Till, Thanks for your response. I m able to use flink-connector-kafka-0.9-2_11 with Kafka v10 to produce and consume messages. Thanks, Sivakumar Bhavanari. On Mon, Aug 1, 2016 at 6:41 AM, Till Rohrmannwrote: > Hi Siva, > > in version 1.0.0 we’ve added the Scala binary version suffix to all Flink > dependencies which depend on Scala. Thus, you should look for > flink-streaming-scala_2.10 and flink-streaming-java_2.10. For these > artifacts you should be able to find a version 1.0.3 on maven central, for > example. > > The easiest way to setup a project is to use the quickstarts [1]. It will > add the correct dependencies. But usually it should be enough to include > the flink-client_2.10 and the respective API dependency to your project > (e.g. flink-streaming-scala_2.10). > > I think that you should be able to use the Flink Kafka connector 0.9 with > Kafka 0.10.0 as long as Kafka 0.10.0 still supports the consumer API > introduced with Kafka 0.9. But I might be wrong here. So best if you tried > it out. > > [1] > https://ci.apache.org/projects/flink/flink-docs-master/quickstart/java_api_quickstart.html > > Cheers, > Till > > > On Mon, Aug 1, 2016 at 3:48 PM, Siva wrote: > >> Hello Everyone, >> >> I m new to Flink, wanted to try streaming API using flink-kafka connector >> in scala. >> >> But there are several versions of it. Please could some one help on below >> questions >> >> what are the differences between flink-streaming-core and >> flink-sreaming-scala[java]? >> >> Latest version of flink-streaming-scala of 0.10.2 version, does it mean >> that streaming development in scala can only use Flink 0.10.2 not the >> latest one 1.0.3? >> >> what is difference between flink-connector-kafka-base and >> flink-connector-kafka-0.x.x? Is >> flink-connector-kafka-0.9-2_11 compatible with Kafka 0.10.0? >> >> Thanks in advance. >> >> Thanks >> Siva >> > >
Re: CEP and Within Clause
+Till, looping him in directly, he probably missed this because he was away for a while. On Tue, 26 Jul 2016 at 18:21 Sameer Wwrote: > Hi, > > It looks like the WithIn clause of CEP uses Tumbling Windows. I could get > it to use Sliding windows by using an upstream pipeline which uses Sliding > Windows and produces repeating elements (in each sliding window) and > applying a Watermark assigner on the resulting stream with elements > duplicated. I wanted to use the "followedBy" pattern where there is a > strong need for sliding windows. > > Is there a plan to add sliding windows to the within clause at some point? > > The PatternStream class's "select" and "flatSelect" have overloaded > versions which take PatternTimeOut variable. Is there a way to insert some > of those elements back to the front of the stream. Say I am trying to find > a pattern where two temperature readings >150 within 6 second window should > raise an alert. If only one was found, can I insert that one back in the > front of the stream on that task node (for that window pane) so that I can > find a pattern match in the events occurring in the next 6 seconds. If I > can do that, I don't need sliding windows. Else I cannot avoid using them > for such scenarios. > > Thanks, > Sameer >
OutOfMemoryError
Hi folks, I'm trying to run a DataSet program but after around 200k records are processed a "java.lang.OutOfMemoryError: unable to create new native thread" stops me. I'm deploying Flink (via bin/yarn-session.sh) on a YARN cluster with 10 nodes (each with 8 cores) and starting 10 task managers, each with 8 slots and 6GB of RAM. Except for the data sink that writes to HDFS and runs with a parallelism of 1, my job runs with a parallelism of 80 and has two input datasets, each is a HDFS file with around 6GB and 20mi lines. Most of my map functions uses external services via RPC or REST APIs to enrich the raw data with info from other sources. Might I be doing something wrong or I really should have more memory available? Thanks, Paulo Cezar
Re: Reprocessing data in Flink / rebuilding Flink state
+Ufuk, looping him in directly Hmm, I think this is changed for the 1.1 release. Ufuk could you please comment? On Mon, 1 Aug 2016 at 08:07 Joshwrote: > Cool, thanks - I've tried out the approach where we replay data from the > Kafka compacted log, then take a savepoint and switch to the live stream. > > It works but I did have to add in a dummy operator for every operator that > was removed. Without doing this, I got an exception: > java.lang.IllegalStateException: Failed to rollback to savepoint > Checkpoint 1 @ 1470059433553 for cb321c233dfd28f73c565030481657cd. Cannot > map old state for task 02ea922553bc7522bdea373f52a702d6 to the new program. > This indicates that the program has been changed in a non-compatible way > after the savepoint. > > I had a Kafka source and a flat mapper chained together when replaying, so > to make it work I had to add two dummy operators and assign the same UID I > used when replaying, like this: > stream.map(x => > x).uid("kafka-replay").name("dummy-1").startNewChain().map(x => > x).name("dummy-2") > > I guess it would be nice if Flink could recover from removed > tasks/operators without needing to add dummy operators like this. > > Josh > > On Fri, Jul 29, 2016 at 5:46 PM, Aljoscha Krettek > wrote: > >> Hi, >> I have to try this to verify but I think the approach works if you give >> the two sources different UIDs. The reason is that Flink will ignore state >> for which it doesn't have an operator to assign it to. Therefore, the state >> of the "historical Kafka source" should be silently discarded. >> >> Cheers, >> Aljoscha >> >> On Fri, 29 Jul 2016 at 18:12 Josh wrote: >> >>> @Aljoscha - The N-input operator way sounds very nice, for now I think >>> I'll try and get something quick running the hacky way, then if we decide >>> to make this a permanent solution maybe I can work on the proper solution. >>> I was wondering about your suggestion for "warming up" the state and then >>> taking a savepoint and switching sources - since the Kafka sources are >>> stateful and are part of Flink's internal state, wouldn't this break when >>> trying to restore the job with a different source? Would I need to assign >>> the replay source a UID, and when switching from replay to live, remove the >>> replay source and replace it with an dummy operator with the same UID? >>> >>> @Jason - I see what you mean now, with the historical and live Flink >>> jobs. That's an interesting approach - I guess it's solving a slightly >>> different problem to my 'rebuilding Flink state upon starting job' - as >>> you're rebuilding state as part of the main job when it comes across events >>> that require historical data. Actually I think we'll need to do something >>> very similar in the future but right now I can probably get away with >>> something simpler! >>> >>> Thanks for the replies! >>> >>> Josh >>> >>> On Fri, Jul 29, 2016 at 2:35 PM, Jason Brelloch >>> wrote: >>> Aljoscha's approach is probably better, but to answer your questions... >How do you send a request from one Flink job to another? All of our different flink jobs communicate over kafka. So the main flink job would be listening to both a "live" kafka source, and a "historical" kafka source. The historical flink job would listen to a "request" kafka source. When the main job gets an event that it does not have state for it writes to the "request" topic. The historical job would read the request, grab the relevant old events from GCS, and write them to the "historical" kafka topic. The "historical" source and the "live" source are merged and proceed through the main flink job as one stream. >How do you handle the switchover between the live stream and the historical stream? Do you somehow block the live stream source and detect when the historical data source is no longer emitting new elements? When the main job sends a request to the historical job, the main job starts storing any events that are come in for that key. As the historical events come in they are processed immediately. The historical flink job flags the last event it sends. When the main flink job sees the flagged event it knows it is caught up to where it was when it sent the request. You can then process the events that the main job stored, and when that is done you are caught up to the live stream, and can stop storing events for that key and just process them as normal. Keep in mind that this is the dangerous part that I was talking about, where memory in the main job would continue to build until the "historical" events are all processed. >In my case I would want the Flink state to always contain the latest state of every item (except when the job first starts and there's a period of time where it's rebuilding its
Re: TimeWindowAll doeesn't assign properly
Hi, yes, if you set the delay to high you will have to wait a long time until your windows are emitted. Cheers, Aljoscha On Mon, 1 Aug 2016 at 04:52 Sendohwrote: > Probably `processAt` is not used adequately because after increasing > maxDelay > in watermark to 10 minutes it works as expected. > > Is there any upper limit of setting this maxDelay? Because there might be > too many windows are waiting for the last instance? > > Best, > > Sendoh > > > > -- > View this message in context: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/TimeWindowAll-doeesn-t-assign-properly-with-EventTime-tp8201p8234.html > Sent from the Apache Flink User Mailing List archive. mailing list archive > at Nabble.com. >
Re: Reprocessing data in Flink / rebuilding Flink state
Cool, thanks - I've tried out the approach where we replay data from the Kafka compacted log, then take a savepoint and switch to the live stream. It works but I did have to add in a dummy operator for every operator that was removed. Without doing this, I got an exception: java.lang.IllegalStateException: Failed to rollback to savepoint Checkpoint 1 @ 1470059433553 for cb321c233dfd28f73c565030481657cd. Cannot map old state for task 02ea922553bc7522bdea373f52a702d6 to the new program. This indicates that the program has been changed in a non-compatible way after the savepoint. I had a Kafka source and a flat mapper chained together when replaying, so to make it work I had to add two dummy operators and assign the same UID I used when replaying, like this: stream.map(x => x).uid("kafka-replay").name("dummy-1").startNewChain().map(x => x).name("dummy-2") I guess it would be nice if Flink could recover from removed tasks/operators without needing to add dummy operators like this. Josh On Fri, Jul 29, 2016 at 5:46 PM, Aljoscha Krettekwrote: > Hi, > I have to try this to verify but I think the approach works if you give > the two sources different UIDs. The reason is that Flink will ignore state > for which it doesn't have an operator to assign it to. Therefore, the state > of the "historical Kafka source" should be silently discarded. > > Cheers, > Aljoscha > > On Fri, 29 Jul 2016 at 18:12 Josh wrote: > >> @Aljoscha - The N-input operator way sounds very nice, for now I think >> I'll try and get something quick running the hacky way, then if we decide >> to make this a permanent solution maybe I can work on the proper solution. >> I was wondering about your suggestion for "warming up" the state and then >> taking a savepoint and switching sources - since the Kafka sources are >> stateful and are part of Flink's internal state, wouldn't this break when >> trying to restore the job with a different source? Would I need to assign >> the replay source a UID, and when switching from replay to live, remove the >> replay source and replace it with an dummy operator with the same UID? >> >> @Jason - I see what you mean now, with the historical and live Flink >> jobs. That's an interesting approach - I guess it's solving a slightly >> different problem to my 'rebuilding Flink state upon starting job' - as >> you're rebuilding state as part of the main job when it comes across events >> that require historical data. Actually I think we'll need to do something >> very similar in the future but right now I can probably get away with >> something simpler! >> >> Thanks for the replies! >> >> Josh >> >> On Fri, Jul 29, 2016 at 2:35 PM, Jason Brelloch >> wrote: >> >>> Aljoscha's approach is probably better, but to answer your questions... >>> >>> >How do you send a request from one Flink job to another? >>> All of our different flink jobs communicate over kafka. So the main >>> flink job would be listening to both a "live" kafka source, and a >>> "historical" kafka source. The historical flink job would listen to a >>> "request" kafka source. When the main job gets an event that it does not >>> have state for it writes to the "request" topic. The historical job would >>> read the request, grab the relevant old events from GCS, and write them to >>> the "historical" kafka topic. The "historical" source and the "live" >>> source are merged and proceed through the main flink job as one stream. >>> >>> >How do you handle the switchover between the live stream and the >>> historical stream? Do you somehow block the live stream source and detect >>> when the historical data source is no longer emitting new elements? >>> When the main job sends a request to the historical job, the main job >>> starts storing any events that are come in for that key. As the historical >>> events come in they are processed immediately. The historical flink job >>> flags the last event it sends. When the main flink job sees the flagged >>> event it knows it is caught up to where it was when it sent the request. >>> You can then process the events that the main job stored, and when that is >>> done you are caught up to the live stream, and can stop storing events for >>> that key and just process them as normal. >>> >>> Keep in mind that this is the dangerous part that I was talking about, >>> where memory in the main job would continue to build until the "historical" >>> events are all processed. >>> >>> >In my case I would want the Flink state to always contain the latest >>> state of every item (except when the job first starts and there's a period >>> of time where it's rebuilding its state from the Kafka log). >>> You could absolutely do it by reading from the beginning of a kafka >>> topic. The reason we do it with GCS is it is really cheap storage, and we >>> are not planning on storing forever on the kafka topic. >>> >>> >Since I would have everything needed to
AW: partial savepoints/combining savepoints
Hi Till, thanks for the quick reply. Too bad, I thought I was on the right track with savepoints here. Some follow-up questions: 1.)Can I do the whole thing of transferring state and the position in the Kafka topic manually for one stream? In other words: is this information accessible easily? 2.)In any case I would need to stop the running job, change the topology (e.g. the number of streams in the program) and resume processing. Can you name the overhead of time coming from stopping and starting a Flink job? 3.)I’m aware of the upcoming feature for scaling in and out. But I don’t quite see, how this will help me with different services. I thought of each service having its own Flink instance/cluster. I would commit this service as one job to the dedicated Flink containing all the necessary streams and computations. Is this a bad architecture? Would it be better to have one big Flink cluster and commit one big Job, which contains all the streams? (As I got to know, committing multiple jobs to one Flink instance is not recommended). To be honest, I’m not quite there to totally understand the different deployment options of Flink and how to bring them together with a microservice architecture where I have a service packed as a JAR-File and wanting to be able to just deploy this JAR-File. I thought of this service containing Flink and then start the JobManager and some TaskManagers from this service and deploy itself as the Flink job with a dedicated entry point. Is this a good idea? Or is it even possible? Thanks in advance, Claudia Von: Till Rohrmann [mailto:trohrm...@apache.org] Gesendet: Montag, 1. August 2016 16:21 An: user@flink.apache.org Betreff: Re: partial savepoints/combining savepoints Hi Claudia, unfortunately neither taking partial savepoints nor combining multiple savepoints into one savepoint is currently supported by Flink. However, we're currently working on dynamic scaling which will allow to adjust the parallelism of your Flink job. This helps you to scale in/out depending on the workload of your job. However, you would only be able to scale within a single Flink job and not across Flink jobs. Cheers, Till On Mon, Aug 1, 2016 at 9:49 PM, Claudia Wegmann> wrote: Hey everyone, I’ve got some questions regarding savepoints in Flink. I have the following situation: There is a microservice that reads data from Kafka topics, creates Flink streams from this data and does different computations/pattern matching workloads. If the overall workload for this service becomes too big, I want to start a new instance of this service and share the work between the running services. To accomplish that, I thought about using Flinks savepoint mechanism. But there are some open questions: 1.)Can I combine two or more savepoints in one program? Think of two services already running. Now I’m starting up a third service. The new one would get savepoints from the already running services. It than would continue computation of some streams while the other services would discard calculation on these streams now calculated by the new service. So, is it possible to combine two or more savepoints in one program? 2.)Another approach I could think of for accomplishing the introduction of a new service would be, to just take a savepoint of the streams that change service. Can I only take a savepoint of a part of the running job? Thanks for your comments and best wishes, Claudia
Re: Loss of TaskManager Error
Hi Till, Thanks for the input. The error was in a training set which I found in the .out file of the taskmanager. I corrected that and I am getting some results. Thanks and Regards, Debaditya On Mon, Aug 1, 2016 at 3:54 PM, Till Rohrmannwrote: > Hi Debaditya, > > could you check what the log of the presumably failed task manager says? > It might contain hints to what actually went wrong. > > Cheers, > Till > > On Mon, Aug 1, 2016 at 9:49 PM, Debaditya Roy wrote: > >> Hello users, >> >> I was running an experiment on a very simple cluster with two nodes (one >> jobmanager and another taskmanager). However after starting the execution, >> in a few seconds the program is aborted with the error. >> >> The program finished with the following exception: >> >> org.apache.flink.client.program.ProgramInvocationException: The program >> execution failed: Job execution failed. >> at org.apache.flink.client.program.Client.runBlocking(Client.java:381) >> at org.apache.flink.client.program.Client.runBlocking(Client.java:355) >> at >> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:65) >> at org.myorg.quickstart.Job.main(Job.java:55) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) >> at >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) >> at org.apache.flink.client.program.Client.runBlocking(Client.java:248) >> at >> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) >> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) >> at >> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1192) >> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243) >> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job >> execution failed. >> at >> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:717) >> at >> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:663) >> at >> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:663) >> at >> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) >> at >> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) >> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) >> at >> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) >> at >> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> at >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >> at >> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >> at >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >> Caused by: java.lang.Exception: The slot in which the task was executed >> has been released. Probably loss of TaskManager >> 6d2c9d29eddb2a1497827217f4d9a6d1 @ parapluie-28 - 1 slots - URL: akka.tcp:// >> flink@172.16.99.28:60365/user/taskmanager >> at >> org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:153) >> at >> org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:547) >> at >> org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:119) >> at >> org.apache.flink.runtime.instance.Instance.markDead(Instance.java:156) >> at >> org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:215) >> at >> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:850) >> at >> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) >> at >> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) >> at >> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) >> at >> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44) >> at >> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) >> at >> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) >> at >>
Re: partial savepoints/combining savepoints
Hi Claudia, unfortunately neither taking partial savepoints nor combining multiple savepoints into one savepoint is currently supported by Flink. However, we're currently working on dynamic scaling which will allow to adjust the parallelism of your Flink job. This helps you to scale in/out depending on the workload of your job. However, you would only be able to scale within a single Flink job and not across Flink jobs. Cheers, Till On Mon, Aug 1, 2016 at 9:49 PM, Claudia Wegmannwrote: > Hey everyone, > > > > I’ve got some questions regarding savepoints in Flink. I have the > following situation: > > > > There is a microservice that reads data from Kafka topics, creates Flink > streams from this data and does different computations/pattern matching > workloads. If the overall workload for this service becomes too big, I want > to start a new instance of this service and share the work between the > running services. To accomplish that, I thought about using Flinks > savepoint mechanism. But there are some open questions: > > > > 1.)Can I combine two or more savepoints in one program? > Think of two services already running. Now I’m starting up a third > service. The new one would get savepoints from the already running > services. It than would continue computation of some streams while the > other services would discard calculation on these streams now calculated by > the new service. So, is it possible to combine two or more savepoints in > one program? > > 2.)Another approach I could think of for accomplishing the > introduction of a new service would be, to just take a savepoint of the > streams that change service. Can I only take a savepoint of a part of the > running job? > > Thanks for your comments and best wishes, > > Claudia >
Re: Loss of TaskManager Error
Hi Debaditya, could you check what the log of the presumably failed task manager says? It might contain hints to what actually went wrong. Cheers, Till On Mon, Aug 1, 2016 at 9:49 PM, Debaditya Roywrote: > Hello users, > > I was running an experiment on a very simple cluster with two nodes (one > jobmanager and another taskmanager). However after starting the execution, > in a few seconds the program is aborted with the error. > > The program finished with the following exception: > > org.apache.flink.client.program.ProgramInvocationException: The program > execution failed: Job execution failed. > at org.apache.flink.client.program.Client.runBlocking(Client.java:381) > at org.apache.flink.client.program.Client.runBlocking(Client.java:355) > at > org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:65) > at org.myorg.quickstart.Job.main(Job.java:55) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) > at org.apache.flink.client.program.Client.runBlocking(Client.java:248) > at > org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1192) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:717) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:663) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:663) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.Exception: The slot in which the task was executed > has been released. Probably loss of TaskManager > 6d2c9d29eddb2a1497827217f4d9a6d1 @ parapluie-28 - 1 slots - URL: akka.tcp:// > flink@172.16.99.28:60365/user/taskmanager > at > org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:153) > at > org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:547) > at > org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:119) > at > org.apache.flink.runtime.instance.Instance.markDead(Instance.java:156) > at > org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:215) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:850) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > at >
partial savepoints/combining savepoints
Hey everyone, I've got some questions regarding savepoints in Flink. I have the following situation: There is a microservice that reads data from Kafka topics, creates Flink streams from this data and does different computations/pattern matching workloads. If the overall workload for this service becomes too big, I want to start a new instance of this service and share the work between the running services. To accomplish that, I thought about using Flinks savepoint mechanism. But there are some open questions: 1.)Can I combine two or more savepoints in one program? Think of two services already running. Now I'm starting up a third service. The new one would get savepoints from the already running services. It than would continue computation of some streams while the other services would discard calculation on these streams now calculated by the new service. So, is it possible to combine two or more savepoints in one program? 2.)Another approach I could think of for accomplishing the introduction of a new service would be, to just take a savepoint of the streams that change service. Can I only take a savepoint of a part of the running job? Thanks for your comments and best wishes, Claudia
Loss of TaskManager Error
Hello users, I was running an experiment on a very simple cluster with two nodes (one jobmanager and another taskmanager). However after starting the execution, in a few seconds the program is aborted with the error. The program finished with the following exception: org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed. at org.apache.flink.client.program.Client.runBlocking(Client.java:381) at org.apache.flink.client.program.Client.runBlocking(Client.java:355) at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:65) at org.myorg.quickstart.Job.main(Job.java:55) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) at org.apache.flink.client.program.Client.runBlocking(Client.java:248) at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1192) at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243) Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:717) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:663) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:663) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.Exception: The slot in which the task was executed has been released. Probably loss of TaskManager 6d2c9d29eddb2a1497827217f4d9a6d1 @ parapluie-28 - 1 slots - URL: akka.tcp:// flink@172.16.99.28:60365/user/taskmanager at org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:153) at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:547) at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:119) at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:156) at org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:215) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:850) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:107) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46) at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369) at
Re: Flink Streaming API
Hi Siva, in version 1.0.0 we’ve added the Scala binary version suffix to all Flink dependencies which depend on Scala. Thus, you should look for flink-streaming-scala_2.10 and flink-streaming-java_2.10. For these artifacts you should be able to find a version 1.0.3 on maven central, for example. The easiest way to setup a project is to use the quickstarts [1]. It will add the correct dependencies. But usually it should be enough to include the flink-client_2.10 and the respective API dependency to your project (e.g. flink-streaming-scala_2.10). I think that you should be able to use the Flink Kafka connector 0.9 with Kafka 0.10.0 as long as Kafka 0.10.0 still supports the consumer API introduced with Kafka 0.9. But I might be wrong here. So best if you tried it out. [1] https://ci.apache.org/projects/flink/flink-docs-master/quickstart/java_api_quickstart.html Cheers, Till On Mon, Aug 1, 2016 at 3:48 PM, Sivawrote: > Hello Everyone, > > I m new to Flink, wanted to try streaming API using flink-kafka connector > in scala. > > But there are several versions of it. Please could some one help on below > questions > > what are the differences between flink-streaming-core and > flink-sreaming-scala[java]? > > Latest version of flink-streaming-scala of 0.10.2 version, does it mean > that streaming development in scala can only use Flink 0.10.2 not the > latest one 1.0.3? > > what is difference between flink-connector-kafka-base and > flink-connector-kafka-0.x.x? Is > flink-connector-kafka-0.9-2_11 compatible with Kafka 0.10.0? > > Thanks in advance. > > Thanks > Siva >
Re: how does flink assign windows to task
Yes you're right Sameer. That's how things work in Flink. Cheers, Till On Sun, Jul 31, 2016 at 12:33 PM, Sameer Wadkarwrote: > Vishnu, > > I would imagine based on Max's explanation and how other systems like > MapReduce and Spark partition keys, if you have 100 keys and 50 slots, 2 > keys would be assigned to each slot. Each slot would maintain one or more > windows (more for time based windows) and each window would have upto 2 > panes (depending on whether there are elements for a key for a given > window). The trigger would evaluate which of these panes will fire for > global window (count windows) or which window as a whole fires for a time > window. > > It seems like this is the only way to get the most efficient utilization > for the entire cluster and allow all keys to be evaluated simultaneously > without being starved by keys getting more elements in case of a slew. > > So I think you will need to have enough memory to hold all the elements > that can arrive for all the active windows (not triggered) for two keys in > a task. For count windows this is easy to estimate. But for times windows > it is less clear if you receive elements out of order. > > Let's see what Max replies. I am just reasoning based on how Flink should > work based on how other similar systems do it. > > Sameer > > > On Jul 29, 2016, at 9:51 PM, Vishnu Viswanath < > vishnu.viswanat...@gmail.com> wrote: > > Hi Max, > > Thanks for the explanation. > > "This happens one after another in a single task slot but in parallel > across all the task slots". > Could you explain more on how this happens in parallel? Which part does > occur in parallel? Is it the Trigger going through each pane and the window > function being executed. > As in the first example, if there are 100 Panes (since I have 1 window and > 100 keys) will trigger go through these 100 Panes using 50 task slots and > then execute whichever fires? Does that mean that Flink determines which > are the set of Panes that has to be evaluated in each task slot and then > the trigger goes through it? > > The reason I am trying to understand exactly how it works is because : I > need to decide how much memory each node in my cluster should have. I know > that a single pane would not cause OOM in my case(since the number of > elements per pane is not huge), but nodes might not have enough memory to > hold the entire window in memory (since I can have a large number of Panes). > > Thanks, > Vishnu > > > On Fri, Jul 29, 2016 at 4:29 AM, Maximilian Michels > wrote: > >> Hi Vishnu Viswanath, >> >> The keyed elements are spread across the 50 task slots (assuming you >> have a parallelism of 50) using hash partitioning on the keys. Each >> task slot runs one or multiple operators (depending on the slot >> sharing options). One of them is a WindowOperator which will decide >> when to trigger and process your keyed elements. >> >> The WindowOperator holds the WindowAssigner and the Trigger. The >> WindowAssigner will determine which window an incoming element gets >> assigned. Windows are kept for each key; the combination of window and >> key is usually called Pane. The Trigger will go through all the Panes >> and check if they should fire or not (whether the window function >> should be executed). This happens one after another in a single task >> slot but in parallel across all the task slots. >> >> Just a brief explanation. Hope it helps :) >> >> Cheers, >> Max >> >> On Thu, Jul 28, 2016 at 5:49 PM, Vishnu Viswanath >> wrote: >> > Hi, >> > >> > Lets say I have a window on a keyed stream, and I have about 100 unique >> > keys. >> > And assume I have about 50 tasks slots in my cluster. And suppose my >> trigger >> > fired 70/100 windows/pane at the same time. >> > >> > How will flink handle this? Will it assign 50/70 triggered windows to >> the 50 >> > available task slots and wait for 20 of them to finish before assigning >> the >> > remaining 20 to the slots? >> > >> > Thanks, >> > Vishnu Viswanath >> > >
How to read AVRO data from Kafka using Flink
Hi All, I am trying to read AVRO data from Kafka using Flink 1.0.3 but I am getting error. I have posted this issue in Stack Overflow: http://stackoverflow.com/questions/38698721/how-to-read-avro-data-from-kafka-using-flink . Is there any mistake we can try to look into? Thanks & Regards Zeeshan Alam
Re: Running yarn-session a kerberos secured Yarn/HBase cluster.
https://github.com/apache/flink/pull/2317 On Mon, Aug 1, 2016 at 11:54 AM, Niels Basjeswrote: > Thanks for the pointers towards the work you are doing here. > I'll put up a patch for the jars and such in the next few days. > https://issues.apache.org/jira/browse/FLINK-4287 > > Niels Basjes > > On Mon, Aug 1, 2016 at 11:46 AM, Stephan Ewen wrote: > >> Thank you for the breakdown of the problem. >> >> Option (1) or (2) would be the way to go, currently. >> >> The problem that (3) does not support HBase is simply solvable by adding >> the HBase jars to the lib directory. In the future, this should be solved >> by the YARN re-architecturing: >> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077 >> >> For the renewal of Kerberos tokens for streaming jobs: There is WIP and a >> pull request to attach key tabs to a Flink job: >> https://github.com/apache/flink/pull/2275 >> >> The problem that the YARN session is accessible by everyone is a bit more >> tricky. In the future, this should be solved by these two parts: >> - With the YARN re-achitecturing, sessions are bound to individual >> users. It should be possible to launch the session out of a single >> YarnExecutionEnvironment and then submit multiple jobs against it. >> - The over-the-wire encryption and authentication should make sure that >> no other user can send jobs to that session. >> >> Greetings, >> Stephan >> >> >> >> >> >> >> >> >> >> On Mon, Aug 1, 2016 at 9:47 AM, Niels Basjes wrote: >> >>> Hi, >>> >>> I have the situation that I have a Kerberos secured Yarn/HBase >>> installation and I want to export data from a lot (~200) HBase tables to >>> files on HDFS. >>> I wrote a flink job that does this exactly the way I want it for a >>> single table. >>> >>> Now in general I have a few possible approaches to do this for the 200 >>> tables I am facing: >>> >>> 1) Create a single job that reads the data from all of those tables and >>> writes them to the correct files. >>> I expect that to be a monster that will hog the entire cluster >>> because of the large number of HBase regions. >>> >>> 2) Run a job that does this for a single table and simply run that in a >>> loop. >>> Essentially I would have a shellscript or 'main' that loops over all >>> tablenames and run a flink job for each of those. >>> The downside of this is that it will start a new flink topology on >>> Yarn for each table. >>> This has a startup overhead of something like 30 seconds for each >>> table that I would like to avoid. >>> >>> 3) I start a singleyarn-session and submit my job in there 200 >>> times. >>> That would solve most of the startup overhead yet this doesn't work. >>> >>> If I start yarn-session then I see these two relevant lines in the >>> output. >>> >>> 2016-07-29 14:58:30,575 INFO org.apache.flink.yarn.Utils >>> - Attempting to obtain Kerberos security token for HBase >>> 2016-07-29 14:58:30,576 INFO org.apache.flink.yarn.Utils >>> - HBase is not available (not packaged with this >>> application): ClassNotFoundException : >>> "org.apache.hadoop.hbase.HBaseConfiguration". >>> >>> As a consequence any flink job I submit cannot access HBase at all. >>> >>> As an experiment I changed my yarn-session.sh script to include HBase on >>> the classpath. (If you want I can submit a Jira issue and a pull request) >>> Now the yarn-session does have HBase available and the jobs runs as >>> expected. >>> >>> There are how ever two problems that remain: >>> 1) This yarnsession is accessible by everyone on the cluster and as a >>> consequence they can run jobs in there that can access all data I have >>> access to. >>> 2) The kerberos token will expire after a while and (just like with all >>> long running jobs) I would really like to have this to be a 'long lived' >>> thing. >>> >>> As far as I know this is just the tip of the security ice berg and I >>> would like to know what the correct approach is to solve this. >>> >>> Thanks. >>> >>> -- >>> Best regards / Met vriendelijke groeten, >>> >>> Niels Basjes >>> >> >> > > > -- > Best regards / Met vriendelijke groeten, > > Niels Basjes > -- Best regards / Met vriendelijke groeten, Niels Basjes
Re: tumbling time window, date boundary and timezone
Aljoscha, Thank you for your response. It would be great if offset setting is available out-of-the box. In the meantime, I will use my custom version. Regards, Hironori 2016-07-29 19:29 GMT+09:00 Aljoscha Krettek: > Hi, > yes, I'm afraid you would have to use a custom version of the > TumblingProcessingTimeWindows right now. > > I've opened a Jira issue for adding an offset setting to the built-in window > assigners: https://issues.apache.org/jira/browse/FLINK-4282 > > Cheers, > Aljoscha > > On Tue, 26 Jul 2016 at 12:51 Hironori Ogibayashi > wrote: >> >> Hello, >> >> I want to calculate daily access count using Flink streaming. >> Flink's TumblingProcessingTimeWindow assigns events to windows of >> 00:00 GMT to 23:59 GMT each day, but I live in Japan (GMT+09:00) and >> want date boundaries to be 09:00 GMT (00:00 JST). >> Do I have to implement my own WindowAssigner for this use case? >> >> Thanks, >> Hironori Ogibayashi
Re: Running yarn-session a kerberos secured Yarn/HBase cluster.
Thank you for the breakdown of the problem. Option (1) or (2) would be the way to go, currently. The problem that (3) does not support HBase is simply solvable by adding the HBase jars to the lib directory. In the future, this should be solved by the YARN re-architecturing: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077 For the renewal of Kerberos tokens for streaming jobs: There is WIP and a pull request to attach key tabs to a Flink job: https://github.com/apache/flink/pull/2275 The problem that the YARN session is accessible by everyone is a bit more tricky. In the future, this should be solved by these two parts: - With the YARN re-achitecturing, sessions are bound to individual users. It should be possible to launch the session out of a single YarnExecutionEnvironment and then submit multiple jobs against it. - The over-the-wire encryption and authentication should make sure that no other user can send jobs to that session. Greetings, Stephan On Mon, Aug 1, 2016 at 9:47 AM, Niels Basjeswrote: > Hi, > > I have the situation that I have a Kerberos secured Yarn/HBase > installation and I want to export data from a lot (~200) HBase tables to > files on HDFS. > I wrote a flink job that does this exactly the way I want it for a single > table. > > Now in general I have a few possible approaches to do this for the 200 > tables I am facing: > > 1) Create a single job that reads the data from all of those tables and > writes them to the correct files. > I expect that to be a monster that will hog the entire cluster because > of the large number of HBase regions. > > 2) Run a job that does this for a single table and simply run that in a > loop. > Essentially I would have a shellscript or 'main' that loops over all > tablenames and run a flink job for each of those. > The downside of this is that it will start a new flink topology on > Yarn for each table. > This has a startup overhead of something like 30 seconds for each > table that I would like to avoid. > > 3) I start a singleyarn-session and submit my job in there 200 > times. > That would solve most of the startup overhead yet this doesn't work. > > If I start yarn-session then I see these two relevant lines in the output. > > 2016-07-29 14:58:30,575 INFO org.apache.flink.yarn.Utils > - Attempting to obtain Kerberos security token for HBase > 2016-07-29 14:58:30,576 INFO org.apache.flink.yarn.Utils > - HBase is not available (not packaged with this > application): ClassNotFoundException : > "org.apache.hadoop.hbase.HBaseConfiguration". > > As a consequence any flink job I submit cannot access HBase at all. > > As an experiment I changed my yarn-session.sh script to include HBase on > the classpath. (If you want I can submit a Jira issue and a pull request) > Now the yarn-session does have HBase available and the jobs runs as > expected. > > There are how ever two problems that remain: > 1) This yarnsession is accessible by everyone on the cluster and as a > consequence they can run jobs in there that can access all data I have > access to. > 2) The kerberos token will expire after a while and (just like with all > long running jobs) I would really like to have this to be a 'long lived' > thing. > > As far as I know this is just the tip of the security ice berg and I would > like to know what the correct approach is to solve this. > > Thanks. > > -- > Best regards / Met vriendelijke groeten, > > Niels Basjes >
Re: TimeWindowAll doeesn't assign properly
Thank you for helping the issue. Those single-element-windows arrive within seconds and delay is configured with watermark as 6 seconds. Following are some samples after investigated. ... {"hashCode":-1798107744,"count":1,"processAt":"2016-08-01T11:08:05.846","startDate":"2016-07-19T21:34:00.000"} {"hashCode":-1794280288,"count":42,"processAt":"2016-08-01T11:08:05.873","startDate":"2016-07-19T21:36:00.000"} {"hashCode":-1796184288,"count":9,"processAt":"2016-08-01T11:08:05.874","startDate":"2016-07-19T21:35:00.000"} {"hashCode":-1800043744,"count":1,"processAt":"2016-08-01T11:08:05.889","startDate":"2016-07-19T21:33:00.000"} {"hashCode":-1798107744,"count":1,"processAt":"2016-08-01T11:08:05.890","startDate":"2016-07-19T21:34:00.000"} {"hashCode":-1798107744,"count":1,"processAt":"2016-08-01T11:08:05.890","startDate":"2016-07-19T21:34:00.000"} {"hashCode":-1798107744,"count":1,"processAt":"2016-08-01T11:08:05.890","startDate":"2016-07-19T21:34:00.000"} {"hashCode":-1794280288,"count":1,"processAt":"2016-08-01T11:08:05.891","startDate":"2016-07-19T21:36:00.000"} ... "processAt" was generated as follows: @Override public void apply(TimeWindow timeWindow, Iterable values, Collector collector) throws Exception { DateTime startTs = new DateTime(timeWindow.getStart()); JSONObject jsonObject = new JSONObject(); int sum = 0; for (Correlation value : values){ sum += 1; } DateTime current = new DateTime(); //joda time jsonObject.put("startDate", startTs.toString()); jsonObject.put("count", sum); jsonObject.put("hashCode", timeWindow.hashCode()); jsonObject.put("processAt", current.toString()); collector.collect(jsonObject); } Is there other mistake we can try to look into? Best, Hung Chang -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/TimeWindowAll-doeesn-t-assign-properly-with-EventTime-tp8201p8229.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: Scala Table API with Java POJO
in org.apache.flink.api.table.plan.PlanTranslator. val inputType = set.getType().asInstanceOf[CompositeType[A]] if (!inputType.hasDeterministicFieldOrder && checkDeterministicFields) { throw new ExpressionException(s"You cannot rename fields upon Table creation: " + s"Field order of input type $inputType is not deterministic." ) } when A is a PojoType, hasDeterministicFieldOrder always returns false. what shall I do using Pojo. Thanks. > On Aug 1, 2016, at 6:11 PM, Dong-iL, Kimwrote: > > I’ve tried like this, but not work. > > dataSet.as(‘id as ‘id, ‘amount as ‘amount) > > dataSet.as(‘id, ‘amount) > > dataSet.as(“id, amount”) > > thanks. > >> On Aug 1, 2016, at 6:03 PM, Timo Walther wrote: >> >> I think you need to use ".as()" instead of "toTable()" to supply the field >> order. >> >> Am 01/08/16 um 10:56 schrieb Dong-iL, Kim: >>> Hi Timo. >>> I’m using scala API. >>> There is no error with java API. >>> my code snippet is this. >>> >>> dataSet.toTable >>>.groupBy(“id") >>>.select(‘id, ‘amount.sum as ‘amount) >>>.where(‘amount > 0) >>>.toDataSet[TestPojo] >>>.print() >>> >>> Thanks. >>> On Aug 1, 2016, at 5:50 PM, Timo Walther wrote: Hi Kim, as the exception says: POJOs have no deterministic field order. You have to specify the order during the DataSet to Table conversion: Table table = tableEnv.fromDataSet(pojoDataSet, "pojoField as a, pojoField2 as b"); I hope that helps. Otherwise it would help if you could supply a code snippet of your program. Timo Am 01/08/16 um 10:19 schrieb Dong-iL, Kim: > my flink ver is 1.0.3. > thanks. > >> On Aug 1, 2016, at 5:18 PM, Dong-iL, Kim wrote: >> >> I’ve create a program using table API and get an exception like this. >> org.apache.flink.api.table.ExpressionException: You cannot rename fields >> upon Table creation: Field order of input type PojoType<….> is not >> deterministic. >> There is an error not in java program, but in scala program. >> how can I use java POJO with scala Table API. >> -- Freundliche Grüße / Kind Regards Timo Walther Follow me: @twalthr https://www.linkedin.com/in/twalthr >> >> >> -- >> Freundliche Grüße / Kind Regards >> >> Timo Walther >> >> Follow me: @twalthr >> https://www.linkedin.com/in/twalthr >> >
Re: Scala Table API with Java POJO
Ok, then I think I have no better solution than use the Table API of the upcoming 1.1 release. The Table API has been completely rewritten and the POJO support is now much better. Maybe you could try the recent 1.1 RC1 release. Am 01/08/16 um 11:11 schrieb Dong-iL, Kim: I’ve tried like this, but not work. dataSet.as(‘id as ‘id, ‘amount as ‘amount) dataSet.as(‘id, ‘amount) dataSet.as(“id, amount”) thanks. On Aug 1, 2016, at 6:03 PM, Timo Waltherwrote: I think you need to use ".as()" instead of "toTable()" to supply the field order. Am 01/08/16 um 10:56 schrieb Dong-iL, Kim: Hi Timo. I’m using scala API. There is no error with java API. my code snippet is this. dataSet.toTable .groupBy(“id") .select(‘id, ‘amount.sum as ‘amount) .where(‘amount > 0) .toDataSet[TestPojo] .print() Thanks. On Aug 1, 2016, at 5:50 PM, Timo Walther wrote: Hi Kim, as the exception says: POJOs have no deterministic field order. You have to specify the order during the DataSet to Table conversion: Table table = tableEnv.fromDataSet(pojoDataSet, "pojoField as a, pojoField2 as b"); I hope that helps. Otherwise it would help if you could supply a code snippet of your program. Timo Am 01/08/16 um 10:19 schrieb Dong-iL, Kim: my flink ver is 1.0.3. thanks. On Aug 1, 2016, at 5:18 PM, Dong-iL, Kim wrote: I’ve create a program using table API and get an exception like this. org.apache.flink.api.table.ExpressionException: You cannot rename fields upon Table creation: Field order of input type PojoType<….> is not deterministic. There is an error not in java program, but in scala program. how can I use java POJO with scala Table API. -- Freundliche Grüße / Kind Regards Timo Walther Follow me: @twalthr https://www.linkedin.com/in/twalthr -- Freundliche Grüße / Kind Regards Timo Walther Follow me: @twalthr https://www.linkedin.com/in/twalthr -- Freundliche Grüße / Kind Regards Timo Walther Follow me: @twalthr https://www.linkedin.com/in/twalthr
Re: Scala Table API with Java POJO
I’ve tried like this, but not work. dataSet.as(‘id as ‘id, ‘amount as ‘amount) dataSet.as(‘id, ‘amount) dataSet.as(“id, amount”) thanks. > On Aug 1, 2016, at 6:03 PM, Timo Waltherwrote: > > I think you need to use ".as()" instead of "toTable()" to supply the field > order. > > Am 01/08/16 um 10:56 schrieb Dong-iL, Kim: >> Hi Timo. >> I’m using scala API. >> There is no error with java API. >> my code snippet is this. >> >> dataSet.toTable >> .groupBy(“id") >> .select(‘id, ‘amount.sum as ‘amount) >> .where(‘amount > 0) >> .toDataSet[TestPojo] >> .print() >> >> Thanks. >> >>> On Aug 1, 2016, at 5:50 PM, Timo Walther wrote: >>> >>> Hi Kim, >>> >>> as the exception says: POJOs have no deterministic field order. You have to >>> specify the order during the DataSet to Table conversion: >>> >>> Table table = tableEnv.fromDataSet(pojoDataSet, "pojoField as a, pojoField2 >>> as b"); >>> >>> I hope that helps. Otherwise it would help if you could supply a code >>> snippet of your program. >>> >>> Timo >>> >>> Am 01/08/16 um 10:19 schrieb Dong-iL, Kim: my flink ver is 1.0.3. thanks. > On Aug 1, 2016, at 5:18 PM, Dong-iL, Kim wrote: > > I’ve create a program using table API and get an exception like this. > org.apache.flink.api.table.ExpressionException: You cannot rename fields > upon Table creation: Field order of input type PojoType<….> is not > deterministic. > There is an error not in java program, but in scala program. > how can I use java POJO with scala Table API. > >>> >>> -- >>> Freundliche Grüße / Kind Regards >>> >>> Timo Walther >>> >>> Follow me: @twalthr >>> https://www.linkedin.com/in/twalthr >>> > > > -- > Freundliche Grüße / Kind Regards > > Timo Walther > > Follow me: @twalthr > https://www.linkedin.com/in/twalthr >
Re: Scala Table API with Java POJO
I think you need to use ".as()" instead of "toTable()" to supply the field order. Am 01/08/16 um 10:56 schrieb Dong-iL, Kim: Hi Timo. I’m using scala API. There is no error with java API. my code snippet is this. dataSet.toTable .groupBy(“id") .select(‘id, ‘amount.sum as ‘amount) .where(‘amount > 0) .toDataSet[TestPojo] .print() Thanks. On Aug 1, 2016, at 5:50 PM, Timo Waltherwrote: Hi Kim, as the exception says: POJOs have no deterministic field order. You have to specify the order during the DataSet to Table conversion: Table table = tableEnv.fromDataSet(pojoDataSet, "pojoField as a, pojoField2 as b"); I hope that helps. Otherwise it would help if you could supply a code snippet of your program. Timo Am 01/08/16 um 10:19 schrieb Dong-iL, Kim: my flink ver is 1.0.3. thanks. On Aug 1, 2016, at 5:18 PM, Dong-iL, Kim wrote: I’ve create a program using table API and get an exception like this. org.apache.flink.api.table.ExpressionException: You cannot rename fields upon Table creation: Field order of input type PojoType<….> is not deterministic. There is an error not in java program, but in scala program. how can I use java POJO with scala Table API. -- Freundliche Grüße / Kind Regards Timo Walther Follow me: @twalthr https://www.linkedin.com/in/twalthr -- Freundliche Grüße / Kind Regards Timo Walther Follow me: @twalthr https://www.linkedin.com/in/twalthr
Re: Scala Table API with Java POJO
Hi Timo. I’m using scala API. There is no error with java API. my code snippet is this. dataSet.toTable .groupBy(“id") .select(‘id, ‘amount.sum as ‘amount) .where(‘amount > 0) .toDataSet[TestPojo] .print() Thanks. > On Aug 1, 2016, at 5:50 PM, Timo Waltherwrote: > > Hi Kim, > > as the exception says: POJOs have no deterministic field order. You have to > specify the order during the DataSet to Table conversion: > > Table table = tableEnv.fromDataSet(pojoDataSet, "pojoField as a, pojoField2 > as b"); > > I hope that helps. Otherwise it would help if you could supply a code snippet > of your program. > > Timo > > Am 01/08/16 um 10:19 schrieb Dong-iL, Kim: >> my flink ver is 1.0.3. >> thanks. >> >>> On Aug 1, 2016, at 5:18 PM, Dong-iL, Kim wrote: >>> >>> I’ve create a program using table API and get an exception like this. >>> org.apache.flink.api.table.ExpressionException: You cannot rename fields >>> upon Table creation: Field order of input type PojoType<….> is not >>> deterministic. >>> There is an error not in java program, but in scala program. >>> how can I use java POJO with scala Table API. >>> > > > -- > Freundliche Grüße / Kind Regards > > Timo Walther > > Follow me: @twalthr > https://www.linkedin.com/in/twalthr >
Re: Scala Table API with Java POJO
Hi Kim, as the exception says: POJOs have no deterministic field order. You have to specify the order during the DataSet to Table conversion: Table table = tableEnv.fromDataSet(pojoDataSet, "pojoField as a, pojoField2 as b"); I hope that helps. Otherwise it would help if you could supply a code snippet of your program. Timo Am 01/08/16 um 10:19 schrieb Dong-iL, Kim: my flink ver is 1.0.3. thanks. On Aug 1, 2016, at 5:18 PM, Dong-iL, Kimwrote: I’ve create a program using table API and get an exception like this. org.apache.flink.api.table.ExpressionException: You cannot rename fields upon Table creation: Field order of input type PojoType<….> is not deterministic. There is an error not in java program, but in scala program. how can I use java POJO with scala Table API. -- Freundliche Grüße / Kind Regards Timo Walther Follow me: @twalthr https://www.linkedin.com/in/twalthr
Re: Flink and SQL
Hi Davran, unregistering tables is not possible at the moment. I have created an issue for this: https://issues.apache.org/jira/browse/FLINK-4288 Timo Am 29/07/16 um 20:24 schrieb Davran Muzafarov: Hi, I could not find the way to reuse table names. tableEnv = TableEnvironment.getTableEnvironment(env); . Table table = tableEnv.registerTable( "table1", table ); Table table = tableEnv.sql( "select * from table1" ); . . Is it possible to "unregister" table or replace it with another one? Thank you. -- Freundliche Grüße / Kind Regards Timo Walther Follow me: @twalthr https://www.linkedin.com/in/twalthr
Re: [VOTE] Release Apache Flink 1.1.0 (RC1)
Just tried to reproduce the error reported by Aljoscha, but could not. I used a clean checkpoint of the RC1 code and cleaned all local maven caches before the testing. @Aljoscha: Can you reproduce this on your machine? Can you try and clean the maven caches? On Sun, Jul 31, 2016 at 7:31 PM, Ufuk Celebiwrote: > Probably related to shading :( What's strange is that Travis builds > for Hadoop 2.6.3 with the release-1.1 branch do succeed (sometimes... > Travis is super flakey at the moment, because of some corrupted cached > dependencies): https://travis-ci.org/apache/flink/jobs/148348699 > > On Fri, Jul 29, 2016 at 4:19 PM, Aljoscha Krettek > wrote: > > When running "mvn clean verify" with Hadoop version 2.6.1 the > > Zookeeper/Leader Election tests fail with this: > > > > java.lang.NoSuchMethodError: > > > org.apache.curator.utils.PathUtils.validatePath(Ljava/lang/String;)Ljava/lang/String; > > at > > > org.apache.curator.framework.imps.NamespaceImpl.(NamespaceImpl.java:37) > > at > > > org.apache.curator.framework.imps.CuratorFrameworkImpl.(CuratorFrameworkImpl.java:113) > > at > > > org.apache.curator.framework.CuratorFrameworkFactory$Builder.build(CuratorFrameworkFactory.java:124) > > at > > > org.apache.flink.runtime.util.ZooKeeperUtils.startCuratorFramework(ZooKeeperUtils.java:101) > > at > > > org.apache.flink.runtime.util.ZooKeeperUtils.createLeaderRetrievalService(ZooKeeperUtils.java:143) > > at > > > org.apache.flink.runtime.util.LeaderRetrievalUtils.createLeaderRetrievalService(LeaderRetrievalUtils.java:70) > > at > > > org.apache.flink.runtime.leaderelection.ZooKeeperLeaderRetrievalTest.testTimeoutOfFindConnectingAddress(ZooKeeperLeaderRetrievalTest.java:187) > > > > I'll continue testing other parts and other Hadoop versions. > > > > On Wed, 27 Jul 2016 at 11:51 Ufuk Celebi wrote: > > > >> Dear Flink community, > >> > >> Please vote on releasing the following candidate as Apache Flink version > >> 1.1.0. > >> > >> I've CC'd user@flink.apache.org as users are encouraged to help > >> testing Flink 1.1.0 for their specific use cases. Please feel free to > >> report issues and successful tests on d...@flink.apache.org. > >> > >> The commit to be voted on: > >> 3a18463 (http://git-wip-us.apache.org/repos/asf/flink/commit/3a18463) > >> > >> Branch: > >> release-1.1.0-rc1 > >> ( > >> > https://git1-us-west.apache.org/repos/asf/flink/repo?p=flink.git;a=shortlog;h=refs/heads/release-1.1.0-rc1 > >> ) > >> > >> The release artifacts to be voted on can be found at: > >> http://people.apache.org/~uce/flink-1.1.0-rc1/ > >> > >> The release artifacts are signed with the key with fingerprint 9D403309: > >> http://www.apache.org/dist/flink/KEYS > >> > >> The staging repository for this release can be found at: > >> https://repository.apache.org/content/repositories/orgapacheflink-1098 > >> > >> There is also a Google doc to coordinate the testing efforts. This is > >> a copy of the release document found in our Wiki: > >> > >> > https://docs.google.com/document/d/1cDZGtnGJKLU1fLw8AE_FzkoDLOR8amYT2oc3mD0_lw4/edit?usp=sharing > >> > >> - > >> > >> Thanks to everyone who contributed to this release candidate. > >> > >> The vote is open for the next 3 days (not counting the weekend) and > >> passes if a majority of at least three +1 PMC votes are cast. > >> > >> The vote ends on Monday August 1st, 2016. > >> > >> [ ] +1 Release this package as Apache Flink 1.1.0 > >> [ ] -1 Do not release this package, because ... > >> >
Re: Scala Table API with Java POJO
my flink ver is 1.0.3. thanks. > On Aug 1, 2016, at 5:18 PM, Dong-iL, Kimwrote: > > I’ve create a program using table API and get an exception like this. > org.apache.flink.api.table.ExpressionException: You cannot rename fields upon > Table creation: Field order of input type PojoType<….> is not deterministic. > There is an error not in java program, but in scala program. > how can I use java POJO with scala Table API. >
Scala Table API with Java POJO
I’ve create a program using table API and get an exception like this. org.apache.flink.api.table.ExpressionException: You cannot rename fields upon Table creation: Field order of input type PojoType<….> is not deterministic. There is an error not in java program, but in scala program. how can I use java POJO with scala Table API.
Flink Streaming API
Hello Everyone, I m new to Flink, wanted to try streaming API using flink-kafka connector in scala. But there are several versions of it. Please could some one help on below questions what are the differences between flink-streaming-core and flink-sreaming-scala[java]? Latest version of flink-streaming-scala of 0.10.2 version, does it mean that streaming development in scala can only use Flink 0.10.2 not the latest one 1.0.3? what is difference between flink-connector-kafka-base and flink-connector-kafka-0.x.x? Is flink-connector-kafka-0.9-2_11 compatible with Kafka 0.10.0? Thanks in advance. Thanks Siva
Running yarn-session a kerberos secured Yarn/HBase cluster.
Hi, I have the situation that I have a Kerberos secured Yarn/HBase installation and I want to export data from a lot (~200) HBase tables to files on HDFS. I wrote a flink job that does this exactly the way I want it for a single table. Now in general I have a few possible approaches to do this for the 200 tables I am facing: 1) Create a single job that reads the data from all of those tables and writes them to the correct files. I expect that to be a monster that will hog the entire cluster because of the large number of HBase regions. 2) Run a job that does this for a single table and simply run that in a loop. Essentially I would have a shellscript or 'main' that loops over all tablenames and run a flink job for each of those. The downside of this is that it will start a new flink topology on Yarn for each table. This has a startup overhead of something like 30 seconds for each table that I would like to avoid. 3) I start a singleyarn-session and submit my job in there 200 times. That would solve most of the startup overhead yet this doesn't work. If I start yarn-session then I see these two relevant lines in the output. 2016-07-29 14:58:30,575 INFO org.apache.flink.yarn.Utils - Attempting to obtain Kerberos security token for HBase 2016-07-29 14:58:30,576 INFO org.apache.flink.yarn.Utils - HBase is not available (not packaged with this application): ClassNotFoundException : "org.apache.hadoop.hbase.HBaseConfiguration". As a consequence any flink job I submit cannot access HBase at all. As an experiment I changed my yarn-session.sh script to include HBase on the classpath. (If you want I can submit a Jira issue and a pull request) Now the yarn-session does have HBase available and the jobs runs as expected. There are how ever two problems that remain: 1) This yarnsession is accessible by everyone on the cluster and as a consequence they can run jobs in there that can access all data I have access to. 2) The kerberos token will expire after a while and (just like with all long running jobs) I would really like to have this to be a 'long lived' thing. As far as I know this is just the tip of the security ice berg and I would like to know what the correct approach is to solve this. Thanks. -- Best regards / Met vriendelijke groeten, Niels Basjes