Re: Flink docker in session cluster mode - is a local distribution needed?

2021-02-17 Thread Manas Kale
Hi Till,
Oh I see... I managed to do what you said using a bunch of docker exec
commands. However, I think this solution is quite hacky and could be
improved by providing some simple command to submit jobs using the Flink
runtime within the docker images. I believe this will achieve full
containerization - the host system is not at all expected to have the Flink
runtime, everything is within Docker images.

Thanks a lot!

On Tue, Feb 16, 2021 at 6:08 PM Till Rohrmann  wrote:

> Hi Manas,
>
> I think the documentation assumes that you first start a session cluster
> and then submit jobs from outside the Docker images. If your jobs are
> included in the Docker image, then you could log into the master process
> and start the jobs from within the Docker image.
>
> Cheers,
> Till
>
> On Tue, Feb 16, 2021 at 1:00 PM Manas Kale  wrote:
>
>> Hi,
>>
>> I have a project that is a set of 6 jobs out of which 4 are written in
>> Java and 2 are written in pyFlink. I want to dockerize these so that all 6
>> can be run in a single Flink session cluster.
>>
>> I have been able to successfully set up the JobManager and TaskManager
>> containers as per [1] after creating a custom Docker image that has Python.
>> For the last step, the guide asks us to submit the job using a local
>> distribution of Flink:
>>
>> $ ./bin/flink run ./examples/streaming/TopSpeedWindowing.jar
>>
>> I am probably missing something here because I have the following
>> questions:
>> Why do I need to use a local distribution to submit a job?
>> Why can't I use the Flink distribution that already exists within the
>> images?
>> How do I submit a job using the Docker image's distribution?
>>
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/standalone/docker.html#starting-a-session-cluster-on-docker
>>
>>


Flink docker in session cluster mode - is a local distribution needed?

2021-02-16 Thread Manas Kale
Hi,

I have a project that is a set of 6 jobs out of which 4 are written in Java
and 2 are written in pyFlink. I want to dockerize these so that all 6 can
be run in a single Flink session cluster.

I have been able to successfully set up the JobManager and TaskManager
containers as per [1] after creating a custom Docker image that has Python.
For the last step, the guide asks us to submit the job using a local
distribution of Flink:

$ ./bin/flink run ./examples/streaming/TopSpeedWindowing.jar

I am probably missing something here because I have the following questions:
Why do I need to use a local distribution to submit a job?
Why can't I use the Flink distribution that already exists within the
images?
How do I submit a job using the Docker image's distribution?


[1]
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/standalone/docker.html#starting-a-session-cluster-on-docker


Re: Flink Docker job fails to launch

2021-01-15 Thread Manas Kale
You mean taskmanager? I tried using this command:

docker run --env FLINK_PROPERTIES="${FLINK_PROPERTIES}" flink_pipeline
taskmanager

after running above script but got:

2021-01-15 13:03:05,069 INFO
 org.apache.flink.runtime.util.LeaderRetrievalUtils   [] - Trying
to select the network interface and address to use by connecting to the
leading JobManager.
2021-01-15 13:03:05,069 INFO
 org.apache.flink.runtime.util.LeaderRetrievalUtils   [] -
TaskManager will try to connect for PT10S before falling back to heuristics
2021-01-15 13:03:05,484 INFO  org.apache.flink.runtime.net.ConnectionUtils
[] - Trying to connect to address jobmanager:6123
2021-01-15 13:03:05,486 INFO  org.apache.flink.runtime.net.ConnectionUtils
[] - Failed to connect from address '608ecee74cff/172.17.0.3':
jobmanager

Here's what I understand is supposed to happen:
1. Start a jobmanager in a docker container.
2. Start a taskmanager in another docker container and tell it where to
find the jobmanager.
3. Using the taskmanager, submit a new job.

I thought since step (1) is failing, adding the next step (starting
taskmanager) would be of no use.

Please correct me if my understanding is wrong.




On Fri, Jan 15, 2021 at 4:37 PM Chesnay Schepler  wrote:

> Where are you starting the task executor?
>
> On 1/15/2021 11:57 AM, Manas Kale wrote:
>
> Hi all,
> I've got a job that I am trying to run using docker as per [1].
> Here's the dockerfile:
>
> # Start from base Flink image.FROM flink:1.11.0# Add fat JAR and logger 
> properties file to image.ADD ./target/flink_POC-0.1.jar 
> /opt/flink/usrlib/flink_POC-0.1.jarADD ./target/classes/log4j.properties 
> /opt/flink/usrlib/log4j.properties
> # Add pipeline.properties and its location.ADD 
> target/classes/pipeline.properties /opt/flink/usrlib/pipeline.propertiesENV 
> FLINK_CONFIG_LOCATION=/opt/flink/usrlib/pipeline.properties
>
> EXPOSE 8081
>
> And the script I use to launch it:
>
> #!/usr/bin/env bashecho "Building docker image..."docker build --tag 
> flink_pipeline .
> echo "Configuring Flink runtime..."export 
> FLINK_PROPERTIES="jobmanager.rpc.address: host 
> taskmanager.memory.process.size: 4000 jobmanager.memory.process.size: 4000 
> "echo "Starting docker image..."docker run --rm -p 8081:8081 --env 
> FLINK_PROPERTIES=FLINK_PROPERTIES \
> flink_pipeline standalone-job --job-classname flink_POC.StreamingJob
>
>
> When I run the script, I see my job stuck in "CREATED" state and after
> some time I get the error:
>
> 2021-01-15 10:44:29,563 INFO
>  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] -
> Requesting new slot [SlotRequestId{1c25a61e6179f66b112b1944740f1a11}] and
> profile ResourceProfile{UNKNOWN} from resource manager.
> 2021-01-15 10:44:29,565 INFO
>  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
> Request slot with profile ResourceProfile{UNKNOWN} for job
> b854f75d6029e1725e822721c30095d7 with allocation id
> edc1e29d229aceb82f75b7c5835eca3c.
> 2021-01-15 10:46:39,604 INFO
>  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] - Failing
> pending slot request [SlotRequestId{1c25a61e6179f66b112b1944740f1a11}]:
> Could not fulfill slot request edc1e29d229aceb82f75b7c5835eca3c. Requested
> resource profile (ResourceProfile{UNKNOWN}) is unfulfillable.
> 2021-01-15 10:46:39,667 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source:
> advanced features  kafak consumer (1/1) (49ea271f6b9881d82c49b2826e8584d9)
> switched from SCHEDULED to FAILED on not deployed.
>
> *java.util.concurrent.CompletionException:
> org.apache.flink.runtime.resourcemanager.exceptions.UnfulfillableSlotRequestException:
> Could not fulfill slot request edc1e29d229aceb82f75b7c5835eca3c. Requested
> resource profile (ResourceProfile{UNKNOWN}) is unfulfillable. *  ...
> I understand that the resourcemanager fails to provide resources for my
> job(?), but other than that the error is quite cryptic for me. Could anyone
> help me understand what is going wrong?
>
>
> Regards,
> Manas
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/docker.html#introduction
>
>
>


Flink Docker job fails to launch

2021-01-15 Thread Manas Kale
Hi all,
I've got a job that I am trying to run using docker as per [1].
Here's the dockerfile:

# Start from base Flink image.
FROM flink:1.11.0

# Add fat JAR and logger properties file to image.
ADD ./target/flink_POC-0.1.jar /opt/flink/usrlib/flink_POC-0.1.jar
ADD ./target/classes/log4j.properties /opt/flink/usrlib/log4j.properties

# Add pipeline.properties and its location.
ADD target/classes/pipeline.properties /opt/flink/usrlib/pipeline.properties
ENV FLINK_CONFIG_LOCATION=/opt/flink/usrlib/pipeline.properties


EXPOSE 8081

And the script I use to launch it:

#!/usr/bin/env bash

echo "Building docker image..."
docker build --tag flink_pipeline .

echo "Configuring Flink runtime..."
export FLINK_PROPERTIES="jobmanager.rpc.address: host
 taskmanager.memory.process.size: 4000
 jobmanager.memory.process.size: 4000
 "

echo "Starting docker image..."
docker run --rm -p 8081:8081 --env FLINK_PROPERTIES=FLINK_PROPERTIES \
flink_pipeline standalone-job --job-classname flink_POC.StreamingJob


When I run the script, I see my job stuck in "CREATED" state and after some
time I get the error:

2021-01-15 10:44:29,563 INFO
 org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] -
Requesting new slot [SlotRequestId{1c25a61e6179f66b112b1944740f1a11}] and
profile ResourceProfile{UNKNOWN} from resource manager.
2021-01-15 10:44:29,565 INFO
 org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
Request slot with profile ResourceProfile{UNKNOWN} for job
b854f75d6029e1725e822721c30095d7 with allocation id
edc1e29d229aceb82f75b7c5835eca3c.
2021-01-15 10:46:39,604 INFO
 org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] - Failing
pending slot request [SlotRequestId{1c25a61e6179f66b112b1944740f1a11}]:
Could not fulfill slot request edc1e29d229aceb82f75b7c5835eca3c. Requested
resource profile (ResourceProfile{UNKNOWN}) is unfulfillable.
2021-01-15 10:46:39,667 INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source:
advanced features  kafak consumer (1/1) (49ea271f6b9881d82c49b2826e8584d9)
switched from SCHEDULED to FAILED on not deployed.

*java.util.concurrent.CompletionException:
org.apache.flink.runtime.resourcemanager.exceptions.UnfulfillableSlotRequestException:
Could not fulfill slot request edc1e29d229aceb82f75b7c5835eca3c. Requested
resource profile (ResourceProfile{UNKNOWN}) is unfulfillable.*  ...
I understand that the resourcemanager fails to provide resources for my
job(?), but other than that the error is quite cryptic for me. Could anyone
help me understand what is going wrong?


Regards,
Manas

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/docker.html#introduction


Re: Flink - Kafka topic null error; happens only when running on cluster

2020-10-26 Thread Manas Kale
Hi Timo,
Sure, I have opened this <https://issues.apache.org/jira/browse/FLINK-19807>
issue on Jira.

On Fri, Oct 23, 2020 at 4:09 PM Timo Walther  wrote:

> Hi Manas,
>
> that is a good point. Feel free to open an issue for this. It is not the
> first time that your question appeared on the mailing list.
>
> Regards,
> Timo
>
> On 23.10.20 07:22, Manas Kale wrote:
> > Hi Timo,
> > I figured it out, thanks a lot for your help.
> > Are there any articles detailing the pre-flight and cluster phases? I
> > couldn't find anything on ci.apache.org/projects/flink
> > <http://ci.apache.org/projects/flink> and I think this behaviour should
> > be documented as a warning/note.
> >
> >
> > On Thu, Oct 22, 2020 at 6:44 PM Timo Walther  > <mailto:twal...@apache.org>> wrote:
> >
> > Hi Manas,
> >
> > you can use static variable but you need to make sure that the logic
> to
> > fill the static variable is accessible and executed in all JVMs.
> >
> > I assume `pipeline.properties` is in your JAR that you submit to the
> > cluster right? Then you should be able to access it through a
> singleton
> > pattern instead of a static variable access.
> >
> > Regards,
> > Timo
> >
> >
> > On 22.10.20 14:17, Manas Kale wrote:
> >  > Sorry, I messed up the code snippet in the earlier mail. The
> > correct one
> >  > is :
> >  >
> >  > public static void main(String[] args) {
> >  > Properties prop =new Properties();
> >  >
> >  > InputStream is =
> >
>  Config.class.getClassLoader().getResourceAsStream("pipeline.properties");
> >  > prop.load(is);
> >  >
> >  > HashMap strMap =new HashMap<>();
> >  >
> >  > strMap.put("CONFIG_TOPIC", prop.getProperty("CONFIG_TOPIC"));
> >  >
> >  > new Config(strMap);
> >  >
> >  > ...
> >  >
> >  > }
> >  >
> >  > public class Config {
> >  >
> >  > public static StringCONFIG_TOPIC;
> >  >
> >  > publicConfig(HashMap s) {
> >  >
> >  >  CONFIG_TOPIC = s.get("CONFIG_TOPIC");
> >  >
> >  > }
> >  >
> >  > }
> >  >
> >  > The value of CONFIG_TOPIC in a minicluster is properly loaded but
> > null
> >  > when run on a cluster.
> >  >
> >  >
> >  > On Thu, Oct 22, 2020 at 5:42 PM Manas Kale  > <mailto:manaskal...@gmail.com>
> >  > <mailto:manaskal...@gmail.com <mailto:manaskal...@gmail.com>>>
> wrote:
> >  >
> >  > Hi Timo,
> >  > Thank you for the explanation, I can start to see why I was
> > getting
> >  > an exception.
> >  > Are you saying that I cannot use static variables at all when
> > trying
> >  > to deploy to a cluster? I would like the variables to remain
> > static
> >  > and not be instance-bound as they are accessed from multiple
> > classes.
> >  > Based on my understanding of what you said, I implemented the
> >  > following pattern:
> >  >
> >  > public static void main(String[] args) {
> >  > Properties prop =new Properties();
> >  >
> >  > InputStream is =
> >
>  Config.class.getClassLoader().getResourceAsStream("pipeline.properties");
> >  > prop.load(is);
> >  >
> >  > strMap.put("CONFIG_TOPIC", prop.getProperty("CONFIG_TOPIC"));
> >  >
> >  > new Config(strMap, longMap);
> >  >
> >  > ...
> >  >
> >  > }
> >  >
> >  > public class Config {
> >  >
> >  > public static StringCONFIG_TOPIC;
> >  > public static StringCONFIG_KAFKA;
> >  >
> >  > public Config(HashMap s) {
> >  >  CONFIG_TOPIC = s.get("CONFIG_TOPIC");
> >  >  CONFIG_KAFKA = s.get("CONFIG_KAFKA");
> >  >
> >  > }
> >  >
> >  > }
> >  >
> >  > This produces the same issue. With the easier sol

Re: Flink - Kafka topic null error; happens only when running on cluster

2020-10-22 Thread Manas Kale
Hi Timo,
I figured it out, thanks a lot for your help.
Are there any articles detailing the pre-flight and cluster phases? I
couldn't find anything on ci.apache.org/projects/flink and I think this
behaviour should be documented as a warning/note.


On Thu, Oct 22, 2020 at 6:44 PM Timo Walther  wrote:

> Hi Manas,
>
> you can use static variable but you need to make sure that the logic to
> fill the static variable is accessible and executed in all JVMs.
>
> I assume `pipeline.properties` is in your JAR that you submit to the
> cluster right? Then you should be able to access it through a singleton
> pattern instead of a static variable access.
>
> Regards,
> Timo
>
>
> On 22.10.20 14:17, Manas Kale wrote:
> > Sorry, I messed up the code snippet in the earlier mail. The correct one
> > is :
> >
> > public static void main(String[] args) {
> > Properties prop =new Properties();
> >
> > InputStream is =
> Config.class.getClassLoader().getResourceAsStream("pipeline.properties");
> > prop.load(is);
> >
> > HashMap strMap =new HashMap<>();
> >
> > strMap.put("CONFIG_TOPIC", prop.getProperty("CONFIG_TOPIC"));
> >
> > new Config(strMap);
> >
> > ...
> >
> > }
> >
> > public class Config {
> >
> > public static StringCONFIG_TOPIC;
> >
> > publicConfig(HashMap s) {
> >
> >  CONFIG_TOPIC = s.get("CONFIG_TOPIC");
> >
> > }
> >
> > }
> >
> > The value of CONFIG_TOPIC in a minicluster is properly loaded but null
> > when run on a cluster.
> >
> >
> > On Thu, Oct 22, 2020 at 5:42 PM Manas Kale  > <mailto:manaskal...@gmail.com>> wrote:
> >
> > Hi Timo,
> > Thank you for the explanation, I can start to see why I was getting
> > an exception.
> > Are you saying that I cannot use static variables at all when trying
> > to deploy to a cluster? I would like the variables to remain static
> > and not be instance-bound as they are accessed from multiple classes.
> > Based on my understanding of what you said, I implemented the
> > following pattern:
> >
> > public static void main(String[] args) {
> > Properties prop =new Properties();
> >
> > InputStream is =
> Config.class.getClassLoader().getResourceAsStream("pipeline.properties");
> > prop.load(is);
> >
> > strMap.put("CONFIG_TOPIC", prop.getProperty("CONFIG_TOPIC"));
> >
> > new Config(strMap, longMap);
> >
> > ...
> >
> > }
> >
> > public class Config {
> >
> > public static StringCONFIG_TOPIC;
> > public static StringCONFIG_KAFKA;
> >
> > public Config(HashMap s) {
> >  CONFIG_TOPIC = s.get("CONFIG_TOPIC");
> >  CONFIG_KAFKA = s.get("CONFIG_KAFKA");
> >
> > }
> >
> > }
> >
> > This produces the same issue. With the easier solution that you
> > listed, are you implying I use multiple instances or a singleton
> > pattern of some sort?
> >
> > On Thu, Oct 22, 2020 at 1:23 PM Timo Walther  > <mailto:twal...@apache.org>> wrote:
> >
> > Hi Manas,
> >
> > you need to make sure to differentiate between what Flink calls
> > "pre-flight phase" and "cluster phase".
> >
> > The pre-flight phase is were the pipeline is constructed and all
> > functions are instantiated. They are then later serialized and
> > send to
> > the cluster.
> >
> > If you are reading your properties file in the `main()` method
> > and store
> >     something in static variables, the content is available locally
> > where
> > the pipeline is constructed (e.g. in the client) but when the
> > function
> > instances are send to the cluster. Those static variables are
> fresh
> > (thus empty) in the cluster JVMs. You need to either make sure
> > that the
> > properties file is read from each task manager again, or easier:
> > pass
> > the parameters as constructor parameters into the instances such
> > that
> > they are shipped together with the function itself.
> >
> > I hope this helps.
> >
> > Regards,
> > Timo
> >
> >
> > On 22.10.2

Re: Flink - Kafka topic null error; happens only when running on cluster

2020-10-22 Thread Manas Kale
Sorry, I messed up the code snippet in the earlier mail. The correct one is
:

public static void main(String[] args) {
   Properties prop = new Properties();

InputStream is =
Config.class.getClassLoader().getResourceAsStream("pipeline.properties");
prop.load(is);

HashMap strMap = new HashMap<>();

strMap.put("CONFIG_TOPIC", prop.getProperty("CONFIG_TOPIC"));

new Config(strMap);

...

}

public class Config {

public static String CONFIG_TOPIC;

public Config(HashMap s) {

CONFIG_TOPIC = s.get("CONFIG_TOPIC");

}

}

The value of CONFIG_TOPIC in a minicluster is properly loaded but null when
run on a cluster.


On Thu, Oct 22, 2020 at 5:42 PM Manas Kale  wrote:

> Hi Timo,
> Thank you for the explanation, I can start to see why I was getting an
> exception.
> Are you saying that I cannot use static variables at all when trying to
> deploy to a cluster? I would like the variables to remain static and not be
> instance-bound as they are accessed from multiple classes.
> Based on my understanding of what you said, I implemented the
> following pattern:
>
> public static void main(String[] args) {
>Properties prop = new Properties();
>
> InputStream is = 
> Config.class.getClassLoader().getResourceAsStream("pipeline.properties");
> prop.load(is);
>
> strMap.put("CONFIG_TOPIC", prop.getProperty("CONFIG_TOPIC"));
>
> new Config(strMap, longMap);
>
> ...
>
> }
>
> public class Config {
>
> public static String CONFIG_TOPIC;
> public static String CONFIG_KAFKA;
>
> public Config(HashMap s) {
> CONFIG_TOPIC = s.get("CONFIG_TOPIC");
> CONFIG_KAFKA = s.get("CONFIG_KAFKA");
>
> }
>
> }
>
> This produces the same issue. With the easier solution that you listed,
> are you implying I use multiple instances or a singleton pattern of some
> sort?
>
> On Thu, Oct 22, 2020 at 1:23 PM Timo Walther  wrote:
>
>> Hi Manas,
>>
>> you need to make sure to differentiate between what Flink calls
>> "pre-flight phase" and "cluster phase".
>>
>> The pre-flight phase is were the pipeline is constructed and all
>> functions are instantiated. They are then later serialized and send to
>> the cluster.
>>
>> If you are reading your properties file in the `main()` method and store
>> something in static variables, the content is available locally where
>> the pipeline is constructed (e.g. in the client) but when the function
>> instances are send to the cluster. Those static variables are fresh
>> (thus empty) in the cluster JVMs. You need to either make sure that the
>> properties file is read from each task manager again, or easier: pass
>> the parameters as constructor parameters into the instances such that
>> they are shipped together with the function itself.
>>
>> I hope this helps.
>>
>> Regards,
>> Timo
>>
>>
>> On 22.10.20 09:24, Manas Kale wrote:
>> > Hi,
>> > I am trying to write some data to a kafka topic and I have the
>> following
>> > situation:
>> >
>> > monitorStateStream
>> >
>> > .process(new
>> IDAP2AlarmEmitter()).name(IDAP2_ALARM_EMITTER).uid(IDAP2_ALARM_EMITTER)
>> >
>> > /... // Stream that outputs elements of type IDAP2Alarm/
>> >
>> > .addSink(getFlinkKafkaProducer(ALARMS_KAFKA,
>> > Config.ALARMS_TOPIC)).name(ALARM_SINK).uid(ALARM_SINK);
>> >
>> > private static  FlinkKafkaProducer
>> getFlinkKafkaProducer(String servers, String topic) {
>> > Properties properties =new Properties();
>> > properties.setProperty("bootstrap.servers", servers);
>> > return new FlinkKafkaProducer(topic,
>> >   (element, timestamp) -> element.serializeForKafka(),
>> >   properties,
>> >   FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
>> > }
>> >
>> > /*
>> > This interface is used to indicate that a class may be output to Kafka.
>> > Since Kafka treats all
>> > data as bytes, classes that implement this interface have to provide an
>> > implementation for the
>> > serializeForKafka() method.
>> > */
>> > public interface IDAP2JSONOutput {
>> >
>> >  // Implement serialization logic in this method.
>> > ProducerRecord serializeForKafka();
>> >
>> > }
>> >
>> > public class IDAP2Alarmextends Tuple5<...>implements IDAP2JSONOutput{
>> >
>> > private final LoggerLOGGER = LoggerFactory.getLogger(IDAP

Re: Flink - Kafka topic null error; happens only when running on cluster

2020-10-22 Thread Manas Kale
Hi Timo,
Thank you for the explanation, I can start to see why I was getting an
exception.
Are you saying that I cannot use static variables at all when trying to
deploy to a cluster? I would like the variables to remain static and not be
instance-bound as they are accessed from multiple classes.
Based on my understanding of what you said, I implemented the
following pattern:

public static void main(String[] args) {
   Properties prop = new Properties();

InputStream is =
Config.class.getClassLoader().getResourceAsStream("pipeline.properties");
prop.load(is);

strMap.put("CONFIG_TOPIC", prop.getProperty("CONFIG_TOPIC"));

new Config(strMap, longMap);

...

}

public class Config {

public static String CONFIG_TOPIC;
public static String CONFIG_KAFKA;

public Config(HashMap s) {
CONFIG_TOPIC = s.get("CONFIG_TOPIC");
CONFIG_KAFKA = s.get("CONFIG_KAFKA");

}

}

This produces the same issue. With the easier solution that you listed, are
you implying I use multiple instances or a singleton pattern of some sort?

On Thu, Oct 22, 2020 at 1:23 PM Timo Walther  wrote:

> Hi Manas,
>
> you need to make sure to differentiate between what Flink calls
> "pre-flight phase" and "cluster phase".
>
> The pre-flight phase is were the pipeline is constructed and all
> functions are instantiated. They are then later serialized and send to
> the cluster.
>
> If you are reading your properties file in the `main()` method and store
> something in static variables, the content is available locally where
> the pipeline is constructed (e.g. in the client) but when the function
> instances are send to the cluster. Those static variables are fresh
> (thus empty) in the cluster JVMs. You need to either make sure that the
> properties file is read from each task manager again, or easier: pass
> the parameters as constructor parameters into the instances such that
> they are shipped together with the function itself.
>
> I hope this helps.
>
> Regards,
> Timo
>
>
> On 22.10.20 09:24, Manas Kale wrote:
> > Hi,
> > I am trying to write some data to a kafka topic and I have the following
> > situation:
> >
> > monitorStateStream
> >
> > .process(new
> IDAP2AlarmEmitter()).name(IDAP2_ALARM_EMITTER).uid(IDAP2_ALARM_EMITTER)
> >
> > /... // Stream that outputs elements of type IDAP2Alarm/
> >
> > .addSink(getFlinkKafkaProducer(ALARMS_KAFKA,
> > Config.ALARMS_TOPIC)).name(ALARM_SINK).uid(ALARM_SINK);
> >
> > private static  FlinkKafkaProducer
> getFlinkKafkaProducer(String servers, String topic) {
> > Properties properties =new Properties();
> > properties.setProperty("bootstrap.servers", servers);
> > return new FlinkKafkaProducer(topic,
> >   (element, timestamp) -> element.serializeForKafka(),
> >   properties,
> >   FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
> > }
> >
> > /*
> > This interface is used to indicate that a class may be output to Kafka.
> > Since Kafka treats all
> > data as bytes, classes that implement this interface have to provide an
> > implementation for the
> > serializeForKafka() method.
> > */
> > public interface IDAP2JSONOutput {
> >
> >  // Implement serialization logic in this method.
> > ProducerRecord serializeForKafka();
> >
> > }
> >
> > public class IDAP2Alarmextends Tuple5<...>implements IDAP2JSONOutput{
> >
> > private final LoggerLOGGER = LoggerFactory.getLogger(IDAP2Alarm.class);
> >
> > @Override
> > public ProducerRecord serializeForKafka() {
> >  byte[] rawValue;
> >  byte[] rawKey;
> >  String k = getMonitorFeatureKey().getMonitorName() ;
> >  ...
> >
> >  rawValue = val.getBytes();
> >
> >  LOGGER.info("value of alarms topic from idap2 alarm : " +
> > Config.ALARMS_TOPIC);
> >
> > return new ProducerRecord<>(Config.ALARMS_TOPIC, rawKey, rawValue); //
> Line 95
> > }
> >
> > }
> >
> >
> > Config.ALARMS_TOPIC is a static string that is read from a properties
> > file. When I run this code on my IDE minicluster, it runs great with no
> > problems. But when I submit it as a jar to the cluster, I get the
> > following error:
> >
> > Caused by: java.lang.IllegalArgumentException: Topic cannot be null.
> >  at
> >
> org.apache.kafka.clients.producer.ProducerRecord.(ProducerRecord.java:71)
>
> > ~[flink_POC-0.1.jar:?]
> >  at
> >
> org.apache.kafka.clients.producer

Flink - Kafka topic null error; happens only when running on cluster

2020-10-22 Thread Manas Kale
ox.MailboxProcessor
.runMailboxStep(MailboxProcessor.java:191) ~[flink-dist_2.11-1.11.0.jar:1.11
.0]
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
.runMailboxLoop(MailboxProcessor.java:181) ~[flink-dist_2.11-1.11.0.jar:1.11
.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
StreamTask.java:558) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:530) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_242]

Apparently Config.ALARM_TOPIC is being evaluated as null. Also, the LOGGER
statement in IDAP2Alarm above is never printed when running on Flink
cluster. In order to verify if the correct value of Config.ALARM_TOPIC is
read from configuration file, I printed it from Config class - and it
prints correctly. So my questions are:

   - Why does this work on a minicluster but not when submitted as a jar to
   a normal cluster? I am using Flink v1.11.0 in both my POM file and the
   cluster runtime.
   - Why does the LOGGER line not get printed even though execution
   definitely reached it (as seen from the stacktrace)?

Thank you,
Manas Kale


Re: Correct way to package application.properties file with Flink JAR

2020-10-22 Thread Manas Kale
Okay, I solved the other issue with viewing logs which proved that correct,
non-null values are being loaded. I believe I have a different issue
altogether so will create a separate thread for that. Thanks for the help
Chesnay!

On Thu, Oct 22, 2020 at 11:30 AM Manas Kale  wrote:

> Hi Chesnay,
> The Config reader has everything static, so I tried using
>
> Config.class.getClassLoader().getResourceAsStream("pipeline.properties");
>
> Here's the .pom file for that file:
>
> 
>
>
>   
>  src/main/resources
>  
> pipeline.properties
> log4j.properties
>  
>   
>
>
> ...
>
> 
>
>
> I can see the pipeline.properties packaged in my JAR at the root level
> (using maven package command).
> However, this works on my IDE minicluster but loads null values when
> submitted to the cluster.
> The Config class is not at the package root, rather it is a few levels
> deep. Does that make a difference and cause the getClassLoader() to treat
> an inner package as root?
>
>
>
> On Wed, Oct 21, 2020 at 6:06 PM Chesnay Schepler 
> wrote:
>
>> You could bundle said file in the jar and retrieve it via
>> getClass().getClassLoader().getResource("").
>>
>> On 10/21/2020 2:24 PM, Manas Kale wrote:
>> > Hi,
>> > I have a Flink job that I am packaging as a JAR that is submitted to
>> > the Flink cluster runtime. However, this JAR reads a few configuration
>> > values from a .properties file.
>> > What is the recommended way to package this properties file when
>> > submitting to a cluster? Do I have to copy it to a folder in my flink
>> > cluster installation?
>> >
>> > My own attempt is a somewhat convoluted method that is not working.
>> > Basically I set an environment variable that points to the properties
>> > file, and I use that at runtime to read configuration values. This
>> > works when I run it in my IDE as a minicluster but fails when I submit
>> > it to the cluster. I'm kind of stuck debugging this as for some reason
>> > I am not able to see the logs from the configuration reader class
>> > (asked a question about that in a separate thread).
>>
>>
>>


Re: Logs printed when running with minicluster but not printed when submitted as a job

2020-10-22 Thread Manas Kale
Thank you Chesnay. I found the logs being printed in the standalone session
when I used CLI to submit the job. However this only deepens the mystery of
the configuration file on the other thread - I see from the logs that the
configuration values are being read correctly, but when these values are
actually used, they are null!

On Wed, Oct 21, 2020 at 7:58 PM Manas Kale  wrote:

> I see, thanks for that clarification - I incorrectly assumed both methods
> of submission produce logs in the same place. I will have an update
> tomorrow!
>
> On Wed, Oct 21, 2020 at 6:12 PM Chesnay Schepler 
> wrote:
>
>> Hold on, let us clarify how you submit the job.
>>
>> Do you upload the jar via the WebUI, or with the CLI (e.g., ./bin/flink
>> run ...)?
>>
>> If it is the former, then it show up in the JM logs.
>> If it is the latter, then it should appear in the logs of the client
>> (i.e., log/flink-???-client-???.log).
>>
>> On 10/21/2020 2:17 PM, Manas Kale wrote:
>>
>> Hi Chesnay,
>> I checked the JobManager logs - it's not there either.
>>
>> On Wed, Oct 21, 2020 at 3:51 PM Chesnay Schepler 
>> wrote:
>>
>>> The main method is executed in the JobManager process and never reaches
>>> the TaskExecutors (only the individual functions do).
>>> As such you have to take a peek into the JobManager logs.
>>>
>>> On 10/21/2020 11:37 AM, Manas Kale wrote:
>>>
>>> Hi,
>>> I have the following pattern:
>>>
>>> public static void main(String[] args) {
>>>
>>>// Get the exec environment. This could be a cluster or a 
>>> mini-cluster used for local development.  StreamExecutionEnvironment 
>>> env = StreamExecutionEnvironment.getExecutionEnvironment();
>>>   // Make the Flink runtime use event time as time metric.  
>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>   // Generate a watermark every WATERMARK_PERIODICITY ms.  
>>> env.getConfig().setAutoWatermarkInterval(WATERMARK_PERIODICITY);
>>>
>>> Config.readProperties();
>>>
>>> }
>>>
>>> class Config {
>>>
>>> private final static Logger LOGGER = LoggerFactory.getLogger(Config.class);
>>>
>>> // Populates variables above with values read from config file.public 
>>> static void readProperties() throws Exception {
>>> Properties prop = new Properties();
>>>
>>> String propFileLocation = System.getenv("FLINK_CONFIG_LOCATION");
>>> if (propFileLocation == null) {
>>> System.err.println("Properties file pointer env variable 
>>> FLINK_CONFIG_LOCATION missing!");
>>> System.exit(1);
>>> }
>>> FileInputStream is = null;
>>> try {
>>>is = new FileInputStream(new File(propFileLocation));
>>>
>>> } catch (Exception e) {
>>> System.err.println("File " + propFileLocation + " not found!");
>>> System.exit(1);
>>> }
>>>
>>> prop.load(is);
>>>
>>>* LOGGER.info(".."); // prints content read from property file*
>>>
>>>   }
>>>
>>> }
>>>
>>>
>>> When I run this program as a minicluster, I am able to see the
>>> LOGGER.info() being printed in my console.
>>> However, when I submit this job as a JAR to a flink cluster, the Config
>>> class's  LOGGER.info()* line above is never printed in the
>>> taskmanager's logs!* I don't understand why this is happening because
>>> log  statements from other operators are definitely being printed in the
>>> log files on the cluster. What am I doing wrong?
>>>
>>> My log4j.properties file is:
>>>
>>> log4j.rootLogger=INFO, console, 
>>> fileAppenderlog4j.appender.console=org.apache.log4j.ConsoleAppenderlog4j.appender.console.layout=org.apache.log4j.PatternLayoutlog4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS}
>>>  %-5p %-60c %x - 
>>> %m%nlog4j.appender.fileAppender=org.apache.log4j.RollingFileAppenderlog4j.appender.fileAppender.layout=org.apache.log4j.PatternLayoutlog4j.appender.fileAppender.layout.ConversionPattern=%d{HH:mm:ss,SSS}
>>>  %-5p %-60c %x - 
>>> %m%nlog4j.appender.fileAppender.File=dataProcessingEngine.loglog4j.appender.fileAppender.policies.type
>>>  = Policieslog4j.appender.fileAppender.policies.size.type = 
>>> SizeBasedTriggeringPolicylog4j.appender.fileAppender.policies.size.size=10MBlog4j.appender.fileAppender.strategy.type
>>>  = DefaultRolloverStrategylog4j.appender.fileAppender.strategy.max = 5
>>>
>>>
>>> Thank you,
>>> Manas Kale
>>>
>>>
>>>
>>>
>>>
>>>
>>


Re: Correct way to package application.properties file with Flink JAR

2020-10-22 Thread Manas Kale
Hi Chesnay,
The Config reader has everything static, so I tried using

Config.class.getClassLoader().getResourceAsStream("pipeline.properties");

Here's the .pom file for that file:



   
  
 src/main/resources
 
pipeline.properties
log4j.properties
 
  
   

...




I can see the pipeline.properties packaged in my JAR at the root level
(using maven package command).
However, this works on my IDE minicluster but loads null values when
submitted to the cluster.
The Config class is not at the package root, rather it is a few levels
deep. Does that make a difference and cause the getClassLoader() to treat
an inner package as root?



On Wed, Oct 21, 2020 at 6:06 PM Chesnay Schepler  wrote:

> You could bundle said file in the jar and retrieve it via
> getClass().getClassLoader().getResource("").
>
> On 10/21/2020 2:24 PM, Manas Kale wrote:
> > Hi,
> > I have a Flink job that I am packaging as a JAR that is submitted to
> > the Flink cluster runtime. However, this JAR reads a few configuration
> > values from a .properties file.
> > What is the recommended way to package this properties file when
> > submitting to a cluster? Do I have to copy it to a folder in my flink
> > cluster installation?
> >
> > My own attempt is a somewhat convoluted method that is not working.
> > Basically I set an environment variable that points to the properties
> > file, and I use that at runtime to read configuration values. This
> > works when I run it in my IDE as a minicluster but fails when I submit
> > it to the cluster. I'm kind of stuck debugging this as for some reason
> > I am not able to see the logs from the configuration reader class
> > (asked a question about that in a separate thread).
>
>
>


Re: Logs printed when running with minicluster but not printed when submitted as a job

2020-10-21 Thread Manas Kale
I see, thanks for that clarification - I incorrectly assumed both methods
of submission produce logs in the same place. I will have an update
tomorrow!

On Wed, Oct 21, 2020 at 6:12 PM Chesnay Schepler  wrote:

> Hold on, let us clarify how you submit the job.
>
> Do you upload the jar via the WebUI, or with the CLI (e.g., ./bin/flink
> run ...)?
>
> If it is the former, then it show up in the JM logs.
> If it is the latter, then it should appear in the logs of the client
> (i.e., log/flink-???-client-???.log).
>
> On 10/21/2020 2:17 PM, Manas Kale wrote:
>
> Hi Chesnay,
> I checked the JobManager logs - it's not there either.
>
> On Wed, Oct 21, 2020 at 3:51 PM Chesnay Schepler 
> wrote:
>
>> The main method is executed in the JobManager process and never reaches
>> the TaskExecutors (only the individual functions do).
>> As such you have to take a peek into the JobManager logs.
>>
>> On 10/21/2020 11:37 AM, Manas Kale wrote:
>>
>> Hi,
>> I have the following pattern:
>>
>> public static void main(String[] args) {
>>
>>// Get the exec environment. This could be a cluster or a 
>> mini-cluster used for local development.  StreamExecutionEnvironment env 
>> = StreamExecutionEnvironment.getExecutionEnvironment();
>>   // Make the Flink runtime use event time as time metric.  
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>   // Generate a watermark every WATERMARK_PERIODICITY ms.  
>> env.getConfig().setAutoWatermarkInterval(WATERMARK_PERIODICITY);
>>
>> Config.readProperties();
>>
>> }
>>
>> class Config {
>>
>> private final static Logger LOGGER = LoggerFactory.getLogger(Config.class);
>>
>> // Populates variables above with values read from config file.public static 
>> void readProperties() throws Exception {
>> Properties prop = new Properties();
>>
>> String propFileLocation = System.getenv("FLINK_CONFIG_LOCATION");
>> if (propFileLocation == null) {
>> System.err.println("Properties file pointer env variable 
>> FLINK_CONFIG_LOCATION missing!");
>> System.exit(1);
>> }
>> FileInputStream is = null;
>> try {
>>is = new FileInputStream(new File(propFileLocation));
>>
>> } catch (Exception e) {
>> System.err.println("File " + propFileLocation + " not found!");
>> System.exit(1);
>> }
>>
>> prop.load(is);
>>
>>* LOGGER.info(".."); // prints content read from property file*
>>
>>   }
>>
>> }
>>
>>
>> When I run this program as a minicluster, I am able to see the
>> LOGGER.info() being printed in my console.
>> However, when I submit this job as a JAR to a flink cluster, the Config
>> class's  LOGGER.info()* line above is never printed in the
>> taskmanager's logs!* I don't understand why this is happening because
>> log  statements from other operators are definitely being printed in the
>> log files on the cluster. What am I doing wrong?
>>
>> My log4j.properties file is:
>>
>> log4j.rootLogger=INFO, console, 
>> fileAppenderlog4j.appender.console=org.apache.log4j.ConsoleAppenderlog4j.appender.console.layout=org.apache.log4j.PatternLayoutlog4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS}
>>  %-5p %-60c %x - 
>> %m%nlog4j.appender.fileAppender=org.apache.log4j.RollingFileAppenderlog4j.appender.fileAppender.layout=org.apache.log4j.PatternLayoutlog4j.appender.fileAppender.layout.ConversionPattern=%d{HH:mm:ss,SSS}
>>  %-5p %-60c %x - 
>> %m%nlog4j.appender.fileAppender.File=dataProcessingEngine.loglog4j.appender.fileAppender.policies.type
>>  = Policieslog4j.appender.fileAppender.policies.size.type = 
>> SizeBasedTriggeringPolicylog4j.appender.fileAppender.policies.size.size=10MBlog4j.appender.fileAppender.strategy.type
>>  = DefaultRolloverStrategylog4j.appender.fileAppender.strategy.max = 5
>>
>>
>> Thank you,
>> Manas Kale
>>
>>
>>
>>
>>
>>
>


Re: Logs printed when running with minicluster but not printed when submitted as a job

2020-10-21 Thread Manas Kale
Hi Chesnay,
I checked the JobManager logs - it's not there either.

On Wed, Oct 21, 2020 at 3:51 PM Chesnay Schepler  wrote:

> The main method is executed in the JobManager process and never reaches
> the TaskExecutors (only the individual functions do).
> As such you have to take a peek into the JobManager logs.
>
> On 10/21/2020 11:37 AM, Manas Kale wrote:
>
> Hi,
> I have the following pattern:
>
> public static void main(String[] args) {
>
>// Get the exec environment. This could be a cluster or a mini-cluster 
> used for local development.  StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>   // Make the Flink runtime use event time as time metric.  
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>   // Generate a watermark every WATERMARK_PERIODICITY ms.  
> env.getConfig().setAutoWatermarkInterval(WATERMARK_PERIODICITY);
>
> Config.readProperties();
>
> }
>
> class Config {
>
> private final static Logger LOGGER = LoggerFactory.getLogger(Config.class);
>
> // Populates variables above with values read from config file.public static 
> void readProperties() throws Exception {
> Properties prop = new Properties();
>
> String propFileLocation = System.getenv("FLINK_CONFIG_LOCATION");
> if (propFileLocation == null) {
> System.err.println("Properties file pointer env variable 
> FLINK_CONFIG_LOCATION missing!");
> System.exit(1);
> }
> FileInputStream is = null;
> try {
>is = new FileInputStream(new File(propFileLocation));
>
> } catch (Exception e) {
> System.err.println("File " + propFileLocation + " not found!");
> System.exit(1);
> }
>
> prop.load(is);
>
>* LOGGER.info(".."); // prints content read from property file*
>
>   }
>
> }
>
>
> When I run this program as a minicluster, I am able to see the
> LOGGER.info() being printed in my console.
> However, when I submit this job as a JAR to a flink cluster, the Config
> class's  LOGGER.info()* line above is never printed in the
> taskmanager's logs!* I don't understand why this is happening because
> log  statements from other operators are definitely being printed in the
> log files on the cluster. What am I doing wrong?
>
> My log4j.properties file is:
>
> log4j.rootLogger=INFO, console, 
> fileAppenderlog4j.appender.console=org.apache.log4j.ConsoleAppenderlog4j.appender.console.layout=org.apache.log4j.PatternLayoutlog4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS}
>  %-5p %-60c %x - 
> %m%nlog4j.appender.fileAppender=org.apache.log4j.RollingFileAppenderlog4j.appender.fileAppender.layout=org.apache.log4j.PatternLayoutlog4j.appender.fileAppender.layout.ConversionPattern=%d{HH:mm:ss,SSS}
>  %-5p %-60c %x - 
> %m%nlog4j.appender.fileAppender.File=dataProcessingEngine.loglog4j.appender.fileAppender.policies.type
>  = Policieslog4j.appender.fileAppender.policies.size.type = 
> SizeBasedTriggeringPolicylog4j.appender.fileAppender.policies.size.size=10MBlog4j.appender.fileAppender.strategy.type
>  = DefaultRolloverStrategylog4j.appender.fileAppender.strategy.max = 5
>
>
> Thank you,
> Manas Kale
>
>
>
>
>
>


Logs printed when running with minicluster but not printed when submitted as a job

2020-10-21 Thread Manas Kale
Hi,
I have the following pattern:

public static void main(String[] args) {

   // Get the exec environment. This could be a cluster or a
mini-cluster used for local development.
  StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
  // Make the Flink runtime use event time as time metric.
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  // Generate a watermark every WATERMARK_PERIODICITY ms.
  env.getConfig().setAutoWatermarkInterval(WATERMARK_PERIODICITY);

Config.readProperties();

}

class Config {

private final static Logger LOGGER = LoggerFactory.getLogger(Config.class);

// Populates variables above with values read from config file.
public static void readProperties() throws Exception {
Properties prop = new Properties();

String propFileLocation = System.getenv("FLINK_CONFIG_LOCATION");
if (propFileLocation == null) {
System.err.println("Properties file pointer env variable
FLINK_CONFIG_LOCATION missing!");
System.exit(1);
}
FileInputStream is = null;
try {
   is = new FileInputStream(new File(propFileLocation));

} catch (Exception e) {
System.err.println("File " + propFileLocation + " not found!");
System.exit(1);
}

prop.load(is);

   * LOGGER.info(".."); // prints content read from property file*

  }

}


When I run this program as a minicluster, I am able to see the
LOGGER.info() being printed in my console.
However, when I submit this job as a JAR to a flink cluster, the Config
class's  LOGGER.info()* line above is never printed in the
taskmanager's logs!* I don't understand why this is happening because log
statements from other operators are definitely being printed in the log
files on the cluster. What am I doing wrong?

My log4j.properties file is:

log4j.rootLogger=INFO, console, fileAppender

log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p
%-60c %x - %m%n

log4j.appender.fileAppender=org.apache.log4j.RollingFileAppender
log4j.appender.fileAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.fileAppender.layout.ConversionPattern=%d{HH:mm:ss,SSS}
%-5p %-60c %x - %m%n
log4j.appender.fileAppender.File=dataProcessingEngine.log
log4j.appender.fileAppender.policies.type = Policies
log4j.appender.fileAppender.policies.size.type = SizeBasedTriggeringPolicy
log4j.appender.fileAppender.policies.size.size=10MB
log4j.appender.fileAppender.strategy.type = DefaultRolloverStrategy
log4j.appender.fileAppender.strategy.max = 5


Thank you,
Manas Kale


Re: Correct way to handle RedisSink exception

2020-10-15 Thread Manas Kale
Hi all,
Thank you for the help, I understand now.

On Thu, Oct 15, 2020 at 5:28 PM 阮树斌 浙江大学  wrote:

> hello, Manas Kale.
>
> From the log, it can be found that the exception was thrown on the
> 'open()' method of the RedisSink class. You can inherit the RedisSink
> class, then override the 'open()' method, and handle the exception as you
> wish.Or no longer use Apache Bahir[1] Flink redis connector class library,
> and inherit RichSinkFunction to develop a custom RedisSink class.
>
> Regards
> Shubin Ruan
>
> At 2020-10-15 19:27:29, "Manas Kale"  wrote:
>
> Hi,
> I have a streaming application that pushes output to a redis cluster sink.
> I am using the Apache Bahir[1] Flink redis connector for this. I want to
> handle the case when the redis server is unavailable.
> I am following the same pattern as outlined by them in [1]:
>
> FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder()
> .setNodes(new HashSet(Arrays.asList(new 
> InetSocketAddress(5601.build();
>
> DataStream stream = ...;
> stream.addSink(new RedisSink>(conf, new 
> RedisExampleMapper());
>
> However, if the redis server is not available, my whole job crashes with
> this exception:
>
> ERROR org.apache.flink.streaming.connectors.redis.RedisSink -
> Redis has not been properly initialized:
> redis.clients.jedis.exceptions.JedisConnectionException: Could not get a
> resource from the pool
> at redis.clients.util.Pool.getResource(Pool.java:53)
> at redis.clients.jedis.JedisPool.getResource(JedisPool.java:226)
> ...
>
> I want to handle and ignore such exceptions thrown by the RedisSink class.
> Where exactly do I put my try/catch to do this? Enclosing the last in the
> code snippet with try/catch does not work.
> I believe the only way to do this would be to handle the exception in the
> RedisSink class, but that is a library class provided by Bahir. Is my
> thinking correct?
>
>
> asd
> [1] https://bahir.apache.org/docs/flink/current/flink-streaming-redis/
>
>
> Regards,
> Manas
>
>
>
>
>
>
>
>


Correct way to handle RedisSink exception

2020-10-15 Thread Manas Kale
Hi,
I have a streaming application that pushes output to a redis cluster sink.
I am using the Apache Bahir[1] Flink redis connector for this. I want to
handle the case when the redis server is unavailable.
I am following the same pattern as outlined by them in [1]:

FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder()
.setNodes(new HashSet(Arrays.asList(new
InetSocketAddress(5601.build();

DataStream stream = ...;
stream.addSink(new RedisSink>(conf, new
RedisExampleMapper());

However, if the redis server is not available, my whole job crashes with
this exception:

ERROR org.apache.flink.streaming.connectors.redis.RedisSink - Redis
has not been properly initialized:
redis.clients.jedis.exceptions.JedisConnectionException: Could not get a
resource from the pool
at redis.clients.util.Pool.getResource(Pool.java:53)
at redis.clients.jedis.JedisPool.getResource(JedisPool.java:226)
...

I want to handle and ignore such exceptions thrown by the RedisSink class.
Where exactly do I put my try/catch to do this? Enclosing the last in the
code snippet with try/catch does not work.
I believe the only way to do this would be to handle the exception in the
RedisSink class, but that is a library class provided by Bahir. Is my
thinking correct?


asd
[1] https://bahir.apache.org/docs/flink/current/flink-streaming-redis/


Regards,
Manas


Re: PyFlink - detect if environment is a StreamExecutionEnvironment

2020-09-09 Thread Manas Kale
Hi Xingbo and Till,
Thank you for your help!

On Wed, Sep 2, 2020 at 9:38 PM Xingbo Huang  wrote:

> Hi Manas,
>
> As Till said, you need to check whether the execution environment used is
> LocalStreamEnvironment. You need to get the class object corresponding to
> the corresponding java object through py4j. You can take a look at the
> example I wrote below, I hope it will help you
>
> ```
> from pyflink.table import EnvironmentSettings, StreamTableEnvironment
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.java_gateway import get_gateway
> from py4j.java_gateway import get_java_class
>
>
> def test():
> env = StreamExecutionEnvironment.get_execution_environment()
> table_env = StreamTableEnvironment.create(
> env, environment_settings=EnvironmentSettings.new_instance()
> .in_streaming_mode().use_blink_planner().build())
> gateway = get_gateway()
>
> # get the execution environment class
> env_class = table_env._j_tenv.getPlanner().getExecEnv().getClass()
>
> # get the LocalStreamEnvironment class
> local_stream_environment_class = get_java_class(
>
> gateway.jvm.org.apache.flink.streaming.api.environment.LocalStreamEnvironment)
> print(env_class == local_stream_environment_class)
>
>
> if __name__ == '__main__':
> test()
>
> ```
>
>
> Best,
> Xingbo
>
> Till Rohrmann  于2020年9月2日周三 下午5:03写道:
>
>> Hi Manas,
>>
>> I am not entirely sure but you might try to check whether
>> env._j_stream_execution_environment is an instance of
>> gateway.jvm.org.apache.flink.streaming.api.environment.LocalStreamEnvironment
>> via Python's isinstance function.
>>
>> Cheers,
>> Till
>>
>> On Wed, Sep 2, 2020 at 5:46 AM Manas Kale  wrote:
>>
>>> Hi Xingbo,
>>> Thank you for clarifying that. I am indeed maintaining a different
>>> version of the code by commenting those lines, but I was just wondering if
>>> it was possible to detect the environment programmatically.
>>>
>>> Regards,
>>> Manas
>>>
>>> On Wed, Sep 2, 2020 at 7:32 AM Xingbo Huang  wrote:
>>>
>>>> Hi Manas,
>>>>
>>>> When running locally, you need
>>>> `ten_sec_summaries.get_job_client().get_job_execution_result().result()` to
>>>> wait job finished. However, when you submit to the cluster, you need to
>>>> delete this code. In my opinion, the current feasible solution is that you
>>>> prepare two sets of codes for this, although this is annoying. After all,
>>>> running jobs locally is usually for testing, so it should be acceptable to
>>>> prepare different codes.
>>>> In the long run, it should be the flink framework that makes different
>>>> behaviors according to different environments  so that users don’t need to
>>>> prepare different codes.
>>>>
>>>> Best,
>>>> Xingbo
>>>>
>>>> Manas Kale  于2020年9月1日周二 下午3:00写道:
>>>>
>>>>> Hi,
>>>>> I am trying to submit a pyFlink job in detached mode using the command:
>>>>>
>>>>> ../../flink-1.11.0/bin/flink run -d -py basic_streaming_job.py -j
>>>>> flink-sql-connector-kafka_2.11-1.11.0.jar
>>>>>
>>>>> The jobs are submitted successfully but the command does not return. I
>>>>> realized that was because I had the following line in
>>>>> basic_streaming_job.py:
>>>>>
>>>>> ten_sec_summaries.get_job_client().get_job_execution_result().result()
>>>>>
>>>>> This statement is useful when testing this locally within a
>>>>> minicluster (using python basic_streaming_job.py) but not needed when the
>>>>> job is submitted to a cluster.
>>>>>
>>>>> So I would like to programmatically detect if the
>>>>> StreamExecutionEnvironment is a localStreamEnvironment and execute
>>>>> the above snippet accordingly. How do I do this?
>>>>>
>>>>>
>>>>> Thanks,
>>>>> Manas
>>>>>
>>>>


Re: PyFlink - detect if environment is a StreamExecutionEnvironment

2020-09-01 Thread Manas Kale
Hi Xingbo,
Thank you for clarifying that. I am indeed maintaining a different version
of the code by commenting those lines, but I was just wondering if it was
possible to detect the environment programmatically.

Regards,
Manas

On Wed, Sep 2, 2020 at 7:32 AM Xingbo Huang  wrote:

> Hi Manas,
>
> When running locally, you need
> `ten_sec_summaries.get_job_client().get_job_execution_result().result()` to
> wait job finished. However, when you submit to the cluster, you need to
> delete this code. In my opinion, the current feasible solution is that you
> prepare two sets of codes for this, although this is annoying. After all,
> running jobs locally is usually for testing, so it should be acceptable to
> prepare different codes.
> In the long run, it should be the flink framework that makes different
> behaviors according to different environments  so that users don’t need to
> prepare different codes.
>
> Best,
> Xingbo
>
> Manas Kale  于2020年9月1日周二 下午3:00写道:
>
>> Hi,
>> I am trying to submit a pyFlink job in detached mode using the command:
>>
>> ../../flink-1.11.0/bin/flink run -d -py basic_streaming_job.py -j
>> flink-sql-connector-kafka_2.11-1.11.0.jar
>>
>> The jobs are submitted successfully but the command does not return. I
>> realized that was because I had the following line in
>> basic_streaming_job.py:
>>
>> ten_sec_summaries.get_job_client().get_job_execution_result().result()
>>
>> This statement is useful when testing this locally within a minicluster
>> (using python basic_streaming_job.py) but not needed when the job is
>> submitted to a cluster.
>>
>> So I would like to programmatically detect if the
>> StreamExecutionEnvironment is a localStreamEnvironment and execute the
>> above snippet accordingly. How do I do this?
>>
>>
>> Thanks,
>> Manas
>>
>


PyFlink - detect if environment is a StreamExecutionEnvironment

2020-09-01 Thread Manas Kale
Hi,
I am trying to submit a pyFlink job in detached mode using the command:

../../flink-1.11.0/bin/flink run -d -py basic_streaming_job.py -j
flink-sql-connector-kafka_2.11-1.11.0.jar

The jobs are submitted successfully but the command does not return. I
realized that was because I had the following line in
basic_streaming_job.py:

ten_sec_summaries.get_job_client().get_job_execution_result().result()

This statement is useful when testing this locally within a minicluster
(using python basic_streaming_job.py) but not needed when the job is
submitted to a cluster.

So I would like to programmatically detect if the StreamExecutionEnvironment is
a localStreamEnvironment and execute the above snippet accordingly. How do
I do this?


Thanks,
Manas


Re: Packaging multiple Flink jobs from a single IntelliJ project

2020-08-31 Thread Manas Kale
Guess I figured out a solution for the first question as well - I am
packaging multiple main() classes in the same JAR and specifying entrypoint
classes when submitting the JAR. Most of my issues stemmed from an
improperly configured POM file and a mismatch in Flink runtime versions.
I'll assume this is the recommended way to go about doing this, thanks for
reading and have a great day!

On Mon, Aug 31, 2020 at 12:03 PM Manas Kale  wrote:

> Hi,
> I solved my second issue - I was not following Maven's convention for
> placing source code (I had not placed my source in src/main/java).
> However, I still would like some help with my first question - what is the
> recommended way to set a project with multiple main() classes? At the end,
> I would like to be able to run each main() class as a separate job. Should
> I create a single JAR and specify different entrypoint classes each time or
> should I create separate JARs for each main() class?
>
> On Mon, Aug 31, 2020 at 11:13 AM Manas Kale  wrote:
>
>> Hi,
>> I have an IntelliJ project that has multiple classes with main()
>> functions. I want to package this project as a JAR that I can submit to the
>> Flink cluster and specify the entry class when I start the job. Here are my
>> questions:
>>
>>- I am not really familiar with Maven and would appreciate some
>>pointers/examples. From what I understand, I will need to use some sort of
>>transformer in the Maven shade plugin to merge all of the classes. *If
>>this is correct, can I see a small example? *
>>- Also, I can't get a single main class working:
>>
>>
>> http://maven.apache.org/POM/4.0.0; 
>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
>>xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
>> http://maven.apache.org/xsd/maven-4.0.0.xsd;>
>>4.0.0
>>
>>flink_summarization
>>flink_summarization
>>0.1
>>jar
>>
>>Flink Quickstart Job
>>http://www.myorganization.org
>>
>>
>>   UTF-8
>>   1.10.1
>>   1.8
>>   2.11
>>   ${java.version}
>>   ${java.version}
>>
>>
>>
>>   
>>  apache.snapshots
>>  Apache Development Snapshot Repository
>>  
>> https://repository.apache.org/content/repositories/snapshots/
>>  
>> false
>>  
>>  
>> true
>>  
>>   
>>
>>
>>
>>   
>>   
>>   
>>   
>>  org.apache.flink
>>  flink-java
>>  ${flink.version}
>>  provided
>>   
>>   
>>  org.apache.flink
>>  
>> flink-streaming-java_${scala.binary.version}
>>  ${flink.version}
>>  provided
>>   
>>
>>   
>>  org.apache.flink
>>  flink-connector-kafka_2.11
>>  ${flink.version}
>>   
>>
>>   
>>  org.apache.flink
>>  flink-state-processor-api_2.11
>>  ${flink.version}
>>  provided
>>   
>>
>>   
>>  org.apache.flink
>>  flink-connector-jdbc_2.11
>>  1.11.0
>>   
>>
>>   
>>   
>>   
>>  org.slf4j
>>  slf4j-log4j12
>>  1.7.7
>>  runtime
>>   
>>   
>>  log4j
>>  log4j
>>  1.2.17
>>  runtime
>>   
>>
>>   
>>   
>>  org.apache.flink
>>  flink-test-utils_${scala.binary.version}
>>  ${flink.version}
>>  test
>>   
>>   
>>  org.apache.flink
>>  flink-runtime_2.11
>>  ${flink.version}
>>  test
>>  tests
>>   
>>   
>>  org.apache.flink
>>  flink-streaming-java_2.11
>>  ${flink.version}
>>  test
>>  tests
>>   
>>   
>>  org.assertj
>>  assertj-core
>>  
>>  3.16.1
>>  test
>>   
>>
>>
>>
>>
>>
>>   
>>
&g

Re: Packaging multiple Flink jobs from a single IntelliJ project

2020-08-31 Thread Manas Kale
Hi,
I solved my second issue - I was not following Maven's convention for
placing source code (I had not placed my source in src/main/java).
However, I still would like some help with my first question - what is the
recommended way to set a project with multiple main() classes? At the end,
I would like to be able to run each main() class as a separate job. Should
I create a single JAR and specify different entrypoint classes each time or
should I create separate JARs for each main() class?

On Mon, Aug 31, 2020 at 11:13 AM Manas Kale  wrote:

> Hi,
> I have an IntelliJ project that has multiple classes with main()
> functions. I want to package this project as a JAR that I can submit to the
> Flink cluster and specify the entry class when I start the job. Here are my
> questions:
>
>- I am not really familiar with Maven and would appreciate some
>pointers/examples. From what I understand, I will need to use some sort of
>transformer in the Maven shade plugin to merge all of the classes. *If
>this is correct, can I see a small example? *
>- Also, I can't get a single main class working:
>
>
> http://maven.apache.org/POM/4.0.0; 
> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
>xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
> http://maven.apache.org/xsd/maven-4.0.0.xsd;>
>4.0.0
>
>flink_summarization
>flink_summarization
>0.1
>jar
>
>Flink Quickstart Job
>http://www.myorganization.org
>
>
>   UTF-8
>   1.10.1
>   1.8
>   2.11
>   ${java.version}
>   ${java.version}
>
>
>
>   
>  apache.snapshots
>  Apache Development Snapshot Repository
>  
> https://repository.apache.org/content/repositories/snapshots/
>  
> false
>  
>  
> true
>  
>   
>
>
>
>   
>   
>   
>   
>  org.apache.flink
>  flink-java
>  ${flink.version}
>  provided
>   
>   
>  org.apache.flink
>  flink-streaming-java_${scala.binary.version}
>  ${flink.version}
>  provided
>   
>
>   
>  org.apache.flink
>  flink-connector-kafka_2.11
>  ${flink.version}
>   
>
>   
>  org.apache.flink
>  flink-state-processor-api_2.11
>  ${flink.version}
>  provided
>   
>
>   
>  org.apache.flink
>  flink-connector-jdbc_2.11
>  1.11.0
>   
>
>   
>   
>   
>  org.slf4j
>  slf4j-log4j12
>  1.7.7
>  runtime
>   
>   
>  log4j
>  log4j
>  1.2.17
>  runtime
>   
>
>   
>   
>  org.apache.flink
>  flink-test-utils_${scala.binary.version}
>  ${flink.version}
>  test
>   
>   
>  org.apache.flink
>  flink-runtime_2.11
>  ${flink.version}
>  test
>  tests
>   
>   
>  org.apache.flink
>  flink-streaming-java_2.11
>  ${flink.version}
>  test
>  tests
>   
>   
>  org.assertj
>  assertj-core
>  
>  3.16.1
>  test
>   
>
>
>
>
>
>   
>
>  
>  
> org.apache.maven.plugins
> maven-compiler-plugin
> 3.1
> 
>${java.version}
>${java.version}
> 
>  
>
>  
>  
>  
> org.apache.maven.plugins
> maven-shade-plugin
> 3.0.0
> 
> 
>false
> 
> 
>
>
>   package
>   
>  shade
>   
>   
>  
> 
>org.apache.flink:force-shading
>com.google.code.findbugs:jsr305
>org.slf4j:*
>log4j:*
> 
>  
>  
> 
>

Packaging multiple Flink jobs from a single IntelliJ project

2020-08-30 Thread Manas Kale
Hi,
I have an IntelliJ project that has multiple classes with main() functions.
I want to package this project as a JAR that I can submit to the Flink
cluster and specify the entry class when I start the job. Here are my
questions:

   - I am not really familiar with Maven and would appreciate some
   pointers/examples. From what I understand, I will need to use some sort of
   transformer in the Maven shade plugin to merge all of the classes. *If
   this is correct, can I see a small example? *
   - Also, I can't get a single main class working:


http://maven.apache.org/POM/4.0.0;
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
   4.0.0

   flink_summarization
   flink_summarization
   0.1
   jar

   Flink Quickstart Job
   http://www.myorganization.org

   
  UTF-8
  1.10.1
  1.8
  2.11
  ${java.version}
  ${java.version}
   

   
  
 apache.snapshots
 Apache Development Snapshot Repository
 
https://repository.apache.org/content/repositories/snapshots/
 
false
 
 
true
 
  
   

   
  
  
  
  
 org.apache.flink
 flink-java
 ${flink.version}
 provided
  
  
 org.apache.flink
 flink-streaming-java_${scala.binary.version}
 ${flink.version}
 provided
  

  
 org.apache.flink
 flink-connector-kafka_2.11
 ${flink.version}
  

  
 org.apache.flink
 flink-state-processor-api_2.11
 ${flink.version}
 provided
  

  
 org.apache.flink
 flink-connector-jdbc_2.11
 1.11.0
  

  
  
  
 org.slf4j
 slf4j-log4j12
 1.7.7
 runtime
  
  
 log4j
 log4j
 1.2.17
 runtime
  

  
  
 org.apache.flink
 flink-test-utils_${scala.binary.version}
 ${flink.version}
 test
  
  
 org.apache.flink
 flink-runtime_2.11
 ${flink.version}
 test
 tests
  
  
 org.apache.flink
 flink-streaming-java_2.11
 ${flink.version}
 test
 tests
  
  
 org.assertj
 assertj-core
 
 3.16.1
 test
  


   

   
  

 
 
org.apache.maven.plugins
maven-compiler-plugin
3.1

   ${java.version}
   ${java.version}

 

 
 
 
org.apache.maven.plugins
maven-shade-plugin
3.0.0


   false


   
   
  package
  
 shade
  
  
 

   org.apache.flink:force-shading
   com.google.code.findbugs:jsr305
   org.slf4j:*
   log4j:*

 
 

   
   *:*
   
  META-INF/*.SF
  META-INF/*.DSA
  META-INF/*.RSA
   

 
 


iu.feature_summarization.basic_features.pre.BasicPreProcessJob


 
  
   

 
  

  
 



   org.eclipse.m2e
   lifecycle-mapping
   1.0.0
   
  
 

   
  org.apache.maven.plugins
  maven-shade-plugin
  [3.0.0,)
  
 shade
  
   
   
  
   


   
  org.apache.maven.plugins
  maven-compiler-plugin
  [3.1,)
  
 testCompile
 compile
  
   
   
  
   


Re: PyFlink cluster runtime issue

2020-08-29 Thread Manas Kale
Ok, thank you!

On Sat, 29 Aug, 2020, 4:07 pm Xingbo Huang,  wrote:

> Hi Manas,
>
> We can't submit a pyflink job through flink web currently. The only way
> currently to submit a pyFlink job is through the command line.
>
> Best,
> Xingbo
>
> Manas Kale  于2020年8月29日周六 下午12:51写道:
>
>> Hi Xingbo,
>> Thanks, that worked. Just to make sure, the only way currently to submit
>> a pyFlink job is through the command line right? Can I do that through the
>> GUI?
>>
>> On Fri, Aug 28, 2020 at 8:17 PM Xingbo Huang  wrote:
>>
>>> Hi Manas,
>>>
>>> I think you forgot to add kafka jar[1] dependency. You can use the
>>> argument -j of the command line[2] or the Python Table API to specify the
>>> jar. For details about the APIs of adding Java dependency, you can refer to
>>> the relevant documentation[3]
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html#dependencies
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/cli.html#job-submission-examples
>>> [3]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/user-guide/table/dependency_management.html#java-dependency
>>>
>>> Best,
>>> Xingbo
>>>
>>> Manas Kale  于2020年8月28日周五 下午9:06写道:
>>>
>>>> Hi,
>>>> I am trying to deploy a pyFlink application on a local cluster. I am
>>>> able to run my application without any problems if I execute it as a normal
>>>> python program using the command :
>>>> python myApplication.py
>>>> My pyFlink version is __version__ = "1.11.0".
>>>> I had installed this pyFlink through conda/pip (don't remember which).
>>>>
>>>> Per instructions given in [1] I have ensured that running the command
>>>> "python" gets me to a python 3.7 shell with pyFlink installed.
>>>> I have also ensured my local Flink cluster version is 1.11.0 (same as
>>>> above).
>>>> However, if I execute the application using the command:
>>>> bin/flink run -py myApplication.py
>>>>
>>>> I get the error:
>>>>
>>>> Traceback (most recent call last):
>>>>  File "basic_streaming_job.py", line 65, in 
>>>>main()
>>>>  File "basic_streaming_job.py", line 43, in main
>>>>""")
>>>>  File
>>>> "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/pyflink.zip/pyflink/table/
>>>> table_environment.py", line 543, in execute_sql
>>>>  File
>>>> "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/py4j-0.10.8.1-src.zip/py4j
>>>> /java_gateway.py", line 1286, in __call__
>>>>  File
>>>> "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/pyflink.zip/pyflink/util/e
>>>> xceptions.py", line 147, in deco
>>>>  File
>>>> "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/py4j-0.10.8.1-src.zip/py4j
>>>> /protocol.py", line 328, in get_return_value
>>>> py4j.protocol.Py4JJavaError: An error occurred while calling
>>>> o5.executeSql.
>>>> : org.apache.flink.table.api.ValidationException: Unable to create a
>>>> source for reading table
>>>> 'default_catalog.default_database.raw_message'.
>>>>
>>>> Table options are:
>>>>
>>>> 'connector'='kafka'
>>>> 'format'='json'
>>>> 'properties.bootstrap.servers'='localhost:9092'
>>>> 'topic'='basic_features_normalized'
>>>>at
>>>> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:12
>>>> 5)
>>>>at
>>>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(Catal
>>>> ogSourceTable.scala:135)
>>>> .
>>>>
>>>> The offending table schema in question :
>>>>
>>>> CREATE TABLE {INPUT_TABLE} (
>>>> monitorId STRING,
>>>> deviceId STRING,
>>>> state INT,
>>>> feature_1 DOUBLE,
>>>> feature_2 DOUBLE,
>>>> feature_3 DOUBLE,
>>>> feature_4 DOUBLE,
>>>> feature_5 DOUBLE,
>>>> feature_6 DOUBLE,
>>>> time_str TIMESTAMP(3),
>>>> WATERMARK FOR time_str AS time_str - INTERVAL '{WATERMARK_DELAY}' 
>>>> SECOND
>>>> ) WITH (
>>>> 'connector' = 'kafka',
>>>> 'topic' = '{INPUT_TOPIC}',
>>>> 'properties.bootstrap.servers' = '{KAFKA}',
>>>> 'format' = 'json'
>>>> )
>>>>
>>>> Clearly, even though my standalone pyFlink version and cluster Flink
>>>> versions are the same, something is different with the cluster runtime.
>>>> What could that be?
>>>>
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/cli.html#job-submission-examples
>>>>
>>>


Re: PyFlink cluster runtime issue

2020-08-28 Thread Manas Kale
Hi Xingbo,
Thanks, that worked. Just to make sure, the only way currently to submit a
pyFlink job is through the command line right? Can I do that through the
GUI?

On Fri, Aug 28, 2020 at 8:17 PM Xingbo Huang  wrote:

> Hi Manas,
>
> I think you forgot to add kafka jar[1] dependency. You can use the
> argument -j of the command line[2] or the Python Table API to specify the
> jar. For details about the APIs of adding Java dependency, you can refer to
> the relevant documentation[3]
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html#dependencies
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/cli.html#job-submission-examples
> [3]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/user-guide/table/dependency_management.html#java-dependency
>
> Best,
> Xingbo
>
> Manas Kale  于2020年8月28日周五 下午9:06写道:
>
>> Hi,
>> I am trying to deploy a pyFlink application on a local cluster. I am able
>> to run my application without any problems if I execute it as a normal
>> python program using the command :
>> python myApplication.py
>> My pyFlink version is __version__ = "1.11.0".
>> I had installed this pyFlink through conda/pip (don't remember which).
>>
>> Per instructions given in [1] I have ensured that running the command
>> "python" gets me to a python 3.7 shell with pyFlink installed.
>> I have also ensured my local Flink cluster version is 1.11.0 (same as
>> above).
>> However, if I execute the application using the command:
>> bin/flink run -py myApplication.py
>>
>> I get the error:
>>
>> Traceback (most recent call last):
>>  File "basic_streaming_job.py", line 65, in 
>>main()
>>  File "basic_streaming_job.py", line 43, in main
>>""")
>>  File
>> "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/pyflink.zip/pyflink/table/
>> table_environment.py", line 543, in execute_sql
>>  File
>> "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/py4j-0.10.8.1-src.zip/py4j
>> /java_gateway.py", line 1286, in __call__
>>  File
>> "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/pyflink.zip/pyflink/util/e
>> xceptions.py", line 147, in deco
>>  File
>> "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/py4j-0.10.8.1-src.zip/py4j
>> /protocol.py", line 328, in get_return_value
>> py4j.protocol.Py4JJavaError: An error occurred while calling
>> o5.executeSql.
>> : org.apache.flink.table.api.ValidationException: Unable to create a
>> source for reading table
>> 'default_catalog.default_database.raw_message'.
>>
>> Table options are:
>>
>> 'connector'='kafka'
>> 'format'='json'
>> 'properties.bootstrap.servers'='localhost:9092'
>> 'topic'='basic_features_normalized'
>>at
>> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:12
>> 5)
>>at
>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(Catal
>> ogSourceTable.scala:135)
>> .
>>
>> The offending table schema in question :
>>
>> CREATE TABLE {INPUT_TABLE} (
>> monitorId STRING,
>> deviceId STRING,
>> state INT,
>> feature_1 DOUBLE,
>> feature_2 DOUBLE,
>> feature_3 DOUBLE,
>> feature_4 DOUBLE,
>> feature_5 DOUBLE,
>> feature_6 DOUBLE,
>> time_str TIMESTAMP(3),
>> WATERMARK FOR time_str AS time_str - INTERVAL '{WATERMARK_DELAY}' SECOND
>> ) WITH (
>> 'connector' = 'kafka',
>> 'topic' = '{INPUT_TOPIC}',
>> 'properties.bootstrap.servers' = '{KAFKA}',
>> 'format' = 'json'
>> )
>>
>> Clearly, even though my standalone pyFlink version and cluster Flink
>> versions are the same, something is different with the cluster runtime.
>> What could that be?
>>
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/cli.html#job-submission-examples
>>
>


PyFlink cluster runtime issue

2020-08-28 Thread Manas Kale
Hi,
I am trying to deploy a pyFlink application on a local cluster. I am able
to run my application without any problems if I execute it as a normal
python program using the command :
python myApplication.py
My pyFlink version is __version__ = "1.11.0".
I had installed this pyFlink through conda/pip (don't remember which).

Per instructions given in [1] I have ensured that running the command
"python" gets me to a python 3.7 shell with pyFlink installed.
I have also ensured my local Flink cluster version is 1.11.0 (same as
above).
However, if I execute the application using the command:
bin/flink run -py myApplication.py

I get the error:

Traceback (most recent call last):
 File "basic_streaming_job.py", line 65, in 
   main()
 File "basic_streaming_job.py", line 43, in main
   """)
 File
"/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/pyflink.zip/pyflink/table/
table_environment.py", line 543, in execute_sql
 File
"/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/py4j-0.10.8.1-src.zip/py4j
/java_gateway.py", line 1286, in __call__
 File
"/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/pyflink.zip/pyflink/util/e
xceptions.py", line 147, in deco
 File
"/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/py4j-0.10.8.1-src.zip/py4j
/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o5.executeSql.
: org.apache.flink.table.api.ValidationException: Unable to create a source
for reading table
'default_catalog.default_database.raw_message'.

Table options are:

'connector'='kafka'
'format'='json'
'properties.bootstrap.servers'='localhost:9092'
'topic'='basic_features_normalized'
   at
org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:12
5)
   at
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(Catal
ogSourceTable.scala:135)
.

The offending table schema in question :

CREATE TABLE {INPUT_TABLE} (
monitorId STRING,
deviceId STRING,
state INT,
feature_1 DOUBLE,
feature_2 DOUBLE,
feature_3 DOUBLE,
feature_4 DOUBLE,
feature_5 DOUBLE,
feature_6 DOUBLE,
time_str TIMESTAMP(3),
WATERMARK FOR time_str AS time_str - INTERVAL '{WATERMARK_DELAY}' SECOND
) WITH (
'connector' = 'kafka',
'topic' = '{INPUT_TOPIC}',
'properties.bootstrap.servers' = '{KAFKA}',
'format' = 'json'
)

Clearly, even though my standalone pyFlink version and cluster Flink
versions are the same, something is different with the cluster runtime.
What could that be?


[1]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/cli.html#job-submission-examples


Re: Different deserialization schemas for Kafka keys and values

2020-08-27 Thread Manas Kale
Hi Robert,
Thanks for the info!

On Thu, Aug 27, 2020 at 8:01 PM Robert Metzger  wrote:

> Hi,
>
> Check out the KafkaDeserializationSchema (
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/kafka.html#the-deserializationschema)
> which allows you to deserialize the key and value bytes coming from Kafka.
>
> Best,
> Robert
>
>
> On Thu, Aug 27, 2020 at 1:56 PM Manas Kale  wrote:
>
>> Hi,
>> I have a kafka topic on which the key is serialized in a custom format
>> and the value is serialized as JSON. How do I create a FlinkKafakConsumer
>> that has different deserialization schemas for the key and value? Here's
>> what I tried:
>>
>> FlinkKafkaConsumer> advancedFeatureData = new 
>> FlinkKafkaConsumer<>(ADVANCED_FEATURES_TOPIC, new 
>> TypeInformationKeyValueSerializationSchema(
>> TypeInformation.of(new TypeHint() {}),
>> TypeInformation.of(new TypeHint() {}),
>> env.getConfig()
>> ), properties);
>>
>> However, I get the error:
>> com.esotericsoftware.kryo.KryoException: Encountered unregistered class
>> ID: 121
>> at
>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
>> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>> at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
>> at
>> org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema.deserialize(TypeInformationKeyValueSerializationSchema.java:112)
>> at
>> org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema.deserialize(TypeInformationKeyValueSerializationSchema.java:43)
>> at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:140)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:718)
>> at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>> at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>>
>> Is there something I am missing with my approach or am I supposed to use
>> a completely different class than
>> TypeInformationKeyValueSerializationSchema?
>>
>


Different deserialization schemas for Kafka keys and values

2020-08-27 Thread Manas Kale
Hi,
I have a kafka topic on which the key is serialized in a custom format and
the value is serialized as JSON. How do I create a FlinkKafakConsumer that
has different deserialization schemas for the key and value? Here's what I
tried:

FlinkKafkaConsumer> advancedFeatureData =
new FlinkKafkaConsumer<>(ADVANCED_FEATURES_TOPIC, new
TypeInformationKeyValueSerializationSchema(
TypeInformation.of(new TypeHint() {}),
TypeInformation.of(new TypeHint() {}),
env.getConfig()
), properties);

However, I get the error:
com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID:
121
at
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
at
org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema.deserialize(TypeInformationKeyValueSerializationSchema.java:112)
at
org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema.deserialize(TypeInformationKeyValueSerializationSchema.java:43)
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:140)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:718)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)

Is there something I am missing with my approach or am I supposed to use a
completely different class than TypeInformationKeyValueSerializationSchema?


Re: AWS EMR deployment error : NoClassDefFoundError org/apache/flink/api/java/typeutils/ResultTypeQueryable

2020-08-24 Thread Manas Kale
Thanks Prasanna and Chesnay. Changing the dependency scope worked and I
also had to add a maven shaded plugin transformer to resolve another error.

On Fri, Aug 21, 2020 at 11:38 PM Prasanna kumar <
prasannakumarram...@gmail.com> wrote:

> Manas,
>
> One option you could try is to set the scope in the dependencies as
> compile for the required artifacts rather than provided.
>
> Prasanna.
>
> On Fri, Aug 21, 2020 at 1:47 PM Chesnay Schepler 
> wrote:
>
>> If this class cannot be found on the classpath then chances are Flink is
>> completely missing from the classpath.
>>
>> I haven't worked with EMR, but my guess is that you did not submit things
>> correctly.
>>
>> From the EMR documentation I could gather that the submission should work
>> without the submitted jar bundling all of Flink;
>>
>> given that you jar works in a local cluster that part should not be the
>> problem.
>>
>> On 21/08/2020 08:16, Manas Kale wrote:
>>
>> Hi,
>> I am trying to deploy a Flink jar on AWS EMR service. I have ensured that
>> Flink v1.10.0 is used in my pom file as that's the version supported by
>> EMR. However, I get the following error:
>>
>> Exception in thread "main" java.lang.NoClassDefFoundError: 
>> org/apache/flink/api/java/typeutils/ResultTypeQueryable
>>  at java.lang.ClassLoader.defineClass1(Native Method)
>>  at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
>>  at 
>> java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
>>  at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
>>  at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
>>  at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
>>  at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
>>  at java.security.AccessController.doPrivileged(Native Method)
>>  at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
>>  at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
>>  at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
>>  at java.lang.Class.forName0(Native Method)
>>  at java.lang.Class.forName(Class.java:348)
>>  at org.apache.hadoop.util.RunJar.run(RunJar.java:232)
>>  at org.apache.hadoop.util.RunJar.main(RunJar.java:153)
>> Caused by: java.lang.ClassNotFoundException: 
>> org.apache.flink.api.java.typeutils.ResultTypeQueryable
>>  at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>>  at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
>>  at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
>>  ... 15 more
>>
>> Also, if I deploy this on my local Flink cluster (v1.10.1) it works.
>> I'm not sure what could be the cause. Could it be because of
>> misconfigured classes bundled in the final JAR file or something that was
>> patched in v 1.10.1?
>>
>>
>>


AWS EMR deployment error : NoClassDefFoundError org/apache/flink/api/java/typeutils/ResultTypeQueryable

2020-08-21 Thread Manas Kale
Hi,
I am trying to deploy a Flink jar on AWS EMR service. I have ensured that
Flink v1.10.0 is used in my pom file as that's the version supported by
EMR. However, I get the following error:

Exception in thread "main" java.lang.NoClassDefFoundError:
org/apache/flink/api/java/typeutils/ResultTypeQueryable
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
at 
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.hadoop.util.RunJar.run(RunJar.java:232)
at org.apache.hadoop.util.RunJar.main(RunJar.java:153)
Caused by: java.lang.ClassNotFoundException:
org.apache.flink.api.java.typeutils.ResultTypeQueryable
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 15 more

Also, if I deploy this on my local Flink cluster (v1.10.1) it works.
I'm not sure what could be the cause. Could it be because of misconfigured
classes bundled in the final JAR file or something that was patched in v
1.10.1?


Re: Event time based disconnection detection logic

2020-08-11 Thread Manas Kale
Hi Timo,
I got it, the issue was a (silly) mistake on my part. I unnecessarily put
all the processElement() logic inside the if condition. The if() condition
is there because I want to emit a disconnected STOPPED message only once.
So the correct code is :

  @Override
public void processElement(IUHeartbeat heartbeat, Context ctx,
Collector out) throws Exception {
Boolean isDisconnected = isDisconnectedStateStore.value();
//LOGGER.info("Watermark: " +
ctx.timerService().currentWatermark() + " Processing timestamp : "+
heartbeat.getTimestamp() + ", isDisconnected : " + isDisconnected
//+" last registered timer :" +
registeredTimerStateStore.value());

// Delete previous timer.
if (registeredTimerStateStore.value() != null)

ctx.timerService().deleteEventTimeTimer(registeredTimerStateStore.value());

// Register a timer that will fire in the future if no further
events are received.
long timerFiringTimestamp = heartbeat.getTimestamp() +
DISCONNECTED_TIMEOUT;
ctx.timerService().registerEventTimeTimer(timerFiringTimestamp);
registeredTimerStateStore.update(timerFiringTimestamp);

// If this is the first message for this monitor or is the
first message after a disconnection.
if (isDisconnected == null || isDisconnected == Boolean.TRUE) {
// Emit a message indicating END of the disconnected state.
IUSessionMessage message = new IUSessionMessage(
new
IUMonitorFeatureKey(heartbeat.getMonitorKey().getMonitorName(),
"dummy", "dummy"),
new IUSessionInfo(heartbeat.getTimestamp(),
IUStatus.ENDED, IUEventType.NO_VALUE));
out.collect(message);
LOGGER.info(message.getSessionInfo().toString());
}

// Update the state store.
isDisconnectedStateStore.update(Boolean.FALSE);
}


This produces the expected output.
Also, I will assume that this is the best way to solve my problem - I can't
use Flink's session windows. Let me know if anyone has any other ideas
though!

Thank you for your time and quick response!


On Tue, Aug 11, 2020 at 1:45 PM Timo Walther  wrote:

> Hi Manas,
>
> at the first glance your code looks correct to me. I would investigate
> if your keys and watermarks are correct. Esp. the watermark frequency
> could be an issue. If watermarks are generated at the same time as the
> heartbeats itself, it might be the case that the timers fire first
> before the process() function is called which resets the timer.
>
> Maybe you can give us more information how watermarks are generated?
>
> Regards,
> Timo
>
> On 11.08.20 08:33, Manas Kale wrote:
> > Hi,
> > I have a bunch of devices that keep sending heartbeat messages. I want
> > to make an operator that emits messages when a device disconnects and
> > when a device stops being disconnected.
> > A device is considered disconnected if we don't receive any heartbeat
> > for more than some TIMEOUT duration.
> > This seemed like a good candidate for session windows, but I am not sure
> > how I can express the inverse logic (i.e. detecting periods of
> > inactivity instead of activity) using Flink's operators.
> > I want to use event time for all processing and ideally want to achieve
> > this behaviour using a single operator.
> >
> > So I am trying to implement a custom processfunction that, on every
> > heartbeat:
> >
> >   * Deletes any previous event time timer
> >   * Registers a new timer to fire at heartbeat.timestamp + TIMEOUT
> >
> > The basic idea is that every new heartbeat will keep pushing the timer
> > forward. Only when heartbeats stop arriving does the timer fire,
> > indicating the start of a disconnected state.
> > Code:
> >
> > public class IUDisconnectedStateDetectorextends
> KeyedProcessFunction {
> >
> >  // Tracks if this monitor is disconnected or not.
> > private ValueStateisDisconnectedStateStore;
> >  // Tracks which timer was registered.
> > private ValueStateregisteredTimerStateStore;
> >
> >  private final LoggerLOGGER =
> LoggerFactory.getLogger(IUDisconnectedStateDetector.class);
> >
> >  // Called by the Flink runtime before starting this operator. We
> > initialize the state stores here.
> > @Override
> > public void open(Configuration parameters)throws Exception {
> >  isDisconnectedStateStore = getRuntimeContext().getState(new
> ValueStateDescriptor(
> >  DISCONNECTED_STATE_STORE_NAME, Boolean.class));
> >  registeredTimerStateStore = getRuntimeContext().getState(new
> ValueStateDescriptor(
> >

Event time based disconnection detection logic

2020-08-11 Thread Manas Kale
Hi,
I have a bunch of devices that keep sending heartbeat messages. I want to
make an operator that emits messages when a device disconnects and when a
device stops being disconnected.
A device is considered disconnected if we don't receive any heartbeat for
more than some TIMEOUT duration.
This seemed like a good candidate for session windows, but I am not sure
how I can express the inverse logic (i.e. detecting periods of inactivity
instead of activity) using Flink's operators.
I want to use event time for all processing and ideally want to achieve
this behaviour using a single operator.

So I am trying to implement a custom processfunction that, on every
heartbeat:

   - Deletes any previous event time timer
   - Registers a new timer to fire at heartbeat.timestamp + TIMEOUT

The basic idea is that every new heartbeat will keep pushing the timer
forward. Only when heartbeats stop arriving does the timer fire, indicating
the start of a disconnected state.
Code:

public class IUDisconnectedStateDetector extends
KeyedProcessFunction {

// Tracks if this monitor is disconnected or not.
private ValueState isDisconnectedStateStore;
// Tracks which timer was registered.
private ValueState registeredTimerStateStore;

private final Logger LOGGER =
LoggerFactory.getLogger(IUDisconnectedStateDetector.class);

// Called by the Flink runtime before starting this operator. We
initialize the state stores here.
@Override
public void open(Configuration parameters) throws Exception {
isDisconnectedStateStore = getRuntimeContext().getState(new
ValueStateDescriptor(
DISCONNECTED_STATE_STORE_NAME, Boolean.class));
registeredTimerStateStore = getRuntimeContext().getState(new
ValueStateDescriptor(
REGISTERED_TIMER_STATE_STORE_NAME, Long.class));
}

@Override
public void processElement(IUHeartbeat heartbeat, Context ctx,
Collector out) throws Exception {
Boolean isDisconnected = isDisconnectedStateStore.value();
LOGGER.info("Watermark: " + heartbeat + ", isDisconnected : "
+ isDisconnected
+" last registered timer :" +
registeredTimerStateStore.value());


// If this is the first message for this monitor or is the
first message after a disconnection.
if (isDisconnected == null || isDisconnected == Boolean.TRUE) {
// Delete previous timer.
if (registeredTimerStateStore.value() != null)

ctx.timerService().deleteEventTimeTimer(registeredTimerStateStore.value());

// Register a timer that will fire in the future if no
further events are received.
long timerFiringTimestamp = heartbeat.getTimestamp() +
DISCONNECTED_TIMEOUT;
ctx.timerService().registerEventTimeTimer(timerFiringTimestamp);
registeredTimerStateStore.update(timerFiringTimestamp);

// Emit a message indicating END of the disconnected state.
IUSessionMessage message = new IUSessionMessage(
new
IUMonitorFeatureKey(heartbeat.getMonitorKey().getMonitorName(),
"dummy", "dummy"),
new IUSessionInfo(heartbeat.getTimestamp(),
IUStatus.ENDED, IUEventType.NO_VALUE));
out.collect(message);
LOGGER.info(message.getSessionInfo().toString());
// Update the state store.
isDisconnectedStateStore.update(Boolean.FALSE);
}
}


@Override
public void onTimer(long timestamp, OnTimerContext ctx,
Collector out) throws Exception {
if (isDisconnectedStateStore.value() == Boolean.FALSE) {
// If this timer fires that means no message was received
from the monitor for some timeout duration.
// Update the state store.
isDisconnectedStateStore.update(Boolean.TRUE);

// Emit a message indicating START of the disconnected
state. Note that since this is applicable for a monitor,
IUSessionMessage message = new IUSessionMessage(
new
IUMonitorFeatureKey(ctx.getCurrentKey().getMonitorName(),
"dummyFeatureName", "dummyDeviceId"),
new IUSessionInfo(timestamp, IUStatus.STARTED,
IUEventType.NO_VALUE));
out.collect(message);

LOGGER.info(message.getSessionInfo().toString());
}
}
}

*However, the above code does not behave as expected - the timer fires even
when (a) it has received heartbeats within the timeout and (b) I have the
code to delete it*. So, my questions:

   - Am I deleting the timer incorrectly? I use a state store to keep track
   of registered timer's timestamps and use that value when deleting.
   - Am I overcomplicating things? Can this be achieved using Flink's
   inbuild session windowing operators?

Thanks!


Re: PyFlink DDL UDTF join error

2020-07-29 Thread Manas Kale
Hi Wei,
Thank you for the clarification and workaround.

Regards,
Manas

On Wed, Jul 29, 2020 at 12:55 PM Wei Zhong  wrote:

> Hi Manas,
>
> It seems a bug of the create view operation. I have created a JIRA for it:
> https://issues.apache.org/jira/browse/FLINK-18750
>
> Before repairing, please do not use create view operation for udtf call.
>
> Best,
> Wei
>
> 在 2020年7月28日,21:19,Wei Zhong  写道:
>
> Hi Manas,
>
> It seems like a bug. You can try to replace the udtf sql call with such
> code as a workaround currently:
>
> t_env.register_table("tmp_view", 
> t_env.from_path(f"{INPUT_TABLE}").join_lateral("split(data) as (featureName, 
> featureValue)"))
>
>
> This works for me. I’ll try to find out what caused this exception.
>
> Best,
> Wei
>
> 在 2020年7月28日,18:33,Manas Kale  写道:
>
> Hi,
> Using pyFlink DDL, I am trying to:
>
>1. Consume a Kafka JSON stream. This has messages with aggregate data,
>example:  "data":
>
> "{\"0001\":105.0,\"0002\":1.21,\"0003\":0.69,\"0004\":1.46,\"0005\":47.43,\"0006\":103.3}"
>2. I am splitting field "data" so that I can process its values
>individually. For that, I have defined a UDTF.
>3. I store the UDTF output in a temporary view. (Meaning each output
>of the UDTF will contain "0001" 105.0, "0002" 1.21 etc...)
>4. I use the values in this temporary view to calculate some
>aggregation metrics.
>
> I am getting an SQL error for  step 4.
> Code:
>
> from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
> from pyflink.table import StreamTableEnvironment, EnvironmentSettings, 
> DataTypes, Row
> from pyflink.table.udf import udtf
> from json import loads
>
> exec_env = StreamExecutionEnvironment.get_execution_environment()
> exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> t_env = StreamTableEnvironment.create(exec_env, 
> environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())
>
>
> @udtf(input_types=DataTypes.STRING(), result_types= 
> [DataTypes.STRING(),DataTypes.DOUBLE()])
> def split_feature_values(data_string):
> json_data = loads(data_string)
> for f_name, f_value in json_data.items():
> yield f_name, f_value
>
> # configure the off-heap memory of current taskmanager to enable the python 
> worker uses off-heap memory.
> t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
>  '80m')
>
> # Register UDTF
> t_env.register_function("split", split_feature_values)
> # ... string constants
>
> # Init Kafka input table
> t_env.execute_sql(f"""
> CREATE TABLE {INPUT_TABLE} (
> monitorId STRING,
> deviceId STRING,
> state INT,
> data STRING,
> time_str TIMESTAMP(3),
> WATERMARK FOR time_str AS time_str - INTERVAL '{WATERMARK_DELAY}' 
> SECOND
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = '{INPUT_TOPIC}',
> 'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
> 'format' = 'json'
> )
> """)
>
> # 10 sec summary table
> t_env.execute_sql(f"""
> CREATE TABLE {TEN_SEC_OUTPUT_TABLE} (
> monitorId STRING,
> featureName STRING,
> maxFv DOUBLE,
> minFv DOUBLE,
> avgFv DOUBLE,
> windowStart TIMESTAMP(3),
> WATERMARK FOR windowStart AS windowStart
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = '{TEN_SEC_OUTPUT_TOPIC}',
> 'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
> 'format' = 'json'
> )
> """)
>
> # Join with UDTF
> t_env.execute_sql(f"""
> CREATE VIEW tmp_view AS
> SELECT * FROM (
> SELECT monitorId, T.featureName, T.featureValue, time_str
> FROM {INPUT_TABLE}, LATERAL TABLE(split(data)) as T(featureName, 
> featureValue)
> )
> """)
>
> # Create 10 second view <-* this causes the error*
> t_env.execute_sql(f"""
> INSERT INTO {TEN_SEC_OUTPUT_TABLE}
> SELECT monitorId, featureName, MAX(featureValue), MIN(featureValue), 
> AVG(featureValue), TUMBLE_START(time_str, INTERVAL '10' SECOND)
> FROM tmp_view
> GROUP BY TUMBLE(time_str, INTERVAL '10' SECOND), monitorId, featureName
> """)
>
>
> The last SQL statement where I calculate metrics causes the error. The error 
> message is :
>
> Traceback (

PyFlink DDL UDTF join error

2020-07-28 Thread Manas Kale
Hi,
Using pyFlink DDL, I am trying to:

   1. Consume a Kafka JSON stream. This has messages with aggregate data,
   example:  "data":
   
"{\"0001\":105.0,\"0002\":1.21,\"0003\":0.69,\"0004\":1.46,\"0005\":47.43,\"0006\":103.3}"
   2. I am splitting field "data" so that I can process its values
   individually. For that, I have defined a UDTF.
   3. I store the UDTF output in a temporary view. (Meaning each output of
   the UDTF will contain "0001" 105.0, "0002" 1.21 etc...)
   4. I use the values in this temporary view to calculate some
   aggregation metrics.

I am getting an SQL error for  step 4.
Code:

from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, EnvironmentSettings,
DataTypes, Row
from pyflink.table.udf import udtf
from json import loads

exec_env = StreamExecutionEnvironment.get_execution_environment()
exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
t_env = StreamTableEnvironment.create(exec_env,
environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())


@udtf(input_types=DataTypes.STRING(), result_types=
[DataTypes.STRING(),DataTypes.DOUBLE()])
def split_feature_values(data_string):
json_data = loads(data_string)
for f_name, f_value in json_data.items():
yield f_name, f_value

# configure the off-heap memory of current taskmanager to enable the
python worker uses off-heap memory.
t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
'80m')

# Register UDTF
t_env.register_function("split", split_feature_values)
# ... string constants

# Init Kafka input table
t_env.execute_sql(f"""
CREATE TABLE {INPUT_TABLE} (
monitorId STRING,
deviceId STRING,
state INT,
data STRING,
time_str TIMESTAMP(3),
WATERMARK FOR time_str AS time_str - INTERVAL '{WATERMARK_DELAY}' SECOND
) WITH (
'connector' = 'kafka',
'topic' = '{INPUT_TOPIC}',
'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
'format' = 'json'
)
""")

# 10 sec summary table
t_env.execute_sql(f"""
CREATE TABLE {TEN_SEC_OUTPUT_TABLE} (
monitorId STRING,
featureName STRING,
maxFv DOUBLE,
minFv DOUBLE,
avgFv DOUBLE,
windowStart TIMESTAMP(3),
WATERMARK FOR windowStart AS windowStart
) WITH (
'connector' = 'kafka',
'topic' = '{TEN_SEC_OUTPUT_TOPIC}',
'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
'format' = 'json'
)
""")

# Join with UDTF
t_env.execute_sql(f"""
CREATE VIEW tmp_view AS
SELECT * FROM (
SELECT monitorId, T.featureName, T.featureValue, time_str
FROM {INPUT_TABLE}, LATERAL TABLE(split(data)) as
T(featureName, featureValue)
)
""")

# Create 10 second view <-* this causes the error*
t_env.execute_sql(f"""
INSERT INTO {TEN_SEC_OUTPUT_TABLE}
SELECT monitorId, featureName, MAX(featureValue),
MIN(featureValue), AVG(featureValue), TUMBLE_START(time_str, INTERVAL
'10' SECOND)
FROM tmp_view
GROUP BY TUMBLE(time_str, INTERVAL '10' SECOND), monitorId, featureName
""")


The last SQL statement where I calculate metrics causes the error. The
error message is :

Traceback (most recent call last):
  File "/home/manas/IU_workspace/Flink_POC/pyflink/aggregate_streaming_job.py",
line 97, in 
""")
  File 
"/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/table/table_environment.py",
line 543, in execute_sql
return TableResult(self._j_tenv.executeSql(stmt))
  File 
"/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/java_gateway.py",
line 1286, in __call__
answer, self.gateway_client, self.target_id, self.name)
  File 
"/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py",
line 147, in deco
return f(*a, **kw)
  File 
"/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/protocol.py",
line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o5.executeSql.
*: org.apache.flink.table.api.ValidationException: SQL validation
failed. From line 4, column 15 to line 4, column 20: Column 'data' not
found in any table*


I don't understand why Flink wants a "data" column. I discard the
"data" column in the temporary view, and it certainly does not exist
in the

TEN_SECOND_OUTPUT_TABLE. The only place it exists is in the initial
INPUT_TABLE which is not relevant for the erroneous SQL statement!

Clearly I missed understanding something. Have I missed something when
creating the temporary view?


Re: Reading/writing Kafka message keys using DDL pyFlink

2020-07-24 Thread Manas Kale
Okay, thanks for the info!

On Fri, Jul 24, 2020 at 2:11 PM Leonard Xu  wrote:

> Hi, Kale
>
> Unfortunately Flink SQL does not support read/write Kafka message keys
> yet, there is a FLIP[1] to discuss this feature.
>
> Best
> Leonard Xu
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records#FLIP107:Readingtablecolumnsfromdifferentpartsofsourcerecords-Readingandwritingfromkey,value,timestamp
> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-107:+Reading+table+columns+from+different+parts+of+source+records#FLIP107:Readingtablecolumnsfromdifferentpartsofsourcerecords-Readingandwritingfromkey,value,timestamp>
>
> 在 2020年7月24日,15:01,Manas Kale  写道:
>
> Hi,
> How do I read/write Kafka message keys using DDL? I have not been able to
> see any documentation for the same.
>
> Thanks!
>
>
>


Reading/writing Kafka message keys using DDL pyFlink

2020-07-24 Thread Manas Kale
Hi,
How do I read/write Kafka message keys using DDL? I have not been able to
see any documentation for the same.

Thanks!


pyFlink UDTF function registration

2020-07-15 Thread Manas Kale
Hi,
I am trying to use a UserDefined Table Function to split up some data as
follows:

from pyflink.table.udf import udtf

@udtf(input_types=DataTypes.STRING(), result_types=
[DataTypes.STRING(), DataTypes.DOUBLE()])
def split_feature_values(data_string):
json_data = loads(data_string)
for f_name, f_value in json_data.items():
yield (f_name, f_value)

# configure the off-heap memory of current taskmanager to enable the
python worker uses off-heap memory.
t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
'80m')

# Register UDTF
t_env.register_function("split", split_feature_values)
ddl_source = f"""
CREATE TABLE {INPUT_TABLE} (
`monitorId` STRING,
`deviceId` STRING,
`state` INT,
`data` STRING,
`time_st` TIMESTAMP(3),
WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND
) WITH (
'connector' = 'kafka',
'topic' = '{INPUT_TOPIC}',
'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
'format' = 'json'
)
"""

ddl_temporary_table = f"""
CREATE TABLE {TEMPORARY_TABLE} (
`monitorId` STRING,
`featureName` STRING,
`featureData` DOUBLE,
`time_st` TIMESTAMP(3),
WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND
)
"""

ddl_populate_temporary_table = f"""
INSERT INTO {TEMPORARY_TABLE}
SELECT monitorId, split(data), time_st
FROM {INPUT_TABLE}
"""

t_env.execute_sql(ddl_source)
t_env.execute_sql(ddl_temporary_table)
t_env.execute_sql(ddl_populate_temporary_table)


However, I get the following error :
py4j.protocol.Py4JJavaError: An error occurred while calling o5.executeSql.
: org.apache.flink.table.api.ValidationException: SQL validation failed.
>From line 3, column 23 to line 3, column 33:* No match found for function
signature split()*

I believe I am using the correct call to register the UDTF as per [1]. Am I
missing something?

Thanks,
Manas

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/python/python_udfs.html#table-functions


Re: pyFlink 1.11 streaming job example

2020-07-14 Thread Manas Kale
Thank you Xingbo, this will certainly help!

On Wed, Jul 15, 2020 at 7:39 AM Xingbo Huang  wrote:

> Hi Manas,
>
> I have created a issue[1] to add related doc
>
> [1] https://issues.apache.org/jira/browse/FLINK-18598
>
> Best,
> Xingbo
>
> Manas Kale  于2020年7月14日周二 下午4:15写道:
>
>> Thank you for the quick reply Xingbo!
>>  Is there some documented webpage example that I can refer to in the
>> future for the latest pyFlink 1.11 API? I couldn't find anything related to
>> awaiting asynchronous results.
>>
>> Thanks,
>> Manas
>>
>> On Tue, Jul 14, 2020 at 1:29 PM Xingbo Huang  wrote:
>>
>>> Hi Manas,
>>>
>>>
>>> I tested your code, but there are no errors. Because execute_sql is an
>>> asynchronous method, you need to await through TableResult, you can try the
>>> following code:
>>>
>>>
>>> from pyflink.datastream import StreamExecutionEnvironment,
>>> TimeCharacteristic
>>> from pyflink.table import StreamTableEnvironment, EnvironmentSettings
>>>
>>>
>>> def test():
>>> exec_env = StreamExecutionEnvironment.get_execution_environment()
>>> exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>>> t_env = StreamTableEnvironment.create(exec_env,
>>>
>>> environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()
>>>   )
>>>
>>> INPUT_TABLE = "test"
>>> INPUT_TOPIC = "test"
>>> LOCAL_KAFKA = "localhost:2181"
>>> OUTPUT_TABLE = "test_output"
>>> OUTPUT_TOPIC = "test_output"
>>> ddl_source = f"""
>>> CREATE TABLE {INPUT_TABLE} (
>>> `monitorId` VARCHAR,
>>> `time_st` TIMESTAMP(3),
>>> WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND,
>>> `data` DOUBLE
>>> ) WITH (
>>> 'connector' = 'kafka',
>>> 'topic' = '{INPUT_TOPIC}',
>>> 'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
>>> 'properties.group.id' = 'myGroup',
>>> 'format' = 'json'
>>> )
>>> """
>>>
>>> ddl_sink = f"""
>>> CREATE TABLE {OUTPUT_TABLE} (
>>> `monitorId` VARCHAR,
>>> `max` DOUBLE
>>> ) WITH (
>>> 'connector' = 'kafka',
>>> 'topic' = '{OUTPUT_TOPIC}',
>>> 'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
>>> 'format' = 'json'
>>> )
>>> """
>>> t_env.execute_sql(ddl_source)
>>> t_env.execute_sql(ddl_sink)
>>>
>>> result = t_env.execute_sql(f"""
>>> INSERT INTO {OUTPUT_TABLE}
>>> SELECT monitorId, data
>>> FROM {INPUT_TABLE}
>>> """)
>>> result.get_job_client().get_job_execution_result().result()
>>>
>>>
>>> if __name__ == '__main__':
>>> test()
>>>
>>>
>>> Best,
>>> Xingbo
>>>
>>> Manas Kale  于2020年7月14日周二 下午3:31写道:
>>>
>>>> Hi,
>>>> I am trying to get a simple streaming job running in pyFlink and
>>>> understand the new 1.11 API. I just want to read from and write to kafka
>>>> topics.
>>>> Previously I was using t_env.execute("jobname"),
>>>> register_table_source() and register_table_sink() but in 1.11 all 3 were
>>>> deprecated and replaced by execute_sql() in the deprecation warning.
>>>>
>>>> exec_env = StreamExecutionEnvironment.get_execution_environment()
>>>> exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>>>> t_env = StreamTableEnvironment.create(exec_env,
>>>>   
>>>> environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()
>>>>   )
>>>>
>>>> ddl_source = f"""
>>>> CREATE TABLE {INPUT_TABLE} (
>>>> `monitorId` VARCHAR,
>>>> `time_st` TIMESTAMP(3),
>>>> WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND,
>>>> `data` DOUBLE
>>>> ) WITH (
>>>> 'connector' = 'kafka',
>>>> 'topic' = '{INPUT_TOPIC}',
>>>> 'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
>>>> 'properties.group.id' = 'myGroup',
>>>> 'format' = 'json'
>>>> )
>>>> """
>>>>
>>>> ddl_sink = f"""
>>>> CREATE TABLE {OUTPUT_TABLE} (
>>>> `monitorId` VARCHAR,
>>>> `max` DOUBLE
>>>> ) WITH (
>>>> 'connector' = 'kafka',
>>>> 'topic' = '{OUTPUT_TOPIC}',
>>>> 'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
>>>> 'format' = 'json'
>>>> )
>>>> """
>>>> t_env.execute_sql(ddl_source)
>>>> t_env.execute_sql(ddl_sink)
>>>>
>>>> t_env.execute_sql(f"""
>>>> INSERT INTO {OUTPUT_TABLE}
>>>> SELECT monitorId, data
>>>> FROM {INPUT_TABLE}
>>>> """)
>>>>
>>>>
>>>> This gives me the error :
>>>> : java.lang.IllegalStateException: No operators defined in streaming
>>>> topology. Cannot generate StreamGraph.
>>>>
>>>> I am aware this is lazily evaluated, so is there some equivalent SQL
>>>> statement for t_env.execute() that I should be calling?
>>>>
>>>> Thanks,
>>>> Manas
>>>>
>>>


Re: DDL TIMESTAMP(3) parsing issue

2020-07-14 Thread Manas Kale
Thanks for the quick replies Dawid and Leonard... I had both flink-json
JARs for 1.10 and 1.11. I deleted 1.10 and now it works!

On Tue, Jul 14, 2020 at 4:17 PM Leonard Xu  wrote:

> Hi,Kale
>
> I think you’re using correct TIMESTAMP Data type in JSON format, and this
> should work properly.
> But looks like you used an old version `flink-json` dependency from the
> log.  Could you check the version of `flink-json` is 1.11.0 ?
>
> Best,
> Leonard Xu
>
>
> 在 2020年7月14日,18:07,Manas Kale  写道:
>
> Hi,
> I am trying to parse this JSON message:
> {"monitorId": 789, "deviceId": "abcd", "data": 144.0, "state": 2,
> "time_st": "2020-07-14 15:15:19.60"}
> using pyFlink 1.11 DDL with this code:
>
> ddl_source = f"""
> CREATE TABLE {INPUT_TABLE} (
> `monitorId` STRING,
> `deviceId` STRING,
> `state` INT,
> `time_st` TIMESTAMP(3),
> WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND,
> `data` DOUBLE
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = '{INPUT_TOPIC}',
> 'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
> 'format' = 'json'
> )
> """
>
> I used *[1]* for the DDL format and *[2]* for the timestamp string
> format. However, when I run this I get the following error :
> *Caused by: java.io.IOException: Failed to deserialize JSON '{"monitorId":
> 789, "deviceId": "abcd", "data": 144.0, "state": 2, "time_st": "2020-07-14
> 15:15:19.60"}'.*
> at
> org.apache.flink.formats.json.JsonRowDataDeserializationSchema.deserialize(JsonRowDataDeserializationSchema.java:126)
> at
> org.apache.flink.formats.json.JsonRowDataDeserializationSchema.deserialize(JsonRowDataDeserializationSchema.java:76)
> at
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:81)
> at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
>
> *Caused by: java.lang.NoSuchFieldError: SQL_TIMESTAMP_FORMAT*
>
> I believe I am using the correct TIMESTAMP format in the JSON message
> according to the documentation so can't figure out what could be the error.
>
> Any help would be appreciated!
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/time_attributes.html#defining-in-create-table-ddl-1
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html#timestamp
>
> Thanks,
> Manas
>
>
>


DDL TIMESTAMP(3) parsing issue

2020-07-14 Thread Manas Kale
Hi,
I am trying to parse this JSON message:
{"monitorId": 789, "deviceId": "abcd", "data": 144.0, "state": 2,
"time_st": "2020-07-14 15:15:19.60"}
using pyFlink 1.11 DDL with this code:

ddl_source = f"""
CREATE TABLE {INPUT_TABLE} (
`monitorId` STRING,
`deviceId` STRING,
`state` INT,
`time_st` TIMESTAMP(3),
WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND,
`data` DOUBLE
) WITH (
'connector' = 'kafka',
'topic' = '{INPUT_TOPIC}',
'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
'format' = 'json'
)
"""

I used *[1]* for the DDL format and *[2]* for the timestamp string format.
However, when I run this I get the following error :
*Caused by: java.io.IOException: Failed to deserialize JSON '{"monitorId":
789, "deviceId": "abcd", "data": 144.0, "state": 2, "time_st": "2020-07-14
15:15:19.60"}'.*
at
org.apache.flink.formats.json.JsonRowDataDeserializationSchema.deserialize(JsonRowDataDeserializationSchema.java:126)
at
org.apache.flink.formats.json.JsonRowDataDeserializationSchema.deserialize(JsonRowDataDeserializationSchema.java:76)
at
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:81)
at
org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)

*Caused by: java.lang.NoSuchFieldError: SQL_TIMESTAMP_FORMAT*

I believe I am using the correct TIMESTAMP format in the JSON message
according to the documentation so can't figure out what could be the error.

Any help would be appreciated!

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/time_attributes.html#defining-in-create-table-ddl-1
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html#timestamp

Thanks,
Manas


Re: pyFlink 1.11 streaming job example

2020-07-14 Thread Manas Kale
Thank you for the quick reply Xingbo!
 Is there some documented webpage example that I can refer to in the future
for the latest pyFlink 1.11 API? I couldn't find anything related to
awaiting asynchronous results.

Thanks,
Manas

On Tue, Jul 14, 2020 at 1:29 PM Xingbo Huang  wrote:

> Hi Manas,
>
>
> I tested your code, but there are no errors. Because execute_sql is an
> asynchronous method, you need to await through TableResult, you can try the
> following code:
>
>
> from pyflink.datastream import StreamExecutionEnvironment,
> TimeCharacteristic
> from pyflink.table import StreamTableEnvironment, EnvironmentSettings
>
>
> def test():
> exec_env = StreamExecutionEnvironment.get_execution_environment()
> exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> t_env = StreamTableEnvironment.create(exec_env,
>
> environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()
>   )
>
> INPUT_TABLE = "test"
> INPUT_TOPIC = "test"
> LOCAL_KAFKA = "localhost:2181"
> OUTPUT_TABLE = "test_output"
> OUTPUT_TOPIC = "test_output"
> ddl_source = f"""
> CREATE TABLE {INPUT_TABLE} (
> `monitorId` VARCHAR,
> `time_st` TIMESTAMP(3),
> WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND,
> `data` DOUBLE
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = '{INPUT_TOPIC}',
> 'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
> 'properties.group.id' = 'myGroup',
> 'format' = 'json'
> )
> """
>
> ddl_sink = f"""
> CREATE TABLE {OUTPUT_TABLE} (
> `monitorId` VARCHAR,
> `max` DOUBLE
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = '{OUTPUT_TOPIC}',
> 'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
> 'format' = 'json'
> )
> """
> t_env.execute_sql(ddl_source)
> t_env.execute_sql(ddl_sink)
>
> result = t_env.execute_sql(f"""
> INSERT INTO {OUTPUT_TABLE}
> SELECT monitorId, data
> FROM {INPUT_TABLE}
> """)
> result.get_job_client().get_job_execution_result().result()
>
>
> if __name__ == '__main__':
> test()
>
>
> Best,
> Xingbo
>
> Manas Kale  于2020年7月14日周二 下午3:31写道:
>
>> Hi,
>> I am trying to get a simple streaming job running in pyFlink and
>> understand the new 1.11 API. I just want to read from and write to kafka
>> topics.
>> Previously I was using t_env.execute("jobname"), register_table_source()
>> and register_table_sink() but in 1.11 all 3 were deprecated and replaced by
>> execute_sql() in the deprecation warning.
>>
>> exec_env = StreamExecutionEnvironment.get_execution_environment()
>> exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>> t_env = StreamTableEnvironment.create(exec_env,
>>   
>> environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()
>>   )
>>
>> ddl_source = f"""
>> CREATE TABLE {INPUT_TABLE} (
>> `monitorId` VARCHAR,
>> `time_st` TIMESTAMP(3),
>> WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND,
>> `data` DOUBLE
>> ) WITH (
>> 'connector' = 'kafka',
>> 'topic' = '{INPUT_TOPIC}',
>> 'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
>> 'properties.group.id' = 'myGroup',
>> 'format' = 'json'
>> )
>> """
>>
>> ddl_sink = f"""
>> CREATE TABLE {OUTPUT_TABLE} (
>> `monitorId` VARCHAR,
>> `max` DOUBLE
>> ) WITH (
>> 'connector' = 'kafka',
>> 'topic' = '{OUTPUT_TOPIC}',
>> 'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
>> 'format' = 'json'
>> )
>> """
>> t_env.execute_sql(ddl_source)
>> t_env.execute_sql(ddl_sink)
>>
>> t_env.execute_sql(f"""
>> INSERT INTO {OUTPUT_TABLE}
>> SELECT monitorId, data
>> FROM {INPUT_TABLE}
>> """)
>>
>>
>> This gives me the error :
>> : java.lang.IllegalStateException: No operators defined in streaming
>> topology. Cannot generate StreamGraph.
>>
>> I am aware this is lazily evaluated, so is there some equivalent SQL
>> statement for t_env.execute() that I should be calling?
>>
>> Thanks,
>> Manas
>>
>


pyFlink 1.11 streaming job example

2020-07-14 Thread Manas Kale
Hi,
I am trying to get a simple streaming job running in pyFlink and understand
the new 1.11 API. I just want to read from and write to kafka topics.
Previously I was using t_env.execute("jobname"), register_table_source()
and register_table_sink() but in 1.11 all 3 were deprecated and replaced by
execute_sql() in the deprecation warning.

exec_env = StreamExecutionEnvironment.get_execution_environment()
exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
t_env = StreamTableEnvironment.create(exec_env,

environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()
  )

ddl_source = f"""
CREATE TABLE {INPUT_TABLE} (
`monitorId` VARCHAR,
`time_st` TIMESTAMP(3),
WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND,
`data` DOUBLE
) WITH (
'connector' = 'kafka',
'topic' = '{INPUT_TOPIC}',
'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
'properties.group.id' = 'myGroup',
'format' = 'json'
)
"""

ddl_sink = f"""
CREATE TABLE {OUTPUT_TABLE} (
`monitorId` VARCHAR,
`max` DOUBLE
) WITH (
'connector' = 'kafka',
'topic' = '{OUTPUT_TOPIC}',
'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
'format' = 'json'
)
"""
t_env.execute_sql(ddl_source)
t_env.execute_sql(ddl_sink)

t_env.execute_sql(f"""
INSERT INTO {OUTPUT_TABLE}
SELECT monitorId, data
FROM {INPUT_TABLE}
""")


This gives me the error :
: java.lang.IllegalStateException: No operators defined in streaming
topology. Cannot generate StreamGraph.

I am aware this is lazily evaluated, so is there some equivalent SQL
statement for t_env.execute() that I should be calling?

Thanks,
Manas


PyFlink Table API - "A group window expects a time attribute for grouping in a stream environment."

2020-07-13 Thread Manas Kale
Hi,
I have the following piece of code (for pyFlink v1.11) :

t_env.from_path(INPUT_TABLE) \
.select("monitorId, data, rowtime") \
.window(Tumble.over("5.minutes").on("rowtime").alias("five_sec_window")) \
.group_by("five_sec_window, monitorId") \
.select("monitorId, data.avg, data.min, data.max,
five_sec_window.rowtime") \
.execute_insert(OUTPUT_TABLE)

Which is generating the exception :

Traceback (most recent call last):


* File "/home/manas/IU_workspace/Flink_POC/pyflink/main.py", line 124, in
.select("monitorId, data.avg, data.min, data.max,
five_sec_window.rowtime") \*  File
"/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/table/table.py",
line 907, in select
return Table(self._j_table.select(fields), self._t_env)
  File
"/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/java_gateway.py",
line 1286, in __call__
answer, self.gateway_client, self.target_id, self.name)
  File
"/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py",
line 147, in deco
return f(*a, **kw)
  File
"/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/protocol.py",
line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o87.select.

*: org.apache.flink.table.api.ValidationException: A group window expects a
time attribute for grouping in a stream environment.*

The "rowtime" attribute in INPUT_TABLE is created as :

exec_env = StreamExecutionEnvironment.get_execution_environment()
exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
t_env = StreamTableEnvironment.create(exec_env,

environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()
  )

...

 .field("rowtime", DataTypes.TIMESTAMP(3))
.rowtime(
Rowtime()
.timestamps_from_field("time_st")
.watermarks_periodic_ascending())

).create_temporary_table(INPUT_TABLE)


What is wrong with the code? I believe that I have already indicated which
attribute has to be treated as the time attribute.

Thank you,
Manas


Re: Flink Kafka connector in Python

2020-07-06 Thread Manas Kale
I also tried doing this by using a User Defined Function.

class DataConverter(ScalarFunction):
def eval(self, str_data):
data = json.loads(str_data)
return ?? # I want to return data['0001'] in field
'feature1', data['0002'] in field 'feature2' etc.

t_env.register_function("data_converter", udf(DataConverter(),
input_types = [DataTypes.STRING()],
  result_type =
  DataTypes.ROW([

DataTypes.FIELD("feature1", DataTypes.STRING())
  ])))


t_env.from_path(INPUT_TABLE) \
.select("data_converter(data)") \ # <--- here "data" is the field
"data" from the previous mail
.insert_into(OUTPUT_TABLE)


I used a ROW to hold multiple values but I can't figure out how I can
return a populated ROW object from the eval() method. Where is the method
to construct a row/field object and return it?


Thanks!


On Fri, Jul 3, 2020 at 12:40 PM Manas Kale  wrote:

> Hi Xingbo,
> Thanks for the reply, I didn't know that a table schema also needs to be
> declared after the connect or but I understand now.
> I have another question: how do I write the parsing schemas for a field
> that itself is a valid JSON string? For example:
> {
> "monitorId": 865,
> "deviceId": "94:54:93:49:96:13",
> "data":
> "{\"0001\":105.0,\"0002\":1.21,\"0003\":0.69,\"0004\":1.46,\"0005\":47.43,\"0006\":103.3}",
> "state": 2,
> "time": 1593687809180
> }
> The field "data" is a string of valid JSON with string:number objects. I'm
> currently trying using JSON schema object and DataTypes.ROW, but am getting
> deserialization errors.
>
> .with_format(
> Json()
> .json_schema(
> """
> {
> "type": "object",
> "properties": {
> "monitorId": {
> "type": "string"
> },
> "deviceId": {
> "type": "string"
> },
> "data": {
> "type": "object"
> },
> "state": {
> "type": "integer"
> },
> "time": {
> "type": "string"
> }
> }
> }
> """
> )
> ) \
> .with_schema(
> Schema()
> .field("monitorId", DataTypes.STRING())
> .field("deviceId", DataTypes.STRING())
> .field("data", DataTypes.ROW())
> )
>
> Regards,
>
> Manas
>
>
> On Thu, Jul 2, 2020 at 6:25 PM Xingbo Huang  wrote:
>
>> Hi, Manas
>> You need to define the schema. You can refer to the following example:
>>  t_env.connect(
>> Kafka()
>> .version('0.11')
>> .topic(INPUT_TOPIC)
>> .property("bootstrap.servers", PROD_KAFKA)
>> .property("zookeeper.connect", "localhost:2181")
>> .start_from_latest()
>> ) \
>> .with_format(
>> Json()
>> .json_schema(
>> "{"
>> "  type: 'object',"
>> "  properties: {"
>> "lon: {"
>> "  type: 'number'"
>> "},"
>> "rideTime: {"
>> "  type: 'string',"
>> "  format: 'date-time'"
>> "}"
>> "  }"
>> "}"
>> )
>> ) \
>> .with_schema(  # declare the schema of the table
>> Schema()
>> .field("lon", DataTypes.DECIMAL(20, 10))
>> .field("rideTime", DataTypes.TIMESTAMP(6))
>> ).register_table_source(INPUT_TABLE)
>>
>> Best,
>> Xingbo
>>
>> Manas Kale  于2020年7月2日周四 下午7:59写道:
>>
>>> Hi,
>>> I'm trying to get a simple consumer/producer running using the following
>>> code referred from the provided links :
>>>
>>> from pyflink.dataset import ExecutionEnvironment
>>> from pyflink.datastream import StreamExecutionEnvironment
>>> from pyflink.table import TableConfig, BatchTableEnvironment, DataTypes, 
>>> StreamTableEnvironment
>>> from pyflink.table.descriptors import Kafka, Json, FileSystem, Schema
>>>
>&g

Re: Flink Kafka connector in Python

2020-07-03 Thread Manas Kale
Hi Xingbo,
Thanks for the reply, I didn't know that a table schema also needs to be
declared after the connect or but I understand now.
I have another question: how do I write the parsing schemas for a field
that itself is a valid JSON string? For example:
{
"monitorId": 865,
"deviceId": "94:54:93:49:96:13",
"data":
"{\"0001\":105.0,\"0002\":1.21,\"0003\":0.69,\"0004\":1.46,\"0005\":47.43,\"0006\":103.3}",
"state": 2,
"time": 1593687809180
}
The field "data" is a string of valid JSON with string:number objects. I'm
currently trying using JSON schema object and DataTypes.ROW, but am getting
deserialization errors.

.with_format(
Json()
.json_schema(
"""
{
"type": "object",
"properties": {
"monitorId": {
"type": "string"
},
"deviceId": {
"type": "string"
},
"data": {
"type": "object"
},
"state": {
"type": "integer"
},
"time": {
"type": "string"
}
}
}
"""
)
) \
.with_schema(
Schema()
.field("monitorId", DataTypes.STRING())
.field("deviceId", DataTypes.STRING())
.field("data", DataTypes.ROW())
)

Regards,

Manas


On Thu, Jul 2, 2020 at 6:25 PM Xingbo Huang  wrote:

> Hi, Manas
> You need to define the schema. You can refer to the following example:
>  t_env.connect(
> Kafka()
> .version('0.11')
> .topic(INPUT_TOPIC)
> .property("bootstrap.servers", PROD_KAFKA)
> .property("zookeeper.connect", "localhost:2181")
> .start_from_latest()
> ) \
> .with_format(
> Json()
> .json_schema(
> "{"
> "  type: 'object',"
> "  properties: {"
> "lon: {"
> "  type: 'number'"
> "},"
> "rideTime: {"
> "  type: 'string',"
> "  format: 'date-time'"
> "}"
> "  }"
> "}"
> )
> ) \
> .with_schema(  # declare the schema of the table
> Schema()
> .field("lon", DataTypes.DECIMAL(20, 10))
> .field("rideTime", DataTypes.TIMESTAMP(6))
> ).register_table_source(INPUT_TABLE)
>
> Best,
> Xingbo
>
> Manas Kale  于2020年7月2日周四 下午7:59写道:
>
>> Hi,
>> I'm trying to get a simple consumer/producer running using the following
>> code referred from the provided links :
>>
>> from pyflink.dataset import ExecutionEnvironment
>> from pyflink.datastream import StreamExecutionEnvironment
>> from pyflink.table import TableConfig, BatchTableEnvironment, DataTypes, 
>> StreamTableEnvironment
>> from pyflink.table.descriptors import Kafka, Json, FileSystem, Schema
>>
>> exec_env = StreamExecutionEnvironment.get_execution_environment()
>>
>> t_config = TableConfig()
>> t_env = StreamTableEnvironment.create(exec_env, t_config)
>>
>> INPUT_TOPIC = 'xyz'
>> INPUT_TABLE = 'raw_message'
>> PROD_ZOOKEEPER = '...'
>> PROD_KAFKA = '...'
>>
>> OUTPUT_TOPIC = 'summary_output'
>> OUTPUT_TABLE = 'feature_summary'
>> LOCAL_ZOOKEEPER = 'localhost:2181'
>> LOCAL_KAFKA = 'localhost:9092'
>>
>>
>> t_env.connect(
>> Kafka()
>> .version('universal')
>> .topic(INPUT_TOPIC)
>> .property("bootstrap.servers", PROD_KAFKA)
>>
>> .start_from_latest()
>> ) \
>> .with_format(
>> Json()
>> .json_schema(
>> "{"
>> "  type: 'object',"
>> "  properties: {"
>> "lon: {"
>> "  type: 'number'"
>> "},"
>> "rideTime: {"
>> "  type: 'string',"
>> "  format: 'date-time'"
>> "}"
>> "  }"
>> "}"
>> )
>> ).register_table_source(INPUT_TABLE)
>>
>> t_env.connect(Kafka()
>> .version('universal')
>> .topic(OUTPUT_TOPIC)
>> .property("bootstrap.servers", LOCAL_KAFKA)
>>
>&

Re: Flink Kafka connector in Python

2020-07-02 Thread Manas Kale
ce/Flink_POC/pyflink/main.py", line 46,
in 
).register_table_source(INPUT_TABLE)
  File 
"/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/table/descriptors.py",
line 1295, in register_table_source
self._j_connect_table_descriptor.registerTableSource(name)
  File 
"/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/java_gateway.py",
line 1286, in __call__
answer, self.gateway_client, self.target_id, self.name)
  File 
"/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py",
line 154, in deco
raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace)
pyflink.util.exceptions.TableException: 'findAndCreateTableSource failed.'


The relevant part seems to be *Caused by:
org.apache.flink.table.api.ValidationException: Could not find the
required schema in property 'schema'.*

This is probably a basic error, but I can't figure out how I can know
what's wrong with the schema. Is the schema not properly declared? Is
some field missing?

FWIW I have included the JSON and kafka connector JARs in the required location.


Regards,
Manas


On Tue, Jun 30, 2020 at 11:58 AM Manas Kale  wrote:

> Hi Xingbo,
> Thank you for the information, it certainly helps!
>
> Regards,
> Manas
>
> On Mon, Jun 29, 2020 at 6:18 PM Xingbo Huang  wrote:
>
>> Hi Manas,
>>
>> Since Flink 1.9, the entire architecture of PyFlink has been redesigned.
>> So the method described in the link won't work.
>> But you can use more convenient DDL[1] or descriptor[2] to read kafka
>> data. Besides, You can refer to the common questions about PyFlink[3]
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#run-a-create-statement
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
>> [3]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/python/common_questions.html
>>
>> Best,
>> Xingbo
>>
>> Manas Kale  于2020年6月29日周一 下午8:10写道:
>>
>>> Hi,
>>> I want to consume and write to Kafak from Flink's python API.
>>>
>>> The only way I found to do this was through this
>>> <https://stackoverflow.com/questions/52744277/apache-flink-kafka-connector-in-python-streaming-api-cannot-load-user-class>
>>>  question
>>> on SO where the user essentially copies FlinkKafka connector JARs into the
>>> Flink runtime's lib/ directory.
>>>
>>>- Is this the recommended method to do this? If not, what is?
>>>- Is there any official documentation for using Kafka with pyFlink?
>>>Is this officially supported?
>>>- How does the method described in the link work? Does the Flink
>>>runtime load and expose all JARs in /lib to the python script? Can I 
>>> write
>>>custom operators in Java and use those through python?
>>>
>>> Thanks,
>>> Manas
>>>
>>


Re: Flink Kafka connector in Python

2020-06-30 Thread Manas Kale
Hi Xingbo,
Thank you for the information, it certainly helps!

Regards,
Manas

On Mon, Jun 29, 2020 at 6:18 PM Xingbo Huang  wrote:

> Hi Manas,
>
> Since Flink 1.9, the entire architecture of PyFlink has been redesigned.
> So the method described in the link won't work.
> But you can use more convenient DDL[1] or descriptor[2] to read kafka
> data. Besides, You can refer to the common questions about PyFlink[3]
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#run-a-create-statement
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
> [3]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/python/common_questions.html
>
> Best,
> Xingbo
>
> Manas Kale  于2020年6月29日周一 下午8:10写道:
>
>> Hi,
>> I want to consume and write to Kafak from Flink's python API.
>>
>> The only way I found to do this was through this
>> <https://stackoverflow.com/questions/52744277/apache-flink-kafka-connector-in-python-streaming-api-cannot-load-user-class>
>>  question
>> on SO where the user essentially copies FlinkKafka connector JARs into the
>> Flink runtime's lib/ directory.
>>
>>- Is this the recommended method to do this? If not, what is?
>>- Is there any official documentation for using Kafka with pyFlink?
>>Is this officially supported?
>>- How does the method described in the link work? Does the Flink
>>runtime load and expose all JARs in /lib to the python script? Can I write
>>custom operators in Java and use those through python?
>>
>> Thanks,
>> Manas
>>
>


Flink Kafka connector in Python

2020-06-29 Thread Manas Kale
Hi,
I want to consume and write to Kafak from Flink's python API.

The only way I found to do this was through this

question
on SO where the user essentially copies FlinkKafka connector JARs into the
Flink runtime's lib/ directory.

   - Is this the recommended method to do this? If not, what is?
   - Is there any official documentation for using Kafka with pyFlink? Is
   this officially supported?
   - How does the method described in the link work? Does the Flink runtime
   load and expose all JARs in /lib to the python script? Can I write custom
   operators in Java and use those through python?

Thanks,
Manas


Re: Testing process functions

2020-05-26 Thread Manas Kale
Thank you for the example, Alexander.

On Wed, May 20, 2020 at 6:48 PM Alexander Fedulov 
wrote:

> Hi Manas,
>
> I would recommend using TestHarnesses for testing. You could also use them
> prior to 1.10. Here is an example of setting the dependencies:
>
> https://github.com/afedulov/fraud-detection-demo/blob/master/flink-job/build.gradle#L113
>
> You can see some examples of tests for a demo application here:
>
> https://github.com/afedulov/fraud-detection-demo/blob/master/flink-job/src/test/java/com/ververica/field/dynamicrules/RulesEvaluatorTest.java
> Hope this helps.
>
> Best regards,
>
> --
>
> Alexander Fedulov | Solutions Architect
>
> +49 1514 6265796
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
>
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Tony) Cheng
>
>
>
> On Mon, May 18, 2020 at 1:18 PM Manas Kale  wrote:
>
>> I see, I had not considered the serialization; that was the issue.
>> Thank you.
>>
>> On Mon, May 18, 2020 at 12:29 PM Chesnay Schepler 
>> wrote:
>>
>>> We don't publish sources for test classes.
>>>
>>> Have you considered that the sink will be serialized on job submission,
>>> meaning that your myTestSink instance is not the one actually used by
>>> the job? This typically means that have to store stuff in a static field
>>> instead.
>>> Alternatively, depending on the number of elements
>>> org.apache.flink.streaming.api.datastream.DataStreamUtils#collect might
>>> be worth a try.
>>>
>>> On 15/05/2020 12:49, Manas Kale wrote:
>>> > Hi,
>>> > How do I test process functions? I tried by implementing a sink
>>> > function that stores myProcessFunction's output in a list. After
>>> > env.execute(), I use assertions.
>>> > If I set a breakpoint in the myTestSink's invoke() method, I see that
>>> > that method is being called correctly. However, after env.execute()
>>> > returns, all data in sink functions is wiped clean.
>>> >
>>> > TestSink myTestSink = new myTestSink();
>>> > testStream.process(new myProcessFunction()).addSink(myTestSink);
>>> > env.execute("test");
>>> > assertEquals(expectedOutput, myTestSink.actual);
>>> >
>>> > What am I doing wrong?
>>> >  Also, I see that a ProcessFunctionTestHarnesses has been added in
>>> > 1.10. I wasn't able to download its sources to understand how I could
>>> > use that. Have the sources not been added to maven or is it a problem
>>> > at my end?
>>> >
>>> > Regards,
>>> > Manas
>>>
>>>
>>>


Re: "Fill in" notification messages based on event time watermark

2020-05-19 Thread Manas Kale
Hi Aljoscha,
Thank you, that clarification helps. I am generating a new watermark in the
getCurrentWatermark() method of my assigner, which causes the watermark to
be actually updated every autoWatermark interval. I assumed that actual
watermark updates were caused by only setAutoWatermark() method, which was
incorrect. Your explanation makes it clear.
Note that I have canned this problem for now, and I'll send out a reply to
this chain if I need help to solve it properly again. I don't want to waste
anyone's time.

Thanks!


On Mon, May 18, 2020 at 7:59 PM Aljoscha Krettek 
wrote:

> I think there is some confusion in this thread between the auto
> watermark interval and the interval (length) of an event-time window.
> Maybe clearing that up for everyone helps.
>
> The auto watermark interval is the periodicity (in processing time) at
> which Flink asks the source (or a watermark generator) what the current
> watermark is. The source will keep track of the timestamps that it can
> "respond" to Flink when it asks. For example, if the auto watermark
> interval is set to 1 sec, Flink will update the watermark information
> every second. This doesn't mean, though, that the watermark advances 1
> sec in that time. If you're reading through some historic data the
> watermark could jump by hours in between those 1 second intervals. You
> can also think of this as the sampling interval for updating the current
> watermark.
>
> The window size size independent of the auto watermark interval, you can
> have an arbitrary size here. The auto watermark interval only controls
> how frequent Flink will check and emit the contents of windows, if their
> end timestamp is below the watermark.
>
> I hope that helps. If we're all clear we can look at the concrete
> problem again.
>
> Best,
> Aljoscha
>
> On 30.04.20 12:46, Manas Kale wrote:
> > Hi Timo and Piotrek,
> > Thank you for the suggestions.
> > I have been trying to set up unit tests at the operator granularity, and
> > the blog post's testHarness examples certainly help a lot in this regard.
> >
> > I understood my problem - an upstream session window operator can only
> > report the end of the session window when the watermark has passed
> > {lastObserverEvent + sessionTimeout}. However, my watermark was being
> > updated periodically without taking this into account. It seems I will
> have
> > to delay this notification operator's watermark by sessionTimeout.
> > Another complication is that this sessionTimeout is per-key, so I guess I
> > will have to implement a watermark assigner that extracts the delay
> period
> > from data (similar to DynamicEventTimeWindows).
> >
> > Also, if I do implement such an assigner, would it be helpful to add it
> to
> > Flink? I am happy to contribute if so. Any other comments/observations
> are
> > also welcome!
> >
> > Thank you all for the help,
> > Manas
> >
> >
> > On Wed, Apr 29, 2020 at 3:39 PM Piotr Nowojski 
> wrote:
> >
> >> Hi Manas,
> >>
> >> Adding to the response from Timo, if you don’t have unit
> tests/integration
> >> tests, I would strongly recommend setting them up, as it makes debugging
> >> and testing easier. You can read how to do it for your functions and
> >> operators here [1] and here [2].
> >>
> >> Piotrek
> >>
> >> [1]
> >>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html
> >> [2]
> >>
> https://flink.apache.org/news/2020/02/07/a-guide-for-unit-testing-in-apache-flink.html
> >>
> >> On 28 Apr 2020, at 18:45, Timo Walther  wrote:
> >>
> >> Hi Manas,
> >>
> >> Reg. 1: I would recommend to use a debugger in your IDE and check which
> >> watermarks are travelling through your operators.
> >>
> >> Reg. 2: All event-time operations are only performed once the watermark
> >> arrived from all parallel instances. So roughly speaking, in machine
> time
> >> you can assume that the window is computed in watermark update
> intervals.
> >> However, "what is computed" depends on the timestamps of your events and
> >> how those are categorized in windows.
> >>
> >> I hope this helps a bit.
> >>
> >> Regards,
> >> Timo
> >>
> >> On 28.04.20 14:38, Manas Kale wrote:
> >>
> >> Hi David and Piotrek,
> >> Thank you both for your inputs.
> >> I tried an implementation with the algorithm Piotrek suggested and
> David's
> >> example. Although notifications are being generated 

Re: Testing process functions

2020-05-18 Thread Manas Kale
I see, I had not considered the serialization; that was the issue.
Thank you.

On Mon, May 18, 2020 at 12:29 PM Chesnay Schepler 
wrote:

> We don't publish sources for test classes.
>
> Have you considered that the sink will be serialized on job submission,
> meaning that your myTestSink instance is not the one actually used by
> the job? This typically means that have to store stuff in a static field
> instead.
> Alternatively, depending on the number of elements
> org.apache.flink.streaming.api.datastream.DataStreamUtils#collect might
> be worth a try.
>
> On 15/05/2020 12:49, Manas Kale wrote:
> > Hi,
> > How do I test process functions? I tried by implementing a sink
> > function that stores myProcessFunction's output in a list. After
> > env.execute(), I use assertions.
> > If I set a breakpoint in the myTestSink's invoke() method, I see that
> > that method is being called correctly. However, after env.execute()
> > returns, all data in sink functions is wiped clean.
> >
> > TestSink myTestSink = new myTestSink();
> > testStream.process(new myProcessFunction()).addSink(myTestSink);
> > env.execute("test");
> > assertEquals(expectedOutput, myTestSink.actual);
> >
> > What am I doing wrong?
> >  Also, I see that a ProcessFunctionTestHarnesses has been added in
> > 1.10. I wasn't able to download its sources to understand how I could
> > use that. Have the sources not been added to maven or is it a problem
> > at my end?
> >
> > Regards,
> > Manas
>
>
>


Testing process functions

2020-05-15 Thread Manas Kale
Hi,
How do I test process functions? I tried by implementing a sink function
that stores myProcessFunction's output in a list. After env.execute(), I
use assertions.
If I set a breakpoint in the myTestSink's invoke() method, I see that that
method is being called correctly. However, after env.execute() returns, all
data in sink functions is wiped clean.

TestSink myTestSink = new myTestSink();
testStream.process(new myProcessFunction()).addSink(myTestSink);
env.execute("test");
assertEquals(expectedOutput, myTestSink.actual);

What am I doing wrong?
 Also, I see that a ProcessFunctionTestHarnesses has been added in 1.10. I
wasn't able to download its sources to understand how I could use that.
Have the sources not been added to maven or is it a problem at my end?

Regards,
Manas


Re: Broadcast state vs data enrichment

2020-05-14 Thread Manas Kale
I see, thank you Roman!

On Tue, May 12, 2020 at 4:59 PM Khachatryan Roman <
khachatryan.ro...@gmail.com> wrote:

> Thanks for the clarification.
>
> Apparently, the second option (with enricher) creates more load by adding
> configuration to every event. Unless events are much bigger than the
> configuration, this will significantly increase network, memory, and CPU
> usage.
> Btw, I think you don't need a broadcast in the 2nd option, since the
> interested subtask will receive the configuration anyways.
>
> Regards,
> Roman
>
>
> On Tue, May 12, 2020 at 5:57 AM Manas Kale  wrote:
>
>> Sure. Apologies for not making this clear enough.
>>
>> > each operator only stores what it needs.
>> Lets imagine this setup :
>>
>> BROADCAST STREAM
>> config-stream 
>> 
>> |   |
>>   |
>> event-stream--> operator1--> operator2-> 
>> operator3
>>
>>
>> In this scenario, all 3 operators will be BroadcastProcessFunctions. Each
>> of them will receive the whole config message in their
>> processBroadcastElement method, but each one will only store what it
>> needs in their state store. So even though operator1 will receive
>>  config = {
>> "config1" : 1,
>> "config2" : 2,
>> "config3" : 3
>> }
>> it will only store config1.
>>
>> > each downstream operator will "strip off" the config parameter that it
>> needs.
>>
>> BROADCAST STREAM
>> config-stream -
>>   |
>> event-stream-->  enricher --> 
>> operator1--> operator2-> operator3
>>
>> In this case, the enricher operator will store the whole config message.
>> When an event message arrives, this operator will append config1, config2
>> and config3 to it. Operator 1 will extract and use config1, and output a
>> message that has config1 stripped off.
>>
>> I hope that helps!
>>
>> Perhaps I am being too pedantic but I would like to know if these two
>> methods have comparable performance differences and if so which one would
>> be preferred.
>>
>>
>>
>>
>> On Mon, May 11, 2020 at 11:46 PM Khachatryan Roman <
>> khachatryan.ro...@gmail.com> wrote:
>>
>>> Hi Manas,
>>>
>>> The approaches you described looks the same:
>>> > each operator only stores what it needs.
>>> > each downstream operator will "strip off" the config parameter that it
>>> needs.
>>>
>>> Can you please explain the difference?
>>>
>>> Regards,
>>> Roman
>>>
>>>
>>> On Mon, May 11, 2020 at 8:07 AM Manas Kale 
>>> wrote:
>>>
>>>> Hi,
>>>> I have a single broadcast message that contains configuration data
>>>> consumed by different operators. For eg:
>>>> config = {
>>>> "config1" : 1,
>>>> "config2" : 2,
>>>> "config3" : 3
>>>> }
>>>>
>>>> Operator 1 will consume config1 only, operator 2 will consume config2
>>>> only etc.
>>>>
>>>>
>>>>- Right now in my implementation the config message gets broadcast
>>>>over operators 1,2,3 and each operator only stores what it needs.
>>>>
>>>>
>>>>- A different approach would be to broadcast the config message to
>>>>a single root operator. This will then enrich event data flowing 
>>>> through it
>>>>with config1,config2 and config3 and each downstream operator will 
>>>> "strip
>>>>off" the config parameter that it needs.
>>>>
>>>>
>>>> *I was wondering which approach would be the best to go with
>>>> performance wise. *I don't really have the time to implement both and
>>>> compare, so perhaps someone here already knows if one approach is better or
>>>> both provide similar performance.
>>>>
>>>> FWIW, the config stream is very sporadic compared to the event stream.
>>>>
>>>> Thank you,
>>>> Manas Kale
>>>>
>>>>
>>>>
>>>>


Re: Broadcast state vs data enrichment

2020-05-11 Thread Manas Kale
Sure. Apologies for not making this clear enough.

> each operator only stores what it needs.
Lets imagine this setup :

BROADCAST STREAM
config-stream 

|   |  |
event-stream--> operator1-->
operator2-> operator3


In this scenario, all 3 operators will be BroadcastProcessFunctions. Each
of them will receive the whole config message in their
processBroadcastElement method, but each one will only store what it needs
in their state store. So even though operator1 will receive
 config = {
"config1" : 1,
"config2" : 2,
"config3" : 3
}
it will only store config1.

> each downstream operator will "strip off" the config parameter that it
needs.

BROADCAST STREAM
config-stream -
  |
event-stream-->  enricher -->
operator1--> operator2-> operator3

In this case, the enricher operator will store the whole config message.
When an event message arrives, this operator will append config1, config2
and config3 to it. Operator 1 will extract and use config1, and output a
message that has config1 stripped off.

I hope that helps!

Perhaps I am being too pedantic but I would like to know if these two
methods have comparable performance differences and if so which one would
be preferred.




On Mon, May 11, 2020 at 11:46 PM Khachatryan Roman <
khachatryan.ro...@gmail.com> wrote:

> Hi Manas,
>
> The approaches you described looks the same:
> > each operator only stores what it needs.
> > each downstream operator will "strip off" the config parameter that it
> needs.
>
> Can you please explain the difference?
>
> Regards,
> Roman
>
>
> On Mon, May 11, 2020 at 8:07 AM Manas Kale  wrote:
>
>> Hi,
>> I have a single broadcast message that contains configuration data
>> consumed by different operators. For eg:
>> config = {
>> "config1" : 1,
>> "config2" : 2,
>> "config3" : 3
>> }
>>
>> Operator 1 will consume config1 only, operator 2 will consume config2
>> only etc.
>>
>>
>>- Right now in my implementation the config message gets broadcast
>>over operators 1,2,3 and each operator only stores what it needs.
>>
>>
>>- A different approach would be to broadcast the config message to a
>>single root operator. This will then enrich event data flowing through it
>>with config1,config2 and config3 and each downstream operator will "strip
>>off" the config parameter that it needs.
>>
>>
>> *I was wondering which approach would be the best to go with performance
>> wise. *I don't really have the time to implement both and compare, so
>> perhaps someone here already knows if one approach is better or both
>> provide similar performance.
>>
>> FWIW, the config stream is very sporadic compared to the event stream.
>>
>> Thank you,
>> Manas Kale
>>
>>
>>
>>


Broadcast state vs data enrichment

2020-05-11 Thread Manas Kale
Hi,
I have a single broadcast message that contains configuration data consumed
by different operators. For eg:
config = {
"config1" : 1,
"config2" : 2,
"config3" : 3
}

Operator 1 will consume config1 only, operator 2 will consume config2 only
etc.


   - Right now in my implementation the config message gets broadcast over
   operators 1,2,3 and each operator only stores what it needs.


   - A different approach would be to broadcast the config message to a
   single root operator. This will then enrich event data flowing through it
   with config1,config2 and config3 and each downstream operator will "strip
   off" the config parameter that it needs.


*I was wondering which approach would be the best to go with performance
wise. *I don't really have the time to implement both and compare, so
perhaps someone here already knows if one approach is better or both
provide similar performance.

FWIW, the config stream is very sporadic compared to the event stream.

Thank you,
Manas Kale


Re: "Fill in" notification messages based on event time watermark

2020-04-30 Thread Manas Kale
Hi Timo and Piotrek,
Thank you for the suggestions.
I have been trying to set up unit tests at the operator granularity, and
the blog post's testHarness examples certainly help a lot in this regard.

I understood my problem - an upstream session window operator can only
report the end of the session window when the watermark has passed
{lastObserverEvent + sessionTimeout}. However, my watermark was being
updated periodically without taking this into account. It seems I will have
to delay this notification operator's watermark by sessionTimeout.
Another complication is that this sessionTimeout is per-key, so I guess I
will have to implement a watermark assigner that extracts the delay period
from data (similar to DynamicEventTimeWindows).

Also, if I do implement such an assigner, would it be helpful to add it to
Flink? I am happy to contribute if so. Any other comments/observations are
also welcome!

Thank you all for the help,
Manas


On Wed, Apr 29, 2020 at 3:39 PM Piotr Nowojski  wrote:

> Hi Manas,
>
> Adding to the response from Timo, if you don’t have unit tests/integration
> tests, I would strongly recommend setting them up, as it makes debugging
> and testing easier. You can read how to do it for your functions and
> operators here [1] and here [2].
>
> Piotrek
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html
> [2]
> https://flink.apache.org/news/2020/02/07/a-guide-for-unit-testing-in-apache-flink.html
>
> On 28 Apr 2020, at 18:45, Timo Walther  wrote:
>
> Hi Manas,
>
> Reg. 1: I would recommend to use a debugger in your IDE and check which
> watermarks are travelling through your operators.
>
> Reg. 2: All event-time operations are only performed once the watermark
> arrived from all parallel instances. So roughly speaking, in machine time
> you can assume that the window is computed in watermark update intervals.
> However, "what is computed" depends on the timestamps of your events and
> how those are categorized in windows.
>
> I hope this helps a bit.
>
> Regards,
> Timo
>
> On 28.04.20 14:38, Manas Kale wrote:
>
> Hi David and Piotrek,
> Thank you both for your inputs.
> I tried an implementation with the algorithm Piotrek suggested and David's
> example. Although notifications are being generated with the watermark,
> subsequent transition events are being received after the watermark has
> crossed their timestamps. For example:
> state1 @ 100
> notification state1@ 110
> notification state1@ 120
> notification state1@ 130<- shouldn't have emitted this
> state2 @ 125 <- watermark is > 125 at this stage
> I think something might be subtly(?) wrong with how I have structured
> upstream operators. The allowed lateness is 0 in the watermarkassigner
> upstream, and I generate watermarks every x seconds.
> The operator that emits state transitions is constructed using the
> TumblingWindow approach I described in the first e-mail (so that I can
> compute at every watermark update). Note that I can use this approach for
> state-transition-operator because it only wants to emit transitions, and
> nothing in between.
> So, two questions:
> 1. Any idea on what might be causing this incorrect watermark behaviour?
> 2. If I want to perform some computation only when the watermark updates,
> is using a watermark-aligned EventTimeTumblingWindow (meaning
> windowDuration = watermarkUpdateInterval) the correct way to do this?
> Regards,
> Manas
> On Tue, Apr 28, 2020 at 2:16 AM David Anderson  mailto:da...@ververica.com >> wrote:
>Following up on Piotr's outline, there's an example in the
>documentation of how to use a KeyedProcessFunction to implement an
>event-time tumbling window [1]. Perhaps that can help you get started.
>Regards,
>David
>[1]
>
> https://ci.apache.org/projects/flink/flink-docs-master/tutorials/event_driven.html#example
>On Mon, Apr 27, 2020 at 7:47 PM Piotr Nowojski <mailto:pi...@ververica.com >> wrote:
>Hi,
>I’m not sure, but I don’t think there is an existing window that
>would do exactly what you want. I would suggest to go back to
>the `keyedProcessFunction` (or a custom operator?), and have a
>MapState currentStates field.
>Your key would be for example a timestamp of the beginning of
>your window. Value would be the latest state in this time
>window, annotated with a timestamp when this state was record.
>On each element:
>1. you determine the window’s begin ts (key of the map)
>2. If it’s first element, register an event time timer to
>publish results for that window’s end TS
>3.

Re: "Fill in" notification messages based on event time watermark

2020-04-28 Thread Manas Kale
Hi David and Piotrek,
Thank you both for your inputs.
I tried an implementation with the algorithm Piotrek suggested and David's
example. Although notifications are being generated with the watermark,
subsequent transition events are being received after the watermark has
crossed their timestamps. For example:
state1 @ 100
notification state1@ 110
notification state1@ 120
notification state1@ 130<- shouldn't have emitted this
state2 @ 125 <- watermark is > 125 at this stage

I think something might be subtly(?) wrong with how I have structured
upstream operators. The allowed lateness is 0 in the watermarkassigner
upstream, and I generate watermarks every x seconds.
The operator that emits state transitions is constructed using the
TumblingWindow approach I described in the first e-mail (so that I can
compute at every watermark update). Note that I can use this approach for
state-transition-operator because it only wants to emit transitions, and
nothing in between.
So, two questions:
1. Any idea on what might be causing this incorrect watermark behaviour?
2. If I want to perform some computation only when the watermark updates,
is using a watermark-aligned EventTimeTumblingWindow (meaning
windowDuration = watermarkUpdateInterval) the correct way to do this?


Regards,
Manas


On Tue, Apr 28, 2020 at 2:16 AM David Anderson  wrote:

> Following up on Piotr's outline, there's an example in the documentation
> of how to use a KeyedProcessFunction to implement an event-time tumbling
> window [1]. Perhaps that can help you get started.
>
> Regards,
> David
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/tutorials/event_driven.html#example
>
>
> On Mon, Apr 27, 2020 at 7:47 PM Piotr Nowojski 
> wrote:
>
>> Hi,
>>
>> I’m not sure, but I don’t think there is an existing window that would do
>> exactly what you want. I would suggest to go back to the
>> `keyedProcessFunction` (or a custom operator?), and have a
>> MapState currentStates field. Your key would
>> be for example a timestamp of the beginning of your window. Value would be
>> the latest state in this time window, annotated with a timestamp when this
>> state was record.
>>
>> On each element:
>>
>> 1. you determine the window’s begin ts (key of the map)
>> 2. If it’s first element, register an event time timer to publish results
>> for that window’s end TS
>> 3. look into the `currentStates` if it should be modified (if your new
>> element is newer or first value for the given key)
>>
>> On even time timer firing
>> 1. output the state matching to this timer
>> 2. Check if there is a (more recent) value for next window, and if not:
>>
>> 3. copy the value to next window
>> 4. Register a timer for this window to fire
>>
>> 5. Cleanup currentState and remove value for the no longed needed key.
>>
>> I hope this helps
>>
>> Piotrek
>>
>> On 27 Apr 2020, at 12:01, Manas Kale  wrote:
>>
>> Hi,
>> I have an upstream operator that outputs device state transition messages
>> with event timestamps. Meaning it only emits output when a transition takes
>> place.
>> For example,
>> state1 @ 1 PM
>> state2 @ 2 PM
>> and so on.
>>
>> *Using a downstream operator, I want to emit notification messages as per
>> some configured periodicity.* For example, if periodicity = 20 min, in
>> the above scenario this operator will output :
>> state1 notification @ 1PM
>> state1 notification @ 1.20PM
>> state1 notification @ 1.40PM
>>  ...
>>
>> *Now the main issue is that I want this to be driven by the watermark and
>> not by transition events received from upstream. *Meaning I would like
>> to see notification events as soon as the watermark crosses their
>> timestamps; *not* when the next transition event arrives at the operator
>> (which could be hours later, as above).
>>
>> My first solution, using a keyedProcessFunction and timers did not work
>> as expected because the order in which transition events arrived at this
>> operator was non-deterministic. To elaborate, assume a
>> setAutoWatermarkInterval of 10 second.
>> If we get transition events :
>> state1 @ 1sec
>> state2 @ 3 sec
>> state3 @ 5 sec
>> state1 @ 8 sec
>> the order in which these events arrived at my keyedProcessFunction was
>> not fixed. To solve this, these messages need to be sorted on event time,
>> which led me to my second solution.
>>
>> My second solution, using a EventTimeTumblingWindow with size =
>> setAutoWatermarkInterval, also does not work. I sorted accumulated events
>> in the window and applied notification-generation logic on them in order.
>> However, I assumed that windows are created even if there are no elements.
>> Since this is not the case, this solution generates notifications only when
>> the next state tranisition message arrives, which could be hours later.
>>
>> Does anyone have any suggestions on how I can implement this?
>> Thanks!
>>
>>
>>
>>
>>


"Fill in" notification messages based on event time watermark

2020-04-27 Thread Manas Kale
Hi,
I have an upstream operator that outputs device state transition messages
with event timestamps. Meaning it only emits output when a transition takes
place.
For example,
state1 @ 1 PM
state2 @ 2 PM
and so on.

*Using a downstream operator, I want to emit notification messages as per
some configured periodicity.* For example, if periodicity = 20 min, in the
above scenario this operator will output :
state1 notification @ 1PM
state1 notification @ 1.20PM
state1 notification @ 1.40PM
 ...

*Now the main issue is that I want this to be driven by the watermark and
not by transition events received from upstream. *Meaning I would like to
see notification events as soon as the watermark crosses their timestamps;
*not* when the next transition event arrives at the operator (which could
be hours later, as above).

My first solution, using a keyedProcessFunction and timers did not work as
expected because the order in which transition events arrived at this
operator was non-deterministic. To elaborate, assume a
setAutoWatermarkInterval of 10 second.
If we get transition events :
state1 @ 1sec
state2 @ 3 sec
state3 @ 5 sec
state1 @ 8 sec
the order in which these events arrived at my keyedProcessFunction was not
fixed. To solve this, these messages need to be sorted on event time, which
led me to my second solution.

My second solution, using a EventTimeTumblingWindow with size =
setAutoWatermarkInterval, also does not work. I sorted accumulated events
in the window and applied notification-generation logic on them in order.
However, I assumed that windows are created even if there are no elements.
Since this is not the case, this solution generates notifications only when
the next state tranisition message arrives, which could be hours later.

Does anyone have any suggestions on how I can implement this?
Thanks!


Re: Perform processing only when watermark updates, buffer data otherwise

2020-04-05 Thread Manas Kale
Hi Timo,
Thanks for the information.

On Thu, Apr 2, 2020 at 9:30 PM Timo Walther  wrote:

> Hi Manas,
>
> first of all, after assigning watermarks at the source level, usually
> Flink operators make sure to handle the watermarks.
>
> In case of a `union()`, the subsequent operator will increment its
> internal event-time clock and emit a new watermark only if all input
> streams (and their parallel instances) have reached a common event-time.
>
> Your sorting use case can be easily done with a KeyedProcessFunction
> [1]. You can buffer your events in a list state, and process them when a
> timer fires. The documentation also explains how to set a timer.
>
> If you want to fire when the next watermark arrives, you can set a timer
> like:
>
> ctx.timerService().currentWatermark() + 1
>
> The `union()` is meant for combining streams of the same data into one
> where the order of the event does not matter. However, watermarks are
> still arriving in order so a sorting by event-time should not be a problem.
>
> connect() is broader than a join (see also the answer here [2]).
>
> I hope I could answer most of your questions. Feel free to ask further
> questions.
>
> Regards,
> Timo
>
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html#process-function
> [2]
>
> https://stackoverflow.com/questions/52885926/what-is-the-difference-between-flink-join-and-connect
>
>
>
> On 02.04.20 12:11, Manas Kale wrote:
> > Also
> >
> >   *   What happens to watermarks after a union operation? Do I have to
> > assignTimestampsAndWatermarks() again? I guess I will have to since
> > multiple streams are being combined and Flink needs to know how to
> > resolve individual watermarks.
> >   * What is the difference between union() and connect()?
> >
> >
> > On Thu, Apr 2, 2020 at 10:33 AM Manas Kale  > <mailto:manaskal...@gmail.com>> wrote:
> >
> > Hi,
> > I want to perform some processing on events only when the watermark
> > is updated. Otherwise, for all other events, I want to keep
> > buffering them till the watermark arrives.
> > The main motivation behind doing this is that I have several
> > operators that emit events/messages to a downstream operator. Since
> > the order in which events arrive at the downstream operator is not
> > guaranteed to be in chronological event time, I want to manually
> > sort events when the watermark arrives and only then proceed.
> >
> > Specifically, I want to first combine multiple streams and then do
> > the above. Something like :
> > stream1.union(stream2, steream3)...
> >
> > One solution I am exploring is using a global window with a trigger
> > that will fire only when the watermark updates.
> > stream1.union(stream2, steream3).
> > keyBy(...).
> > window(GlobalWindows.create()).
> > trigger(new OnWatermarkUpdateTrigger()).
> > process(...)
> >
> > I will store the latest watermark in the trigger's state store. In
> > the onElement() method, I will FIRE if the current watermark is
> > different than the stored one.
> >
> > Is this the best way to implement the functionality described above?
> >
>
>


Re: Perform processing only when watermark updates, buffer data otherwise

2020-04-02 Thread Manas Kale
Also

   -  What happens to watermarks after a union operation? Do I have to
   assignTimestampsAndWatermarks() again? I guess I will have to since
   multiple streams are being combined and Flink needs to know how to resolve
   individual watermarks.
   - What is the difference between union() and connect()?


On Thu, Apr 2, 2020 at 10:33 AM Manas Kale  wrote:

> Hi,
> I want to perform some processing on events only when the watermark is
> updated. Otherwise, for all other events, I want to keep buffering them
> till the watermark arrives.
> The main motivation behind doing this is that I have several operators
> that emit events/messages to a downstream operator. Since the order in
> which events arrive at the downstream operator is not guaranteed to be in
> chronological event time, I want to manually sort events when the watermark
> arrives and only then proceed.
>
> Specifically, I want to first combine multiple streams and then do the
> above. Something like :
> stream1.union(stream2, steream3)...
>
> One solution I am exploring is using a global window with a trigger that
> will fire only when the watermark updates.
> stream1.union(stream2, steream3).
> keyBy(...).
> window(GlobalWindows.create()).
> trigger(new OnWatermarkUpdateTrigger()).
> process(...)
>
> I will store the latest watermark in the trigger's state store. In the
> onElement() method, I will FIRE if the current watermark is different than
> the stored one.
>
> Is this the best way to implement the functionality described above?
>
>


Perform processing only when watermark updates, buffer data otherwise

2020-04-01 Thread Manas Kale
Hi,
I want to perform some processing on events only when the watermark is
updated. Otherwise, for all other events, I want to keep buffering them
till the watermark arrives.
The main motivation behind doing this is that I have several operators that
emit events/messages to a downstream operator. Since the order in which
events arrive at the downstream operator is not guaranteed to be in
chronological event time, I want to manually sort events when the watermark
arrives and only then proceed.

Specifically, I want to first combine multiple streams and then do the
above. Something like :
stream1.union(stream2, steream3)...

One solution I am exploring is using a global window with a trigger that
will fire only when the watermark updates.
stream1.union(stream2, steream3).
keyBy(...).
window(GlobalWindows.create()).
trigger(new OnWatermarkUpdateTrigger()).
process(...)

I will store the latest watermark in the trigger's state store. In the
onElement() method, I will FIRE if the current watermark is different than
the stored one.

Is this the best way to implement the functionality described above?


Re: Emit message at start and end of event time session window

2020-03-26 Thread Manas Kale
Hi Till,
Thank you for the explanation, I understand the behaviour now.


On Thu, Mar 26, 2020 at 9:23 PM Till Rohrmann  wrote:

> A quick update concerning your observations. The reason why you are seeing
> the unordered output is because in the gist we used
> a AssignerWithPeriodicWatermarks which generates watermarks periodically.
> Due to this aspect, it can happen that Flink already process all elements
> up to "20" before it sees the next watermark which triggers the processing.
> If there are multiple windows being processed, Flink does not give a
> guarantee in which order this happens.
>
> You can avoid this behaviour if you used
> an AssignerWithPunctuatedWatermarks instead. This watermark assigner is
> called for every record. The updated gist [1] shows how it is used.
>
> [1] https://gist.github.com/tillrohrmann/dda90b8b0e67e379a8dfee967fbd9af1
>
> Cheers,
> Till
>
> On Thu, Mar 26, 2020 at 4:27 PM Till Rohrmann 
> wrote:
>
>> Hmm, I might have given you a bad advice. I think the problem becomes
>> harder because with Flink's window and trigger API we need to keep state
>> consistent between the Trigger and the Window function. Maybe it would be
>> easier to not rely on the windowing mechanism and instead to use Flink's
>> process function [1] to implement the logic yourself.
>>
>> With the process function you have basically a low level API with which
>> you can implement an operator which groups incoming events according to
>> sessions and outputs the required information.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html
>>
>> Cheers,
>> Till
>>
>> On Thu, Mar 26, 2020 at 11:27 AM Manas Kale 
>> wrote:
>>
>>> Hi Till,
>>> I see, thanks for the clarification.
>>> Assuming all other setting are the same, if I generate events as follows
>>> :
>>> Element.from("1", 1000L),
>>> Element.from("2", 2000L),
>>> Element.from("3", 3000L),
>>> Element.from("10", 1L)
>>> ,Element.from("11", 11000L),
>>> Element.from("12", 12000L),
>>> Element.from("20", 2L)
>>> we will expect 2 session windows to be created {1,2,3} and {10,11,12}
>>> with appropriate messages. However, when I run this, there seems to be a
>>> problem in the valueState of MyWindowFunction. Apparently that state is
>>> being shared by both the session windows, which leads to incorrect results.
>>> To solve this, I replaced it with a MapState. The Long is
>>> the start timestamp of a window, something that can uniquely identify
>>> different windows. This works but with one caveat : if we have two
>>> subsequent windows, the ordering of messages is :
>>>
>>> window1 started @ 1000 -> window2 started @ 1 -> window1 ended @
>>> 8000 -> window2 ended @ 17000
>>>
>>> whereas I expect it to be :
>>> window1 started @ 1000 -> window1 ended @ 8000 -> window2 started @
>>> 1 -> window2 ended @ 17000
>>>
>>> I thought Flink would execute event time timers and process events in
>>> chronological event time order. However, it seems that the onEventTime()
>>> invocation of window1 is called *after *elements from window2 have been
>>> processed even though window1's onEventTime() is earlier in event time.
>>>
>>> Is my approach and reasoning correct? Also, is it possible to get the
>>> messages in the expected order?
>>>
>>> Thanks!
>>>
>>>
>>>
>>>
>>>
>>> On Thu, Mar 26, 2020 at 2:55 PM Till Rohrmann 
>>> wrote:
>>>
>>>> Hi Manas,
>>>>
>>>> the problem is that the print() statement is being executed with a
>>>> different parallelism than 1. Due to this fact, the messages coming from
>>>> the window function will be sent in round-robin fashion to the print
>>>> operators. If you remove the setParallelism(1) from the window function,
>>>> then the window function will be executed with the same parallelism as the
>>>> print operator. Due to this fact, there is no round-robin distribution of
>>>> the events but every window function task will simply forward its
>>>> elements to its print operator task. You should be able to see these
>>>> topology differences in the web ui.
>>>>
>>&g

Re: How to move event time forward using externally generated watermark message

2020-03-26 Thread Manas Kale
Thanks for the help, Arvid!

On Tue, Mar 24, 2020 at 1:30 AM Arvid Heise  wrote:

> Hi Manas,
>
> both are valid options.
>
> I'd probably add a processing time timeout event in a process function,
> which will only trigger after no event has been received after 1 minute. In
> this way, you don't need to know which devices there are and just enqueue
> one timer per key (=device id).
>
> After the process function, you'd need to reapply your watermark assigner
> as processing time and event time usually don't mix well and need to be
> explicitly resolved.
>
> After the assigner, you can then simply filter out the timeout event and
> don't need to care in downstream operations.
>
> On Mon, Mar 23, 2020 at 11:42 AM Manas Kale  wrote:
>
>> Hi,
>> I have a scenario where I have an input event stream from various IoT
>> devices. Every message on this stream can be of some eventType and has an
>> eventTimestamp. Downstream, some business logic is implemented on this
>> based on event time.
>> In case a device goes offline, what's the best way to indicate to this
>> system that even time has progressed? Should I :
>>
>>- Send a special message that contains only event time information,
>>and write code to handle this message in all downstream operators?
>>
>>
>>- Implement some processing time timer in the system that will tick
>>the watermark forward if we don't see any message for some duration? I 
>> will
>>still need to write code in downstream operators that handles this timer's
>>trigger message.
>>
>> I would prefer not writing code to handle special watermark messages. So
>> does Flink provide any API level call that I can use to tick the watermark
>> forward for all downstream operators when this special message is received
>> / timer is fired?
>>
>


Re: Emit message at start and end of event time session window

2020-03-26 Thread Manas Kale
Hi Till,
I see, thanks for the clarification.
Assuming all other setting are the same, if I generate events as follows :
Element.from("1", 1000L),
Element.from("2", 2000L),
Element.from("3", 3000L),
Element.from("10", 1L)
,Element.from("11", 11000L),
Element.from("12", 12000L),
Element.from("20", 2L)
we will expect 2 session windows to be created {1,2,3} and {10,11,12} with
appropriate messages. However, when I run this, there seems to be a problem
in the valueState of MyWindowFunction. Apparently that state is being
shared by both the session windows, which leads to incorrect results.
To solve this, I replaced it with a MapState. The Long is
the start timestamp of a window, something that can uniquely identify
different windows. This works but with one caveat : if we have two
subsequent windows, the ordering of messages is :

window1 started @ 1000 -> window2 started @ 1 -> window1 ended @ 8000
-> window2 ended @ 17000

whereas I expect it to be :
window1 started @ 1000 -> window1 ended @ 8000 -> window2 started @ 1
-> window2 ended @ 17000

I thought Flink would execute event time timers and process events in
chronological event time order. However, it seems that the onEventTime()
invocation of window1 is called *after *elements from window2 have been
processed even though window1's onEventTime() is earlier in event time.

Is my approach and reasoning correct? Also, is it possible to get the
messages in the expected order?

Thanks!





On Thu, Mar 26, 2020 at 2:55 PM Till Rohrmann  wrote:

> Hi Manas,
>
> the problem is that the print() statement is being executed with a
> different parallelism than 1. Due to this fact, the messages coming from
> the window function will be sent in round-robin fashion to the print
> operators. If you remove the setParallelism(1) from the window function,
> then the window function will be executed with the same parallelism as the
> print operator. Due to this fact, there is no round-robin distribution of
> the events but every window function task will simply forward its
> elements to its print operator task. You should be able to see these
> topology differences in the web ui.
>
> You could configure the print() operator to run with a parallelism of 1 as
> well by adding a setParallelism(1) statement to it.
>
> Cheers,
> Till
>
> On Thu, Mar 26, 2020 at 7:11 AM Manas Kale  wrote:
>
>> Hi Till,
>> When I run the example code that you posted, the order of the three
>> messages (window started, contents of window and window ended) is
>> non-deterministic. This is surprising to me, as setParallelism(1) has been
>> used in the pipeline - I assumed this should eliminate any form of race
>> conditions for printing. What's more is that if I *remove*
>> setParallelism(1) from the code, the output is deterministic and correct
>> (i.e. windowStarted -> windowContents -> windowEnded).
>>
>> Clearly, something is wrong with my understanding. What is it?
>>
>> On Fri, Feb 28, 2020 at 1:58 PM Till Rohrmann 
>> wrote:
>>
>>> Great to hear that you solved the problem. Let us know if you run into
>>> any other issues.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Fri, Feb 28, 2020 at 8:08 AM Manas Kale 
>>> wrote:
>>>
>>>> Hi,
>>>> This problem is solved[1]. The issue was that the BroadcastStream did
>>>> not contain any watermark, which prevented watermarks for any downstream
>>>> operators from advancing.
>>>> I appreciate all the help.
>>>> [1]
>>>> https://stackoverflow.com/questions/60430520/how-do-i-fire-downstream-oneventtime-method-when-using-broadcaststate-pattern
>>>>
>>>> Thanks,
>>>> Manas
>>>>
>>>> On Thu, Feb 27, 2020 at 4:28 PM Manas Kale 
>>>> wrote:
>>>>
>>>>> Hi Rafi and Till,
>>>>> Thank you for pointing out that edge case, Rafi.
>>>>>
>>>>> Till, I am trying to get this example working with the BroadcastState
>>>>> pattern upstream to the window operator[1]. The problem is that 
>>>>> introducing
>>>>> the BroadcastState makes the onEventTime() *never* fire. Is the
>>>>> BroadcastState somehow eating up the watermark? Do I need to generate the
>>>>> watermark again in the KeyedBroadcastProcessFunction?
>>>>>
>>>>> [1] https://gist.github.com/manasIU/1777c9c99e195a409441815559094b49
>>>>>
>>>>&g

Re: Emit message at start and end of event time session window

2020-03-26 Thread Manas Kale
Hi Till,
When I run the example code that you posted, the order of the three
messages (window started, contents of window and window ended) is
non-deterministic. This is surprising to me, as setParallelism(1) has been
used in the pipeline - I assumed this should eliminate any form of race
conditions for printing. What's more is that if I *remove*
setParallelism(1) from the code, the output is deterministic and correct
(i.e. windowStarted -> windowContents -> windowEnded).

Clearly, something is wrong with my understanding. What is it?

On Fri, Feb 28, 2020 at 1:58 PM Till Rohrmann  wrote:

> Great to hear that you solved the problem. Let us know if you run into any
> other issues.
>
> Cheers,
> Till
>
> On Fri, Feb 28, 2020 at 8:08 AM Manas Kale  wrote:
>
>> Hi,
>> This problem is solved[1]. The issue was that the BroadcastStream did not
>> contain any watermark, which prevented watermarks for any downstream
>> operators from advancing.
>> I appreciate all the help.
>> [1]
>> https://stackoverflow.com/questions/60430520/how-do-i-fire-downstream-oneventtime-method-when-using-broadcaststate-pattern
>>
>> Thanks,
>> Manas
>>
>> On Thu, Feb 27, 2020 at 4:28 PM Manas Kale  wrote:
>>
>>> Hi Rafi and Till,
>>> Thank you for pointing out that edge case, Rafi.
>>>
>>> Till, I am trying to get this example working with the BroadcastState
>>> pattern upstream to the window operator[1]. The problem is that introducing
>>> the BroadcastState makes the onEventTime() *never* fire. Is the
>>> BroadcastState somehow eating up the watermark? Do I need to generate the
>>> watermark again in the KeyedBroadcastProcessFunction?
>>>
>>> [1] https://gist.github.com/manasIU/1777c9c99e195a409441815559094b49
>>>
>>> Thanks,
>>> Manas
>>>
>>> On Fri, Feb 21, 2020 at 8:55 PM Till Rohrmann 
>>> wrote:
>>>
>>>> Hi Manas and Rafi,
>>>>
>>>> you are right that when using merging windows as event time session
>>>> windows are, then Flink requires that any state the Trigger keeps is of
>>>> type MergingState. This constraint allows that the state can be merged
>>>> whenever two windows get merged.
>>>>
>>>> Rafi, you are right. With the current implementation it might happen
>>>> that you send a wrong started window message. I think it depends on the
>>>> MIN_WINDOW_SIZE and the distribution of your timestamps and, hence, also
>>>> your watermark. If you want to be on the safe side, then I would recommend
>>>> to use the ProcessFunction to implement the required logic. The
>>>> ProcessFunction [1] is Flink's low level API and gives you access to state
>>>> and timers. In it, you would need to buffer the elements and to sessionize
>>>> them yourself, though. However, it would give you access to the
>>>> watermark which in turn would allow you to properly handle your described
>>>> edge case.
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Thu, Feb 20, 2020 at 12:25 PM Rafi Aroch 
>>>> wrote:
>>>>
>>>>> I think one "edge" case which is not handled would be that the first
>>>>> event (by event-time) arrives late, then a wrong "started-window" would be
>>>>> reported.
>>>>>
>>>>> Rafi
>>>>>
>>>>>
>>>>> On Thu, Feb 20, 2020 at 12:36 PM Manas Kale 
>>>>> wrote:
>>>>>
>>>>>> Is the reason ValueState cannot be use because session windows are
>>>>>> always formed by merging proto-windows of single elements, therefore a
>>>>>> state store is needed that can handle merging. ValueState does not 
>>>>>> provide
>>>>>> this functionality, but a ReducingState does?
>>>>>>
>>>>>> On Thu, Feb 20, 2020 at 4:01 PM Manas Kale 
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Till,
>>>>>>> Thanks for your answer! You also answered the next question that I
>>>>>>> was about to ask "Can we share state between a Trigger and a Window?"
>>>>>>> Currently the only (convoluted) way to share state between two 
>>

How to move event time forward using externally generated watermark message

2020-03-23 Thread Manas Kale
Hi,
I have a scenario where I have an input event stream from various IoT
devices. Every message on this stream can be of some eventType and has an
eventTimestamp. Downstream, some business logic is implemented on this
based on event time.
In case a device goes offline, what's the best way to indicate to this
system that even time has progressed? Should I :

   - Send a special message that contains only event time information, and
   write code to handle this message in all downstream operators?


   - Implement some processing time timer in the system that will tick the
   watermark forward if we don't see any message for some duration? I will
   still need to write code in downstream operators that handles this timer's
   trigger message.

I would prefer not writing code to handle special watermark messages. So
does Flink provide any API level call that I can use to tick the watermark
forward for all downstream operators when this special message is received
/ timer is fired?


Re: Emit message at start and end of event time session window

2020-02-27 Thread Manas Kale
Hi,
This problem is solved[1]. The issue was that the BroadcastStream did not
contain any watermark, which prevented watermarks for any downstream
operators from advancing.
I appreciate all the help.
[1]
https://stackoverflow.com/questions/60430520/how-do-i-fire-downstream-oneventtime-method-when-using-broadcaststate-pattern

Thanks,
Manas

On Thu, Feb 27, 2020 at 4:28 PM Manas Kale  wrote:

> Hi Rafi and Till,
> Thank you for pointing out that edge case, Rafi.
>
> Till, I am trying to get this example working with the BroadcastState
> pattern upstream to the window operator[1]. The problem is that introducing
> the BroadcastState makes the onEventTime() *never* fire. Is the
> BroadcastState somehow eating up the watermark? Do I need to generate the
> watermark again in the KeyedBroadcastProcessFunction?
>
> [1] https://gist.github.com/manasIU/1777c9c99e195a409441815559094b49
>
> Thanks,
> Manas
>
> On Fri, Feb 21, 2020 at 8:55 PM Till Rohrmann 
> wrote:
>
>> Hi Manas and Rafi,
>>
>> you are right that when using merging windows as event time session
>> windows are, then Flink requires that any state the Trigger keeps is of
>> type MergingState. This constraint allows that the state can be merged
>> whenever two windows get merged.
>>
>> Rafi, you are right. With the current implementation it might happen that
>> you send a wrong started window message. I think it depends on the
>> MIN_WINDOW_SIZE and the distribution of your timestamps and, hence, also
>> your watermark. If you want to be on the safe side, then I would recommend
>> to use the ProcessFunction to implement the required logic. The
>> ProcessFunction [1] is Flink's low level API and gives you access to state
>> and timers. In it, you would need to buffer the elements and to sessionize
>> them yourself, though. However, it would give you access to the
>> watermark which in turn would allow you to properly handle your described
>> edge case.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html
>>
>> Cheers,
>> Till
>>
>> Cheers,
>> Till
>>
>> On Thu, Feb 20, 2020 at 12:25 PM Rafi Aroch  wrote:
>>
>>> I think one "edge" case which is not handled would be that the first
>>> event (by event-time) arrives late, then a wrong "started-window" would be
>>> reported.
>>>
>>> Rafi
>>>
>>>
>>> On Thu, Feb 20, 2020 at 12:36 PM Manas Kale 
>>> wrote:
>>>
>>>> Is the reason ValueState cannot be use because session windows are
>>>> always formed by merging proto-windows of single elements, therefore a
>>>> state store is needed that can handle merging. ValueState does not provide
>>>> this functionality, but a ReducingState does?
>>>>
>>>> On Thu, Feb 20, 2020 at 4:01 PM Manas Kale 
>>>> wrote:
>>>>
>>>>> Hi Till,
>>>>> Thanks for your answer! You also answered the next question that I was
>>>>> about to ask "Can we share state between a Trigger and a Window?" 
>>>>> Currently
>>>>> the only (convoluted) way to share state between two operators is through
>>>>> the broadcast state pattern, right?
>>>>> Also, in your example, why can't we use a
>>>>> ValueStateDescriptor in the Trigger? I tried using it in my own
>>>>> example but it  I am not able to  call the mergePartitionedState() method
>>>>> on a ValueStateDescriptor.
>>>>>
>>>>> Regards,
>>>>> Manas
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Feb 18, 2020 at 7:20 PM Till Rohrmann 
>>>>> wrote:
>>>>>
>>>>>> Hi Manas,
>>>>>>
>>>>>> you can implement something like this with a bit of trigger magic.
>>>>>> What you need to do is to define your own trigger implementation which
>>>>>> keeps state to remember whether it has triggered the "started window"
>>>>>> message or not. In the stateful window function you would need to do
>>>>>> something similar. The first call could trigger the output of "window
>>>>>> started" and any subsequent call will trigger the evaluation of the 
>>>>>> window.
>>>>>> It would have been a bit easier if the trigger and the window process
>>>>>> function could share its internal state. Unfortuna

Re: Emit message at start and end of event time session window

2020-02-27 Thread Manas Kale
Hi Rafi and Till,
Thank you for pointing out that edge case, Rafi.

Till, I am trying to get this example working with the BroadcastState
pattern upstream to the window operator[1]. The problem is that introducing
the BroadcastState makes the onEventTime() *never* fire. Is the
BroadcastState somehow eating up the watermark? Do I need to generate the
watermark again in the KeyedBroadcastProcessFunction?

[1] https://gist.github.com/manasIU/1777c9c99e195a409441815559094b49

Thanks,
Manas

On Fri, Feb 21, 2020 at 8:55 PM Till Rohrmann  wrote:

> Hi Manas and Rafi,
>
> you are right that when using merging windows as event time session
> windows are, then Flink requires that any state the Trigger keeps is of
> type MergingState. This constraint allows that the state can be merged
> whenever two windows get merged.
>
> Rafi, you are right. With the current implementation it might happen that
> you send a wrong started window message. I think it depends on the
> MIN_WINDOW_SIZE and the distribution of your timestamps and, hence, also
> your watermark. If you want to be on the safe side, then I would recommend
> to use the ProcessFunction to implement the required logic. The
> ProcessFunction [1] is Flink's low level API and gives you access to state
> and timers. In it, you would need to buffer the elements and to sessionize
> them yourself, though. However, it would give you access to the
> watermark which in turn would allow you to properly handle your described
> edge case.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html
>
> Cheers,
> Till
>
> Cheers,
> Till
>
> On Thu, Feb 20, 2020 at 12:25 PM Rafi Aroch  wrote:
>
>> I think one "edge" case which is not handled would be that the first
>> event (by event-time) arrives late, then a wrong "started-window" would be
>> reported.
>>
>> Rafi
>>
>>
>> On Thu, Feb 20, 2020 at 12:36 PM Manas Kale 
>> wrote:
>>
>>> Is the reason ValueState cannot be use because session windows are
>>> always formed by merging proto-windows of single elements, therefore a
>>> state store is needed that can handle merging. ValueState does not provide
>>> this functionality, but a ReducingState does?
>>>
>>> On Thu, Feb 20, 2020 at 4:01 PM Manas Kale 
>>> wrote:
>>>
>>>> Hi Till,
>>>> Thanks for your answer! You also answered the next question that I was
>>>> about to ask "Can we share state between a Trigger and a Window?" Currently
>>>> the only (convoluted) way to share state between two operators is through
>>>> the broadcast state pattern, right?
>>>> Also, in your example, why can't we use a ValueStateDescriptor
>>>> in the Trigger? I tried using it in my own example but it  I am not able
>>>> to  call the mergePartitionedState() method on a ValueStateDescriptor.
>>>>
>>>> Regards,
>>>> Manas
>>>>
>>>>
>>>>
>>>> On Tue, Feb 18, 2020 at 7:20 PM Till Rohrmann 
>>>> wrote:
>>>>
>>>>> Hi Manas,
>>>>>
>>>>> you can implement something like this with a bit of trigger magic.
>>>>> What you need to do is to define your own trigger implementation which
>>>>> keeps state to remember whether it has triggered the "started window"
>>>>> message or not. In the stateful window function you would need to do
>>>>> something similar. The first call could trigger the output of "window
>>>>> started" and any subsequent call will trigger the evaluation of the 
>>>>> window.
>>>>> It would have been a bit easier if the trigger and the window process
>>>>> function could share its internal state. Unfortunately, this is not
>>>>> possible at the moment.
>>>>>
>>>>> I've drafted a potential solution which you can find here [1].
>>>>>
>>>>> [1]
>>>>> https://gist.github.com/tillrohrmann/5251f6d62e256b60947eea7b553519ef
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Mon, Feb 17, 2020 at 8:09 AM Manas Kale 
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>> I want to achieve the following using event time session windows:
>>>>>>
>>>>>>1. When the window.getStart() and last event timestamp in the
>>>>>>window is greater than MIN_WINDOW_SIZE milliseconds, I want to emit a
>>>>>>message "Window started @ timestamp".
>>>>>>2. When the session window ends, i.e. the watermark passes
>>>>>>lasteventTimestamp + inactivityPeriod, I want to emit a message 
>>>>>> "Window
>>>>>>ended @ timestamp".
>>>>>>
>>>>>>  It is guaranteed that all events are on time and no lateness is
>>>>>> allowed. I am having difficulty implementing both 1 and 2 simultaneously.
>>>>>> I am able to implement point 1 using a custom trigger, which checks
>>>>>> if  (lastEventTimestamp - window.getStart()) > MIN_WINDOW_SIZE and 
>>>>>> triggers
>>>>>> a customProcessWindowFunction().
>>>>>> However, with this architecture I can't detect the end of the window.
>>>>>>
>>>>>> Is my approach correct or is there a completely different method to
>>>>>> achieve this?
>>>>>>
>>>>>> Thanks,
>>>>>> Manas Kale
>>>>>>
>>>>>>
>>>>>>
>>>>>>


Re: Emit message at start and end of event time session window

2020-02-20 Thread Manas Kale
Is the reason ValueState cannot be use because session windows are always
formed by merging proto-windows of single elements, therefore a state store
is needed that can handle merging. ValueState does not provide this
functionality, but a ReducingState does?

On Thu, Feb 20, 2020 at 4:01 PM Manas Kale  wrote:

> Hi Till,
> Thanks for your answer! You also answered the next question that I was
> about to ask "Can we share state between a Trigger and a Window?" Currently
> the only (convoluted) way to share state between two operators is through
> the broadcast state pattern, right?
> Also, in your example, why can't we use a ValueStateDescriptor in
> the Trigger? I tried using it in my own example but it  I am not able to
> call the mergePartitionedState() method on a ValueStateDescriptor.
>
> Regards,
> Manas
>
>
>
> On Tue, Feb 18, 2020 at 7:20 PM Till Rohrmann 
> wrote:
>
>> Hi Manas,
>>
>> you can implement something like this with a bit of trigger magic. What
>> you need to do is to define your own trigger implementation which keeps
>> state to remember whether it has triggered the "started window" message or
>> not. In the stateful window function you would need to do something
>> similar. The first call could trigger the output of "window started" and
>> any subsequent call will trigger the evaluation of the window. It would
>> have been a bit easier if the trigger and the window process function could
>> share its internal state. Unfortunately, this is not possible at the moment.
>>
>> I've drafted a potential solution which you can find here [1].
>>
>> [1] https://gist.github.com/tillrohrmann/5251f6d62e256b60947eea7b553519ef
>>
>> Cheers,
>> Till
>>
>> On Mon, Feb 17, 2020 at 8:09 AM Manas Kale  wrote:
>>
>>> Hi,
>>> I want to achieve the following using event time session windows:
>>>
>>>1. When the window.getStart() and last event timestamp in the window
>>>is greater than MIN_WINDOW_SIZE milliseconds, I want to emit a message
>>>"Window started @ timestamp".
>>>2. When the session window ends, i.e. the watermark passes
>>>lasteventTimestamp + inactivityPeriod, I want to emit a message "Window
>>>ended @ timestamp".
>>>
>>>  It is guaranteed that all events are on time and no lateness is
>>> allowed. I am having difficulty implementing both 1 and 2 simultaneously.
>>> I am able to implement point 1 using a custom trigger, which checks if
>>> (lastEventTimestamp - window.getStart()) > MIN_WINDOW_SIZE and triggers a
>>> customProcessWindowFunction().
>>> However, with this architecture I can't detect the end of the window.
>>>
>>> Is my approach correct or is there a completely different method to
>>> achieve this?
>>>
>>> Thanks,
>>> Manas Kale
>>>
>>>
>>>
>>>


Re: Emit message at start and end of event time session window

2020-02-20 Thread Manas Kale
Hi Till,
Thanks for your answer! You also answered the next question that I was
about to ask "Can we share state between a Trigger and a Window?" Currently
the only (convoluted) way to share state between two operators is through
the broadcast state pattern, right?
Also, in your example, why can't we use a ValueStateDescriptor in
the Trigger? I tried using it in my own example but it  I am not able to
call the mergePartitionedState() method on a ValueStateDescriptor.

Regards,
Manas



On Tue, Feb 18, 2020 at 7:20 PM Till Rohrmann  wrote:

> Hi Manas,
>
> you can implement something like this with a bit of trigger magic. What
> you need to do is to define your own trigger implementation which keeps
> state to remember whether it has triggered the "started window" message or
> not. In the stateful window function you would need to do something
> similar. The first call could trigger the output of "window started" and
> any subsequent call will trigger the evaluation of the window. It would
> have been a bit easier if the trigger and the window process function could
> share its internal state. Unfortunately, this is not possible at the moment.
>
> I've drafted a potential solution which you can find here [1].
>
> [1] https://gist.github.com/tillrohrmann/5251f6d62e256b60947eea7b553519ef
>
> Cheers,
> Till
>
> On Mon, Feb 17, 2020 at 8:09 AM Manas Kale  wrote:
>
>> Hi,
>> I want to achieve the following using event time session windows:
>>
>>1. When the window.getStart() and last event timestamp in the window
>>is greater than MIN_WINDOW_SIZE milliseconds, I want to emit a message
>>"Window started @ timestamp".
>>2. When the session window ends, i.e. the watermark passes
>>lasteventTimestamp + inactivityPeriod, I want to emit a message "Window
>>ended @ timestamp".
>>
>>  It is guaranteed that all events are on time and no lateness is allowed.
>> I am having difficulty implementing both 1 and 2 simultaneously.
>> I am able to implement point 1 using a custom trigger, which checks if
>> (lastEventTimestamp - window.getStart()) > MIN_WINDOW_SIZE and triggers a
>> customProcessWindowFunction().
>> However, with this architecture I can't detect the end of the window.
>>
>> Is my approach correct or is there a completely different method to
>> achieve this?
>>
>> Thanks,
>> Manas Kale
>>
>>
>>
>>


Emit message at start and end of event time session window

2020-02-16 Thread Manas Kale
Hi,
I want to achieve the following using event time session windows:

   1. When the window.getStart() and last event timestamp in the window is
   greater than MIN_WINDOW_SIZE milliseconds, I want to emit a message "Window
   started @ timestamp".
   2. When the session window ends, i.e. the watermark passes
   lasteventTimestamp + inactivityPeriod, I want to emit a message "Window
   ended @ timestamp".

 It is guaranteed that all events are on time and no lateness is allowed. I
am having difficulty implementing both 1 and 2 simultaneously.
I am able to implement point 1 using a custom trigger, which checks if
(lastEventTimestamp - window.getStart()) > MIN_WINDOW_SIZE and triggers a
customProcessWindowFunction().
However, with this architecture I can't detect the end of the window.

Is my approach correct or is there a completely different method to achieve
this?

Thanks,
Manas Kale