Re: How many events can Flink process each second

2019-10-23 Thread Michael Latta
There are a lot of variables. How many cores are allocated, how much ram, etc. 
there are companies doing billions of events per day and more. 

Tell your boss it has proven to have extremely flat horizontal scaling. Meaning 
you can get it to process almost any number given sufficient hardware. 

You will need to do a proof of concept on your data and your analysis to know 
how much hardware is required for your problem. 


Michael

> On Oct 23, 2019, at 7:24 AM, A. V.  wrote:
> 
> 
> Hi,
> 
> My boss wants to know how many events Flink can process, analyse etc. per 
> second? I cant find this in the documentation.
> 
> 


Re: Calculating over multiple streams...

2019-02-22 Thread Michael Latta
You may want to union the 3 streams prior to the process function if they are 
independently processed. 


Michael

> On Feb 22, 2019, at 9:15 AM, Oytun Tez  wrote:
> 
> Hi everyone!
> 
> I've been struggling with an implementation problem in the last days, which I 
> am almost sure caused by my misunderstanding of Flink. 
> 
> The purpose: consume multiple streams, update a score list (with meta data 
> e.g. user_id) for each update coming from any of the streams. The new output 
> list will also need to be used by another pattern.
> We created 3 SourceFunctions, that periodically go to our MySQL database and 
> stream new results back. This one returns POJOs.
> Then we flatMap these streams to unify their Type. They are now all Tuple3s 
> with matching types.
> And we process each stream with the same ProcessFunction.
> I am stuck with the output list.
> Business case (human translation workflow):
> Input: Stream "translation quality" score updates of each translator 
> [translator_id, score]
> Input: Stream "responsivity score" updates of each translator (email open 
> rates/speeds etc) [translator_id, score]
> Input: Stream "number of projects" updates each translator worked on 
> [translator_id, score]
> Calculation: for each translator, use 3 scores to come up with a unified 
> score and its percentile over all translators. This step definitely feels 
> like a Batch job, but I am pushing to go with a streaming mindset.
> So now supposedly, in this way or another, I have a list of translators with 
> their unified score and percentile over this list.
> Another independent stream should send me updates on "need for proofreaders" 
> – I couldn't even come to this point yet. Once a need info is streamed, 
> application would fetch the previously calculated list and let's say picks 
> the top X determined by the message from need algorithm.
> 
> 
> 
> Overall, my desire is to make everything a stream and let the data and 
> decisions constantly react to stream updates. I am very confused at this 
> point. Tried using keyed and operator states, but they seem to be keeping 
> their state only for their own items. Considering to do Batch instead after 
> all the struggle.
> 
> Any ideas? I can even get on a call.
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> ---
> Oytun Tez
> 
> M O T A W O R D
> The World's Fastest Human Translation Platform.
> oy...@motaword.com — www.motaword.com


Re: Flink SQL questions

2018-11-01 Thread Michael Latta
Thanks, I will check it out. 

Michael

Sent from my iPad

> On Nov 1, 2018, at 8:22 PM, Hequn Cheng  wrote:
> 
> Hi Michael,
> 
> There are some test cases in Flink git, such as[1] which I think may help you.
> 
> Best, Hequn
> [1] 
> https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java
> 
> 
>> On Fri, Nov 2, 2018 at 7:46 AM TechnoMage  wrote:
>> I am trying to get a minimal Flink SQL program going to test it out.  I have 
>> a StreamExecutionEnvironment and from that created a StreamTableEnvironment. 
>>  The docs indicate there should be a fromDataStream method to create a 
>> Table, but none appears to exist according to Eclipse.  The method 
>> registerDataStream is also missing, just registerDataSteramInternal.  The 
>> Internal suggests private API to me, so I am asking if the docs are out of 
>> date, or I am missing some library or such.  I am using Java 8.0 not Scala.
>> 
>> Michael Latta
>> 


Job manager UI improvement

2018-11-01 Thread Michael Latta
I would really like to see the job manager show metrics on state size, not just 
io per task. Is there a way to do that now, or is the metric there, and just 
needs some UI Ewok to show it?

Michael

Sent from my iPad


Re: dynamic tables in cassandra sink

2018-05-03 Thread Michael Latta
If you restart the job each month you can build the string dynamically. If you 
want data to flow into the log based on a date in the record you will need to 
do something fancier. I have not used the casandra connector so I can’t help on 
the details. Can you subclass the connector and build the query dynamically for 
each data item? Or build a custom sink that just writes to casandra?

Michael

Sent from my iPad

> On May 3, 2018, at 5:31 PM, Meghashyam Sandeep V  
> wrote:
> 
> Hi there,
> 
> I have a flink stream from kafka writing to Cassandra. We use monthly tables 
> in Cassandra to avoid TTL and tombstones that come with it. Tables would be 
> like table_05_2018, table_06_2018 and so on. How do I dynamically register 
> this table name in the following snippet? 
> 
>   CassandraSink
> .addSink(dataStream)
> .setQuery("INSERT INTO table_05_2018(id, text) VALUES (?,?);")
> .setHost("127.0.0.1")
> .build();
> 
> 
> 
> 
> 


Re: Different result on running Flink in local mode and Yarn cluster

2018-04-26 Thread Michael Latta
Not knowing the library or the config needs I do not have a suggestion. If the 
config is accumulated from inputs and needs to see all inputs I would suggest 
setting parallelism to 1 as an experiment, but it would need a redesign to run 
in parallel. 

Michael

Sent from my iPad

> On Apr 26, 2018, at 12:50 AM, Soheil Pourbafrani  
> wrote:
> 
> Thanks, So what is your suggestion to solve the problem? Is it possible to 
> use Broadcast Variables for this senario?
> 
>> On Thu, Apr 26, 2018 at 10:57 AM, TechnoMage  wrote:
>> What parallelism are you using?  If it is > 1 you can not rely on the config 
>> value being passed to each of the parsing functions as they are running on 
>> separate threads or even separate machines.
>> 
>> Michael
>> 
>> 
>>> On Apr 26, 2018, at 12:24 AM, Soheil Pourbafrani  
>>> wrote:
>>> 
>>> As I said at first version of the code I didn't pass any argument to the 
>>> parse function and the HashMap was static among the Parser class, but it 
>>> didn't get the desired anwser and I test giving the HashMap as an argument 
>>> for parse method, but still not getting desired answers! The code is 
>>> something like the following:
>>> 
>>> public class Test {
>>> 
>>> public static void main(String[] args) throws Exception {
>>> 
>>> CassandraConnection.connect();
>>> Parser.setInsert(true);
>>> 
>>> stream.flatMap(new FlatMapFunction() {
>>> @Override
>>> public void flatMap(byte[] value, Collector out) throws 
>>> Exception {
>>> // Parser.parse(ByteBuffer.wrap(value));
>>> }
>>> });
>>> env.execute();
>>> }
>>> }
>>> 
>>> 
>>> 
>>> 
>>> I summary I want the HashMap to be shared among the taskmanagers.
>>> 
>>> 
>>> 
>> 
> 


Re: Different result on running Flink in local mode and Yarn cluster

2018-04-26 Thread Michael Latta
Only the anonymous FlatMapFunction instance is sent to the TaskManager. Move 
the static field to that class. 

Michael

Sent from my iPad

> On Apr 25, 2018, at 10:42 PM, Soheil Pourbafrani  
> wrote:
> 
> I run a code using Flink Java API that gets some bytes from Kafka and parses 
> it following by inserting into Cassandra database using another library 
> static method (both parsing and inserting results is done by the library). 
> Running code on local in IDE, I get the desired answer, but running on YARN 
> cluster the parse method didn't work as expected!
> 
> public class Test {
> static HashMap ConfigHashMap = new HashMap<>();
> 
> public static void main(String[] args) throws Exception {
> 
> CassandraConnection.connect();
> Parser.setInsert(true);
> 
> stream.flatMap(new FlatMapFunction() {
> @Override
> public void flatMap(byte[] value, Collector out) throws 
> Exception {
> Parser.parse(ByteBuffer.wrap(value), ConfigHashMap);
> // Parser.parse(ByteBuffer.wrap(value));
> }
> });
> env.execute();
> }
> }
> 
> There is a static HashMap field in the classParser that configuration of 
> parsing data is based on its information, and data will insert it during the 
> execution. The problem running on YARN was this data was not available for 
> taskmanagers and they just print config is not available!
> 
> So I redefine that HashMap as a parameter for the methodparse, but no 
> differences in results!
> 
> How can I fix the problem?
> 
> 


Re: Flink/Kafka POC performance issue

2018-04-17 Thread Michael Latta
Thanks for the suggestion. The task manager is configured for 8GB of heap, and 
gets to about 8.3 total. Other java processes (job manager and Kafka). Add a 
few more. I will check it again but the instances have 16GB same as my laptop 
that completes the test in <90 min. 

Michael

Sent from my iPad

> On Apr 16, 2018, at 10:53 PM, Niclas Hedhman  wrote:
> 
> 
> Have you checked memory usage? It could be as simple as either having memory 
> leaks, or aggregating more than you think (sometimes not obvious how much is 
> kept around in memory for longer than one first thinks). If possible, connect 
> FlightRecorder or similar tool and keep an eye on memory. Additionally, I 
> don't have AWS experience to talk of, but IF AWS swaps RAM to disk like 
> regular Linux, then that might be triggered if your JVM heap is bigger than 
> can be handled within the available RAM.
> 
>> On Tue, Apr 17, 2018 at 9:26 AM, TechnoMage  wrote:
>> I am doing a short Proof of Concept for using Flink and Kafka in our 
>> product.  On my laptop I can process 10M inputs in about 90 min.  On 2 
>> different EC2 instances (m4.xlarge and m5.xlarge both 4core 16GB ram and ssd 
>> storage) I see the process hit a wall around 50min into the test and short 
>> of 7M events processed.  This is running zookeeper, kafka broker, flink all 
>> on the same server in all cases.  My goal is to measure single node vs. 
>> multi-node and test horizontal scalability, but I would like to figure out 
>> why hit hits a wall first.  I have the task maanger configured with 6 slots 
>> and the job has 5 parallelism.  The laptop has 8 threads, and the EC2 
>> instances have 4 threads. On smaller data sets and in the begining of each 
>> test the EC2 instances outpace the laptop.  I will try again with an 
>> m5.2xlarge which has 8 threads and 32GB ram to see if that works better for 
>> this workload.  Any pointers or ways to get metrics that would help diagnose 
>> this would be appreciated.
>> 
>> Michael
>> 
> 
> 
> 
> -- 
> Niclas Hedhman, Software Developer
> http://polygene.apache.org - New Energy for Java


Multi-stream question

2018-04-06 Thread Michael Latta
I would like to “join” several streams (>3) in a custom operator. Is this 
feasible in Flink?


Michael


Re: KeyedSream question

2018-04-06 Thread Michael Latta
Yes. It took a bit of digging in the website to find RichFlatMapFunction to get 
managed state. 

Michael

Sent from my iPad

> On Apr 6, 2018, at 3:29 AM, Fabian Hueske <fhue...@gmail.com> wrote:
> 
> Hi, 
> 
> I think Flink is exactly doing what you are looking for.
> If you use keyed state [1], Flink will put the state always in the context of 
> the key of the currently processed record.
> So if you have a MapFunction with keyed state, and the map() method is called 
> with a record that has a key A, the state will be the state for key A. If the 
> next record has a key B, the state will be for key B.
> 
> Best,
> Fabian
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#keyed-state
> 
> 2018-04-05 14:08 GMT+02:00 Michael Latta <mla...@technomage.com>:
>> Thanks for the clarification. I was just trying to understand the intended 
>> behavior. It would have been nice if Flink tracked state for downstream 
>> operators by key, but I can do that with a map in the downstream functions. 
>> 
>> Michael
>> 
>> Sent from my iPad
>> 
>>> On Apr 5, 2018, at 2:30 AM, Fabian Hueske <fhue...@gmail.com> wrote:
>>> 
>>> Amit is correct. keyBy() ensures that all records with the same key are 
>>> processed by the same paralllel instance of a function.
>>> This is different from "a parallel instance only sees records of one key".
>>> 
>>> I had a look at the docs [1]. 
>>> I agree that "Logically partitions a stream into disjoint partitions, each 
>>> partition containing elements of the same key." can be easily interpreted 
>>> as you did.
>>> I've pushed a commit to clarify the description. The docs should be updated 
>>> soon.
>>> 
>>> Best, Fabian 
>>> 
>>> [1] 
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/#datastream-transformations
>>> 
>>> 2018-04-05 6:21 GMT+02:00 Amit Jain <aj201...@gmail.com>:
>>>> Hi,
>>>> 
>>>> KeyBy operation partition the data on given key and make sure same slot 
>>>> will
>>>> get all future data belonging to same key. In default implementation, it 
>>>> can
>>>> also map subset of keys in your DataStream to same slot.
>>>> 
>>>> Assuming you have number of keys equal to number running slot then you may
>>>> specify your custom keyBy operation to the achieve the same.
>>>> 
>>>> 
>>>> Could you specify your case.
>>>> 
>>>> --
>>>> Thanks
>>>> Amit
>>>> 
>>>> 
>>>> 
>>>> --
>>>> Sent from: 
>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>> 
> 


Re: KeyedSream question

2018-04-05 Thread Michael Latta
Thanks for the clarification. I was just trying to understand the intended 
behavior. It would have been nice if Flink tracked state for downstream 
operators by key, but I can do that with a map in the downstream functions. 

Michael

Sent from my iPad

> On Apr 5, 2018, at 2:30 AM, Fabian Hueske  wrote:
> 
> Amit is correct. keyBy() ensures that all records with the same key are 
> processed by the same paralllel instance of a function.
> This is different from "a parallel instance only sees records of one key".
> 
> I had a look at the docs [1]. 
> I agree that "Logically partitions a stream into disjoint partitions, each 
> partition containing elements of the same key." can be easily interpreted as 
> you did.
> I've pushed a commit to clarify the description. The docs should be updated 
> soon.
> 
> Best, Fabian 
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/#datastream-transformations
> 
> 2018-04-05 6:21 GMT+02:00 Amit Jain :
>> Hi,
>> 
>> KeyBy operation partition the data on given key and make sure same slot will
>> get all future data belonging to same key. In default implementation, it can
>> also map subset of keys in your DataStream to same slot.
>> 
>> Assuming you have number of keys equal to number running slot then you may
>> specify your custom keyBy operation to the achieve the same.
>> 
>> 
>> Could you specify your case.
>> 
>> --
>> Thanks
>> Amit
>> 
>> 
>> 
>> --
>> Sent from: 
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>