flink build error
Hi, I am trying to build flink 1.6 but cannot build it to run also the tests. Any ideas of why the surefire error to run junits tests? [ERROR] Failed to execute goal org.apache.maven.plugins:maven-surefire-plugin:2.18.1:test (default-test) on project flink-test-utils-junit: ExecutionException: java.lang.RuntimeException: The forked VM terminated without properly saying goodbye. VM crash or System.exit called? [ERROR] Command was /bin/sh -c cd /home/rtudoran/git/flink16Release/flink/flink-test-utils-parent/flink-test-utils-junit && /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java -Xms256m -Xmx2048m -Dmvn.forkNumber=2 -XX:+UseG1GC -jar /home/rtudoran/git/flink16Release/flink/flink-test-utils-parent/flink-test-utils-junit/target/surefire/surefirebooter9129098759083326906.jar /home/rtudoran/git/flink16Release/flink/flink-test-utils-parent/flink-test-utils-junit/target/surefire/surefire1374644174200907236tmp /home/rtudoran/git/flink16Release/flink/flink-test-utils-parent/flink-test-utils-junit/target/surefire/surefire_0656703638108827545tmp Thanks
FW: What's the advantage of using BroadcastState?
Thanks. For the explanation – I was suspected it might be like this and I wanted to double check before building inconsistent programs ☺) Would it be interesting for the community to have also something that would also be able to share/broadcast items from one task to the other tasks. Spark for example has this as the broadcast behavior, and it can be useful for different algorithms and applications. For example if one build a clustering algorithm running in parallel, then you can have a task updating the cluster centroids, that would advertise to the others afterwards (just one of the examples). I am thinking we can achieve this using a combination of the broadcast stream and the iterative operators. Basically if a task makes an update to the state, this value should be push back (upstream) such that afterwards it can be broadcasted to all the task using it. Let me know what you think.. Dr. Radu Tudoran Staff Research Engineer - Big Data Expert IT R Division [cid:image007.jpg@01CD52EB.AD060EE0] HUAWEI TECHNOLOGIES Duesseldorf GmbH German Research Center Munich Office Riesstrasse 25, 80992 München E-mail: radu.tudo...@huawei.com<mailto:radu.tudo...@huawei.com> Mobile: +49 15209084330 Telephone: +49 891588344173 HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com<http://www.huawei.com/> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing Director: Bo PENG, Qiuen Peng, Shengli Wang Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it! From: Fabian Hueske [mailto:fhue...@gmail.com] Sent: Tuesday, August 28, 2018 9:53 AM To: Xingcan Cui mailto:xingc...@gmail.com>> Cc: Radu Tudoran mailto:radu.tudo...@huawei.com>>; user mailto:user@flink.apache.org>> Subject: Re: What's the advantage of using BroadcastState? Hi, Xingcan is right. There is no hidden state synchronization happening. You have to ensure that the broadcast state is the same at every parallel instance. Hence, it should only be modified by the processBroadcastElement() method that receives the same broadcasted elements on all task instance. The API tries to help users to not violate the contract, however it is not bullet proof. Side-passing information in a local variable (as suggested by you) cannot be prevented and would lead to inconsistencies. Best, Fabian Am Mo., 27. Aug. 2018 um 16:51 Uhr schrieb Xingcan Cui mailto:xingc...@gmail.com>>: Hi Radu, I cannot make a full understanding of your question but I guess the answer is NO. The broadcast state pattern just provides you with an automatic data broadcasting and a bunch of map states to cache the "low-throughput” patterns. Also, to keep consistency, it forbid the `processElement()` to modify the states. But this API does not really broadcast the states. You should keep the logic for `processBraodcastElement()` deterministic. Maybe the equation below could make the pattern clear. + = = Best, Xingcan On Aug 27, 2018, at 10:23 PM, Radu Tudoran mailto:radu.tudo...@huawei.com>> wrote: Hi Fabian, Thanks for the blog post about broadcast state. I have a question with respect to the update capabilities of the broadcast state: Assume you do whatever processing logic in the main processElement function .. and at a given context marker you 1) would change a local field marker, to 2) signal that next time the broadcast function is triggered a special pattern should be created and broadcasted. My question is: is such a behavior allowed? Would the new special Pattern that originates in an operator be shared across the other instances of the KeyedProcessFunction? public static class PatternEvaluator extends KeyedBroadcastProcessFunction> { public bolean test = false; @Override public void processElement( Action action, ReadOnlyContext ctx, Collector> out) throws Exception { //…logic if (..whatever context) { Test = true; } } @Override public void processBroadcastElement( Pattern pattern, Context ctx, Collector> out) throws Exception { // store the new pattern by updating the broadcast state BroadcastState bcState = ctx.getBroadcastState(new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class))); // storing in MapState with null as VOID default value bcState.put(null, pattern); I
RE: What's the advantage of using BroadcastState?
Hi Fabian, Thanks for the blog post about broadcast state. I have a question with respect to the update capabilities of the broadcast state: Assume you do whatever processing logic in the main processElement function .. and at a given context marker you 1) would change a local field marker, to 2) signal that next time the broadcast function is triggered a special pattern should be created and broadcasted. My question is: is such a behavior allowed? Would the new special Pattern that originates in an operator be shared across the other instances of the KeyedProcessFunction? public static class PatternEvaluator extends KeyedBroadcastProcessFunction> { public bolean test = false; @Override public void processElement( Action action, ReadOnlyContext ctx, Collector> out) throws Exception { //…logic if (..whatever context) { Test = true; } } @Override public void processBroadcastElement( Pattern pattern, Context ctx, Collector> out) throws Exception { // store the new pattern by updating the broadcast state BroadcastState bcState = ctx.getBroadcastState(new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class))); // storing in MapState with null as VOID default value bcState.put(null, pattern); If (test) { bcState.put(null, new Pattern(test) ); } } } Dr. Radu Tudoran Staff Research Engineer - Big Data Expert IT R Division [cid:image007.jpg@01CD52EB.AD060EE0] HUAWEI TECHNOLOGIES Duesseldorf GmbH German Research Center Munich Office Riesstrasse 25, 80992 München E-mail: radu.tudo...@huawei.com<mailto:radu.tudo...@huawei.com> Mobile: +49 15209084330 Telephone: +49 891588344173 HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com<http://www.huawei.com/> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing Director: Bo PENG, Qiuen Peng, Shengli Wang Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it! From: Fabian Hueske [mailto:fhue...@gmail.com] Sent: Monday, August 20, 2018 9:40 AM To: Paul Lam Cc: Rong Rong ; Hequn Cheng ; user Subject: Re: What's the advantage of using BroadcastState? Hi, I've recently published a blog post about Broadcast State [1]. Cheers, Fabian [1] https://data-artisans.com/blog/a-practical-guide-to-broadcast-state-in-apache-flink 2018-08-20 3:58 GMT+02:00 Paul Lam mailto:paullin3...@gmail.com>>: Hi Rong, Hequn Your answers are very helpful! Thank you! Best Regards, Paul Lam 在 2018年8月19日,23:30,Rong Rong mailto:walter...@gmail.com>> 写道: Hi Paul, To add to Hequn's answer. Broadcast state can typically be used as "a low-throughput stream containing a set of rules which we want to evaluate against all elements coming from another stream" [1] So to add to the difference list is: whether it is "broadcast" across all keys if processing a keyed stream. This is typically when it is not possible to derive same key field using KeySelector in CoStream. Another additional difference is performance: BroadcastStream is "stored locally and is used to process all incoming elements on the other stream" thus requires to carefully manage the size of the BroadcastStream. [1]: https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/broadcast_state.html On Sun, Aug 19, 2018 at 1:40 AM Hequn Cheng mailto:chenghe...@gmail.com>> wrote: Hi Paul, There are some differences: 1. The BroadcastStream can broadcast data for you, i.e, data will be broadcasted to all downstream tasks automatically. 2. To guarantee that the contents in the Broadcast State are the same across all parallel instances of our operator, read-write access is only given to the broadcast side 3. For BroadcastState, flink guarantees that upon restoring/rescaling there will be no duplicates and no missing data. In case of recovery with the same or smaller parallelism, each task reads its checkpointed state. Upon scaling up, each task reads its own state, and the remaining tasks (p_new-p_old) read checkpoints of previous tasks in a round-robin manner. While MapState doesn't have such abilities. Best, Hequn On Sun, Aug 19, 2018 at 11:18 AM, Paul Lam mailto:paullin3...@gmail.com>> wrote: Hi, AFAIK, the difference between a BroadcastStream and a normal DataStream
RE: concurrency?
Hi, Yes it does – thanks a lot Knowing that this is the order time = 2, onTimer(2) -> access state with key t=2-1, get A, B time = 2, pocessElement(C) -> put C in state keyed to t=2, registerProcTimer(3) is useful! Dr. Radu Tudoran Senior Research Engineer - Big Data Expert IT R Division [cid:image007.jpg@01CD52EB.AD060EE0] HUAWEI TECHNOLOGIES Duesseldorf GmbH German Research Center Munich Office Riesstrasse 25, 80992 München E-mail: radu.tudo...@huawei.com<mailto:radu.tudo...@huawei.com> Mobile: +49 15209084330 Telephone: +49 891588344173 HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com<http://www.huawei.com/> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing Director: Bo PENG, Qiuen Peng, Shengli Wang Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it! From: Fabian Hueske [mailto:fhue...@gmail.com] Sent: Friday, March 31, 2017 12:00 PM To: Radu Tudoran Cc: user@flink.apache.org Subject: Re: concurrency? Hi Radu, timers are fired in order of their time stamps. Multiple timers on the same time are deduplicated. if you have the following logic: time = 1, processElement(A) -> put A in state keyed to t=1, registerProcTimer(2) time = 1, processElement(B) -> put B in state keyed to t=1, registerProcTimer(2) // deduplicated time = 2, onTimer(2) -> access state with key t=2-1, get A, B time = 2, pocessElement(C) -> put C in state keyed to t=2, registerProcTimer(3) ... You get all calls in the right order. Does that answer you questions? 2017-03-31 11:36 GMT+02:00 Radu Tudoran <radu.tudo...@huawei.com<mailto:radu.tudo...@huawei.com>>: Hi, Thanks Fabian. But is there also a fixed order that is imposed in their execution? I am asking this because it is not enough just to have them executed atomically. If once you have the processElement() being called and then onTimer(), and in the next called you have them vice versa, it would mean that you need additional mechanism to synchronize your logic. Right? For example if in the process element you do state.update (newValue) and in the ontimer you do out.collect(state.getValue()) than if you have ev1,ev2 and eve3 coming at consecutive times and once the function are executed processelement and than timer and then in reverse order your output would be: time1: (processElement)ev1 –arrives state=ev1 time2: (processElement – executed first) ev2-arrives state=ev2 onTime(executed second): out = ev2 time3: (processElement – executed second) ev3-arrives state=ev3 onTime(executed first): out = ev2 Best regards, Dr. Radu Tudoran Senior Research Engineer - Big Data Expert IT R Division [cid:image007.jpg@01CD52EB.AD060EE0] HUAWEI TECHNOLOGIES Duesseldorf GmbH German Research Center Munich Office Riesstrasse 25, 80992 München E-mail: radu.tudo...@huawei.com<mailto:radu.tudo...@huawei.com> Mobile: +49 15209084330<tel:+49%201520%209084330> Telephone: +49 891588344173<tel:+49%2089%201588344173> HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com<http://www.huawei.com/> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing Director: Bo PENG, Qiuen Peng, Shengli Wang Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it! From: Fabian Hueske [mailto:fhue...@gmail.com<mailto:fhue...@gmail.com>] Sent: Friday, March 31, 2017 11:05 AM To: Radu Tudoran Cc: user@flink.apache.org<mailto:user@flink.apache.org> Subject: Re: concurrency? Hi Radu, the processElement() and onTimer() calls are synchronized by a lock, i.e., they won't be called at the same time. Best, Fabian 2017-03-31 9:34 GMT+02:00 Radu Tudoran <radu.tudo...@huawei.com<mailto:radu.tudo...@huawei.com>>: Hi, I wo
RE: concurrency?
Hi, Thanks Fabian. But is there also a fixed order that is imposed in their execution? I am asking this because it is not enough just to have them executed atomically. If once you have the processElement() being called and then onTimer(), and in the next called you have them vice versa, it would mean that you need additional mechanism to synchronize your logic. Right? For example if in the process element you do state.update (newValue) and in the ontimer you do out.collect(state.getValue()) than if you have ev1,ev2 and eve3 coming at consecutive times and once the function are executed processelement and than timer and then in reverse order your output would be: time1: (processElement)ev1 –arrives state=ev1 time2: (processElement – executed first) ev2-arrives state=ev2 onTime(executed second): out = ev2 time3: (processElement – executed second) ev3-arrives state=ev3 onTime(executed first): out = ev2 Best regards, Dr. Radu Tudoran Senior Research Engineer - Big Data Expert IT R Division [cid:image007.jpg@01CD52EB.AD060EE0] HUAWEI TECHNOLOGIES Duesseldorf GmbH German Research Center Munich Office Riesstrasse 25, 80992 München E-mail: radu.tudo...@huawei.com<mailto:radu.tudo...@huawei.com> Mobile: +49 15209084330 Telephone: +49 891588344173 HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com<http://www.huawei.com/> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing Director: Bo PENG, Qiuen Peng, Shengli Wang Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it! From: Fabian Hueske [mailto:fhue...@gmail.com] Sent: Friday, March 31, 2017 11:05 AM To: Radu Tudoran Cc: user@flink.apache.org Subject: Re: concurrency? Hi Radu, the processElement() and onTimer() calls are synchronized by a lock, i.e., they won't be called at the same time. Best, Fabian 2017-03-31 9:34 GMT+02:00 Radu Tudoran <radu.tudo...@huawei.com<mailto:radu.tudo...@huawei.com>>: Hi, I would like to use a processFunction to accumulate elements. Therefore in the processElement function I will accumulate this element into a state. However, I would like to emit the output only 1ms later. Therefore I would register a timer to trigger one second later and read the state and emit it. However, I am curious of what happens if in the next ms another event arrives. In principle both the processElement function and the onTimer function should be triggered in the same time. My question is: is there a fix order to execute them? Because if any of them work just like normal threads, than concurrency related issues could happen when accessing the state.
concurrency?
Hi, I would like to use a processFunction to accumulate elements. Therefore in the processElement function I will accumulate this element into a state. However, I would like to emit the output only 1ms later. Therefore I would register a timer to trigger one second later and read the state and emit it. However, I am curious of what happens if in the next ms another event arrives. In principle both the processElement function and the onTimer function should be triggered in the same time. My question is: is there a fix order to execute them? Because if any of them work just like normal threads, than concurrency related issues could happen when accessing the state.
RE: Dummy DataStream
Hi Duck, I am not 100% sure I understand your exact scenario but I will try to give you some pointers, maybe it will help. Typically when you do the split you have some knowledge about the criterion to do the split. For example if you follow the example from the website https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/datastream_api.html SplitStream split = someDataStream.split(new OutputSelector() { @Override public Iterable select(Integer value) { List output = new ArrayList(); if (value % 2 == 0) { output.add("even"); } else { output.add("odd"); } return output; } }); You would know you have a stream for even and odd and then you can collect them in your list by doing myList.add(split.select("even")); myList.add(split.select("odd")); for that matter, the SplitStream object kind of does the same. I would say that you have 2 options from this to get your full stream back: You can use the option from the website: DataStream all = split.select("even","odd"); Which I believe does not work as you might have some operations performed on the splits. The other option is to use union, which aggregates the independent streams without a specific condition like a join. You could do something like For(DataStream stream:myList) allStream = allStream.union(stream) From: Duck [mailto:k...@protonmail.com] Sent: Thursday, January 26, 2017 9:08 PM To: user@flink.apache.org Subject: Dummy DataStream I have a project where i am reading in on a single DataStream from Kafka, then sending to a variable number of handlers based on content of the recieved data, after that i want to join them all. Since i do not know how many different streams this will create, i cannot have a single "base" to performa a Join operation on. So my question is, can i create a "dummy / empty" DataStream to use as a join basis? Example: 1) DataStream all = .. 2) Create a ListmyList; 3) Then i split the "all" datastream based on content, and add each stream to "myList" 4) I now parse each of the different streams 5) I now want to join my list of streams, "myList" to a DataStream all_joined_again; /Duck
RE: [DISCUSS] "Who's hiring on Flink" monthly thread in the mailing lists?
Hi, I think the idea of having such a monthly thread is very good and it might even help to further attract new people in the community. In the same time I do not think that 1 extra mail per month is necessary a spam ☺ In the same time – we can also consider a jobs@flink mailing list Dr. Radu Tudoran Senior Research Engineer - Big Data Expert IT R Division [cid:image007.jpg@01CD52EB.AD060EE0] HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center Riesstrasse 25, 80992 München E-mail: radu.tudo...@huawei.com<mailto:radu.tudo...@huawei.com> Mobile: +49 15209084330 Telephone: +49 891588344173 HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com<http://www.huawei.com/> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it! From: Kanstantsin Kamkou [mailto:kkam...@gmail.com] Sent: Wednesday, December 07, 2016 9:57 PM To: user@flink.apache.org Subject: Re: [DISCUSS] "Who's hiring on Flink" monthly thread in the mailing lists? Is it possible to avoid such a spam here? If I need a new job, I could search it. The same way I might want to subscribe to a different thread, like jobs@flink. * The idea itself is great. On Tue, 6 Dec 2016 at 14:04, Kostas Tzoumas <ktzou...@apache.org<mailto:ktzou...@apache.org>> wrote: yes, of course! On Tue, Dec 6, 2016 at 12:54 PM, Márton Balassi <balassi.mar...@gmail.com<mailto:balassi.mar...@gmail.com>> wrote: +1. It keeps it both organized and to a reasonable minimum overhead. Would you volunteer for starting the mail thread each month then, Kostas? Best, Marton On Tue, Dec 6, 2016 at 6:42 AM, Kostas Tzoumas <ktzou...@apache.org<mailto:ktzou...@apache.org>> wrote: Hi folks, I'd like to see how the community feels about a monthly "Who is hiring on Flink" email thread on the dev@ and user@ mailing lists where folks can post job positions related to Flink. I personally think that posting individual job offerings in the mailing list is off-topic (hence I have refrained to do that wearing my company hat, and I have discouraged others when they asked for my opinion on this), but I thought that a monthly thread like this would be both helpful to the community and not cause overhead. Cheers, Kostas
RE: TIMESTAMP TypeInformation
OK. I will open the JIRA From: Fabian Hueske [mailto:fhue...@gmail.com] Sent: Thursday, October 27, 2016 9:56 AM To: user@flink.apache.org Subject: Re: TIMESTAMP TypeInformation Yes, I think you are right. TypeInfoParser needs to be extended to parse the java.sql.* types into the corresponding TypeInfos. Can you open a JIRA for that? Thanks, Fabian 2016-10-27 9:31 GMT+02:00 Radu Tudoran <radu.tudo...@huawei.com<mailto:radu.tudo...@huawei.com>>: Hi, I dig meanwhile more through this and I think I found a bug actually. The scenario that I was trying to describe was something like 1. You have a generic datastream with Tuple (alternatively I could move to row I guess) and you get the data from whatever stream (e.g., kafka, network socket…) 2. In the map/flat map function you parse and instantiate the tuple generically 3. In the “returns()” function of the map you enforce the types DataStream = env.socketTextStream(…) .map(new mapFunction(){ Public Tuple map(String value){ Tuple out = Tuple.getTupleClass(#) … out.setField(SqlTimeTypeInfo.TIMESTAMP,0) … }}) .returns(“Tuple#<java.sql.TIMESTAMP,…>”); The problem is that if you rely on the type extraction mechanism called after the returns to recognize TIMESTAMP of type SqlTimeTypeInfo it will not happen but instead a GenericType will be created. It looks like the type parsers were not extended to consider this types Dr. Radu Tudoran Senior Research Engineer - Big Data Expert IT R Division [cid:image007.jpg@01CD52EB.AD060EE0] HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center Riesstrasse 25, 80992 München E-mail: radu.tudo...@huawei.com<mailto:radu.tudo...@huawei.com> Mobile: +49 15209084330<tel:%2B49%2015209084330> Telephone: +49 891588344173<tel:%2B49%20891588344173> HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com<http://www.huawei.com/> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it! From: Fabian Hueske [mailto:fhue...@gmail.com<mailto:fhue...@gmail.com>] Sent: Wednesday, October 26, 2016 10:11 PM To: user@flink.apache.org<mailto:user@flink.apache.org> Subject: Re: TIMESTAMP TypeInformation Hi Radu, I might not have complete understood your problem, but if you do val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) val ds = env.fromElements( (1, 1L, new Time(1,2,3)) ) val t = ds.toTable(tEnv, 'a, 'b, 'c) val results = t .select('c + 10.seconds) then field 'c will be of type SqlTimeTypeInfo and handled as such. Hope this helps, Fabian 2016-10-25 17:32 GMT+02:00 Radu Tudoran <radu.tudo...@huawei.com<mailto:radu.tudo...@huawei.com>>: Re-hi, I actually realized that the problem comes from the fact that the datastream that I am registering does not create properly the types. I am using something like DataStream … .returns(“TupleX<,….,java.sql.Timestamp, java.sql.Time>”)…and I was expecting that these will be converted to SqlTimeTypeInfo…but it is converted to GenericType. Anythoughts how I could force the type to be recognize as a SqlTimeType? From: Radu Tudoran Sent: Tuesday, October 25, 2016 4:46 PM To: 'user@flink.apache.org<mailto:user@flink.apache.org>' Subject: TIMESTAMP TypeInformation Hi, I would like to create a TIMESTAMP type from the data schema. I would need this to match against the FlinkTypeFactory (toTypeInfo()) def toTypeInfo(relDataType: RelDataType): TypeInformation[_] = relDataType.getSqlTypeName match { case BOOLEAN => BOOLEAN_TYPE_INFO case TINYINT => BYTE_TYPE_INFO case SMALLINT => SHORT_TYPE_INFO case INTEGER => INT_TYPE_INFO case BIGINT => LONG_TYPE_INFO case FLOAT => FLOAT_TYPE_INFO case DOUBLE => DOUBLE_TYPE_INFO case VARCHAR | CHAR => STRING_TYPE_INFO case DECIMAL => BIG_DEC_TYPE_INFO // date/time types case DATE => SqlTimeTypeInfo.DATE case TIME => SqlTimeTypeInfo.TIME case TIMESTAMP => SqlTimeTypeInfo.TIMESTAMP I tried to use create the TypeInf
RE: TIMESTAMP TypeInformation
Hi, I dig meanwhile more through this and I think I found a bug actually. The scenario that I was trying to describe was something like 1. You have a generic datastream with Tuple (alternatively I could move to row I guess) and you get the data from whatever stream (e.g., kafka, network socket…) 2. In the map/flat map function you parse and instantiate the tuple generically 3. In the “returns()” function of the map you enforce the types DataStream = env.socketTextStream(…) .map(new mapFunction(){ Public Tuple map(String value){ Tuple out = Tuple.getTupleClass(#) … out.setField(SqlTimeTypeInfo.TIMESTAMP,0) … }}) .returns(“Tuple#<java.sql.TIMESTAMP,…>”); The problem is that if you rely on the type extraction mechanism called after the returns to recognize TIMESTAMP of type SqlTimeTypeInfo it will not happen but instead a GenericType will be created. It looks like the type parsers were not extended to consider this types Dr. Radu Tudoran Senior Research Engineer - Big Data Expert IT R Division [cid:image007.jpg@01CD52EB.AD060EE0] HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center Riesstrasse 25, 80992 München E-mail: radu.tudo...@huawei.com<mailto:radu.tudo...@huawei.com> Mobile: +49 15209084330 Telephone: +49 891588344173 HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com<http://www.huawei.com/> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it! From: Fabian Hueske [mailto:fhue...@gmail.com] Sent: Wednesday, October 26, 2016 10:11 PM To: user@flink.apache.org Subject: Re: TIMESTAMP TypeInformation Hi Radu, I might not have complete understood your problem, but if you do val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) val ds = env.fromElements( (1, 1L, new Time(1,2,3)) ) val t = ds.toTable(tEnv, 'a, 'b, 'c) val results = t .select('c + 10.seconds) then field 'c will be of type SqlTimeTypeInfo and handled as such. Hope this helps, Fabian 2016-10-25 17:32 GMT+02:00 Radu Tudoran <radu.tudo...@huawei.com<mailto:radu.tudo...@huawei.com>>: Re-hi, I actually realized that the problem comes from the fact that the datastream that I am registering does not create properly the types. I am using something like DataStream … .returns(“TupleX<,….,java.sql.Timestamp, java.sql.Time>”)…and I was expecting that these will be converted to SqlTimeTypeInfo…but it is converted to GenericType. Anythoughts how I could force the type to be recognize as a SqlTimeType? From: Radu Tudoran Sent: Tuesday, October 25, 2016 4:46 PM To: 'user@flink.apache.org<mailto:user@flink.apache.org>' Subject: TIMESTAMP TypeInformation Hi, I would like to create a TIMESTAMP type from the data schema. I would need this to match against the FlinkTypeFactory (toTypeInfo()) def toTypeInfo(relDataType: RelDataType): TypeInformation[_] = relDataType.getSqlTypeName match { case BOOLEAN => BOOLEAN_TYPE_INFO case TINYINT => BYTE_TYPE_INFO case SMALLINT => SHORT_TYPE_INFO case INTEGER => INT_TYPE_INFO case BIGINT => LONG_TYPE_INFO case FLOAT => FLOAT_TYPE_INFO case DOUBLE => DOUBLE_TYPE_INFO case VARCHAR | CHAR => STRING_TYPE_INFO case DECIMAL => BIG_DEC_TYPE_INFO // date/time types case DATE => SqlTimeTypeInfo.DATE case TIME => SqlTimeTypeInfo.TIME case TIMESTAMP => SqlTimeTypeInfo.TIMESTAMP I tried to use create the TypeInformation by calling directly SqlTimeTypeInfo.TIMESTAMP . However, it seems that relDataType.getSqlTypeName match is of type ANY instead of being of type TIMESTAMP. Any thoughts of how to create the proper TIMESTAMP typeinformation?
RE: TIMESTAMP TypeInformation
Re-hi, I actually realized that the problem comes from the fact that the datastream that I am registering does not create properly the types. I am using something like DataStream ... .returns("TupleX<,,java.sql.Timestamp, java.sql.Time>")...and I was expecting that these will be converted to SqlTimeTypeInfo...but it is converted to GenericType. Anythoughts how I could force the type to be recognize as a SqlTimeType? From: Radu Tudoran Sent: Tuesday, October 25, 2016 4:46 PM To: 'user@flink.apache.org' Subject: TIMESTAMP TypeInformation Hi, I would like to create a TIMESTAMP type from the data schema. I would need this to match against the FlinkTypeFactory (toTypeInfo()) def toTypeInfo(relDataType: RelDataType): TypeInformation[_] = relDataType.getSqlTypeName match { case BOOLEAN => BOOLEAN_TYPE_INFO case TINYINT => BYTE_TYPE_INFO case SMALLINT => SHORT_TYPE_INFO case INTEGER => INT_TYPE_INFO case BIGINT => LONG_TYPE_INFO case FLOAT => FLOAT_TYPE_INFO case DOUBLE => DOUBLE_TYPE_INFO case VARCHAR | CHAR => STRING_TYPE_INFO case DECIMAL => BIG_DEC_TYPE_INFO // date/time types case DATE => SqlTimeTypeInfo.DATE case TIME => SqlTimeTypeInfo.TIME case TIMESTAMP => SqlTimeTypeInfo.TIMESTAMP I tried to use create the TypeInformation by calling directly SqlTimeTypeInfo.TIMESTAMP . However, it seems that relDataType.getSqlTypeName match is of type ANY instead of being of type TIMESTAMP. Any thoughts of how to create the proper TIMESTAMP typeinformation?
TIMESTAMP TypeInformation
Hi, I would like to create a TIMESTAMP type from the data schema. I would need this to match against the FlinkTypeFactory (toTypeInfo()) def toTypeInfo(relDataType: RelDataType): TypeInformation[_] = relDataType.getSqlTypeName match { case BOOLEAN => BOOLEAN_TYPE_INFO case TINYINT => BYTE_TYPE_INFO case SMALLINT => SHORT_TYPE_INFO case INTEGER => INT_TYPE_INFO case BIGINT => LONG_TYPE_INFO case FLOAT => FLOAT_TYPE_INFO case DOUBLE => DOUBLE_TYPE_INFO case VARCHAR | CHAR => STRING_TYPE_INFO case DECIMAL => BIG_DEC_TYPE_INFO // date/time types case DATE => SqlTimeTypeInfo.DATE case TIME => SqlTimeTypeInfo.TIME case TIMESTAMP => SqlTimeTypeInfo.TIMESTAMP I tried to use create the TypeInformation by calling directly SqlTimeTypeInfo.TIMESTAMP . However, it seems that relDataType.getSqlTypeName match is of type ANY instead of being of type TIMESTAMP. Any thoughts of how to create the proper TIMESTAMP typeinformation?
question for generic streams
Hi, I am trying to read the types of an input stream from a file and then generate the datastream resulting from parsing a source accordingly (e.g. DataStream>). Finally this will be registered as a table. What would be the best way to do this? I am trying currently to use the generic type of Tuple for the datastream which then will be instantiated based on the arity of the number of types. However, this does not seem to be a good way (and did not really figure out how to actually implement it) Any suggestions are welcome
RE: org.apache.flink.core.fs.Path error?
Hi, As you found the source of the error I am not sure if the outputs that you asked for are needed anymore. Nevertheless, see below new File("D:\\dir\\myfile.csv").exists(); => true FileSystem.getLocalFileSystem().getFileStatus(new Path()).getPath(); => “file:/D:/dir/myfile.csv” Fabian’s suggestion of specifying the file like “"file:/D:/dir/myfile.csv"?” => works! Nevertheless, IMHO I would suggest to fix the issue as it is in general more practical to specify paths in the form of D:\\dir\\myfile.csv … mainly as it can be understood also by other file readers outside flink From: Chesnay Schepler [mailto:ches...@apache.org] Sent: Thursday, October 20, 2016 4:06 PM To: user@flink.apache.org Subject: Re: org.apache.flink.core.fs.Path error? I believe i found the issue. The ContinuousFileMonitoringFunction never converts the given string to a Path, but directly generates a URI from it. On 20.10.2016 15:48, Fabian Hueske wrote: The error message suggests that Flink tries to resolve "D:" as a file system schema such as "file:" or "hdfs:". Can you try to use specify your path as "file:/D:/dir/myfile.csv"? Best, Fabian 2016-10-20 14:41 GMT+02:00 Radu Tudoran <radu.tudo...@huawei.com<mailto:radu.tudo...@huawei.com>>: Hi, I know that Flink in general supports files also on windows. For example I just tested successfully with relative file paths (e.g. place the file in the local directory and give just the file name then everything is working correctly). However with absolute paths it does not work as per my previous explanation. Nevertheless, please see also the error log below. Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$mcV$sp(JobManager.scala:822) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.io.IOException: No file system found with scheme D, referenced in file URI 'D:/dir/myfile.csv'. at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:297) at org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.run(ContinuousFileMonitoringFunction.java:120) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:80) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:53) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:266) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) in addition to this there is some additional error if I dig through the output logs 4:33:32,651 ERROR org.apache.hadoop.util.Shell - Failed to locate the winutils binary in the hadoop binary path java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries. at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:318) at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:333) at org.apache.hadoop.util.Shell.(Shell.java:326) at org.apache.hadoop.util.StringUtils.(StringUtils.java:76) at org.apache.hadoop.security.Groups.parseStaticMapping(Groups.java:92) at org.apache.hadoop.security.Groups.(Groups.java:76) at org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:239) at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:255) at org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:232) at org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:718) at org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:703) at o
RE: org.apache.flink.core.fs.Path error?
em.java:75) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at org.apache.flink.core.fs.FileSystem.instantiateHadoopFileSystemWrapper(FileSystem.java:334) at org.apache.flink.core.fs.FileSystem.getHadoopWrapperClassNameForFileSystem(FileSystem.java:358) at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:280) at org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.run(ContinuousFileMonitoringFunction.java:120) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:80) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:53) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:266) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) From: Chesnay Schepler [mailto:ches...@apache.org] Sent: Thursday, October 20, 2016 2:22 PM To: user@flink.apache.org Subject: Re: org.apache.flink.core.fs.Path error? Hello Radu, Flink can handle windows paths, this alone can't be the problem. If you could post the error you are getting we may pinpoint the issue, but right now i would suggest the usual: check that the path is indeed correct, that you have sufficient permissions to access the file. And yes, you can report problems here ;) Regards, Chesnay On 20.10.2016 13:17, Radu Tudoran wrote: Hi, I am running a program that is suppose to read a CSV file from the local disk (I am still using Flink 1.1..i did not check if the situation is the same for 1.2). I am currently running the test on a windows OS. I am creating the path to the file e.g. "D:\\dir\\myfile.csv" However, I see that the CSV reader converts this to a Path object from flink core "val inputFormat = new TupleCsvInputFormat(new Path(path), rowDelim, fieldDelim, typeInfo)" In CSVTableSource This ends up representing the initial path as an URI and changes \ to / resulting in ""D:/dir/myfile.csv"". The problem is that this is never changed when the file is actually open and accessed which leads to an error. ...not sure if signaling this error here is the best place or if I should have used some other media.. Best regards, Dr. Radu Tudoran Senior Research Engineer - Big Data Expert IT R Division [cid:image007.jpg@01CD52EB.AD060EE0] HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center Riesstrasse 25, 80992 München E-mail: radu.tudo...@huawei.com<mailto:radu.tudo...@huawei.com> Mobile: +49 15209084330 Telephone: +49 891588344173 HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com<http://www.huawei.com/> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it!
emit watermarks
Hi, Is there some way to emit a watermark in the trigger? I see that in the evictor there is the option to check the StreamRecord if it is a watermark..so I would hope that there is some option also to create them
RE: SQL / Tuple question
Hi Fabiane, Thanks for the advice. It works this way! From: Fabian Hueske [mailto:fhue...@gmail.com] Sent: Monday, September 19, 2016 6:08 PM To: user@flink.apache.org Subject: Re: SQL / Tuple question Hi Radu, you can pass the TypeInfo directly without accessing the TypeClass. Have you tried this? TypeInformation<Tuple2<String, Long>> tpinf = new TypeHint<Tuple2<String, Long>>(){}.getTypeInfo(); .toDataStream( , tpinf ) Best, Fabian 2016-09-19 17:53 GMT+02:00 Radu Tudoran <radu.tudo...@huawei.com<mailto:radu.tudo...@huawei.com>>: Hi, I am trying to create an sql statement that is suppose to return a string and and integer Mytable.sql(“select mystringfield,myintfield …. ) I am trying to give the typefinormation as a class to the TypeInformation<Tuple2<String, Long>> tpinf = new TypeHint<Tuple2<String, Long>>(){}.getTypeInfo(); .toDataStream( , tpinf.getTypeClass() ) However I get the following error shown below. Can someone give me an example of some working tuples for this case. Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: Tuple needs to be parameterized by using generics. at
SQL for Flink
Hi, As a follow up to multiple discussions that happened during Flink Forward about how SQL should be supported by Flink, I was thinking to make a couple of proposals. Disclaimer: I do not claim I have managed to synthesized all the discussions and probably a great deal of things are still missing Why supporting SQL for Flink? - A goal to support SQL for Flink should be to enable larger adoption of Flink - particularly for data scientists / data engineers who might not want/know how to program against the existing APIs - The main implication as I see from this is that SQL should serve as a translation tool of the data processing processing flow to a stream topology that will be executed by Flink - This would require to support rather soon an SQL client for Flink How many features should be supported? - In order to enable a (close to ) full benefit of the processing capabilities of Flink, I believe most of the processing types should be supported - this includes all different types of windows, aggregations, transformations, joins - I would propose that UDFs should also be supported such that one can easily add more complex computation if needed - In the spirit of the extensibility that Flink supports for the operators, functions... such custom operators should be supported to replace the default implementations of the SQL logical operators How much customization should be enabled? - Regarding customization this could be provided by configuration files. Such a configuration can cover the policies for how the triggers, evictors, parallelization ... will be done for the specific translation of the SQL query into Flink code - In order to support the integration of custom operators for specific SQL logical operators, the users should be enabled also to provide translation RULES that will replace the default ones (e.g. if a user want to define their own CUSTOM_TABLE_SCAN, it should be able to provide something like configuration.replaceRule(DataStreamScanRule.INSTANCE , CUSTOM_TABLE_SCAN_Rule.INSTANCE) - or if the selection of the new translation rule can be handled from the cost than simply configuration.addRule( CUSTOM_TABLE_SCAN_Rule.INSTANCE) What do you think? Dr. Radu Tudoran Senior Research Engineer - Big Data Expert IT R Division [cid:image007.jpg@01CD52EB.AD060EE0] HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center Riesstrasse 25, 80992 München E-mail: radu.tudo...@huawei.com<mailto:radu.tudo...@huawei.com> Mobile: +49 15209084330 Telephone: +49 891588344173 HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com<http://www.huawei.com/> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it!
RE: error for building flink-runtime from source
Hi, I am building the 1.1 snapshot (should be the latest release). I will try to build the whole project to check if it works Dr. Radu Tudoran Research Engineer - Big Data Expert IT R Division [cid:image007.jpg@01CD52EB.AD060EE0] HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center Riesstrasse 25, 80992 München E-mail: radu.tudo...@huawei.com Mobile: +49 15209084330 Telephone: +49 891588344173 HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com<http://www.huawei.com/> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it! From: Márton Balassi [mailto:balassi.mar...@gmail.com] Sent: Tuesday, July 12, 2016 2:37 PM To: user@flink.apache.org Subject: Re: error for building flink-runtime from source Hi Radu, Which version of Flink are you building? Looking at the current master builds they are coming in green recently [1]. If you are solely building flink-runtime the issue might be that you are using different version of flink-core (a dependency of flink-runtime) and flink-runtime. Could you try building Flink as a whole? [1] https://travis-ci.org/apache/flink/builds/144096299 Best, Marton On Tue, Jul 12, 2016 at 12:02 PM, Radu Tudoran <radu.tudo...@huawei.com<mailto:radu.tudo...@huawei.com>> wrote: I am trying to build flink-runtime from source. I run mvn install and the compilation builds with the error below. Any though on this? [ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) on project flink-runtime_2.10: Compilation failure [ERROR] /D:/ISAR 2/Dependencies/flink/flink-master/flink-master/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java:[60,49] cannot find symbol [ERROR] symbol: method getDelayBetweenAttempts() [ERROR] location: variable fixedDelayConfig of type org.apache.flink.api.common.restartstrategy.RestartStrategies.FixedDelayRestartStrategyConfiguration [ERROR] -> [Help 1] [ERROR] [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch. Best regards, Dr. Radu Tudoran Research Engineer - Big Data Expert IT R Division [cid:image007.jpg@01CD52EB.AD060EE0] HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center Riesstrasse 25, 80992 München E-mail: radu.tudo...@huawei.com<mailto:radu.tudo...@huawei.com> Mobile: +49 15209084330<tel:%2B49%2015209084330> Telephone: +49 891588344173<tel:%2B49%20891588344173> HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com<http://www.huawei.com/> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it!
error for building flink-runtime from source
I am trying to build flink-runtime from source. I run mvn install and the compilation builds with the error below. Any though on this? [ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) on project flink-runtime_2.10: Compilation failure [ERROR] /D:/ISAR 2/Dependencies/flink/flink-master/flink-master/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java:[60,49] cannot find symbol [ERROR] symbol: method getDelayBetweenAttempts() [ERROR] location: variable fixedDelayConfig of type org.apache.flink.api.common.restartstrategy.RestartStrategies.FixedDelayRestartStrategyConfiguration [ERROR] -> [Help 1] [ERROR] [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch. Best regards, Dr. Radu Tudoran Research Engineer - Big Data Expert IT R Division [cid:image007.jpg@01CD52EB.AD060EE0] HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center Riesstrasse 25, 80992 München E-mail: radu.tudo...@huawei.com Mobile: +49 15209084330 Telephone: +49 891588344173 HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com<http://www.huawei.com/> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it!
Flink and Calcite
Hi, Can someone point me to the repository where the integration of Calcite with Flink is available? Does this come with the master branch (as indicated by the link in the blog post)? https://github.com/apache/flink/tree/master Thanks Dr. Radu Tudoran Research Engineer - Big Data Expert IT R Division [cid:image007.jpg@01CD52EB.AD060EE0] HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center Riesstrasse 25, 80992 München E-mail: radu.tudo...@huawei.com Mobile: +49 15209084330 Telephone: +49 891588344173 HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com<http://www.huawei.com/> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it!
RE: lost connection
- Could not submit job Operator2 execution (170aef70d31f3fee62f8a483930be213), because there is no connection to a JobManager. 15:59:48,456 WARN Remoting - Tried to associate with unreachable remote address [akka.tcp://flink@10.204.62.71:6123]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: Connection refused: /10.204.62.71:6123 16:01:28,409 ERROR org.apache.flink.client.CliFrontend - Error while running the command. org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Communication with JobManager failed: Lost connection to the JobManager. I do not understand what could be the root cause of this... the IPs look ok and there is not firewall to block things... Dr. Radu Tudoran Research Engineer - Big Data Expert IT R Division [cid:image007.jpg@01CD52EB.AD060EE0] HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center Riesstrasse 25, 80992 München E-mail: radu.tudo...@huawei.com Mobile: +49 15209084330 Telephone: +49 891588344173 HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com<http://www.huawei.com/> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it! From: Chesnay Schepler [mailto:ches...@apache.org] Sent: Thursday, April 21, 2016 3:58 PM To: user@flink.apache.org Subject: Re: lost connection Hello, the first step is always to check the logs under /log. The JobManager log in particular may contain clues as why no connection could be established. Regards, Chesnay On 21.04.2016 15:44, Radu Tudoran wrote: Hi, I am trying to submit a jar via the console (flink run my.jar). The result is that I get an error saying that the communication with the jobmanager failed: Lost connection to the jobmanager. Can you give me some hints/ recommendations about approaching this issue. Thanks Dr. Radu Tudoran Research Engineer - Big Data Expert IT R Division [cid:image007.jpg@01CD52EB.AD060EE0] HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center Riesstrasse 25, 80992 München E-mail: radu.tudo...@huawei.com<mailto:radu.tudo...@huawei.com> Mobile: +49 15209084330 Telephone: +49 891588344173 HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com<http://www.huawei.com/> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it!
RE: Custom time window in Flink
Hi, Easiest way is to just start from the code of an existing one https://github.com/apache/flink/tree/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors If you take the example of Timeevictor you would just need to use the same code and modify the public int evict class. Same story with the triggers https://github.com/apache/flink/tree/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers Dr. Radu Tudoran Research Engineer - Big Data Expert IT R Division [cid:image007.jpg@01CD52EB.AD060EE0] HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center Riesstrasse 25, 80992 München E-mail: radu.tudo...@huawei.com Mobile: +49 15209084330 Telephone: +49 891588344173 HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com<http://www.huawei.com/> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it! From: Piyush Shrivastava [mailto:piyush...@yahoo.co.in] Sent: Wednesday, April 20, 2016 11:24 AM To: user@flink.apache.org Subject: Re: Custom time window in Flink Hello, Thanks a lot for your reply. Can you share a sample code or example which I can refer while creating a custom evictor and trigger? Thanks and Regards, Piyush Shrivastava<mailto:piy...@webograffiti.com> [WeboGraffiti] http://webograffiti.com<http://webograffiti.com/> On Wednesday, 20 April 2016 2:50 PM, Radu Tudoran <radu.tudo...@huawei.com<mailto:radu.tudo...@huawei.com>> wrote: Hi, Tha way to do this is to create your own evictor. In the evictor you can than decide when the events are removed. I would suggest creating a symmetric trigger as well because I would assume that you also need to fire the computation first after 1 hour and then at each 5 minutes. The logic would be that you have a field that marks whether a window was created or not (e.g., a Boolean field in the evictor class), which once a window is created you can set it to false and from there one you operate on 5 minutes windows Dr. Radu Tudoran Research Engineer - Big Data Expert IT R Division [cid:image007.jpg@01CD52EB.AD060EE0] HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center Riesstrasse 25, 80992 München E-mail: radu.tudo...@huawei.com<mailto:radu.tudo...@huawei.com> Mobile: +49 15209084330 Telephone: +49 891588344173 HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com<http://www.huawei.com/> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it! From: Piyush Shrivastava [mailto:piyush...@yahoo.co.in] Sent: Wednesday, April 20, 2016 9:59 AM To: user@flink.apache.org<mailto:user@flink.apache.org> Subject: Custom time window in Flink Hello, I wanted to enquire how a job I am trying to do with Flink can be done. I have also posted a question on StackOverflow. PFB the link: http://stackoverflow.com/questions/36720192/custom-windows-charging-in-flink I am using Flink's TimeWindow functionality to perform some computations. I am creating a 5 minute Window. However I want to create a one hour Window for only the first time. The next Windows I need are of 5 minutes. Such that for the first hour, data is collected and my operation is performed on it. Once this is done, every five minutes the same operation is performed. Can you kindly help me with this? How can such a functionality be implemented? Thanks and Regards, Piyush Shrivastava<mailto:piy...@webograffiti.com> [WeboGraffiti] http://webograffiti.com<http://webograffiti.com/>
RE: Custom time window in Flink
Hi, Tha way to do this is to create your own evictor. In the evictor you can than decide when the events are removed. I would suggest creating a symmetric trigger as well because I would assume that you also need to fire the computation first after 1 hour and then at each 5 minutes. The logic would be that you have a field that marks whether a window was created or not (e.g., a Boolean field in the evictor class), which once a window is created you can set it to false and from there one you operate on 5 minutes windows Dr. Radu Tudoran Research Engineer - Big Data Expert IT R Division [cid:image007.jpg@01CD52EB.AD060EE0] HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center Riesstrasse 25, 80992 München E-mail: radu.tudo...@huawei.com Mobile: +49 15209084330 Telephone: +49 891588344173 HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com<http://www.huawei.com/> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it! From: Piyush Shrivastava [mailto:piyush...@yahoo.co.in] Sent: Wednesday, April 20, 2016 9:59 AM To: user@flink.apache.org Subject: Custom time window in Flink Hello, I wanted to enquire how a job I am trying to do with Flink can be done. I have also posted a question on StackOverflow. PFB the link: http://stackoverflow.com/questions/36720192/custom-windows-charging-in-flink I am using Flink's TimeWindow functionality to perform some computations. I am creating a 5 minute Window. However I want to create a one hour Window for only the first time. The next Windows I need are of 5 minutes. Such that for the first hour, data is collected and my operation is performed on it. Once this is done, every five minutes the same operation is performed. Can you kindly help me with this? How can such a functionality be implemented? Thanks and Regards, Piyush Shrivastava<mailto:piy...@webograffiti.com> [WeboGraffiti] http://webograffiti.com<http://webograffiti.com/>
RE: ClasNotFound when submitting job from command line
Hi, In my case the root cause for this was mainly that I was using eclipse to package the jar. Try using mvn instead. Additioanlly you can copy the dependency jars in the lib of the task managers and restart them Dr. Radu Tudoran Research Engineer - Big Data Expert IT R Division [cid:image007.jpg@01CD52EB.AD060EE0] HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center Riesstrasse 25, 80992 München E-mail: radu.tudo...@huawei.com Mobile: +49 15209084330 Telephone: +49 891588344173 HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com<http://www.huawei.com/> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it! From: Flavio Pompermaier [mailto:pomperma...@okkam.it] Sent: Tuesday, April 19, 2016 5:40 PM To: user Subject: ClasNotFound when submitting job from command line Hi to all, I just tied to dubmit my application to the Flink cluster (1.0.1) but I get ClassNotFound exceptions for classes inside my shaded jar (like oracle.jdbc.OracleDriver or org.apache.commons.pool2.PooledObjectFactory). Those classes are in the shaded jar but aren't found. If I put the jars in the flink's lib dir (for every node of the cluster) things work. How can I solve that? Best, Flavio
jar dependency in the cluster
Hi, Could anyone help me with the following problem: I have a flink cluster of a couple of nodes (i am using the old version 0.10). I am packaging a jar that needs to use kafka connector. When I create the jar in eclipse I am adding the flink connector dependency and set to be packed with the jar. Nevertheless, when I submitted it to be executed on the cluster I get an error that the jar connector is not visible for the class loader. Is there a way in which I can set flink to use a certain library path where to look for dependencies or maybe when I deploy either the flink cluster or submit the job to add extra dependencies. Many thanks Dr. Radu Tudoran Research Engineer - Big Data Expert IT R Division [cid:image007.jpg@01CD52EB.AD060EE0] HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center Riesstrasse 25, 80992 München E-mail: radu.tudo...@huawei.com Mobile: +49 15209084330 Telephone: +49 891588344173 HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com<http://www.huawei.com/> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it!
RE: operators
Hi, It would not be feasible actually to use kafka queues or the DFS. Could you point me at which level of API I could access the CoLocationConstraint? Is it accessible from the DataSourceStream or from the operator directly? I have also dig through the documentation and API and I was curious to understand a bit what can the “slotSharingGroup” and “startNewResouceGroup()” can do. I did not find though a good example..only this link https://issues.apache.org/jira/browse/FLINK-3315 Also, for the “slotSharingGroup” it doesn’t seem to be available (I am currently using flink 0.10) – so if it is something that came newer than I guess this is the explanation why I cannot find it in any of datastream api or source function Thanks for the info. From: ewenstep...@gmail.com [mailto:ewenstep...@gmail.com] On Behalf Of Stephan Ewen Sent: Wednesday, March 09, 2016 6:30 PM To: user@flink.apache.org Subject: Re: operators Hi! You cannot specify that on the higher API levels. The lower API levels have something called "CoLocationConstraint". At this point it is not exposed, because we thought that would lead to not very scalable and robust designs in many cases . The best thing usually is location transparency and local affinity (as a performance optimization). Is the file large, i.e., would it hurt to do it on a DFS? Or actually use a Kafka Queue between the operators? Stephan On Wed, Mar 9, 2016 at 5:38 PM, Radu Tudoran <radu.tudo...@huawei.com<mailto:radu.tudo...@huawei.com>> wrote: Hi, Is there any way in which you can ensure that 2 distinct operators will be executed on the same machine? More precisely what I am trying to do is to have a window that computes some metrics and will dump this locally (from the operator not from an output sink) and I would like to create independent of this (or event within the operator) a stream source to emit this data. I cannot The schema would be something as below: Stream -> operator -> output | Local file | Stream source -> new stream .=> the red items should go on the same machine Dr. Radu Tudoran Research Engineer - Big Data Expert IT R Division [cid:image007.jpg@01CD52EB.AD060EE0] HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center Riesstrasse 25, 80992 München E-mail: radu.tudo...@huawei.com<mailto:radu.tudo...@huawei.com> Mobile: +49 15209084330<tel:%2B49%2015209084330> Telephone: +49 891588344173<tel:%2B49%20891588344173> HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com<http://www.huawei.com/> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it!
operators
Hi, Is there any way in which you can ensure that 2 distinct operators will be executed on the same machine? More precisely what I am trying to do is to have a window that computes some metrics and will dump this locally (from the operator not from an output sink) and I would like to create independent of this (or event within the operator) a stream source to emit this data. I cannot The schema would be something as below: Stream -> operator -> output | Local file | Stream source -> new stream .=> the red items should go on the same machine Dr. Radu Tudoran Research Engineer - Big Data Expert IT R Division [cid:image007.jpg@01CD52EB.AD060EE0] HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center Riesstrasse 25, 80992 München E-mail: radu.tudo...@huawei.com Mobile: +49 15209084330 Telephone: +49 891588344173 HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com<http://www.huawei.com/> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it!
RE: ype of TypeVariable could not be determined
Hi, The issue is that this problem appears when I want to create a stream source. StreamExecutionEnvironment.addSource(new MySourceFunction()) … Where the stream source class is MySourceFunction implements SourceFunction { … } In such a case I am not sure how I can pass the outertype nor how I can pass it using the “.returns()” method as Timo suggested Dr. Radu Tudoran Research Engineer - Big Data Expert IT R Division [cid:image007.jpg@01CD52EB.AD060EE0] HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center Riesstrasse 25, 80992 München E-mail: radu.tudo...@huawei.com Mobile: +49 15209084330 Telephone: +49 891588344173 HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com<http://www.huawei.com/> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it! From: Wang Yangjun [mailto:yangjun.w...@aalto.fi] Sent: Tuesday, March 08, 2016 7:15 PM To: user@flink.apache.org Subject: Re: ype of TypeVariable could not be determined Hi Radu, I met this issue also. The reason is outTypeInfo couldn't be created base on generic type when a transform applied. public SingleOutputStreamOperator<R, ?> transform(String operatorName, TypeInformation outTypeInfo, OneInputStreamOperator<T, R> operator) The solution would be passed Calss to your UDF and create TypeInfomation by yourself. Best, Jun From: Radu Tudoran <radu.tudo...@huawei.com<mailto:radu.tudo...@huawei.com>> Reply-To: "user@flink.apache.org<mailto:user@flink.apache.org>" <user@flink.apache.org<mailto:user@flink.apache.org>> Date: Tuesday 8 March 2016 at 19:57 To: "user@flink.apache.org<mailto:user@flink.apache.org>" <user@flink.apache.org<mailto:user@flink.apache.org>> Subject: ype of TypeVariable could not be determined Hi, I am trying to create a custom stream source. I first build this with generic and I run into problems regarding type extraction. I tried to put concrete types but run into the same issue (see errors below). Can anyone provide a solution to solve this issue. Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Type of TypeVariable 'IN' in 'class test.MySourceFunction' could not be determined. This is most likely a type erasure problem. The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s). at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:498) at org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:380) at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:346) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1152) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1107) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1089) ... 1 more Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Type of TypeVariable 'TupleEvent2' in 'class test.MySourceFunctionTuple' could not be determined. This is most likely a type erasure problem. The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s). at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:498) at org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:380) at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:346) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1152) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1107) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1089) ... 1 more
ype of TypeVariable could not be determined
Hi, I am trying to create a custom stream source. I first build this with generic and I run into problems regarding type extraction. I tried to put concrete types but run into the same issue (see errors below). Can anyone provide a solution to solve this issue. Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Type of TypeVariable 'IN' in 'class test.MySourceFunction' could not be determined. This is most likely a type erasure problem. The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s). at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:498) at org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:380) at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:346) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1152) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1107) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1089) ... 1 more Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Type of TypeVariable 'TupleEvent2' in 'class test.MySourceFunctionTuple' could not be determined. This is most likely a type erasure problem. The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s). at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:498) at org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:380) at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:346) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1152) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1107) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1089) ... 1 more Dr. Radu Tudoran Research Engineer - Big Data Expert IT R Division [cid:image007.jpg@01CD52EB.AD060EE0] HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center Riesstrasse 25, 80992 München E-mail: radu.tudo...@huawei.com Mobile: +49 15209084330 Telephone: +49 891588344173 HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com<http://www.huawei.com/> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it!
RE: [ANNOUNCE] Flink 1.0.0 has been released
Hi, Do you have also a linkedin post that I could share - or should I make a blogpost in which I take this announcement? Dr. Radu Tudoran Research Engineer - Big Data Expert IT R Division HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center Riesstrasse 25, 80992 München E-mail: radu.tudo...@huawei.com Mobile: +49 15209084330 Telephone: +49 891588344173 HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it! -Original Message- From: Kostas Tzoumas [mailto:ktzou...@apache.org] Sent: Tuesday, March 08, 2016 4:17 PM To: user@flink.apache.org; d...@flink.apache.org; n...@flink.apache.org Subject: [ANNOUNCE] Flink 1.0.0 has been released Hi everyone! As you might have noticed, Apache Flink 1.0.0 has been released and announced! You can read more about the release at the ASF blog and the Flink blog - https://blogs.apache.org/foundation/entry/the_apache_software_foundation_announces88 - http://flink.apache.org/news/2016/03/08/release-1.0.0.html Don't forget to retweet and spread the news :-) - https://twitter.com/TheASF/status/707174116969857024 - https://twitter.com/ApacheFlink/status/707175973482012672 Check out the changelog and the migration guide, download the release, and check out the documentation - http://flink.apache.org/blog/release_1.0.0-changelog_known_issues.html - https://cwiki.apache.org/confluence/display/FLINK/Migration+Guide%3A+0.10.x+to+1.0.x - https://cwiki.apache.org/confluence/display/FLINK/Stability+Annotations - http://flink.apache.org/downloads.html - https://ci.apache.org/projects/flink/flink-docs-release-1.0/ Many congratulations to the Flink community for making this happen! Best, Kostas
RE: events eviction
Hi, Following up on the example before: you have to aggregate the data from 2 partitions (e.g. let's say in one you count objects of type 1 and in the other of type 2). Then you need to pair them together and emit that at iterations N you had: (object T1 - Y, object T2 - X) and finally evict them together. Alternatively you can also consider more complex operations not only this simple aggregation (e.g. see if any of objects o T1 is also an object of type T2). -Original Message- From: Aljoscha Krettek [mailto:aljos...@apache.org] Sent: Friday, February 19, 2016 10:47 AM To: user@flink.apache.org Subject: Re: events eviction Hi, yes, in some cases it could be necessary. Could you maybe give some example of what kind of window computation you want to achieve? Then we can see if it would be possible without GlobalWindows and evictor. Cheers, Aljoscha > On 15 Feb 2016, at 18:07, Radu Tudoran <radu.tudo...@huawei.com> wrote: > > Hi, > > Thanks Aljoscha for the details! > > The warning about performance and evictors is useful, but I am not sure how > it can be put in practice always. Take for example a GlobalWindow that you > would use to aggregate data from multiple partitions. A GlobalWindow does not > come with a trigger - would it have than a default evictor? Even if it has > some, you still need to control the eviction of the events. Secondly, > assuming that you would need to aggregate the data from 2 partitions and > evict something only when you have one item from each partition. You would > need a sort of state for this. And then to ensure resiliency, the state > should be recoverable if a crash happens. Could you approach this without an > evictor state? > > > Dr. Radu Tudoran > Research Engineer - Big Data Expert > IT R Division > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH > European Research Center > Riesstrasse 25, 80992 München > > E-mail: radu.tudo...@huawei.com > Mobile: +49 15209084330 > Telephone: +49 891588344173 > > HUAWEI TECHNOLOGIES Duesseldorf GmbH > Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com Registered > Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing > Director: Bo PENG, Wanzhou MENG, Lifang CHEN Sitz der Gesellschaft: > Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, > Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN This e-mail and > its attachments contain confidential information from HUAWEI, which is > intended only for the person or entity whose address is listed above. Any use > of the information contained herein in any way (including, but not limited > to, total or partial disclosure, reproduction, or dissemination) by persons > other than the intended recipient(s) is prohibited. If you receive this > e-mail in error, please notify the sender by phone or email immediately and > delete it! > > > -Original Message- > From: Aljoscha Krettek [mailto:aljos...@apache.org] > Sent: Monday, February 15, 2016 11:58 AM > To: user@flink.apache.org > Subject: Re: events eviction > > Hi, > you are right, the logic is in EvictingNonKeyedWindowOperator.emitWindow() > for non-parallel (non-keyed) windows and in > EvictingWindow.processTriggerResult() in the case of keyed windows. > > You are also right about the contract of the Evictor, it returns the number > of elements to be evicted from the beginning. This also means that eviction > does not consider any timestamps in the elements and is therefore quite > arbitrary. The places in the code I mentioned above simply get the value from > the Evictor and evict that many elements from the internal buffer/state. > > Right now it is not possible to replace the window operator that is used by > flink. What you can do is copy the window operator code and use it manually > using DataStream.transform(). > > About the evictor state. I’m afraid this is not possible right now. It was a > conscious decision to make the Evictor stateless to make it easier for the > system to handle. I would also strongly advise against using Evictors if at > all possible. They make it impossible to incrementally aggregate window > results (for example with a reduce function). This can have a huge > performance/memory footprint impact. In your case, what are you using them > for? > > I hope this helps somehow, but let us know if you need further explanations. > > Cheers, > Aljoscha > >> On 15 Feb 2016, at 11:09, Radu Tudoran <radu.tudo...@huawei.com> wrote: >> >> Hello, >> >> I am looking over the mechanisms of evicting events in Flink. I saw that >> either using a default evictor or building a custom one the logic is that >> the evictor will provide the number of ev
events eviction
Hello, I am looking over the mechanisms of evicting events in Flink. I saw that either using a default evictor or building a custom one the logic is that the evictor will provide the number of events to be discarded. Could you please provide me with some additional pointers regarding the mechanism in Flink where this actually happens: - The class that implements this functionality of discarding the events? (my initial expectations that this happens in the window class turn out to be wrong). I checked and found the "EvictingNonKeyedWindowOperator" - is this the right place to look indeed? - If yes, would it be possible to create a customizable class like this one and somehow pass it to the framework? I would be curious if there is an option other than modifying the core classes and recompiling the framework? On a slightly parallel topic - is there some way of creating a state in the evictor that will be check pointed and restore in case of failure. I would be interested if something like an operator state is possible in the evictor. Regards, Dr. Radu Tudoran Research Engineer - Big Data Expert IT R Division [cid:image007.jpg@01CD52EB.AD060EE0] HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center Riesstrasse 25, 80992 München E-mail: radu.tudo...@huawei.com Mobile: +49 15209084330 Telephone: +49 891588344173 HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com<http://www.huawei.com/> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it!
RE: Flink writeAsCsv
Hi Radu, It is indeed interesting to know how each window could be registered separately - I am not sure it any of the existing mechanisms in Flink support this. I think you need to create your own output sink. It is a bit tricky to pass the window sequence number (actually I do not think such an index is kept – but you can create one by yourself). Maybe an easier option is to manage the writing of the data yourself in the window function or in a custom created evictor. In the window and in the evictor you have access to all data and you can create specific files for each window triggered From: Radu Prodan [mailto:raduprod...@gmail.com] Sent: Thursday, February 04, 2016 11:58 AM To: user@flink.apache.org Subject: Re: Flink writeAsCsv Hi Marton, Thanks to your comment I managed to get it worked. At least it outputs the results. However, what I need is to output each window result seperately. Now, it outputs the results of parallel working windows (I think) and appends the new results to them. For example, If I have parallelism of 10, then I will have at most 10 files and each file will grow in size as windows continue. What I want is, to have seperate file for a window. For example, after n'th window is computed output it to some file and close the file. -best Radu On Thu, Feb 4, 2016 at 11:42 AM Márton Balassi> wrote: Hey Radu, As you are using the streaming api I assume that you call env.execute() in both cases. Is that the case? Do you see any errors appearing? My first call would be if your data type is not a tuple type then writeAsCsv does not work by default. Best, Marton On Thu, Feb 4, 2016 at 11:36 AM, Radu Prodan > wrote: Hi all, I am new to flink. I wrote a simple program and I want it to output as csv file. timeWindowAll(Time.of(3, TimeUnit.MINUTES)) .apply(newFunction1()) .writeAsCsv("file:///user/someuser/Documents/somefile.csv"); When I change the sink to . print(), it works and outputs some results. I want it to output the result of every window. However, it outputs nothing and the file is not created. Am I missing anything? -best Radu
RE: release of task slot
Hi, Well…yesterday when I looked into it there was no additional info than the one I have send. Today I reproduced the problem and I could see in the log file. akka.actor.ActorInitializationException: exception during creation at akka.actor.ActorInitializationException$.apply(Actor.scala:164) at akka.actor.ActorCell.create(ActorCell.scala:596) at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:456) at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:279) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.Mailbox.exec(Mailbox.scala:231) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:422) at akka.util.Reflect$.instantiate(Reflect.scala:66) at akka.actor.ArgsReflectConstructor.produce(Props.scala:352) at akka.actor.Props.newActor(Props.scala:252) at akka.actor.ActorCell.newActor(ActorCell.scala:552) at akka.actor.ActorCell.create(ActorCell.scala:578) ... 10 more Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded 11:21:17,423 ERROR org.apache.flink.runtime.taskmanager.Task - FATAL - exception in task resource cleanup java.lang.OutOfMemoryError: GC overhead limit exceeded 11:21:55,160 ERROR org.apache.flink.runtime.taskmanager.Task - FATAL - exception in task exception handler java.lang.OutOfMemoryError: GC overhead limit exceeded …. - Unexpected exception in the selector loop. java.lang.OutOfMemoryError: GC overhead limit exceeded Looks like the input flow is faster than the GC collector Dr. Radu Tudoran Research Engineer - Big Data Expert IT R Division [cid:image007.jpg@01CD52EB.AD060EE0] HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center Riesstrasse 25, 80992 München E-mail: radu.tudo...@huawei.com Mobile: +49 15209084330 Telephone: +49 891588344173 HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com<http://www.huawei.com/> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it! From: Till Rohrmann [mailto:trohrm...@apache.org] Sent: Thursday, February 04, 2016 4:55 PM To: user@flink.apache.org Subject: Re: release of task slot Hi Radu, what does the log of the TaskManager 10.204.62.80:57910<http://10.204.62.80:57910> say? Cheers, Till On Wed, Feb 3, 2016 at 6:00 PM, Radu Tudoran <radu.tudo...@huawei.com<mailto:radu.tudo...@huawei.com>> wrote: Hello, I am facing an error which for which I cannot figure the cause. Any idea what could cause such an error? java.lang.Exception: The slot in which the task was executed has been released. Probably loss of TaskManager a8b69bd9449ee6792e869a9ff9e843e2 @ cloudr6-admin - 4 slots - URL: akka.tcp://flink@10.204.62.80:57910/user/taskmanager<http://flink@10.204.62.80:57910/user/taskmanager> at org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:151) at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:547) at org.apache.flink.runtime.instance.SharedSlot.r
release of task slot
Hello, I am facing an error which for which I cannot figure the cause. Any idea what could cause such an error? java.lang.Exception: The slot in which the task was executed has been released. Probably loss of TaskManager a8b69bd9449ee6792e869a9ff9e843e2 @ cloudr6-admin - 4 slots - URL: akka.tcp://flink@10.204.62.80:57910/user/taskmanager at org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:151) at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:547) at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:119) at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:156) at org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:215) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:696) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:100) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46) at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369) at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501) at akka.actor.ActorCell.invoke(ActorCell.scala:486) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at akka.dispatch.Mailbox.run(Mailbox.scala:221) at akka.dispatch.Mailbox.exec(Mailbox.scala:231) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Dr. Radu Tudoran Research Engineer - Big Data Expert IT R Division [cid:image007.jpg@01CD52EB.AD060EE0] HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center Riesstrasse 25, 80992 München E-mail: radu.tudo...@huawei.com Mobile: +49 15209084330 Telephone: +49 891588344173 HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com<http://www.huawei.com/> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it!
maxtime / watermark for GlobaWindow
Hi, I am using a global window to collect some events. I use a trigger to fire the processing. Is there any way to get the time of the event that has triggered the processing. I am asking this as the getMaxTime() field of the GlobalWindow returns MaxLong. The code skeleton is: stream .windowAll(GlobalWindows.create()) .trigger( new MyTrigger()) .apply( new AllWindowFunction<Tuple1, Tuple1, GlobalWindow>() { @Override public void apply(GlobalWindow arg0, Iterable< Tuple1, > arg1, Collector<Tuple1> arg2) throws Exception { // - get the even timestamp } }) Dr. Radu Tudoran Research Engineer - Big Data Expert IT R Division [cid:image007.jpg@01CD52EB.AD060EE0] HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center Riesstrasse 25, 80992 München E-mail: radu.tudo...@huawei.com Mobile: +49 15209084330 Telephone: +49 891588344173 HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com<http://www.huawei.com/> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it!
continous time triger
Re-Hi, I have another question regarding the triggering of the processing of a window. Can this be done in some way at specific time intervals, independent of whether an event has been received or not, via a trigger? The reason why I am considering a trigger rather than timeWindow(All) is that timeWindow will end up generating multiple windows and duplicating data, while having the option from the trigger to actually fire the processing at certain times, independent of when the events arrived) would enable to operate with a single window. Regards, Dr. Radu Tudoran Research Engineer - Big Data Expert IT R Division [cid:image007.jpg@01CD52EB.AD060EE0] HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center Riesstrasse 25, 80992 München E-mail: radu.tudo...@huawei.com Mobile: +49 15209084330 Telephone: +49 891588344173 HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com<http://www.huawei.com/> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it!
RE: global function over partitions
} //System.out.println("Count per hash is " + count); }; }); result.timeWindowAll(Time.of(2, TimeUnit.SECONDS)) .apply(new AllWindowFunction<Tuple2<Integer, Integer>, Tuple1, TimeWindow>() { @Override public void apply(TimeWindow arg0, Iterable<Tuple2<Integer, Integer>> arg1, Collector<Tuple1> arg2) throws Exception { // Compuatation int count = 0; for (Tuple2<Integer, Integer> value : arg1) { count++; } //System.out.println("Count aggregated metrics is " //+ count + " at " + System.currentTimeMillis()); arg2.collect(new Tuple1(count)); } }).setParallelism(1) .writeAsText("/tmp/testoutput", WriteMode.OVERWRITE); env.execute("main stream application"); } Regards, Dr. Radu Tudoran Research Engineer - Big Data Expert IT R Division [cid:image007.jpg@01CD52EB.AD060EE0] HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center Riesstrasse 25, 80992 München E-mail: radu.tudo...@huawei.com Mobile: +49 15209084330 Telephone: +49 891588344173 HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com<http://www.huawei.com/> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it! From: Robert Metzger [mailto:rmetz...@apache.org] Sent: Friday, January 15, 2016 10:18 AM To: user@flink.apache.org Subject: Re: global function over partitions Hi Radu, I'm sorry for the delayed response. I'm not sure what the purpose of DataStream.global() actually is. I've opened a JIRA to document or remove it: https://issues.apache.org/jira/browse/FLINK-3240. For getting the final metrics, you can just call ".timeWindowAll()", without a ".global()" call before. The timeWindowAll() will run with a parallelism of one, hence it will receive the data from all partitions. Regards, Robert On Tue, Jan 12, 2016 at 6:59 PM, Radu Tudoran <radu.tudo...@huawei.com<mailto:radu.tudo...@huawei.com>> wrote: Hi, I am trying to compute some final statistics over a stream topology. For this I would like to gather all data from all windows and parallel partitions into a single/global window. Could you suggest a solution for this. I saw that the map function has a ".global()" but I end up with the same number of partitions as I have in the main computation. Bellow you can find a schema for the program: DataStream stream = env.Read... end.setParallelism(10); //Compute phase DataStream result = stream.keyBy(_).window(_).apply(); //end compute phase //get the metrics result.map(//extract some of the Tuple fields).global().timeWindowAll(Time.of(5, TimeUnit.SECONDS),Time.of(1, TimeUnit.SECONDS)) .trigger(EventTimeTrigger.create()).apply ().writeAsText(); For this last function - I would expect that even if I had parallel computation during the compute phase, I can select part of the events from all partitions and gather all these into one unique window. However, I do not seem to be successful in this. I also tried by applying a keyBy() to the result stream in which I assigned the same hash to any event, but the result remains the same. result.map((//extract some of the Tuple fields).keyBy( new KeySelector<Tuple2<Long,Long>, Integer>() { @Override public Integer getKey(Tuple2<Long, Long> arg0) throws Exception { return 1; } @Override public int hashCode() { return 1; } }). timeWindowAll().apply() Thanks for the help/ideas
global function over partitions
Hi, I am trying to compute some final statistics over a stream topology. For this I would like to gather all data from all windows and parallel partitions into a single/global window. Could you suggest a solution for this. I saw that the map function has a ".global()" but I end up with the same number of partitions as I have in the main computation. Bellow you can find a schema for the program: DataStream stream = env.Read... end.setParallelism(10); //Compute phase DataStream result = stream.keyBy(_).window(_).apply(); //end compute phase //get the metrics result.map(//extract some of the Tuple fields).global().timeWindowAll(Time.of(5, TimeUnit.SECONDS),Time.of(1, TimeUnit.SECONDS)) .trigger(EventTimeTrigger.create()).apply ().writeAsText(); For this last function - I would expect that even if I had parallel computation during the compute phase, I can select part of the events from all partitions and gather all these into one unique window. However, I do not seem to be successful in this. I also tried by applying a keyBy() to the result stream in which I assigned the same hash to any event, but the result remains the same. result.map((//extract some of the Tuple fields).keyBy( new KeySelector, Integer>() { @Override public Integer getKey(Tuple2 arg0) throws Exception { return 1; } @Override public int hashCode() { return 1; } }). timeWindowAll().apply() Thanks for the help/ideas
RE: Behaviour of CountWindowAll
Hi, I believe this question might have been asked before - so sorry for repeating it (I just did not find the discussion on the mailing list). Is it possible somehow to create a new DataStream from the elements that are evicted from a window? A simple use case for this is: We have data coming from a sensor every second. We want to compute all the times the average on the last 5 seconds and on the interval from 5 seconds ago until 10 seconds ago. I would be interested in how the data evicted from the main window that keeps the fresh data could be feed into a new stream on which I could apply again a window of 5 seconds. (Having a 10 seconds window and selecting only the most ancient 5 second data is not a viable option) Regards,
RE: Question about DataStream serialization
Hi, Thanks for the answer - it is helpful. The issue that remains is why is the open function not being executed before the flatmap to load the data in the OperatorState. I used something like - and I observe that the dataset is not initialized when being used in the flatmap function env.socketTextStream .map() -> to transform data to a Tuple1 .keyby(0) -> to enable the usage of the operatorState which I saw requires keyed structured .flatmap(RichFlatMapFunction<Tuple1, String> -> the function { private OperatorState dataset; @Override public void flatMap( { Dataset -> use ...is empty } @Override public void open( { dataset -> load } }) Dr. Radu Tudoran Research Engineer IT R Division HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center Riesstrasse 25, 80992 München E-mail: radu.tudo...@huawei.com Mobile: +49 15209084330 Telephone: +49 891588344173 HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it! -Original Message- From: Matthias J. Sax [mailto:mj...@apache.org] Sent: Tuesday, December 08, 2015 8:42 AM To: user@flink.apache.org Subject: Re: Question about DataStream serialization Hi Radu, you are right. The open() method is called for each parallel instance of a rich function. Thus, if all instanced use the same code, you might read the same data multiple times. The easiest was to distinguish different instanced within open() is to user the RuntimeContext. If offers two methods "int getNumberOfParallelSubtasks()" and "int getIndexOfThisSubtask()" that you can use to compute your own partitioning within open(). For example (just a sketch): @Override public void open(Configuration parameters) throws Exception { RuntimeContext context = super.getRuntimeContext(); int dop = context.getNumberOfParallelSubtasks(); int idx = context.getIndexOfThisSubtask(); // open file // get size of file in bytes // seek to partition #idx: long seek = fileSize * idx / dop; // read "fileSize/dop" bytes } Hope this helps. -Matthias On 12/08/2015 04:28 AM, Radu Tudoran wrote: > Hi, > > > > Taking the example you mentioned of using RichFlatMapFunction and in > the > open() reading a file. > > Would this open function be executed on each node where the > RichFlatMapFunction gets executed? (I have done some tests and I would > get the feeling it does – but I wanted to double - check ) > > If so, would this mean that the same data will be loaded multiple > times on each parallel instance? Is there anyway, this can be > prevented and the data to be hashed and partitioned somehow across nodes? > > > > Would using the operator state help?: > > “ > > OperatorState*<*MyList*>*dataset*;* > > ” > > I would be curious in this case how could the open function look like > to initialize the data for this operator state: > > > > > > I have tried to just read a file and write it into the dataset, but I > encountered a strange behavior that would look like the flatmap > function gets executed before the open function, which leads to using > an empty dataset in the flatmap function while when this finish > executing the dataset gets loaded. Is this an error or I am doing something > wrong? > > > > > > > > Dr. Radu Tudoran > > Research Engineer > > IT R Division > > > > cid:image007.jpg@01CD52EB.AD060EE0 > > HUAWEI TECHNOLOGIES Duesseldorf GmbH > > European Research Center > > Riesstrasse 25, 80992 München > > > > E-mail: _radu.tudoran@huawei.com_ > > Mobile: +49 15209084330 > > Telephone: +49 891588344173 > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH > Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com > <http://www.huawei.com/> Registered Office: Düsseldorf, Register Court > Düsseldorf, HRB 56063, Managing Director: Jingwen TAO, Wanzhou MENG, > Lifang CHEN Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, > HRB 56063, > Geschäftsführer: Ji
RE: Question about DataStream serialization
Hi, I attached below a function that shows the issue and that operatorstate does not have the initialized value from the open function before the flatmap is called. You can see this as the print will not show anything. If you remove on the other hand the initialization loop and put a non zero value for the dataset flag than the print will work. public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment .getExecutionEnvironment(); DataStream stream = env .socketTextStream("localhost", 16333, '\n') .map(new MapFunction<String, Tuple1>() { @Override public Tuple1 map(String arg0) throws Exception { return new Tuple1(arg0); } }).keyBy(0) .flatMap(new RichFlatMapFunction<Tuple1, String>() { private OperatorState dataset; @Override public void flatMap(Tuple1 arg0, Collector arg1) throws Exception { if (dataset.value() > 0) arg1.collect("Test OK " + arg0); } @Override public void open(Configuration parameters) throws Exception { dataset = getRuntimeContext().getKeyValueState( "loadeddata", Integer.class, 0); /* * Simulate loading data * Looks like if this part is commented out and the dataset is * initialize with 1 for example, than the non-zero value is available * in the flatMap function */ for(int i=0;i<10;i++) { dataset.update(dataset.value()+1); } //System.out.println("dataset value "+dataset.value()); } }); stream.print(); env.execute("test open function"); } Dr. Radu Tudoran Research Engineer IT R Division HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center Riesstrasse 25, 80992 München E-mail: radu.tudo...@huawei.com Mobile: +49 15209084330 Telephone: +49 891588344173 HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it! -Original Message- From: Aljoscha Krettek [mailto:aljos...@apache.org] Sent: Tuesday, December 08, 2015 12:14 PM To: user@flink.apache.org Subject: Re: Question about DataStream serialization Hi, if the open() method is indeed not called before the first flatMap() call then this would be a bug. Could you please verify that this is the case and maybe provide an example where this is observable? Cheers, Aljoscha > On 08 Dec 2015, at 10:41, Matthias J. Sax <mj...@apache.org> wrote: > > Hi, > > I think (but please someone verify) that an OperatorState is ac
RE: Question about DataStream serialization
Hi, The state that is being loaded can very well be partitioned by keys. Assuming this scenario and that you would now that the keys go from 0 to N, is there some possibility to load and partitioned the initial data in the open function? Dr. Radu Tudoran Research Engineer IT R Division HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center Riesstrasse 25, 80992 München E-mail: radu.tudo...@huawei.com Mobile: +49 15209084330 Telephone: +49 891588344173 HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it! -Original Message- From: Aljoscha Krettek [mailto:aljos...@apache.org] Sent: Tuesday, December 08, 2015 4:20 PM To: user@flink.apache.org Subject: Re: Question about DataStream serialization Ah, I see what’s the problem. Operator state is scoped to the key of the incoming element. In the open() method, no element has been received yet, so the key of the incoming element is basically NULL. So the open() method initializes state for key NULL. In flatMap() we actually have a key of incoming elements so we access state for a specific key, which has default value “0” (from the getKeyValueState() call). OperatorState is only useful if the state needs to be partitioned by key, but here it seems that the state is valid for all elements? > On 08 Dec 2015, at 15:30, Radu Tudoran <radu.tudo...@huawei.com> wrote: > > final StreamExecutionEnvironment env = StreamExecutionEnvironment > .getExecutionEnvironment(); > > DataStream stream = env > .socketTextStream("localhost", 16333, '\n') > .map(new MapFunction<String, Tuple1>() { > @Override > public Tuple1 map(String arg0) > throws Exception { > return new Tuple1(arg0); > } > }).keyBy(0) > .flatMap(new > RichFlatMapFunction<Tuple1, String>() { > > private OperatorState dataset; > > @Override > public void flatMap(Tuple1 arg0, > Collector arg1) > throws Exception { > > if (dataset.value() > 0) > arg1.collect("Test OK " > + arg0); > > > > } > > @Override > public void open(Configuration > parameters) throws Exception { > > dataset = > getRuntimeContext().getKeyValueState( > "loadeddata", > Integer.class, 0); > > >/* > * Simulate loading data > * Looks like if this part is > commented out and the dataset is > * initialize with 1 for > example, than the non-zero value is available > * in the flatMap function > */ > > for(int i=0;i<10;i++) { > > dataset.update(dataset.value()+1); > } > > //System.out.println("dataset
RE: Question about DataStream serialization
Hi, Is the partitioned functioned used by the ".keyBy(Object)" of the form: Object.hash % getNumberOfParallelSubtasks() ? Dr. Radu Tudoran Research Engineer IT R Division HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center Riesstrasse 25, 80992 München E-mail: radu.tudo...@huawei.com Mobile: +49 15209084330 Telephone: +49 891588344173 HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it! -Original Message- From: Aljoscha Krettek [mailto:aljos...@apache.org] Sent: Tuesday, December 08, 2015 5:00 PM To: user@flink.apache.org Subject: Re: Question about DataStream serialization Hi, it is not possible in an officially supported way. There is however a trick that you could use: You can cast the OperatorState to a KvState. This has a method setCurrentKey() that sets the key to be used when calling value() and update(). In this way you can trick the OperatorState into thinking that it has the key of an input element. This is an internal API, however, and could change in the future, thereby breaking your program. Cheers, Aljoscha > On 08 Dec 2015, at 16:31, Radu Tudoran <radu.tudo...@huawei.com> wrote: > > Hi, > > The state that is being loaded can very well be partitioned by keys. Assuming > this scenario and that you would now that the keys go from 0 to N, is there > some possibility to load and partitioned the initial data in the open > function? > > > Dr. Radu Tudoran > Research Engineer > IT R Division > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH > European Research Center > Riesstrasse 25, 80992 München > > E-mail: radu.tudo...@huawei.com > Mobile: +49 15209084330 > Telephone: +49 891588344173 > > HUAWEI TECHNOLOGIES Duesseldorf GmbH > Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com Registered > Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing > Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN Sitz der > Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, > Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN This e-mail > and its attachments contain confidential information from HUAWEI, which is > intended only for the person or entity whose address is listed above. Any use > of the information contained herein in any way (including, but not limited > to, total or partial disclosure, reproduction, or dissemination) by persons > other than the intended recipient(s) is prohibited. If you receive this > e-mail in error, please notify the sender by phone or email immediately and > delete it! > > > -Original Message- > From: Aljoscha Krettek [mailto:aljos...@apache.org] > Sent: Tuesday, December 08, 2015 4:20 PM > To: user@flink.apache.org > Subject: Re: Question about DataStream serialization > > Ah, I see what’s the problem. Operator state is scoped to the key of the > incoming element. In the open() method, no element has been received yet, so > the key of the incoming element is basically NULL. So the open() method > initializes state for key NULL. In flatMap() we actually have a key of > incoming elements so we access state for a specific key, which has default > value “0” (from the getKeyValueState() call). > > OperatorState is only useful if the state needs to be partitioned by key, but > here it seems that the state is valid for all elements? >> On 08 Dec 2015, at 15:30, Radu Tudoran <radu.tudo...@huawei.com> wrote: >> >> final StreamExecutionEnvironment env = StreamExecutionEnvironment >> .getExecutionEnvironment(); >> >> DataStream stream = env >> .socketTextStream("localhost", 16333, '\n') >> .map(new MapFunction<String, Tuple1>() { >> @Override >> public Tuple1 map(String arg0) >> throws Exception { >> retu
question about DataStream serialization
Hello, I have the following scenario · Reading a fixed set DataStream fixedset = env.readtextFile(... · Reading a continuous stream of data DataStream stream = I would need that for each event read from the continuous stream to make some operations onit and on the fixedsettoghether I have tried something like List<> Final myObject.referenceStaticSet = fixedset; stream.map(new MapFunction<String, String>() { @Override public String map(String arg0) throws Exception { //for example: final string2add = arg0; //the goal of below function would be to add the string2add to the fixedset myObject.referenceStaticSet = myObject.referenceStaticSet.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String arg0, Collector arg1) //for example adding to the fixed set also the string2add object: arg1.collect(string2add); } ... } However, I get an exception (Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: ) that object is not serializable (Object MyClass$3@a71081 not serializable ) Looking into this I see that the DataStream<> is not serializable What would be the solution to this issue? As I said I would like that for each event from the continuous stream to use the initial fixed set, add the event to it and apply an operation. Stephan was mentioning at some point some possibility to create a DataSet and launch a batch processing while operating in stream mode- in case this is possible, can you give me a reference for it, because it might be the good solution to use in case I could treat the fixed set as a DataSet and just add/remove the incoming event to it and apply an operation Regards, Dr. Radu Tudoran Research Engineer IT R Division [cid:image007.jpg@01CD52EB.AD060EE0] HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center Riesstrasse 25, 80992 München E-mail: radu.tudo...@huawei.com Mobile: +49 15209084330 Telephone: +49 891588344173 HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com<http://www.huawei.com/> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it!
RE: flink connectors
Hi, Is there any alternative to avoiding maven? That is why I was curious if there is a binary distribution of this available for download directly Dr. Radu Tudoran Research Engineer IT R Division [cid:image007.jpg@01CD52EB.AD060EE0] HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center Riesstrasse 25, 80992 München E-mail: radu.tudo...@huawei.com Mobile: +49 15209084330 Telephone: +49 891588344173 HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com<http://www.huawei.com/> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it! From: Fabian Hueske [mailto:fhue...@gmail.com] Sent: Friday, November 27, 2015 2:41 PM To: user@flink.apache.org Subject: Re: flink connectors Hi Radu, the connectors are available in Maven Central. Just add them as a dependency in your project and they will be fetched and included. Best, Fabian 2015-11-27 14:38 GMT+01:00 Radu Tudoran <radu.tudo...@huawei.com<mailto:radu.tudo...@huawei.com>>: Hi, I was trying to use flink connectors. However, when I tried to import this import org.apache.flink.streaming.connectors.*; I saw that they are not present in the binary distribution as downloaded from website (flink-dist-0.10.0.jar). Is this intentionally? Is there also a binary distribution that contains these connectors? Regards, Dr. Radu Tudoran Research Engineer IT R Division [cid:image007.jpg@01CD52EB.AD060EE0] HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center Riesstrasse 25, 80992 München E-mail: radu.tudo...@huawei.com<mailto:radu.tudo...@huawei.com> Mobile: +49 15209084330<tel:%2B49%2015209084330> Telephone: +49 891588344173<tel:%2B49%20891588344173> HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com<http://www.huawei.com/> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it!
RE: flink connectors
Hi, Thank you for the tips! For future references in case someone else wants to search for the binaries for this, I would like to share the link to the maven repository http://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka Dr. Radu Tudoran Research Engineer IT R Division HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center Riesstrasse 25, 80992 München E-mail: radu.tudo...@huawei.com Mobile: +49 15209084330 Telephone: +49 891588344173 HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it! -Original Message- From: Matthias J. Sax [mailto:mj...@apache.org] Sent: Friday, November 27, 2015 2:53 PM To: user@flink.apache.org Subject: Re: flink connectors If I understand the question right, you just want to download the jar manually? Just go to the maven repository website and download the jar from there. -Matthias On 11/27/2015 02:49 PM, Robert Metzger wrote: > Maybe there is a maven mirror you can access from your network? > > This site contains a list of some mirrors > http://stackoverflow.com/questions/5233610/what-are-the-official-mirro > rs-of-the-maven-central-repository > You don't have to use the maven tool, you can also manually browse for > the jars and download what you need. > > > On Fri, Nov 27, 2015 at 2:46 PM, Fabian Hueske <fhue...@gmail.com > <mailto:fhue...@gmail.com>> wrote: > > You can always build Flink from source, but apart from that I am not > aware of an alternative. > > 2015-11-27 14:42 GMT+01:00 Radu Tudoran <radu.tudo...@huawei.com > <mailto:radu.tudo...@huawei.com>>: > > Hi, > > __ __ > > Is there any alternative to avoiding maven? > > That is why I was curious if there is a binary distribution of > this available for download directly > > __ __ > > Dr. Radu Tudoran > > Research Engineer > > IT R Division > > __ __ > > cid:image007.jpg@01CD52EB.AD060EE0 > > HUAWEI TECHNOLOGIES Duesseldorf GmbH > > European Research Center > > Riesstrasse 25, 80992 München > > __ __ > > E-mail: _radu.tudo...@huawei.com > <mailto:radu.tudo...@huawei.com>_ > > Mobile: +49 15209084330 <tel:%2B49%2015209084330> > > Telephone: +49 891588344173 <tel:%2B49%20891588344173> > > __ __ > > HUAWEI TECHNOLOGIES Duesseldorf GmbH > Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com > <http://www.huawei.com/> > Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, > Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN > Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB > 56063, > Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN > > This e-mail and its attachments contain confidential information > from HUAWEI, which is intended only for the person or entity > whose address is listed above. Any use of the information > contained herein in any way (including, but not limited to, > total or partial disclosure, reproduction, or dissemination) by > persons other than the intended recipient(s) is prohibited. If > you receive this e-mail in error, please notify the sender by > phone or email immediately and delete it! > > __ __ > > *From:*Fabian Hueske [mailto:fhue...@gmail.com > <mailto:fhue...@gmail.com>] > *Sent:* Friday, November 27, 2015 2:41 PM > *To:* user@flink.apache.org <mailto:user@flink.apache.org> > *Subject:* Re: flink connectors > > __ __ > > Hi Radu, > > the connectors are available in Maven Central. > > Just add them as a dependency in your
flink connectors
Hi, I was trying to use flink connectors. However, when I tried to import this import org.apache.flink.streaming.connectors.*; I saw that they are not present in the binary distribution as downloaded from website (flink-dist-0.10.0.jar). Is this intentionally? Is there also a binary distribution that contains these connectors? Regards, Dr. Radu Tudoran Research Engineer IT R Division [cid:image007.jpg@01CD52EB.AD060EE0] HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center Riesstrasse 25, 80992 München E-mail: radu.tudo...@huawei.com Mobile: +49 15209084330 Telephone: +49 891588344173 HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com<http://www.huawei.com/> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it!
RE: output writer
Hi, My 2 cents ... based on something similar that I have tried. I have created an own implementation for OutputFormat where you define your own logic for what happens in the writerecord function. This logic would consist in making a distinction between the ids and write each to the appropriate file Might be that other solutions exist Dr. Radu Tudoran Research Engineer IT RD Division HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center Riesstrasse 25, 80992 München E-mail: radu.tudo...@huawei.com Mobile: +49 15209084330 Telephone: +49 891588344173 HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN -Original Message- From: Michele Bertoni [mailto:michele1.bert...@mail.polimi.it] Sent: Thursday, July 30, 2015 10:15 AM To: user@flink.apache.org Subject: output writer Hi everybody, I have a question about the writer I have to save my dataset in different files according to a field of the tuples let’s assume I have a groupId in the tuple, I need to store each group in a different file, with a custom name: any idea on how i can do that? thanks! Michele
RE: output writer
I will double check and try to commit this in the next days Dr. Radu Tudoran Research Engineer IT RD Division [cid:image007.jpg@01CD52EB.AD060EE0] HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center Riesstrasse 25, 80992 München E-mail: radu.tudo...@huawei.com Mobile: +49 15209084330 Telephone: +49 891588344173 HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.comhttp://www.huawei.com/ Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN From: Fabian Hueske [mailto:fhue...@gmail.com] Sent: Thursday, July 30, 2015 11:34 AM To: user@flink.apache.org Subject: Re: output writer Hi Michele, hi Radu Flink does not have such an OutputFormat, but I agree, it would be a valuable addition. Radu's approach looks like the way to go to implement this feature. @Radu, is there a way to contribute your OutputFormat to Flink? Cheers, Fabian 2015-07-30 10:24 GMT+02:00 Radu Tudoran radu.tudo...@huawei.commailto:radu.tudo...@huawei.com: Hi, My 2 cents ... based on something similar that I have tried. I have created an own implementation for OutputFormat where you define your own logic for what happens in the writerecord function. This logic would consist in making a distinction between the ids and write each to the appropriate file Might be that other solutions exist Dr. Radu Tudoran Research Engineer IT RD Division HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center Riesstrasse 25, 80992 München E-mail: radu.tudo...@huawei.commailto:radu.tudo...@huawei.com Mobile: +49 15209084330tel:%2B49%2015209084330 Telephone: +49 891588344173tel:%2B49%20891588344173 HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.comhttp://www.huawei.com Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN -Original Message- From: Michele Bertoni [mailto:michele1.bert...@mail.polimi.itmailto:michele1.bert...@mail.polimi.it] Sent: Thursday, July 30, 2015 10:15 AM To: user@flink.apache.orgmailto:user@flink.apache.org Subject: output writer Hi everybody, I have a question about the writer I have to save my dataset in different files according to a field of the tuples let’s assume I have a groupId in the tuple, I need to store each group in a different file, with a custom name: any idea on how i can do that? thanks! Michele
RE: output writer
Re-hi, I have double –checked and actually there is an OutputFormat interface in flink which can be extended. I believe that for this kind of specific formats as mentioned by Michele, each can develop the appropriate format. On the other hand, having more outputformats I believe is something that could be contributed. We should identify a couple of common formats. The first one that comes in my mind is to have something for writing to memory (e.g. memory buffer) Dr. Radu Tudoran Research Engineer IT RD Division [cid:image007.jpg@01CD52EB.AD060EE0] HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center Riesstrasse 25, 80992 München E-mail: radu.tudo...@huawei.com Mobile: +49 15209084330 Telephone: +49 891588344173 HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.comhttp://www.huawei.com/ Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN From: Fabian Hueske [mailto:fhue...@gmail.com] Sent: Thursday, July 30, 2015 11:34 AM To: user@flink.apache.org Subject: Re: output writer Hi Michele, hi Radu Flink does not have such an OutputFormat, but I agree, it would be a valuable addition. Radu's approach looks like the way to go to implement this feature. @Radu, is there a way to contribute your OutputFormat to Flink? Cheers, Fabian 2015-07-30 10:24 GMT+02:00 Radu Tudoran radu.tudo...@huawei.commailto:radu.tudo...@huawei.com: Hi, My 2 cents ... based on something similar that I have tried. I have created an own implementation for OutputFormat where you define your own logic for what happens in the writerecord function. This logic would consist in making a distinction between the ids and write each to the appropriate file Might be that other solutions exist Dr. Radu Tudoran Research Engineer IT RD Division HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center Riesstrasse 25, 80992 München E-mail: radu.tudo...@huawei.commailto:radu.tudo...@huawei.com Mobile: +49 15209084330tel:%2B49%2015209084330 Telephone: +49 891588344173tel:%2B49%20891588344173 HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.comhttp://www.huawei.com Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN -Original Message- From: Michele Bertoni [mailto:michele1.bert...@mail.polimi.itmailto:michele1.bert...@mail.polimi.it] Sent: Thursday, July 30, 2015 10:15 AM To: user@flink.apache.orgmailto:user@flink.apache.org Subject: output writer Hi everybody, I have a question about the writer I have to save my dataset in different files according to a field of the tuples let’s assume I have a groupId in the tuple, I need to store each group in a different file, with a custom name: any idea on how i can do that? thanks! Michele