Re: Emit message at start and end of event time session window

2020-02-27 Thread Manas Kale
I think one "edge" case which is not handled would be that the first >> event (by event-time) arrives late, then a wrong "started-window" would be >> reported. >> >> Rafi >> >> >> On Thu, Feb 20, 2020 at 12:36 PM Manas Kale >> wrot

Re: Emit message at start and end of event time session window

2020-02-27 Thread Manas Kale
-using-broadcaststate-pattern Thanks, Manas On Thu, Feb 27, 2020 at 4:28 PM Manas Kale wrote: > Hi Rafi and Till, > Thank you for pointing out that edge case, Rafi. > > Till, I am trying to get this example working with the BroadcastState > pattern upstream to the window operator[

Emit message at start and end of event time session window

2020-02-16 Thread Manas Kale
int 1 using a custom trigger, which checks if (lastEventTimestamp - window.getStart()) > MIN_WINDOW_SIZE and triggers a customProcessWindowFunction(). However, with this architecture I can't detect the end of the window. Is my approach correct or is there a completely different method to achieve this? Thanks, Manas Kale

Re: Emit message at start and end of event time session window

2020-02-20 Thread Manas Kale
ossible at the moment. > > I've drafted a potential solution which you can find here [1]. > > [1] https://gist.github.com/tillrohrmann/5251f6d62e256b60947eea7b553519ef > > Cheers, > Till > > On Mon, Feb 17, 2020 at 8:09 AM Manas Kale wrote: > >> Hi, >>

Re: Emit message at start and end of event time session window

2020-02-20 Thread Manas Kale
been a bit easier if the trigger and the window process function could >> share its internal state. Unfortunately, this is not possible at the moment. >> >> I've drafted a potential solution which you can find here [1]. >> >> [1] https://gist.github.com/tillrohrmann/5251f

Re: Perform processing only when watermark updates, buffer data otherwise

2020-04-05 Thread Manas Kale
f your questions. Feel free to ask further > questions. > > Regards, > Timo > > > [1] > > https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html#process-function > [2] > > https://stackoverflow.com/questions/52885

Re: Emit message at start and end of event time session window

2020-03-26 Thread Manas Kale
wrote: > Great to hear that you solved the problem. Let us know if you run into any > other issues. > > Cheers, > Till > > On Fri, Feb 28, 2020 at 8:08 AM Manas Kale wrote: > >> Hi, >> This problem is solved[1]. The issue was that the BroadcastStream did not

Re: Emit message at start and end of event time session window

2020-03-26 Thread Manas Kale
>> you can implement an operator which groups incoming events according to >> sessions and outputs the required information. >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html >> >> Cheers, &g

Re: How to move event time forward using externally generated watermark message

2020-03-26 Thread Manas Kale
ved. > > After the assigner, you can then simply filter out the timeout event and > don't need to care in downstream operations. > > On Mon, Mar 23, 2020 at 11:42 AM Manas Kale wrote: > >> Hi, >> I have a scenario where I have an input event stream from various IoT >>

Perform processing only when watermark updates, buffer data otherwise

2020-04-01 Thread Manas Kale
Hi, I want to perform some processing on events only when the watermark is updated. Otherwise, for all other events, I want to keep buffering them till the watermark arrives. The main motivation behind doing this is that I have several operators that emit events/messages to a downstream operator.

Re: Perform processing only when watermark updates, buffer data otherwise

2020-04-02 Thread Manas Kale
() and connect()? On Thu, Apr 2, 2020 at 10:33 AM Manas Kale wrote: > Hi, > I want to perform some processing on events only when the watermark is > updated. Otherwise, for all other events, I want to keep buffering them > till the watermark arrives. > The main motivation behind doing

Re: Emit message at start and end of event time session window

2020-03-26 Thread Manas Kale
> You could configure the print() operator to run with a parallelism of 1 as > well by adding a setParallelism(1) statement to it. > > Cheers, > Till > > On Thu, Mar 26, 2020 at 7:11 AM Manas Kale wrote: > >> Hi Till, >> When I run the example code that you posted, the

"Fill in" notification messages based on event time watermark

2020-04-27 Thread Manas Kale
Hi, I have an upstream operator that outputs device state transition messages with event timestamps. Meaning it only emits output when a transition takes place. For example, state1 @ 1 PM state2 @ 2 PM and so on. *Using a downstream operator, I want to emit notification messages as per some

Re: "Fill in" notification messages based on event time watermark

2020-04-28 Thread Manas Kale
> On even time timer firing >> 1. output the state matching to this timer >> 2. Check if there is a (more recent) value for next window, and if not: >> >> 3. copy the value to next window >> 4. Register a timer for this window to fire >> >> 5. Cleanup currentS

Re: "Fill in" notification messages based on event time watermark

2020-04-30 Thread Manas Kale
ce the watermark > arrived from all parallel instances. So roughly speaking, in machine time > you can assume that the window is computed in watermark update intervals. > However, "what is computed" depends on the timestamps of your events and > how those are categorized in windows.

Re: Broadcast state vs data enrichment

2020-05-11 Thread Manas Kale
p off" the config parameter that it > needs. > > Can you please explain the difference? > > Regards, > Roman > > > On Mon, May 11, 2020 at 8:07 AM Manas Kale wrote: > >> Hi, >> I have a single broadcast message that contains configuration data >>

Re: Testing process functions

2020-05-18 Thread Manas Kale
ink instance is not the one actually used by > the job? This typically means that have to store stuff in a static field > instead. > Alternatively, depending on the number of elements > org.apache.flink.streaming.api.datastream.DataStreamUtils#collect might > be worth a try. > >

Broadcast state vs data enrichment

2020-05-11 Thread Manas Kale
proach is better or both provide similar performance. FWIW, the config stream is very sporadic compared to the event stream. Thank you, Manas Kale

Re: Broadcast state vs data enrichment

2020-05-14 Thread Manas Kale
e much bigger than the > configuration, this will significantly increase network, memory, and CPU > usage. > Btw, I think you don't need a broadcast in the 2nd option, since the > interested subtask will receive the configuration anyways. > > Regards, > Roman > > > On Tue, May

Re: "Fill in" notification messages based on event time watermark

2020-05-19 Thread Manas Kale
hat helps. If we're all clear we can look at the concrete > problem again. > > Best, > Aljoscha > > On 30.04.20 12:46, Manas Kale wrote: > > Hi Timo and Piotrek, > > Thank you for the suggestions. > > I have been trying to set up unit tests at the operator granulari

Testing process functions

2020-05-15 Thread Manas Kale
Hi, How do I test process functions? I tried by implementing a sink function that stores myProcessFunction's output in a list. After env.execute(), I use assertions. If I set a breakpoint in the myTestSink's invoke() method, I see that that method is being called correctly. However, after

How to move event time forward using externally generated watermark message

2020-03-23 Thread Manas Kale
Hi, I have a scenario where I have an input event stream from various IoT devices. Every message on this stream can be of some eventType and has an eventTimestamp. Downstream, some business logic is implemented on this based on event time. In case a device goes offline, what's the best way to

PyFlink - detect if environment is a StreamExecutionEnvironment

2020-09-01 Thread Manas Kale
Hi, I am trying to submit a pyFlink job in detached mode using the command: ../../flink-1.11.0/bin/flink run -d -py basic_streaming_job.py -j flink-sql-connector-kafka_2.11-1.11.0.jar The jobs are submitted successfully but the command does not return. I realized that was because I had the

Packaging multiple Flink jobs from a single IntelliJ project

2020-08-30 Thread Manas Kale
Hi, I have an IntelliJ project that has multiple classes with main() functions. I want to package this project as a JAR that I can submit to the Flink cluster and specify the entry class when I start the job. Here are my questions: - I am not really familiar with Maven and would appreciate

Re: Packaging multiple Flink jobs from a single IntelliJ project

2020-08-31 Thread Manas Kale
like to be able to run each main() class as a separate job. Should I create a single JAR and specify different entrypoint classes each time or should I create separate JARs for each main() class? On Mon, Aug 31, 2020 at 11:13 AM Manas Kale wrote: > Hi, > I have an IntelliJ project th

Re: PyFlink - detect if environment is a StreamExecutionEnvironment

2020-09-01 Thread Manas Kale
hat users don’t need to > prepare different codes. > > Best, > Xingbo > > Manas Kale 于2020年9月1日周二 下午3:00写道: > >> Hi, >> I am trying to submit a pyFlink job in detached mode using the command: >> >> ../../flink-1.11.0/bin/flink run -d -py basic_s

Re: Packaging multiple Flink jobs from a single IntelliJ project

2020-08-31 Thread Manas Kale
this is the recommended way to go about doing this, thanks for reading and have a great day! On Mon, Aug 31, 2020 at 12:03 PM Manas Kale wrote: > Hi, > I solved my second issue - I was not following Maven's convention for > placing source code (I had not placed my source in src/

Re: PyFlink - detect if environment is a StreamExecutionEnvironment

2020-09-09 Thread Manas Kale
Till Rohrmann 于2020年9月2日周三 下午5:03写道: > >> Hi Manas, >> >> I am not entirely sure but you might try to check whether >> env._j_stream_execution_environment is an instance of >> gateway.jvm.org.apache.flink.streaming.api.environment.LocalStreamEnvironment >> via Python's i

Correct way to handle RedisSink exception

2020-10-15 Thread Manas Kale
Hi, I have a streaming application that pushes output to a redis cluster sink. I am using the Apache Bahir[1] Flink redis connector for this. I want to handle the case when the redis server is unavailable. I am following the same pattern as outlined by them in [1]: FlinkJedisPoolConfig conf = new

Re: Correct way to handle RedisSink exception

2020-10-15 Thread Manas Kale
Hi all, Thank you for the help, I understand now. On Thu, Oct 15, 2020 at 5:28 PM 阮树斌 浙江大学 wrote: > hello, Manas Kale. > > From the log, it can be found that the exception was thrown on the > 'open()' method of the RedisSink class. You can inherit the RedisSink > class, then ove

Re: Different deserialization schemas for Kafka keys and values

2020-08-27 Thread Manas Kale
to deserialize the key and value bytes coming from Kafka. > > Best, > Robert > > > On Thu, Aug 27, 2020 at 1:56 PM Manas Kale wrote: > >> Hi, >> I have a kafka topic on which the key is serialized in a custom format >> and the value is serialized as JSON. How

PyFlink cluster runtime issue

2020-08-28 Thread Manas Kale
Hi, I am trying to deploy a pyFlink application on a local cluster. I am able to run my application without any problems if I execute it as a normal python program using the command : python myApplication.py My pyFlink version is __version__ = "1.11.0". I had installed this pyFlink through

Re: PyFlink cluster runtime issue

2020-08-28 Thread Manas Kale
table/connectors/kafka.html#dependencies > [2] > https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/cli.html#job-submission-examples > [3] > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/user-guide/table/dependency_management.html#java-dependency > >

Re: PyFlink cluster runtime issue

2020-08-29 Thread Manas Kale
Ok, thank you! On Sat, 29 Aug, 2020, 4:07 pm Xingbo Huang, wrote: > Hi Manas, > > We can't submit a pyflink job through flink web currently. The only way > currently to submit a pyFlink job is through the command line. > > Best, > Xingbo > > Manas Kale 于2020年8月29日周

AWS EMR deployment error : NoClassDefFoundError org/apache/flink/api/java/typeutils/ResultTypeQueryable

2020-08-21 Thread Manas Kale
Hi, I am trying to deploy a Flink jar on AWS EMR service. I have ensured that Flink v1.10.0 is used in my pom file as that's the version supported by EMR. However, I get the following error: Exception in thread "main" java.lang.NoClassDefFoundError:

Different deserialization schemas for Kafka keys and values

2020-08-27 Thread Manas Kale
Hi, I have a kafka topic on which the key is serialized in a custom format and the value is serialized as JSON. How do I create a FlinkKafakConsumer that has different deserialization schemas for the key and value? Here's what I tried: FlinkKafkaConsumer> advancedFeatureData = new

Re: AWS EMR deployment error : NoClassDefFoundError org/apache/flink/api/java/typeutils/ResultTypeQueryable

2020-08-24 Thread Manas Kale
> >> given that you jar works in a local cluster that part should not be the >> problem. >> >> On 21/08/2020 08:16, Manas Kale wrote: >> >> Hi, >> I am trying to deploy a Flink jar on AWS EMR service. I have ensured that >> Flink v1.

Re: Testing process functions

2020-05-26 Thread Manas Kale
Flink > Conference > > Stream Processing | Event Driven | Real Time > > -- > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > -- > > Ververica GmbH > Registered at Amtsgericht Charlottenburg: HRB 158244 B > Managing Directors: Timothy Ale

Re: Correct way to package application.properties file with Flink JAR

2020-10-22 Thread Manas Kale
evels deep. Does that make a difference and cause the getClassLoader() to treat an inner package as root? On Wed, Oct 21, 2020 at 6:06 PM Chesnay Schepler wrote: > You could bundle said file in the jar and retrieve it via > getClass().getClassLoader().getResource(""). > > O

Re: Correct way to package application.properties file with Flink JAR

2020-10-22 Thread Manas Kale
Okay, I solved the other issue with viewing logs which proved that correct, non-null values are being loaded. I believe I have a different issue altogether so will create a separate thread for that. Thanks for the help Chesnay! On Thu, Oct 22, 2020 at 11:30 AM Manas Kale wrote: > Hi Ches

Re: Logs printed when running with minicluster but not printed when submitted as a job

2020-10-22 Thread Manas Kale
are actually used, they are null! On Wed, Oct 21, 2020 at 7:58 PM Manas Kale wrote: > I see, thanks for that clarification - I incorrectly assumed both methods > of submission produce logs in the same place. I will have an update > tomorrow! > > On Wed, Oct 21, 2020 at 6:12 PM C

Flink - Kafka topic null error; happens only when running on cluster

2020-10-22 Thread Manas Kale
he LOGGER line not get printed even though execution definitely reached it (as seen from the stacktrace)? Thank you, Manas Kale

Re: Flink - Kafka topic null error; happens only when running on cluster

2020-10-22 Thread Manas Kale
n the cluster JVMs. You need to either make sure that the > properties file is read from each task manager again, or easier: pass > the parameters as constructor parameters into the instances such that > they are shipped together with the function itself. > > I hope this helps. > &

Re: Flink - Kafka topic null error; happens only when running on cluster

2020-10-22 Thread Manas Kale
NFIG_TOPIC in a minicluster is properly loaded but null when run on a cluster. On Thu, Oct 22, 2020 at 5:42 PM Manas Kale wrote: > Hi Timo, > Thank you for the explanation, I can start to see why I was getting an > exception. > Are you saying that I cannot use static variables

Re: Flink - Kafka topic null error; happens only when running on cluster

2020-10-26 Thread Manas Kale
question appeared on the mailing list. > > Regards, > Timo > > On 23.10.20 07:22, Manas Kale wrote: > > Hi Timo, > > I figured it out, thanks a lot for your help. > > Are there any articles detailing the pre-flight and cluster phases? I > > couldn't find anything on ci.apa

Re: Flink - Kafka topic null error; happens only when running on cluster

2020-10-22 Thread Manas Kale
to access it through a singleton > pattern instead of a static variable access. > > Regards, > Timo > > > On 22.10.20 14:17, Manas Kale wrote: > > Sorry, I messed up the code snippet in the earlier mail. The correct one > > is : > > > > public static void

Event time based disconnection detection logic

2020-08-11 Thread Manas Kale
Hi, I have a bunch of devices that keep sending heartbeat messages. I want to make an operator that emits messages when a device disconnects and when a device stops being disconnected. A device is considered disconnected if we don't receive any heartbeat for more than some TIMEOUT duration. This

Re: Event time based disconnection detection logic

2020-08-11 Thread Manas Kale
watermarks are correct. Esp. the watermark frequency > could be an issue. If watermarks are generated at the same time as the > heartbeats itself, it might be the case that the timers fire first > before the process() function is called which resets the timer. > > Maybe you can give us more inf

Re: Flink Kafka connector in Python

2020-06-30 Thread Manas Kale
che.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector > [3] > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/python/common_questions.html > > Best, > Xingbo > > Manas Kale 于2020年6月29日周一 下午8:10写道: > >> Hi,

Re: Flink Kafka connector in Python

2020-07-02 Thread Manas Kale
race) pyflink.util.exceptions.TableException: 'findAndCreateTableSource failed.' The relevant part seems to be *Caused by: org.apache.flink.table.api.ValidationException: Could not find the required schema in property 'schema'.* This is probably a basic error, but I can't figure out how I can know

Re: Flink Kafka connector in Python

2020-07-06 Thread Manas Kale
return a populated ROW object from the eval() method. Where is the method to construct a row/field object and return it? Thanks! On Fri, Jul 3, 2020 at 12:40 PM Manas Kale wrote: > Hi Xingbo, > Thanks for the reply, I didn't know that a table schema also needs to be > declared after the c

PyFlink Table API - "A group window expects a time attribute for grouping in a stream environment."

2020-07-13 Thread Manas Kale
Hi, I have the following piece of code (for pyFlink v1.11) : t_env.from_path(INPUT_TABLE) \ .select("monitorId, data, rowtime") \ .window(Tumble.over("5.minutes").on("rowtime").alias("five_sec_window")) \ .group_by("five_sec_window, monitorId") \ .select("monitorId, data.avg,

pyFlink 1.11 streaming job example

2020-07-14 Thread Manas Kale
Hi, I am trying to get a simple streaming job running in pyFlink and understand the new 1.11 API. I just want to read from and write to kafka topics. Previously I was using t_env.execute("jobname"), register_table_source() and register_table_sink() but in 1.11 all 3 were deprecated and replaced by

DDL TIMESTAMP(3) parsing issue

2020-07-14 Thread Manas Kale
Hi, I am trying to parse this JSON message: {"monitorId": 789, "deviceId": "abcd", "data": 144.0, "state": 2, "time_st": "2020-07-14 15:15:19.60"} using pyFlink 1.11 DDL with this code: ddl_source = f""" CREATE TABLE {INPUT_TABLE} ( `monitorId` STRING, `deviceId` STRING,

Re: pyFlink 1.11 streaming job example

2020-07-14 Thread Manas Kale
UTPUT_TOPIC}', > 'properties.bootstrap.servers' = '{LOCAL_KAFKA}', > 'format' = 'json' > ) > """ > t_env.execute_sql(ddl_source) > t_env.execute_sql(ddl_sink) > > result = t_env.execute_sql(f""" >

Re: pyFlink 1.11 streaming job example

2020-07-14 Thread Manas Kale
Thank you Xingbo, this will certainly help! On Wed, Jul 15, 2020 at 7:39 AM Xingbo Huang wrote: > Hi Manas, > > I have created a issue[1] to add related doc > > [1] https://issues.apache.org/jira/browse/FLINK-18598 > > Best, > Xingbo > > Manas Kale 于2020年7月

Re: DDL TIMESTAMP(3) parsing issue

2020-07-14 Thread Manas Kale
ork properly. > But looks like you used an old version `flink-json` dependency from the > log. Could you check the version of `flink-json` is 1.11.0 ? > > Best, > Leonard Xu > > > 在 2020年7月14日,18:07,Manas Kale 写道: > > Hi, > I am trying to parse this JSON message: &

pyFlink UDTF function registration

2020-07-15 Thread Manas Kale
Hi, I am trying to use a UserDefined Table Function to split up some data as follows: from pyflink.table.udf import udtf @udtf(input_types=DataTypes.STRING(), result_types= [DataTypes.STRING(), DataTypes.DOUBLE()]) def split_feature_values(data_string): json_data = loads(data_string) for

Re: Flink Kafka connector in Python

2020-07-03 Thread Manas Kale
th_format( > Json() > .json_schema( > "{" > " type: 'object'," > " properties: {" > "lon: {" > " type: 'number'" > "}," > "ri

Flink Kafka connector in Python

2020-06-29 Thread Manas Kale
Hi, I want to consume and write to Kafak from Flink's python API. The only way I found to do this was through this question on SO where the user essentially copies FlinkKafka

Re: Reading/writing Kafka message keys using DDL pyFlink

2020-07-24 Thread Manas Kale
different+parts+of+source+records#FLIP107:Readingtablecolumnsfromdifferentpartsofsourcerecords-Readingandwritingfromkey,value,timestamp> > > 在 2020年7月24日,15:01,Manas Kale 写道: > > Hi, > How do I read/write Kafka message keys using DDL? I have not been able to > see any documentation for the same. > > Thanks! > > >

PyFlink DDL UDTF join error

2020-07-28 Thread Manas Kale
Hi, Using pyFlink DDL, I am trying to: 1. Consume a Kafka JSON stream. This has messages with aggregate data, example: "data": "{\"0001\":105.0,\"0002\":1.21,\"0003\":0.69,\"0004\":1.46,\"0005\":47.43,\"0006\":103.3}" 2. I am splitting field "data" so that I can process its values

Re: PyFlink DDL UDTF join error

2020-07-29 Thread Manas Kale
er_table("tmp_view", > t_env.from_path(f"{INPUT_TABLE}").join_lateral("split(data) as (featureName, > featureValue)")) > > > This works for me. I’ll try to find out what caused this exception. > > Best, > Wei > > 在 2020年7月28日,18:33,Man

Reading/writing Kafka message keys using DDL pyFlink

2020-07-24 Thread Manas Kale
Hi, How do I read/write Kafka message keys using DDL? I have not been able to see any documentation for the same. Thanks!

Re: Logs printed when running with minicluster but not printed when submitted as a job

2020-10-21 Thread Manas Kale
e jar via the WebUI, or with the CLI (e.g., ./bin/flink > run ...)? > > If it is the former, then it show up in the JM logs. > If it is the latter, then it should appear in the logs of the client > (i.e., log/flink-???-client-???.log). > > On 10/21/2020 2:17 PM, Manas Kale wrot

Logs printed when running with minicluster but not printed when submitted as a job

2020-10-21 Thread Manas Kale
ppender.fileAppender.File=dataProcessingEngine.log log4j.appender.fileAppender.policies.type = Policies log4j.appender.fileAppender.policies.size.type = SizeBasedTriggeringPolicy log4j.appender.fileAppender.policies.size.size=10MB log4j.appender.fileAppender.strategy.type = DefaultRolloverStrategy log4j.appende

Re: Logs printed when running with minicluster but not printed when submitted as a job

2020-10-21 Thread Manas Kale
ke a peek into the JobManager logs. > > On 10/21/2020 11:37 AM, Manas Kale wrote: > > Hi, > I have the following pattern: > > public static void main(String[] args) { > >// Get the exec environment. This could be a cluster or a mini-cluster > used for local developme

Flink Docker job fails to launch

2021-01-15 Thread Manas Kale
Hi all, I've got a job that I am trying to run using docker as per [1]. Here's the dockerfile: # Start from base Flink image. FROM flink:1.11.0 # Add fat JAR and logger properties file to image. ADD ./target/flink_POC-0.1.jar /opt/flink/usrlib/flink_POC-0.1.jar ADD

Re: Flink Docker job fails to launch

2021-01-15 Thread Manas Kale
correct me if my understanding is wrong. On Fri, Jan 15, 2021 at 4:37 PM Chesnay Schepler wrote: > Where are you starting the task executor? > > On 1/15/2021 11:57 AM, Manas Kale wrote: > > Hi all, > I've got a job that I am trying to run using docker as per [1]. > Here's t

Flink docker in session cluster mode - is a local distribution needed?

2021-02-16 Thread Manas Kale
Hi, I have a project that is a set of 6 jobs out of which 4 are written in Java and 2 are written in pyFlink. I want to dockerize these so that all 6 can be run in a single Flink session cluster. I have been able to successfully set up the JobManager and TaskManager containers as per [1] after

Re: Flink docker in session cluster mode - is a local distribution needed?

2021-02-17 Thread Manas Kale
it jobs from outside the Docker images. If your jobs are > included in the Docker image, then you could log into the master process > and start the jobs from within the Docker image. > > Cheers, > Till > > On Tue, Feb 16, 2021 at 1:00 PM Manas Kale wrote: > >> Hi, >