Re: Dependency injection and flink.

2020-11-09 Thread Arvid Heise
I hope you don't mind that I'm just lazily giving you a link to wikipedia [1]. The first few examples all show manual DI with ctor, setters, etc. Folks usually only think of assembling (automatic) DI when talking about DI but you can build everything manually with a bit of inconvenience as well. W

Re: Stream aggregation using Flink Table API (Blink plan)

2020-11-09 Thread Jark Wu
Hi Felipe, The "Split Distinct Aggregation", i.e. the "table.optimizer.distinct-agg.split.enabled" option, only works for distinct aggregations (e.g. COUNT(DISTINCT ...)). However, the query in your example is using COUNT(driverId). You can update it to COUNT(DISTINCT driverId), and it should sh

Re: checkpoint interval and hdfs file capacity

2020-11-09 Thread lec ssmi
Thanks. I have some jobs with the checkpoint interval 1000ms. And the HDFS files grow too large to work normally . What I am curious about is, are writing and deleting performed synchronously? Is it possible to add too fast to delete old files? Congxian Qiu 于2020年11月10日周二 下午2:16写道: > Hi >

Re: checkpoint interval and hdfs file capacity

2020-11-09 Thread Congxian Qiu
Hi No matter what interval you set, Flink will take care of the checkpoints(remove the useless checkpoint when it can), but when you set a very small checkpoint interval, there may be much high pressure for the storage system(here is RPC pressure of HDFS NN). Best, Congxian lec ssmi 于2020年1

checkpoint interval and hdfs file capacity

2020-11-09 Thread lec ssmi
Hi, if I set the checkpoint interval to be very small, such as 5 seconds, will there be a lot of state files on HDFS? In theory, no matter what the interval is set, every time you checkpoint, the old file will be deleted and new file will be written, right?

Flink Kafka Table API for python with JAAS

2020-11-09 Thread Sweta Kalakuntla
Hi, I am using Flink 1.11.2 version Python Table API to connect to Kafka Topic using SASL protocol but it fails with the following error. I tried the same properties in Flink java version, and I am able to connect. Has anyone faced this issue and how did you resolve it? Error: Caused by: javax.s

Re: Caching Mechanism in Flink

2020-11-09 Thread Jack Kolokasis
Thank you Xuannan for the reply. Also I want to ask about how Flink uses the off-heap memory. If I set taskmanager.memory.task.off-heap.size then which data does Flink allocate off-heap? This is handle by the programmer? Best, Iacovos On 10/11/20 4:42 π.μ., Xuannan Su wrote: Hi Jack, At th

Re: ValidationException using DataTypeHint in Scalar Function

2020-11-09 Thread Steve Whelan
Timo and Dawid, Registering my UDF via the deprecated *registerFunction()* instead of the new *createTemporarySystemFunction()* worked. So it would appear there is some incompatibility with my implementation and the new registration system. I will wait for the Flip to be completed and retry then.

Re: Caching Mechanism in Flink

2020-11-09 Thread Xuannan Su
Hi Jack, At the moment, Flink doesn't support caching the intermediate result. However, there is some ongoing effort to support caching in Flink. FLIP-36[1] propose to add the caching mechanism at the Table API. And it is planned for 1.13. Best, Xuannan On Nov 10, 2020, 4:29 AM +0800, Jack Kolo

Re: Help needed to increase throughput of simple flink app

2020-11-09 Thread Jaffe, Julian
One thing to check is how much you're serializing to the network. If you're using Avro Generic records without special handling you can wind up serializing the schema with every record, greatly increasing the amount of data you're sending across the wire. On 11/9/20, 8:14 AM, "ashwinkonale" w

CLI help, documentation is confusing...

2020-11-09 Thread Marco Villalobos
The flink CLI documentation says that the -m option is to specify the job manager. but the examples are passing in an execution target. I am quite confused by this. ./bin/flink run -m yarn-cluster \ ./examples/batch/WordCount.jar \ --input hdfs:///

SSL setup for YARN deployment when hostnames are unknown.

2020-11-09 Thread Jiahui Jiang
Hello Flink! We are working on turning on REST SSL for YARN deployments. We built a generic orchestration server that can submit Flink clusters to any YARN clusters given the relevant Hadoop configs. But this means we may not know the hostname the Job Managers can be deployed onto - not even th

debug statefun

2020-11-09 Thread Lian Jiang
Hi, I created a POC by mimicing statefun-greeter-example. However, it failed due to: Caused by: java.lang.IllegalStateException: There are no ingress defined. at org.apache.flink.statefun.flink.core.StatefulFunctionsUniverseValidator.validate(StatefulFunctionsUniverseValidator.java:25) ~[

Caching Mechanism in Flink

2020-11-09 Thread Jack Kolokasis
Hello all, I am new to Flink and I want to ask if the Flink supports a caching mechanism to store intermediate results in memory for machine learning workloads. If yes, how can I enable it and how can I use it? Thank you, Iacovos

Re: SQL aggregation functions inside the Table API

2020-11-09 Thread Ori Popowski
Thanks On Mon, Nov 9, 2020 at 4:50 PM Timo Walther wrote: > Hi Ori, > > we might support SQL expressions soon in Table API. However, we might > not support aggregate functions immediately. I would recommend to use > `sqlQuery` for now. > > The following is supported: > > val table = tenv.fromDa

Re: Help needed to increase throughput of simple flink app

2020-11-09 Thread ashwinkonale
Hi, Thanks a lot for the reply. I added some more metrics to the pipeline to understand bottleneck. Seems like avro deserialization introduces some delay. With use of histogram I found processing of a single message takes ~300us(p99). ~180(p50). Which means a single slot can output at most 3000 mes

Re: Help needed to increase throughput of simple flink app

2020-11-09 Thread Kostas Kloudas
Hi Ashwin, Do you have any filtering or aggregation (or any operation that emits less data than it receives) in your logic? If yes, you could for example put if before the reschaling operation so that it gets chained to your source and you reduce the amount of data you ship through the network. Af

Re: Table SQL Filesystem CSV recursive directory traversal

2020-11-09 Thread Timo Walther
Hi Ruben, by looking at the code, it seems you should be able to do that. At least for batch workloads we are using org.apache.flink.formats.csv.CsvFileSystemFormatFactory.CsvInputFormat which is a FileInputFormat that supports the mentioned configuration option. The problem is that this mig

Re: Understanding kafka watermark strategy assigner

2020-11-09 Thread Kostas Kloudas
Hi Nikola, Apart from the potential overhead you mentioned about having one more operator, I cannot find any other. Also even this one I think is negligible. The reason why we recommend attaching the Watermark Generator to the source is more about semantics rather than efficiency. It seems natural

Re: ValidationException using DataTypeHint in Scalar Function

2020-11-09 Thread Timo Walther
Sorry for jumping in so late. I think Dawid gave a nice summary. As he said, integration of the DataStream <> Table integration is still under development. Until then I would suggest to option 3) which means don't upgrade the functions and use the old registration function `registerFunction`.

Re: SQL aggregation functions inside the Table API

2020-11-09 Thread Timo Walther
Hi Ori, we might support SQL expressions soon in Table API. However, we might not support aggregate functions immediately. I would recommend to use `sqlQuery` for now. The following is supported: val table = tenv.fromDataStream(stream) val newTable = tenv.sqlQuery(s"SELECT ... FROM $table")

Re: Stream aggregation using Flink Table API (Blink plan)

2020-11-09 Thread Felipe Gutierrez
I realized that I forgot the image. Now it is attached. -- -- Felipe Gutierrez -- skype: felipe.o.gutierrez -- https://felipeogutierrez.blogspot.com On Mon, Nov 9, 2020 at 1:41 PM Felipe Gutierrez wrote: > > Hi community, > > I am testing the "Split Distinct Aggregation" [1] consuming the taxi >

SQL aggregation functions inside the Table API

2020-11-09 Thread Ori Popowski
Hi, Some functions only exist in the SQL interface and are missing from the Table API. For example LAST_VALUE(expression) [1] I still want to use this function in my aggregation, and I don't want to implement a user-defined function. Can I combine an SQL expression inside my Table API? For examp

Stream aggregation using Flink Table API (Blink plan)

2020-11-09 Thread Felipe Gutierrez
Hi community, I am testing the "Split Distinct Aggregation" [1] consuming the taxi ride data set. My sql query from the table environment is the one below: Table tableCountDistinct = tableEnv.sqlQuery("SELECT startDate, COUNT(driverId) FROM TaxiRide GROUP BY startDate"); and I am enableing: conf

Re: How to use properly the function: withTimestampAssigner((event, timestamp) ->..

2020-11-09 Thread Aljoscha Krettek
@Till For instances where we use withTimestampAssigner() the examples in the docs always use the explicit generic parameter. (See event_timestamps_watermarks.md and streaming_analytics.md). For cases where we don't use withTimestampAssigner() we don't need the extra generic parameter because th

Re: ValidationException using DataTypeHint in Scalar Function

2020-11-09 Thread Dawid Wysakowicz
Hi Steve, Unfortunately the information you posted still does not explain how you ended up with *RAW('java.util.Map', ?)* for your input type. Would be best if you could share an example that I could use to reproduce it. I tried putting down some potential approaches: I tested it with a class ge

Re: Help needed to increase throughput of simple flink app

2020-11-09 Thread ashwinkonale
Hi Till, Thanks a lot for the reply. The problem I am facing is as soon as I add network(remove chaining) to discarding sink, I have huge problem with throughput. Do you have any pointers on how can I go about debugging this ? - Ashwin -- Sent from: http://apache-flink-user-mailing-list-archive

Re: Help needed to increase throughput of simple flink app

2020-11-09 Thread Till Rohrmann
Hi Ashwin, Thanks for reaching out to the Flink community. Since you have tested that a kafka_source -> discarding_sink can process 10 Million records/s you might also wanna test the write throughput to data_sink and dlq_sink. Maybe these sinks are limiting your overall throughput by backpressurin

Re: How to use properly the function: withTimestampAssigner((event, timestamp) ->..

2020-11-09 Thread Till Rohrmann
Glad to hear it! Cheers, Till On Sun, Nov 8, 2020 at 8:02 PM Simone Cavallarin wrote: > Hi Till, > > That's great! thank you so much!!! I have spent one week on this. I'm so > relieved! > > Cheers > > s > > > -- > *From:* Till Rohrmann > *Sent:* 06 November 2020 17:

Table SQL Filesystem CSV recursive directory traversal

2020-11-09 Thread Ruben Laguna
Is it possible? For Dataset I've found [1] : parameters.setBoolean("recursive.file.enumeration", true); // pass the configuration to the data sourceDataSet logs = env.readTextFile("file:///path/with.nested/files") .withParameters(parameters); But can I achieve somethin