Re: Error while connecting with MSSQL server

2020-12-09 Thread aj
Sure thanks Flavio, will check it out

On Wed, Dec 9, 2020, 16:20 Flavio Pompermaier  wrote:

> I issued a PR some time ago at https://github.com/apache/flink/pull/12038 but
> Flink committers were busy in refactoring that part..I don't know if it is
> still required to have that part into the jdbc connector Flink code of if
> using the new factories (that use the java services) you could register
> your own dialect putting your code in a separate jar
>
> On Tue, Dec 8, 2020 at 7:02 AM Jark Wu  wrote:
>
>> Hi,
>>
>> Currently, flink-connector-jdbc doesn't support MS Server dialect. Only
>> MySQL and Postgres are supported.
>>
>> Best,
>> Jark
>>
>> On Tue, 8 Dec 2020 at 01:20, aj  wrote:
>>
>>> Hello ,
>>>
>>> I am trying to create a table with microsoft sql server  using flink sql
>>>
>>> CREATE TABLE sampleSQLSink (
>>> id INTEGER
>>> message STRING,
>>> ts TIMESTAMP(3),
>>> proctime AS PROCTIME()
>>> ) WITH (
>>> 'connector' = 'jdbc',
>>> 'driver' = 'com.microsoft.sqlserver.jdbc.SQLServerDriver',
>>> 'url' = 'jdbc:sqlserver://samplecustsql.database.windows.net:1433
>>> ;database=customerdb',
>>> 'username'=
>>> 'password'=
>>> 'table-name' =
>>> );
>>>
>>>
>>> select * from sampleSQLSink
>>>
>>>
>>> I am getting this error
>>>
>>> ERROR] Could not execute SQL statement. Reason:
>>> java.lang.IllegalStateException: Cannot handle such jdbc url:
>>> jdbc:sqlserver://samplecustsql.database.windows.net:1433
>>> ;database=customerdb
>>>
>>>
>>> Can somedoby help what is wrong.
>>>
>>> I am using microsoft jdbc driver.
>>>
>>


Stream job getting Failed

2020-12-09 Thread aj
I have a Flink stream job that reads data from Kafka and writes it to S3.
This job keeps failing after running for 2-3 days.
I am not able to find anything in logs why it's failing. Can somebody help
me how to find out the cause of failure?

I can only see this in logs :

 org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] -
Subtask 7 received completion notification for checkpoint with id=608.
2020-12-09 16:41:56,110 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
[] - RECEIVED SIGNAL 15: SIGTERM. Shutting down as
requested.
2020-12-09 16:41:56,111 INFO
 org.apache.flink.runtime.blob.TransientBlobCache [] - Shutting
down BLOB cache
2020-12-09 16:41:56,111 INFO
 org.apache.flink.runtime.blob.PermanentBlobCache [] - Shutting
down BLOB cache
2020-12-09 16:41:56,111 INFO
 org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager [] -
Shutting down TaskExecutorLocalStateStoresManager.
2020-12-09 16:41:56,115 INFO  org.apache.flink.runtime.filecache.FileCache
[] - removed file cache directory
/mnt1/yarn/usercache/hadoop/appcache/application_1603267081962_94843/flink-dist-cache-fd5d7eae-bff7-4d74-89d8-0a40f174b7b8
2020-12-09 16:41:56,115 INFO  org.apache.flink.runtime.filecache.FileCache
[] - removed file cache directory
/mnt2/yarn/usercache/hadoop/appcache/application_1603267081962_94843/flink-dist-cache-c5833412-5944-4b41-a502-5d952f5156af
2020-12-09 16:41:56,115 INFO
 org.apache.flink.runtime.io.disk.FileChannelManagerImpl  [] -
FileChannelManager removed spill file directory
/mnt1/yarn/usercache/hadoop/appcache/application_1603267081962_94843/flink-io-e290b3fd-9110-47c4-9463-1bd08003afc9
2020-12-09 16:41:56,115 INFO  org.apache.flink.runtime.filecache.FileCache
[] - removed file cache directory
/mnt3/yarn/usercache/hadoop/appcache/application_1603267081962_94843/flink-dist-cache-bf0d69fe-0f00-4483-8b20-0056a049f86b
2020-12-09 16:41:56,115 INFO
 org.apache.flink.runtime.io.disk.FileChannelManagerImpl  [] -
FileChannelManager removed spill file directory
/mnt2/yarn/usercache/hadoop/appcache/application_1603267081962_94843/flink-io-55b8467d-8c16-441a-83d6-393462a0b4ca
2020-12-09 16:41:56,115 INFO  org.apache.flink.runtime.filecache.FileCache
[] - removed file cache directory
/mnt/yarn/usercache/hadoop/appcache/application_1603267081962_94843/flink-dist-cache-8bc77a7c-f62b-4f06-b963-41f174a0db8e
2020-12-09 16:41:56,115 INFO
 org.apache.flink.runtime.io.disk.FileChannelManagerImpl  [] -
FileChannelManager removed spill file directory
/mnt3/yarn/usercache/hadoop/appcache/application_1603267081962_94843/flink-io-bf57f8db-0152-4697-b743-d07b4e46c9d7
2020-12-09 16:41:56,115 INFO
 org.apache.flink.runtime.io.disk.FileChannelManagerImpl  [] -
FileChannelManager removed spill file directory
/mnt/yarn/usercache/hadoop/appcache/application_1603267081962_94843/flink-io-d650ed68-9c44-45b9-9b41-d501152b3f0f
2020-12-09 16:41:56,120 INFO
 org.apache.flink.runtime.io.disk.FileChannelManagerImpl  [] -
FileChannelManager removed spill file directory
/mnt1/yarn/usercache/hadoop/appcache/application_1603267081962_94843/flink-netty-shuffle-9311e006-fee0-4317-9355-5d981c558a08
2020-12-09 16:41:56,120 INFO
 org.apache.flink.runtime.io.disk.FileChannelManagerImpl  [] -
FileChannelManager removed spill file directory
/mnt2/yarn/usercache/hadoop/appcache/application_1603267081962_94843/flink-netty-shuffle-c633cf9f-8220-433a-8f3e-04d45e81efde
2020-12-09 16:41:56,120 INFO
 org.apache.flink.runtime.io.disk.FileChannelManagerImpl  [] -
FileChannelManager removed spill file directory
/mnt3/yarn/usercache/hadoop/appcache/application_1603267081962_94843/flink-netty-shuffle-671eda78-9981-4f6d-bff4-25cca973d76d
2020-12-09 16:41:56,120 INFO
 org.apache.flink.runtime.io.disk.FileChannelManagerImpl  [] -
FileChannelManager removed spill file directory
/mnt/yarn/usercache/hadoop/appcache/application_1603267081962_94843/flink-netty-shuffle-5efa7701-72da-4d91-b9f7-7e6963ffefdb

End of LogType:taskmanager.log



End of LogType:taskmanager.out



-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07






Error while connecting with MSSQL server

2020-12-07 Thread aj
Hello ,

I am trying to create a table with microsoft sql server  using flink sql

CREATE TABLE sampleSQLSink (
id INTEGER
message STRING,
ts TIMESTAMP(3),
proctime AS PROCTIME()
) WITH (
'connector' = 'jdbc',
'driver' = 'com.microsoft.sqlserver.jdbc.SQLServerDriver',
'url' = 'jdbc:sqlserver://samplecustsql.database.windows.net:1433
;database=customerdb',
'username'=
'password'=
'table-name' =
);


select * from sampleSQLSink


I am getting this error

ERROR] Could not execute SQL statement. Reason:
java.lang.IllegalStateException: Cannot handle such jdbc url:
jdbc:sqlserver://samplecustsql.database.windows.net:1433;database=customerdb


Can somedoby help what is wrong.

I am using microsoft jdbc driver.


Re: Broadcasting control messages to a sink

2020-12-04 Thread aj
Hi Jafee,

Can u please help me out with the sample code how you have written the
custom sink and how you using this broadcast pattern to update schema at
run time. It will help me.

On Sat, Oct 17, 2020 at 1:55 PM Piotr Nowojski  wrote:

> Hi Julian,
>
> Glad to hear it worked! And thanks for coming back to us :)
>
> Best,
> Piotrek
>
> sob., 17 paź 2020 o 04:22 Jaffe, Julian 
> napisał(a):
>
>> Hey Piotr,
>>
>>
>>
>> Thanks for your help! The main thing I was missing was the .broadcast
>> partition operation on a stream (searching for “broadcasting” obviously
>> brought up the broadcast state pattern). This coupled with my
>> misunderstanding of an error in my code as being an error in Flink code
>> resulted in me making this a much harder problem than it needed to be.
>>
>>
>>
>> For anyone who may find this in the future, Piotr’s suggestion is pretty
>> spot-on. I wound up broadcasting (as in the partitioning strategy) my
>> schema stream and connecting it to my event stream. I then processed those
>> using a CoProcessFunction, using the schema messages to update the parsing
>> for the events. I also emitted a side output message when I processed a new
>> schema, using the same type as my main output messages. I once again
>> broadcast-as-in-partitioning the side output stream, unioned it with my
>> processed output from the CoProcessFunction and passed it to my sink,
>> making sure to handle control messages before attempting to do any
>> bucketing.
>>
>>
>>
>> In poor ASCII art, it looks something like the below:
>>
>>
>>
>>
>>
>> ___   
>>
>> | Schema Source || Event Source |
>>
>> ---  ---
>>
>>   | |
>>
>>Broadcast |
>>
>>   |__   |
>>
>>- | Processor | ---
>>
>>   |  | ---    Control
>> message side output
>>
>>---   |
>>
>>  |  |
>>
>>  |   Broadcast
>>
>>  |  |
>>
>> Union  --
>>
>>  |
>>
>>   ___
>>
>>  |   Sink   |
>>
>>   ---
>>
>>
>>
>> I hope this is helpful to someone.
>>
>>
>>
>> Julian
>>
>>
>>
>> *From: *Piotr Nowojski 
>> *Date: *Wednesday, October 14, 2020 at 11:22 PM
>> *To: *"Jaffe, Julian" 
>> *Cc: *"user@flink.apache.org" 
>> *Subject: *Re: Broadcasting control messages to a sink
>>
>>
>>
>> Hi Julian,
>>
>>
>>
>> I think the problem is that BroadcastProcessFunction and SinkFunction
>> will be executed by separate operators, so they won't be able to share
>> state. If you can not split your logic into two, I think you will have to
>> workaround this problem differently.
>>
>>
>>
>> 1. Relay on operator chaining and wire both of them together.
>>
>>
>>
>> If you set up your BroadcastProcessFunction and SinkFunction one after
>> another, with the same parallelism, with the default chaining, without any
>> rebalance/keyBy in between, you can be sure they will be chained together.
>> So the output type of your record between BroadcastProcessFunction and
>> SinkFunction, can be a Union type, of a) your actual payload, b)
>> broadcasted message. Upon initialization/before processing first record, if
>> you have any broadcast state, you would need to forward it's content to the
>> downstream SinkFunction as well.
>>
>>
>>
>> 2. Another solution is that maybe you can try to embed SinkFunction
>> inside the BroadcastProcessFunction? This will require some
>> careful proxying and wrapping calls.
>>
>> 3. As always, you can also write a custom operator that will be doing the
>> same thing.
>>
>>
>>
>> For the 2. and 3. I'm not entirely sure if there are some gotchas that I
>> haven't thought through (state handling?), so if you can make 1. work for
>> you, it will probably be a safer route.
>>
>>
>>
>> Best,
>>
>> Piotrek
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> śr., 14 paź 2020 o 19:42 Jaffe, Julian 
>> napisał(a):
>>
>> Thanks for the suggestion Piotr!
>>
>>
>>
>> The problem is that the sink needs to have access to the schema (so that
>> it can write the schema only once per file instead of record) and thus
>> needs to know when the schema has been updated. In this proposed
>> architecture, I think the sink would still need to check each record to see
>> if the current schema matches the new record or not? The main problem I
>> encountered when playing around with broadcast state was that I couldn’t
>> figure out how to access the broadcast state within the sink, but perhaps I
>> just haven’t thought about it the right way. I’ll meditate 

Re: Broadcasting control messages to a sink

2020-10-15 Thread aj
Hi Jaffe,

I am also working on something similar type of a problem.

I am receiving a set of events in Avro format on different topics. I want
to consume these and write to s3 in parquet format.
I have written a  job that creates a different stream for each event and
fetches its schema from the confluent schema registry to create a
parquet sink for an event.
This is working fine but the only problem I am facing is whenever a new
event start coming or any change in the schema  I have to change in the
YAML config and restart the job every time. Is there any way I do not have
to restart the job and it starts consuming a new set of events?

As I see you are handling schema evolution can u help me with this and also
how can I handle the new events.  In the config, I am keeping a mapping of
events and schema subjects.  Please share how you solving this.


So currently this is the way I am doing it but wanna know some better way
to handle it.

FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer(topics,
new KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
properties);

DataStream dataStream =
streamExecutionEnvironment.addSource(flinkKafkaConsumer).name("source");

try {
for (EventConfig eventConfig : eventTypesList) {

LOG.info("creating a stream for ", eventConfig.getEvent_name());

final StreamingFileSink sink = StreamingFileSink.forBulkFormat
(path, 
ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(eventConfig.getSchema_subject(),
registryClient)))
.withBucketAssigner(new EventTimeBucketAssigner())
.build();

DataStream outStream =
dataStream.filter((FilterFunction) genericRecord -> {
if (genericRecord != null &&
genericRecord.get(EVENT_NAME).toString().equals(eventConfig.getEvent_name()))
{
return true;
}
return false;
});

outStream.addSink(sink).name(eventConfig.getSink_id()).setParallelism(parallelism);

}
} catch (Exception e) {
e.printStackTrace();
}


On Wed, Oct 14, 2020, 23:12 Jaffe, Julian 
wrote:

> Thanks for the suggestion Piotr!
>
>
>
> The problem is that the sink needs to have access to the schema (so that
> it can write the schema only once per file instead of record) and thus
> needs to know when the schema has been updated. In this proposed
> architecture, I think the sink would still need to check each record to see
> if the current schema matches the new record or not? The main problem I
> encountered when playing around with broadcast state was that I couldn’t
> figure out how to access the broadcast state within the sink, but perhaps I
> just haven’t thought about it the right way. I’ll meditate on the docs
> further  
>
>
>
> Julian
>
>
>
> *From: *Piotr Nowojski 
> *Date: *Wednesday, October 14, 2020 at 6:35 AM
> *To: *"Jaffe, Julian" 
> *Cc: *"user@flink.apache.org" 
> *Subject: *Re: Broadcasting control messages to a sink
>
>
>
> Hi Julian,
>
>
>
> Have you seen Broadcast State [1]? I have never used it personally, but it
> sounds like something you want. Maybe your job should look like:
>
>
>
> 1. read raw messages from Kafka, without using the schema
>
> 2. read schema changes and broadcast them to 3. and 5.
>
> 3. deserialize kafka records in BroadcastProcessFunction by using combined
> 1. and 2.
>
> 4. do your logic o
>
> 5. serialize records using schema in another BroadcastProcessFunction by
> using combined 4. and 2.
>
> 6. write raw records using BucketingSink
>
> ?
>
>
>
> Best,
>
> Piotrek
>
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
> 
>
>
>
> śr., 14 paź 2020 o 11:01 Jaffe, Julian 
> napisał(a):
>
> Hey all,
>
>
>
> I’m building a Flink app that pulls in messages from a Kafka topic and
> writes them out to disk using a custom bucketed sink. Each message needs to
> be parsed using a schema that is also needed when writing in the sink. This
> schema is read from a remote file on a distributed file system (it could
> also be fetched from a service). The schema will be updated very
> infrequently.
>
>
>
> In order to support schema evolution, I have created a custom source that
> occasionally polls for updates and if it finds one parses the new schema
> and sends a message containing the serialized schema. I’ve connected these
> two streams and then use a RichCoFlatMapFunction to flatten them back into
> a single output stream (schema events get used to update the parser,
> messages get parsed using the parser and emitted).
>
>
>
> However, I need some way to communicate the updated schema to every task

Flink Collector issue when Collection Object

2020-09-28 Thread aj
Hello All,

Can somebody help me to resolve this and understand what is wrong i am
doing.

https://stackoverflow.com/questions/64063833/flink-collector-issue-when-collection-object-with-map-of-object-class


-- 
Thanks & Regards,
Anuj Jain






Re: Flink 1.11 TaskManagerLogFileHandler -Exception

2020-09-04 Thread aj
I have also checked for port and all the ports from 0-65535 are open. Even
I do not see any taskmanager.log is getting generated under my container
logs on the task machine.

On Fri, Sep 4, 2020 at 2:58 PM aj  wrote:

>
> I am trying to switch to Flink 1.11 with the new EMR release 6.1. I have
> created 3 nodes EMR cluster with Flink 1.11. When I am running my job its
> working fine only issue is I am not able to see any logs in the job manager
> and task manager. I am seeing below exception in stdout of job manager
>
> 09:21:36.761 [flink-akka.actor.default-dispatcher-17] ERROR
> org.apache.flink.runtime.rest.handler.taskmanager.
> TaskManagerLogFileHandler - Failed to transfer file from TaskExecutor
> container_1599202660950_0009_01_03.
> java.util.concurrent.CompletionException: org.apache.flink.util.
> FlinkException: The file LOG does not exist on the TaskExecutor.
> at org.apache.flink.runtime.taskexecutor.TaskExecutor
> .lambda$requestFileUploadByFilePath$25(TaskExecutor.java:1742)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
> at java.util.concurrent.CompletableFuture$AsyncSupply.run(
> CompletableFuture.java:1604) ~[?:1.8.0_252]
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1149) ~[?:1.8.0_252]
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:624) ~[?:1.8.0_252]
> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_252]
> Caused by: org.apache.flink.util.FlinkException: The file LOG does not
> exist on the TaskExecutor.
> ... 5 more
> 09:21:36.773 [flink-akka.actor.default-dispatcher-17] ERROR
> org.apache.flink.runtime.rest.handler.taskmanager.
> TaskManagerLogFileHandler - Unhandled exception.
> org.apache.flink.util.FlinkException: The file LOG does not exist on the
> TaskExecutor.
> at org.apache.flink.runtime.taskexecutor.TaskExecutor
> .lambda$requestFileUploadByFilePath$25(TaskExecutor.java:1742)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
> at java.util.concurrent.CompletableFuture$AsyncSupply.run(
> CompletableFuture.java:1604) ~[?:1.8.0_252]
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1149) ~[?:1.8.0_252]
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:624) ~[?:1.8.0_252]
> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_252]
> 09:21:57.399 [flink-akka.actor.default-dispatcher-15] ERROR
> org.apache.flink.runtime.rest.handler.taskmanager.
> TaskManagerLogFileHandler - Failed to transfer file from TaskExecutor
> container_1599202660950_0009_01_03.
> TaskManagerLogFileHandler - Unhandled exception.
> org.apache.flink.util.FlinkException
>
> Is Any new setting I have to do in flink 1.11 as the same job is working
> fine on the previous version of EMR with Flink 1.10
>
> --
> Thanks & Regards,
> Anuj Jain
>
>
> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>


-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07
<http://www.oracle.com/>


<http://www.cse.iitm.ac.in/%7Eanujjain/>


Flink 1.11 TaskManagerLogFileHandler -Exception

2020-09-04 Thread aj
I am trying to switch to Flink 1.11 with the new EMR release 6.1. I have
created 3 nodes EMR cluster with Flink 1.11. When I am running my job its
working fine only issue is I am not able to see any logs in the job manager
and task manager. I am seeing below exception in stdout of job manager

09:21:36.761 [flink-akka.actor.default-dispatcher-17] ERROR
org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerLogFileHandler
- Failed to transfer file from TaskExecutor
container_1599202660950_0009_01_03.
java.util.concurrent.CompletionException: org.apache.flink.util.
FlinkException: The file LOG does not exist on the TaskExecutor.
at org.apache.flink.runtime.taskexecutor.TaskExecutor
.lambda$requestFileUploadByFilePath$25(TaskExecutor.java:1742)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(
CompletableFuture.java:1604) ~[?:1.8.0_252]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor
.java:1149) ~[?:1.8.0_252]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor
.java:624) ~[?:1.8.0_252]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_252]
Caused by: org.apache.flink.util.FlinkException: The file LOG does not
exist on the TaskExecutor.
... 5 more
09:21:36.773 [flink-akka.actor.default-dispatcher-17] ERROR
org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerLogFileHandler
- Unhandled exception.
org.apache.flink.util.FlinkException: The file LOG does not exist on the
TaskExecutor.
at org.apache.flink.runtime.taskexecutor.TaskExecutor
.lambda$requestFileUploadByFilePath$25(TaskExecutor.java:1742)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(
CompletableFuture.java:1604) ~[?:1.8.0_252]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor
.java:1149) ~[?:1.8.0_252]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor
.java:624) ~[?:1.8.0_252]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_252]
09:21:57.399 [flink-akka.actor.default-dispatcher-15] ERROR
org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerLogFileHandler
- Failed to transfer file from TaskExecutor
container_1599202660950_0009_01_03.
TaskManagerLogFileHandler - Unhandled exception.
org.apache.flink.util.FlinkException

Is Any new setting I have to do in flink 1.11 as the same job is working
fine on the previous version of EMR with Flink 1.10

-- 
Thanks & Regards,
Anuj Jain





Re: Count of records coming into the Flink App from KDS and at each step through Execution Graph

2020-09-03 Thread aj
Hello Vijay,
I have the same use case where I am reading from Kafka and want to
report count corresponding to each event every 5 mins. On Prometheus, I
want to set an alert if fr any event we do not receive the event like say
count is zero.

So can you please help me with how you implemented this finally?

On Fri, Jul 31, 2020 at 2:14 AM Chesnay Schepler  wrote:

> If you do the aggregation in Prometheus I would think that you do not need
> to reset the counter; but it's been a while since I've used it.
> Flink will not automatically reset counters.
> If this is necessary then you will have to manually reset the counter
> every 5 seconds.
>
> The name under which it will be exposed to Prometheus depends on the
> configured scope format; see the metric documentation for details.
> By default it will contain information about the task executors, job, task
> etc. .
>
> On 30/07/2020 22:02, Vijay Balakrishnan wrote:
>
> Hi David,
> Thx for your reply.
>
> To summarize:
> Use a Counter:
>
> counter = getRuntimeContext()
>   .getMetricGroup()
>   .addGroup("MyMetricsKey", "MyMetricsValue") //specify my value for each 
> custom event_name here- I might not know all custom event_names in advance
>   .counter("myCounter");
>
> This MyMetricsValue will show up in Prometheus as for eg: 
> 0.Window(TumblingWindow...)__EventTimeTrigger_MyClass.myMetricsKey.myCounter 
> and so on for 1.
>
> Window(TumblingWindow...).. for each parallel Operator.
>
> This will then have to be aggregated in Prometheus for 5 secs for all the 
> .
>
> Window(TumblingWindow...)__EventTimeTrigger_MyClass.myMetricsKey.myCounter   
> // no task executors here - this is at Operator level ???
>
> This is independent of task Executors right ?? How does your statement - 
> Flink does not support aggregating operator-level metrics across task 
> executors. This job is left to proper time-series databases. relate to the 
> Summary above from me
>
> Also, I am assuming that the Counter will get reset after every Window 
> interval of 5 secs or do I need to do counter.dec(counter.getCount()) in the 
> close() method as you showed above.
>
> TIA,
>
>
>
> On Wed, Jul 29, 2020 at 2:53 AM Chesnay Schepler 
> wrote:
>
>> I'd recommend to do the aggregation over 5 seconds in graphite/prometheus
>> etc., and expose a counter in Flink for each attribute/event_name.
>>
>> User variables are a good choice for encoding the attribute/event_name
>> values.
>>
>> As for your remaining questions:
>>
>> Flink does not support aggregating operator-level metrics across task
>> executors. This job is left to proper time-series databases.
>>
>> A counter can be reset like this: counter.dec(counter.getCount())
>> You can also create a custom implementation with whatever behavior you
>> desire.
>>
>> The default meter implementation (MeterView) calculate the rate of events
>> per second based on counts that are periodically gathered over some
>> time-period (usually 1 minute). If you want to calculate the
>> rate-per-second over the last 5 seconds, then new Meterview(5) should do
>> the trick.
>> If you want to have a rate-per-5-seconds, then you will need to implement
>> a custom meter. Note that I would generally discourage this as it will not
>> work properly with some metric systems which assume rates to be per-second.
>>
>> On 27/07/2020 19:59, Vijay Balakrishnan wrote:
>>
>> Hi Al,
>> I am looking at the Custom User Metrics to count incoming records by an
>> incomng attribute, event_name and aggregate it over 5 secs.
>> I looked at
>> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#reporter
>> .
>> I am trying to figure out which one to use Counter or Meter.
>> If using Counter, how do I reset it after 5 secs.
>> If using Meter which measures avg throughput, How do i specify a
>> duration like 5 secs ? markEvent(long n) ???
>>
>> I am also trying to collect total count of events across all TaskManagers.
>> Do I collect at
>> flink_taskmanager_job_task__numrecordsIn  or
>> flink_taskmanager_job_task_operator__numrecordsIn  ??
>> (so at task or operator level
>>
>> Or should I use User variables like below:
>>
>> counter = getRuntimeContext()
>>   .getMetricGroup()
>>   .addGroup("MyMetricsKey", "MyMetricsValue") //specify my value for each 
>> custom event_name here- I might not know all custom event_names in advance
>>   .counter("myCounter");
>>
>>
>> Pardon my confusion here.
>> TIA,
>>
>> On Mon, Jul 27, 2020 at 10:00 AM Vijay Balakrishnan 
>> wrote:
>>
>>> Hi David,
>>> Thanks for your reply.
>>> I am already using the PrometheusReporter. I am trying to figure out how
>>> to dig into the application data and count grouped by an attribute called
>>> event_name in the incoming application data and report to Grafana via
>>> Prometheus.
>>>
>>> I see the following at a high level
>>> task_numRecordsIn
>>> task_numRecordsOut
>>> ..operator_numLateRecordsDropped
>>>
>>> Trying to dig in deeper than this numRecordsIn to get groped by
>>> 

Not able to Assign Watermark in Flink 1.11

2020-08-27 Thread aj
I am getting this error when trying to assign watermark in Flink  1.11

*"Cannot resolve method 'withTimestampAssigner(anonymous
org.apache.flink.api.common.eventtime.SerializableTimestampAssigner)'"*

FlinkKafkaConsumer bookingFlowConsumer = new
FlinkKafkaConsumer(topics,
new KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
properties);

bookingFlowConsumer.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMinutes(15))
.withTimestampAssigner(new SerializableTimestampAssigner() {
  @Override
  public long extractTimestamp(GenericRecord genericRecord, long l) {
return (long)genericRecord.get("event_ts");
  }
}));


What is wrong with this.

In Flink 1.9 I was using this function and it was working fine

public static class SessionAssigner implements
AssignerWithPunctuatedWatermarks {

  @Override
  public long extractTimestamp(GenericRecord record, long
previousElementTimestamp) {
long timestamp = (long) record.get("event_ts");
// LOGGER.info("timestamp", timestamp);
return timestamp;
  }

  @Override
  public Watermark checkAndGetNextWatermark(GenericRecord record, long
extractedTimestamp) {
// simply emit a watermark with every event
// LOGGER.info("extractedTimestamp ", extractedTimestamp);
return new Watermark(extractedTimestamp);
  }
}


-- 
Thanks & Regards,
Anuj Jain






Stream job with Event Timestamp (Backfill getting incorrect results)

2020-08-20 Thread aj
I have a streaming job where I am doing window operation on *"user_id" *and
then doing some summarization based on some time bases logic like :

1.  end the session based on 30 mins inactivity of the user.
2.  The End_trip event or cancellation event has arrived for the user.

I am trying to rerun the job with some old offset for backfilling then I am
getting wrong results. Some of the sessions is ending with same start and
end time.  How to control the streaming job when lot of data get
accumulated in Kafka and I have to replay the job. Please help me what is
going wrong.

My assumption is it may be due to:
1. Out of order events
2. I am reading data from multiple topics so the end_trip event that is
happening at a later time can be read before and end the session.

I am using keyedProcessFunction like this :

public class DemandFunnelProcessFunction extends

KeyedProcessFunction,
DemandFunnelSummaryTuple> {

  private static final Logger LOGGER =
LoggerFactory.getLogger(DemandFunnelProcessFunction.class);

  private transient ValueState sessionSummary;
  private transient ValueState> distanceListState;

  @SuppressWarnings("checkstyle:LocalVariableName")
  @Override
  public void processElement(Tuple2 recordTuple2,
Context context,
  Collector collector) throws Exception {

GenericRecord record = recordTuple2.f1;


String event_name = record.get("event_name").toString();
long event_ts = (Long) record.get("event_ts");

DemandFunnelSummaryTuple currentTuple = sessionSummary.value();
ArrayList distanceList =
distanceListState.value() != null ? distanceListState.value()
: new ArrayList();

try {

  if (currentTuple == null) {
currentTuple = new DemandFunnelSummaryTuple();
String demandSessionId = UUID.randomUUID().toString();
currentTuple.setDemandSessionId(demandSessionId);
currentTuple.setStartTime(event_ts);
currentTuple.setUserId(recordTuple2.f0);
currentTuple.setEventName("demand_funnel_summary");
int geo_id = record.get("geo_id") != null ? (int)
record.get("geo_id") : 0;
currentTuple.setGeoId(geo_id);
  }
  long endTime = currentTuple.getEndTime();

  if (event_name.equals("search_list_keyless")) {
//System.out.println("inside search_list_keyless " + recordTuple2.f0);
currentTuple.setTotalSearch(currentTuple.getTotalSearch() + 1);
SearchEventUtil.searchSummaryCalculation(record, currentTuple,
distanceList);
  }


  currentTuple.setEndTime(event_ts);
  sessionSummary.update(currentTuple);
  distanceListState.update(distanceList);

  if (event_name.equals("keyless_booking_cancellation") || event_name
  .equals("keyless_end_trip")) {
try {

  DemandFunnelSummaryTuple sessionSummaryTuple = sessionSummary.value();

  if (sessionSummaryTuple != null) {
sessionSummaryTuple.setAvgResultCount(
(double) distanceList.size() /
sessionSummaryTuple.getTotalSearch());
if (distanceList.size() > 0) {
  int distanceSum = distanceList.stream()
  .collect(Collectors.summingInt(Integer::intValue));
  sessionSummaryTuple.setAvgBikeDistance((double)
distanceSum / distanceList.size());
  sessionSummaryTuple

.setP50DistNearestBike(SearchEventUtil.percentile(distanceList, 50));
  sessionSummaryTuple

.setP90DistNearestBike(SearchEventUtil.percentile(distanceList, 90));
}
sessionSummaryTuple.setEndTime(event_ts);
collector.collect(sessionSummaryTuple);
  }
} catch (Exception e) {
  DemandFunnelSummaryTuple sessionSummaryTuple = sessionSummary.value();
  LOGGER.info("Error in collecting event for user_id " +
sessionSummaryTuple.getUserId());
  e.printStackTrace();
}
sessionSummary.clear();
distanceListState.clear();
  }
} catch (Exception e) {
  LOGGER.info("error in processing event --" + recordTuple2.f1.toString());
  LOGGER.info(e.toString());
  e.printStackTrace();

}
  }

  @Override
  public void onTimer(long timestamp, OnTimerContext ctx,
Collector out)
  throws Exception {

try {
  DemandFunnelSummaryTuple sessionSummaryTuple = sessionSummary.value();
  if (sessionSummaryTuple != null) {
System.out.println(
"calling on timer" + sessionSummaryTuple.getUserId() + " "
+ sessionSummaryTuple
.getEndTime() + "  " + timestamp);
ArrayList distanceList = distanceListState.value();
if (distanceList != null && distanceList.size() > 0) {
  sessionSummaryTuple
  .setAvgResultCount(
  (double) distanceList.size() /
sessionSummaryTuple.getTotalSearch());
  int distanceSum =
distanceList.stream().collect(Collectors.summingInt(Integer::intValue));
  sessionSummaryTuple.setAvgBikeDistance((double) 

Re: Flink Stream job to parquet sink

2020-06-30 Thread aj
Thanks, Arvid. I will try to implement using the broadcast approach.

On Fri, Jun 26, 2020 at 1:10 AM Arvid Heise  wrote:

> Hi Anuj,
>
> Yes, broadcast sounds really good. Now you just need to hide the
> structural invariance (variable number of sinks) by delegating to inner
> sinks.
>
> public class SplittingStreamingFileSink
>   extends RichSinkFunction
>   implements CheckpointedFunction, CheckpointListener {
>
>   Map sinks = ...; // event name to sink
>
>   // delegate each method of RichSinkFunction, CheckpointedFunction, 
> CheckpointListener to sinks
>
>   public void invoke(IN value, SinkFunction.Context context) throws 
> Exception {
> StreamingFileSink sink = sinks.get(value.getEventName());
>
> if (sink == null) {
>
>   // create new StreamingFileSink add to sinks and call 
> initializeState
>
> }
>
> sink.invoke(value, context);
>
>   }
>
> }
>
> Use the SplittingStreamingFileSink as the only sink in your workflow. You
> definitely need to wrap the SinkFunction.Context such that state gets
> prefixes (eventName), but it should be rather straightforward.
>
> On Thu, Jun 25, 2020 at 3:47 PM aj  wrote:
>
>> Thanks, Arvide for detailed answers.
>> - Have some kind submitter that restarts flink automatically on config
>> change (assumes that restart time is not the issue).
>>   Yes, that can be written but that not solve the problem completely
>> because I want to avoid job restart itself. Every time I restart I also
>> have to restart from the last checkpoint in S3.
>>
>> - Your job could periodically check for a change and then fail. However,
>> you need to fail in a way that the topology is rebuilt. I guess it's close
>> to a fatal error and then the driver handles that error and restart.
>>Again same reason as first also not sure how would it be implemented.
>>
>> Rewrite the job in such a way that you have only one sink in your job
>> graph which demultiplexes the event to several internal sinks. Then you
>> could simply add a new sink whenever a new event occurs.
>> -Please provide me some idea I want to implement this, how it can be
>> possible.
>>
>> @Arvid Heise   jJuujIstdfdf
>> I am just thinking can I use a broadcast state where this config rule
>> which I keeping in YAML can be a push in Kafka itself.  Because I just need
>> event name and  Avro schema subject mapping mainly.
>> Please correct me if I am thinking in the wrong direction.
>>
>>
>>
>> On Thu, Jun 25, 2020 at 2:18 PM Rafi Aroch  wrote:
>>
>>> Hi Arvid,
>>>
>>> Would it be possible to implement a BucketAssigner that for example
>>> loads the configuration periodically from an external source and according
>>> to the event type decides on a different sub-folder?
>>>
>>> Thanks,
>>> Rafi
>>>
>>>
>>> On Wed, Jun 24, 2020 at 10:53 PM Arvid Heise 
>>> wrote:
>>>
>>>> Hi Anuj,
>>>>
>>>> There is currently no way to dynamically change the topology. It would
>>>> be good to know why your current approach is not working (restart taking
>>>> too long? Too frequent changes?)
>>>>
>>>> So some ideas:
>>>> - Have some kind submitter that restarts flink automatically on config
>>>> change (assumes that restart time is not the issue).
>>>> - Your job could periodically check for a change and then fail.
>>>> However, you need to fail in a way that the topology is rebuilt. I guess
>>>> it's close to a fatal error and then the driver handles that error and
>>>> restart.
>>>> - Rewrite the job in such a way that you have only one sink in your job
>>>> graph which demultiplexes the event to several internal sinks. Then you
>>>> could simply add a new sink whenever a new event occurs.
>>>>
>>>> The first option is the easiest and the last option the most versatile
>>>> (could even have different sink types mixed).
>>>>
>>>> On Tue, Jun 23, 2020 at 5:34 AM aj  wrote:
>>>>
>>>>> I am stuck on this . Please give some suggestions.
>>>>>
>>>>> On Tue, Jun 9, 2020, 21:40 aj  wrote:
>>>>>
>>>>>> please help with this. Any suggestions.
>>>>>>
>>>>>> On Sat, Jun 6, 2020 at 12:20 PM aj  wrote:
>>>>>>
>>>>>>> Hello All,
>>>>>>>
&g

Re: Flink Stream job to parquet sink

2020-06-25 Thread aj
Thanks, Arvide for detailed answers.
- Have some kind submitter that restarts flink automatically on config
change (assumes that restart time is not the issue).
  Yes, that can be written but that not solve the problem completely
because I want to avoid job restart itself. Every time I restart I also
have to restart from the last checkpoint in S3.

- Your job could periodically check for a change and then fail. However,
you need to fail in a way that the topology is rebuilt. I guess it's close
to a fatal error and then the driver handles that error and restart.
   Again same reason as first also not sure how would it be implemented.

Rewrite the job in such a way that you have only one sink in your job graph
which demultiplexes the event to several internal sinks. Then you could
simply add a new sink whenever a new event occurs.
-Please provide me some idea I want to implement this, how it can be
possible.

@Arvid Heise   jJuujIstdfdf
I am just thinking can I use a broadcast state where this config rule which
I keeping in YAML can be a push in Kafka itself.  Because I just need event
name and  Avro schema subject mapping mainly.
Please correct me if I am thinking in the wrong direction.



On Thu, Jun 25, 2020 at 2:18 PM Rafi Aroch  wrote:

> Hi Arvid,
>
> Would it be possible to implement a BucketAssigner that for example loads
> the configuration periodically from an external source and according to the
> event type decides on a different sub-folder?
>
> Thanks,
> Rafi
>
>
> On Wed, Jun 24, 2020 at 10:53 PM Arvid Heise  wrote:
>
>> Hi Anuj,
>>
>> There is currently no way to dynamically change the topology. It would be
>> good to know why your current approach is not working (restart taking too
>> long? Too frequent changes?)
>>
>> So some ideas:
>> - Have some kind submitter that restarts flink automatically on config
>> change (assumes that restart time is not the issue).
>> - Your job could periodically check for a change and then fail. However,
>> you need to fail in a way that the topology is rebuilt. I guess it's close
>> to a fatal error and then the driver handles that error and restart.
>> - Rewrite the job in such a way that you have only one sink in your job
>> graph which demultiplexes the event to several internal sinks. Then you
>> could simply add a new sink whenever a new event occurs.
>>
>> The first option is the easiest and the last option the most versatile
>> (could even have different sink types mixed).
>>
>> On Tue, Jun 23, 2020 at 5:34 AM aj  wrote:
>>
>>> I am stuck on this . Please give some suggestions.
>>>
>>> On Tue, Jun 9, 2020, 21:40 aj  wrote:
>>>
>>>> please help with this. Any suggestions.
>>>>
>>>> On Sat, Jun 6, 2020 at 12:20 PM aj  wrote:
>>>>
>>>>> Hello All,
>>>>>
>>>>> I am receiving a set of events in Avro format on different topics. I
>>>>> want to consume these and write to s3 in parquet format.
>>>>> I have written a below job that creates a different stream for each
>>>>> event and fetches it schema from the confluent schema registry to create a
>>>>> parquet sink for an event.
>>>>> This is working fine but the only problem I am facing is whenever a
>>>>> new event start coming I have to change in the YAML config and restart the
>>>>> job every time. Is there any way I do not have to restart the job and it
>>>>> start consuming a new set of events.
>>>>>
>>>>>
>>>>> YAML config :
>>>>>
>>>>> !com.bounce.config.EventTopologyConfig
>>>>> eventsType:
>>>>>   - !com.bounce.config.EventConfig
>>>>> event_name: "search_list_keyless"
>>>>> schema_subject: 
>>>>> "search_list_keyless-com.bounce.events.keyless.bookingflow.search_list_keyless"
>>>>> topic: "search_list_keyless"
>>>>>
>>>>>   - !com.bounce.config.EventConfig
>>>>> event_name: "bike_search_details"
>>>>> schema_subject: 
>>>>> "bike_search_details-com.bounce.events.keyless.bookingflow.bike_search_details"
>>>>> topic: "bike_search_details"
>>>>>
>>>>>   - !com.bounce.config.EventConfig
>>>>> event_name: "keyless_bike_lock"
>>>>> schema_subject: 
>>>>> "analytics-keyless-com.bounce.events.keyless.bookingflow.keyless_bike_lock"
>>>>> topic: "ana

Re: Flink Stream job to parquet sink

2020-06-22 Thread aj
I am stuck on this . Please give some suggestions.

On Tue, Jun 9, 2020, 21:40 aj  wrote:

> please help with this. Any suggestions.
>
> On Sat, Jun 6, 2020 at 12:20 PM aj  wrote:
>
>> Hello All,
>>
>> I am receiving a set of events in Avro format on different topics. I want
>> to consume these and write to s3 in parquet format.
>> I have written a below job that creates a different stream for each event
>> and fetches it schema from the confluent schema registry to create a
>> parquet sink for an event.
>> This is working fine but the only problem I am facing is whenever a new
>> event start coming I have to change in the YAML config and restart the job
>> every time. Is there any way I do not have to restart the job and it start
>> consuming a new set of events.
>>
>>
>> YAML config :
>>
>> !com.bounce.config.EventTopologyConfig
>> eventsType:
>>   - !com.bounce.config.EventConfig
>> event_name: "search_list_keyless"
>> schema_subject: 
>> "search_list_keyless-com.bounce.events.keyless.bookingflow.search_list_keyless"
>> topic: "search_list_keyless"
>>
>>   - !com.bounce.config.EventConfig
>> event_name: "bike_search_details"
>> schema_subject: 
>> "bike_search_details-com.bounce.events.keyless.bookingflow.bike_search_details"
>> topic: "bike_search_details"
>>
>>   - !com.bounce.config.EventConfig
>> event_name: "keyless_bike_lock"
>> schema_subject: 
>> "analytics-keyless-com.bounce.events.keyless.bookingflow.keyless_bike_lock"
>> topic: "analytics-keyless"
>>
>>   - !com.bounce.config.EventConfig
>>   event_name: "keyless_bike_unlock"
>>   schema_subject: 
>> "analytics-keyless-com.bounce.events.keyless.bookingflow.keyless_bike_unlock"
>>   topic: "analytics-keyless"
>>
>>
>> checkPointInterval: 120
>>
>> topics: ["search_list_keyless","bike_search_details","analytics-keyless"]
>>
>>
>>
>>
>>
>> *Sink code :*
>>
>>   YamlReader reader = new YamlReader(topologyConfig);
>> EventTopologyConfig eventTopologyConfig = 
>> reader.read(EventTopologyConfig.class);
>>
>> long checkPointInterval = eventTopologyConfig.getCheckPointInterval();
>> topics = eventTopologyConfig.getTopics();
>>
>> List eventTypesList = 
>> eventTopologyConfig.getEventsType();
>>
>> CachedSchemaRegistryClient registryClient = new 
>> CachedSchemaRegistryClient(schemaRegistryUrl, 1000);
>>
>>
>> FlinkKafkaConsumer flinkKafkaConsumer = new 
>> FlinkKafkaConsumer(topics,
>> new KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>> properties);
>>
>> DataStream dataStream = 
>> streamExecutionEnvironment.addSource(flinkKafkaConsumer).name("source");
>>
>> try {
>> for (EventConfig eventConfig : eventTypesList) {
>>
>> LOG.info("creating a stream for ", eventConfig.getEvent_name());
>>
>> final StreamingFileSink sink = StreamingFileSink.forBulkFormat
>> (path, 
>> ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(eventConfig.getSchema_subject(),
>>  registryClient)))
>> .withBucketAssigner(new EventTimeBucketAssigner())
>> .build();
>>
>> DataStream outStream = 
>> dataStream.filter((FilterFunction) genericRecord -> {
>> if (genericRecord != null && 
>> genericRecord.get(EVENT_NAME).toString().equals(eventConfig.getEvent_name()))
>>  {
>> return true;
>> }
>> return false;
>> });
>> 
>> outStream.addSink(sink).name(eventConfig.getSink_id()).setParallelism(parallelism);
>>
>> }
>> } catch (Exception e) {
>> e.printStackTrace();
>> }
>>
>>
>>
>>
>> --
>> Thanks & Regards,
>> Anuj Jain
>>
>>
>>
>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>
>
>
> --
> Thanks & Regards,
> Anuj Jain
> Mob. : +91- 8588817877
> Skype : anuj.jain07
> <http://www.oracle.com/>
>
>
> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>


Flink Count of Events using metric

2020-06-16 Thread aj
Please help me with this:

https://stackoverflow.com/questions/62297467/flink-count-of-events-using-metric

I have a topic in Kafka where I am getting multiple types of events in JSON
format. I have created a file stream sink to write these events to S3 with
bucketing.

Now I want to publish an hourly count of each event as metrics to
Prometheus and publish a grafana dashboard over that.

So please help me how can I achieve hourly count for each event using Flink
metrics and publish to Prometheus.


-- 
Thanks & Regards,
Anuj Jain






Re: Flink on yarn : yarn-session understanding

2020-06-16 Thread aj
Ok, thanks for the clarification on yarn session.

 I am trying to connect to job manager on 8081 but it's not connecting.

[image: image.png]


So this is the address shown on my Flink job UI and i am trying to connect
rest address on 8081 but its refusing connection.

On Tue, Jun 9, 2020 at 1:03 PM Andrey Zagrebin  wrote:

> Hi Anuj,
>
> Afaik, the REST API should work for both modes. What is the issue? Maybe,
> some network problem to connect to YARN application master?
>
> Best,
> Andrey
>
> On Mon, Jun 8, 2020 at 4:39 PM aj  wrote:
>
>> I am running some stream jobs that are long-running always. I am
>> currently submitting each job as a standalone job on yarn.
>>
>> 1. I need to understand what is the advantage of using yarn-session and
>> when should I use that.
>> 2. Also, I am not able to access rest API services is it because I am
>> running as standalone job over yarn. Is REST API works only in yarn-session?
>>
>>
>> --
>> Thanks & Regards,
>> Anuj Jain
>>
>>
>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>
>

-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07
<http://www.oracle.com/>


<http://www.cse.iitm.ac.in/%7Eanujjain/>


Re: Flink Stream job to parquet sink

2020-06-09 Thread aj
please help with this. Any suggestions.

On Sat, Jun 6, 2020 at 12:20 PM aj  wrote:

> Hello All,
>
> I am receiving a set of events in Avro format on different topics. I want
> to consume these and write to s3 in parquet format.
> I have written a below job that creates a different stream for each event
> and fetches it schema from the confluent schema registry to create a
> parquet sink for an event.
> This is working fine but the only problem I am facing is whenever a new
> event start coming I have to change in the YAML config and restart the job
> every time. Is there any way I do not have to restart the job and it start
> consuming a new set of events.
>
>
> YAML config :
>
> !com.bounce.config.EventTopologyConfig
> eventsType:
>   - !com.bounce.config.EventConfig
> event_name: "search_list_keyless"
> schema_subject: 
> "search_list_keyless-com.bounce.events.keyless.bookingflow.search_list_keyless"
> topic: "search_list_keyless"
>
>   - !com.bounce.config.EventConfig
> event_name: "bike_search_details"
> schema_subject: 
> "bike_search_details-com.bounce.events.keyless.bookingflow.bike_search_details"
> topic: "bike_search_details"
>
>   - !com.bounce.config.EventConfig
> event_name: "keyless_bike_lock"
> schema_subject: 
> "analytics-keyless-com.bounce.events.keyless.bookingflow.keyless_bike_lock"
> topic: "analytics-keyless"
>
>   - !com.bounce.config.EventConfig
>   event_name: "keyless_bike_unlock"
>   schema_subject: 
> "analytics-keyless-com.bounce.events.keyless.bookingflow.keyless_bike_unlock"
>   topic: "analytics-keyless"
>
>
> checkPointInterval: 120
>
> topics: ["search_list_keyless","bike_search_details","analytics-keyless"]
>
>
>
>
>
> *Sink code :*
>
>   YamlReader reader = new YamlReader(topologyConfig);
> EventTopologyConfig eventTopologyConfig = 
> reader.read(EventTopologyConfig.class);
>
> long checkPointInterval = eventTopologyConfig.getCheckPointInterval();
> topics = eventTopologyConfig.getTopics();
>
> List eventTypesList = 
> eventTopologyConfig.getEventsType();
>
> CachedSchemaRegistryClient registryClient = new 
> CachedSchemaRegistryClient(schemaRegistryUrl, 1000);
>
>
> FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer(topics,
> new KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
> properties);
>
> DataStream dataStream = 
> streamExecutionEnvironment.addSource(flinkKafkaConsumer).name("source");
>
> try {
> for (EventConfig eventConfig : eventTypesList) {
>
> LOG.info("creating a stream for ", eventConfig.getEvent_name());
>
> final StreamingFileSink sink = StreamingFileSink.forBulkFormat
> (path, 
> ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(eventConfig.getSchema_subject(),
>  registryClient)))
> .withBucketAssigner(new EventTimeBucketAssigner())
> .build();
>
> DataStream outStream = 
> dataStream.filter((FilterFunction) genericRecord -> {
> if (genericRecord != null && 
> genericRecord.get(EVENT_NAME).toString().equals(eventConfig.getEvent_name())) 
> {
> return true;
> }
> return false;
> });
> 
> outStream.addSink(sink).name(eventConfig.getSink_id()).setParallelism(parallelism);
>
> }
> } catch (Exception e) {
> e.printStackTrace();
> }
>
>
>
>
> --
> Thanks & Regards,
> Anuj Jain
>
>
>
> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>


-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07
<http://www.oracle.com/>


<http://www.cse.iitm.ac.in/%7Eanujjain/>


Re: Data Quality Library in Flink

2020-06-09 Thread aj
Thanks, Andrey, I will check it out.

On Mon, Jun 8, 2020 at 8:10 PM Andrey Zagrebin  wrote:

> Hi Anuj,
>
> I am not familiar with data quality measurement methods and deequ
> <https://github.com/awslabs/deequ> in depth.
> What you describe looks like monitoring some data metrics.
> Maybe, there are other community users aware of better solution.
> Meanwhile, I would recommend to implement the checks and failures as
> separate operators and side outputs (for streaming) [1], if not yet
> Then you could also use Flink metrics to aggregate and monitor the data
> [2].
> The metrics systems usually allow to define alerts on metrics, like in
> prometheus [3], [4].
>
> Best,
> Andrey
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/side_output.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html
> [3]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html#prometheus-orgapacheflinkmetricsprometheusprometheusreporter
> [4] https://prometheus.io/docs/alerting/overview/
>
> On Sat, Jun 6, 2020 at 9:23 AM aj  wrote:
>
>> Hello All,
>>
>> I  want to do some data quality analysis on stream data example.
>>
>> 1. Fill rate in a particular column
>> 2. How many events are going to error queue due to favor schema
>> validation failed?
>> 3. Different statistics measure of a column.
>> 3. Alert if a particular threshold is breached (like if fill rate is less
>> than 90% for a column)
>>
>> Is there any library that exists on top of Flink for data quality. As I
>> am looking there is a library on top of the spark
>> https://github.com/awslabs/deequ
>>
>> This proved all that I am looking for.
>>
>> --
>> Thanks & Regards,
>> Anuj Jain
>>
>>
>>
>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>
>

-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07
<http://www.oracle.com/>


<http://www.cse.iitm.ac.in/%7Eanujjain/>


Flink on yarn : yarn-session understanding

2020-06-08 Thread aj
I am running some stream jobs that are long-running always. I am currently
submitting each job as a standalone job on yarn.

1. I need to understand what is the advantage of using yarn-session and
when should I use that.
2. Also, I am not able to access rest API services is it because I am
running as standalone job over yarn. Is REST API works only in yarn-session?


-- 
Thanks & Regards,
Anuj Jain





Flink Stream job to parquet sink

2020-06-06 Thread aj
Hello All,

I am receiving a set of events in Avro format on different topics. I want
to consume these and write to s3 in parquet format.
I have written a below job that creates a different stream for each event
and fetches it schema from the confluent schema registry to create a
parquet sink for an event.
This is working fine but the only problem I am facing is whenever a new
event start coming I have to change in the YAML config and restart the job
every time. Is there any way I do not have to restart the job and it start
consuming a new set of events.


YAML config :

!com.bounce.config.EventTopologyConfig
eventsType:
  - !com.bounce.config.EventConfig
event_name: "search_list_keyless"
schema_subject:
"search_list_keyless-com.bounce.events.keyless.bookingflow.search_list_keyless"
topic: "search_list_keyless"

  - !com.bounce.config.EventConfig
event_name: "bike_search_details"
schema_subject:
"bike_search_details-com.bounce.events.keyless.bookingflow.bike_search_details"
topic: "bike_search_details"

  - !com.bounce.config.EventConfig
event_name: "keyless_bike_lock"
schema_subject:
"analytics-keyless-com.bounce.events.keyless.bookingflow.keyless_bike_lock"
topic: "analytics-keyless"

  - !com.bounce.config.EventConfig
  event_name: "keyless_bike_unlock"
  schema_subject:
"analytics-keyless-com.bounce.events.keyless.bookingflow.keyless_bike_unlock"
  topic: "analytics-keyless"


checkPointInterval: 120

topics: ["search_list_keyless","bike_search_details","analytics-keyless"]





*Sink code :*

  YamlReader reader = new YamlReader(topologyConfig);
EventTopologyConfig eventTopologyConfig =
reader.read(EventTopologyConfig.class);

long checkPointInterval = eventTopologyConfig.getCheckPointInterval();
topics = eventTopologyConfig.getTopics();

List eventTypesList =
eventTopologyConfig.getEventsType();

CachedSchemaRegistryClient registryClient = new
CachedSchemaRegistryClient(schemaRegistryUrl, 1000);


FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer(topics,
new KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
properties);

DataStream dataStream =
streamExecutionEnvironment.addSource(flinkKafkaConsumer).name("source");

try {
for (EventConfig eventConfig : eventTypesList) {

LOG.info("creating a stream for ", eventConfig.getEvent_name());

final StreamingFileSink sink = StreamingFileSink.forBulkFormat
(path, 
ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(eventConfig.getSchema_subject(),
registryClient)))
.withBucketAssigner(new EventTimeBucketAssigner())
.build();

DataStream outStream =
dataStream.filter((FilterFunction) genericRecord -> {
if (genericRecord != null &&
genericRecord.get(EVENT_NAME).toString().equals(eventConfig.getEvent_name()))
{
return true;
}
return false;
});

outStream.addSink(sink).name(eventConfig.getSink_id()).setParallelism(parallelism);

}
} catch (Exception e) {
e.printStackTrace();
}




-- 
Thanks & Regards,
Anuj Jain






Data Quality Library in Flink

2020-06-06 Thread aj
Hello All,

I  want to do some data quality analysis on stream data example.

1. Fill rate in a particular column
2. How many events are going to error queue due to favor schema
validation failed?
3. Different statistics measure of a column.
3. Alert if a particular threshold is breached (like if fill rate is less
than 90% for a column)

Is there any library that exists on top of Flink for data quality. As I am
looking there is a library on top of the spark
https://github.com/awslabs/deequ

This proved all that I am looking for.

-- 
Thanks & Regards,
Anuj Jain






Re: Re: Re: Flink Window with multiple trigger condition

2020-05-30 Thread aj
Thanks Yun.

I have converted the code to use a keyed-processed function rather than a
flatMap and using register timer it worked.

On Fri, May 29, 2020 at 11:13 AM Yun Gao  wrote:

> Hi,
>
>  I think you could use *timer* to achieve that. In *processFunction*
> you could register a timer at specific time (event time or processing time)
> and get callbacked at that point. It could be registered like
>
>
> ctx.timerService().registerEventTimeTimer(current.lastModified + 6);
>
>
> More details on timer could be found in [1] and an example is in [2].
> In this example, a timer is registered in the last line of the
> *processElement* method, and the callback is implemented by override the
> *onTimer* method.
>
>[1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html#timers
>[2]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html#example
>
>
> --Original Mail --
> *Sender:*aj 
> *Send Date:*Fri May 29 02:07:33 2020
> *Recipients:*Yun Gao 
> *CC:*user 
> *Subject:*Re: Re: Flink Window with multiple trigger condition
>
>> Hi,
>>
>> I have implemented the below solution and its working fine but the
>> biggest problem with this is if no event coming for the user after 30 min
>> then I am not able to trigger because I am checking
>> time diff from upcoming events. So when the next event comes than only it
>> triggers but I want it to trigger just after 30 mins.
>>
>> So please help me to improve this and how to solve the above problem.
>>
>>
>>
>> public class DemandSessionFlatMap extends RichFlatMapFunction> GenericRecord>, DemandSessionSummaryTuple> {
>>
>> private static final Logger LOGGER = 
>> LoggerFactory.getLogger(DemandSessionFlatMap.class);
>>
>> private transient ValueState> timeState; // 
>> maintain session_id starttime and endtime
>> private transient MapState 
>> sessionSummary; // map for hex9 and summarytuple
>>
>> @Override
>> public void open(Configuration config) {
>>
>> ValueStateDescriptor> timeDescriptor =
>> new ValueStateDescriptor<>(
>> "time_state", // the state name
>> TypeInformation.of(new TypeHint> Long>>() {
>> }), // type information
>> Tuple3.of(null, 0L, 0L)); // default value of the 
>> state, if nothing was set
>> timeState = getRuntimeContext().getState(timeDescriptor);
>>
>> MapStateDescriptor descriptor =
>> new MapStateDescriptor> DemandSessionSummaryTuple>("demand_session",
>> TypeInformation.of(new TypeHint() {
>> }), TypeInformation.of(new 
>> TypeHint() {
>> }));
>> sessionSummary = getRuntimeContext().getMapState(descriptor);
>>
>> }
>>
>> @Override
>> public void flatMap(Tuple2 recordTuple2, 
>> Collector collector) throws Exception {
>> GenericRecord record = recordTuple2.f1;
>> String event_name = record.get("event_name").toString();
>> long event_ts = (Long) record.get("event_ts");
>> Tuple3 currentTimeState = timeState.value();
>>
>> if (event_name.equals("search_list_keyless") && currentTimeState.f1 
>> == 0) {
>> currentTimeState.f1 = event_ts;
>> String demandSessionId = UUID.randomUUID().toString();
>> currentTimeState.f0 = demandSessionId;
>> }
>>
>> long timeDiff = event_ts - currentTimeState.f1;
>>
>> if (event_name.equals("keyless_start_trip") || timeDiff >= 180) {
>> Tuple3 finalCurrentTimeState = 
>> currentTimeState;
>> sessionSummary.entries().forEach( tuple ->{
>> String key = tuple.getKey();
>> DemandSessionSummaryTuple sessionSummaryTuple = 
>> tuple.getValue();
>> try {
>> sessionSummaryTuple.setEndTime(finalCurrentTimeState.f2);
>> collector.collect(sessionSummaryTuple);
>> } catch (Exception e) {
>> e.printStackTrace();
>> }
>>
>> });
>> timeState.clear();
>> sessionSummary.clear();
>>   

Re: Flink Elastic Sink

2020-05-30 Thread aj
Thanks, It worked.

I was confused before as I was thinking the sink builder is called only
once but it gets called for every batch request, correct me if my
understanding is wrong.

On Fri, May 29, 2020 at 9:08 AM Leonard Xu  wrote:

> Hi,aj
>
> In the implementation of ElasticsearchSink, ElasticsearchSink  won't
> create index and only start a Elastic client for sending requests to
> the Elastic cluster. You can simply extract the index(date value in your
> case) from your timestamp field and then put it to an IndexRequest[2],
>  ElasticsearchSink will send the IndexRequests to the Elastic cluster,
> Elastic cluster will create corresponding index and flush the records.
>
> BTW, If you’re using Flink SQL you can use dynamic index in Elasticsearch
> sql connector [2], you can simply config 'connector.index' =
> ‘myindex_{ts_field|-MM-dd}’ to achieve your goals.
>
> Best,
> Leoanrd Xu
> [1]
> https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-elasticsearch7-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch7SinkExample.java#L119
>
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector
>
>
>
>
> 在 2020年5月29日,02:43,aj  写道:
>
> Hello All,
>
> I am getting many events in Kafka and I have written a link job that sinks
> that Avro records from Kafka to S3 in parquet format.
>
> Now, I want to sink these records into elastic search. but the only
> challenge is that I want to sink record on time indices. Basically, In
> Elastic, I want to create a per day index with the date as the suffix.
> So in Flink stream job if I create an es sink how will I change the sink
> to start writing  in a new index when the first event of the day arrives
>
> Thanks,
> Anuj.
>
>
> <http://www.oracle.com/>
>
>
> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>
>
>

-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07
<http://www.oracle.com/>


<http://www.cse.iitm.ac.in/%7Eanujjain/>


Flink Elastic Sink

2020-05-28 Thread aj
Hello All,

I am getting many events in Kafka and I have written a link job that sinks
that Avro records from Kafka to S3 in parquet format.

Now, I want to sink these records into elastic search. but the only
challenge is that I want to sink record on time indices. Basically, In
Elastic, I want to create a per day index with the date as the suffix.
So in Flink stream job if I create an es sink how will I change the sink to
start writing  in a new index when the first event of the day arrives

Thanks,
Anuj.








Re: Re: Flink Window with multiple trigger condition

2020-05-28 Thread aj
a timer 30
> mins later when received the "*search*" event and write the time of
> search event into the state. Then for the following events, they will be
> saved to the state since the flag is set. After received the "*start*"
> event or the timer is triggered, you could load all the events from the
> states, do the aggregation and cancel the timer if it is triggered by "
> *start*" event. A simpler case is [1] and it does not consider stop the
> aggreation when received special event, but it seems that the logic could
> be added to the case.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/process_function.html#example
>
> Best,
>  Yun
>
>
>
> --Original Mail --
> *Sender:*aj 
> *Send Date:*Sun May 24 01:10:55 2020
> *Recipients:*Tzu-Li (Gordon) Tai 
> *CC:*user 
> *Subject:*Re: Flink Window with multiple trigger condition
>
>>
>> I am still not able to get much after reading the stuff. Please help with
>> some basic code to start to build this window and trigger.
>>
>> Another option I am thinking is I just use a Richflatmap function and use
>> the keyed state to build this logic. Is that the correct approach?
>>
>>
>>
>> On Fri, May 22, 2020 at 4:52 PM aj  wrote:
>>
>>>
>>>
>>> I was also thinking to have a processing time window but that will not
>>> work for me. I want to start the window when the user  "*search*" event
>>> arrives. So for each user window will start from the *search* event.
>>>  The Tumbling window has fixed start end time so that will not be
>>> suitable in my case.
>>>
>>>
>>>
>>>
>>> On Fri, May 22, 2020 at 10:23 AM Tzu-Li (Gordon) Tai <
>>> tzuli...@apache.org> wrote:
>>>
>>>> Hi,
>>>>
>>>> To achieve what you have in mind, I think what you have to do is to use
>>>> a
>>>> processing time window of 30 mins, and have a custom trigger that
>>>> matches
>>>> the "start" event in the `onElement` method and return
>>>> TriggerResult.FIRE_AND_PURGE.
>>>>
>>>> That way, the window fires either when the processing time has passed,
>>>> or
>>>> the start event was recieved.
>>>>
>>>> Cheers,
>>>> Gordon
>>>>
>>>>
>>>>
>>>> --
>>>> Sent from:
>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>>
>>>
>>>
>>> --
>>> Thanks & Regards,
>>> Anuj Jain
>>> Mob. : +91- 8588817877
>>> Skype : anuj.jain07
>>> <http://www.oracle.com/>
>>>
>>>
>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>
>>
>>
>> --
>> Thanks & Regards,
>> Anuj Jain
>> Mob. : +91- 8588817877
>> Skype : anuj.jain07
>> <http://www.oracle.com/>
>>
>>
>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>
>

-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07
<http://www.oracle.com/>


<http://www.cse.iitm.ac.in/%7Eanujjain/>


Re: Flink Window with multiple trigger condition

2020-05-23 Thread aj
I am still not able to get much after reading the stuff. Please help with
some basic code to start to build this window and trigger.

Another option I am thinking is I just use a Richflatmap function and use
the keyed state to build this logic. Is that the correct approach?



On Fri, May 22, 2020 at 4:52 PM aj  wrote:

>
>
> I was also thinking to have a processing time window but that will not
> work for me. I want to start the window when the user  "*search*" event
> arrives. So for each user window will start from the *search* event.
>  The Tumbling window has fixed start end time so that will not be suitable
> in my case.
>
>
>
>
> On Fri, May 22, 2020 at 10:23 AM Tzu-Li (Gordon) Tai 
> wrote:
>
>> Hi,
>>
>> To achieve what you have in mind, I think what you have to do is to use a
>> processing time window of 30 mins, and have a custom trigger that matches
>> the "start" event in the `onElement` method and return
>> TriggerResult.FIRE_AND_PURGE.
>>
>> That way, the window fires either when the processing time has passed, or
>> the start event was recieved.
>>
>> Cheers,
>> Gordon
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>
>
> --
> Thanks & Regards,
> Anuj Jain
> Mob. : +91- 8588817877
> Skype : anuj.jain07
> <http://www.oracle.com/>
>
>
> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>


-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07
<http://www.oracle.com/>


<http://www.cse.iitm.ac.in/%7Eanujjain/>


Re: Flink Window with multiple trigger condition

2020-05-22 Thread aj
I was also thinking to have a processing time window but that will not work
for me. I want to start the window when the user  "*search*" event arrives.
So for each user window will start from the *search* event.
 The Tumbling window has fixed start end time so that will not be suitable
in my case.




On Fri, May 22, 2020 at 10:23 AM Tzu-Li (Gordon) Tai 
wrote:

> Hi,
>
> To achieve what you have in mind, I think what you have to do is to use a
> processing time window of 30 mins, and have a custom trigger that matches
> the "start" event in the `onElement` method and return
> TriggerResult.FIRE_AND_PURGE.
>
> That way, the window fires either when the processing time has passed, or
> the start event was recieved.
>
> Cheers,
> Gordon
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07






Re: Flink Window with multiple trigger condition

2020-05-21 Thread aj
Session window defined on the gap of inactivity, I do not have that
requirement.

Start the window only on the "*search even*t" that part I will take later.

Let's say in the first phase I want to start the window on any event that
appears for that user.

For example :

*Scenario -1*
t1 - user1   event1 ( window start)
t1 +5 mins - user1 - event2
t1 + 10 mins --- user1  event3
t1 + 15 mins - user1  event4===start type event (terminate window
as event type "*Start*" arrived and calculate aggregate on above collected
events)

t1+16 mins ---user-1   event 5 starts a new window


*Scenario -2*
t1 - user1   event1 ( window start)
t1 +5 mins - user1 - event2
t1 + 10 mins --- user1  event3
t1 + 30 mins - user1  event4 (terminates the window as 30 mins
elapsed and calculate aggregate on above collected events)

t1+31 mins ---user-1   event5  starts a new window

This I want to implement. I have tried to read triggers but did not getting
understand how to trigger when either time pass or eventtype==* "start"*
has arrived.  Which function of trigger class I have to implement and how
to check these 2  conditions on each event arrive.

Please help to implement this. If you can provide a basic start function
that I need to implement. I am not clear how to start.



















On Thu, May 21, 2020 at 4:59 PM Jiayi Liao  wrote:

>
> According to your description, it's more like a session window scenario
> rather than a tumbling window, you can find more details in [1]. But the
> difference between your window and Flink
> 's session window is, the session length is defined by the first element
> in your case. I'm afraid that Flink does't have implementations for such
> scenarios, you may need to create your own WindowAssigner.
>
> For the trigger, yes, you can always implement a trigger to determine the
> lifecyle of a window.
>
>
>
> [1].
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows
>
>
>
> Best,
> Jiayi Liao
>
> On Thu, May 21, 2020 at 5:51 PM aj  wrote:
>
>> Hi Liao,
>>
>> Thanks for the response. I am reading all this as I never implemented
>> this.
>>
>> > Start the window for the user when event_name: *"search"  *arrived for
>> the user.
>>
>> I'm not very sure this is right way to define the window in your business
>> if you use event time, because there may exist out-of-order events that
>> happened after "search" event, but arrive before "search" event, which will
>> be discarded because you haven't assigned a window. (If I understand
>> correctly)
>>
>> *Yes you are right and I raised this concern to the business team and we
>> still in discussion. *
>>
>> But let say if I do not need the above condition if I want to start the
>> window whenever the first event of the particular user event appears and
>> then bucket those events with similar conditions (either 30 mins from the
>> start of the window reached or event_type: *"start" *is appeared). So,
>> in that case, can I use *TumblingProcessingTimeWindows *with 30 mins,
>> and on that can I put a custom trigger that before 30 mins if event_type: 
>> *"start"
>> is *arrived than the process the window.
>> Is this correct understanding like if let stay *start* event arrived at
>> 20 mins from window start then that window will be close and processed and
>> events that arriving after that will be assign to the new window or window
>> will continue till 30 mins.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> On Thu, May 21, 2020 at 2:55 PM Jiayi Liao 
>> wrote:
>>
>>> Hi,
>>>
>>>
>>> > Start the window for the user when event_name: *"search"  *arrived
>>> for the user.
>>>
>>> I'm not very sure this is right way to define the window in your
>>> business if you use event time, because there may exist out-of-order events
>>> that happened after "search" event, but arrive before "search" event, which
>>> will be discarded because you haven't assigned a window. (If I understand
>>> correctly)
>>>
>>> Back to the problem, I think you can implement your own #WindowAssigner,
>>> which will create a window based on the event's name. Take a look at our
>>> existing implementations like #TumblingEventWindows.
>>>
>>> > Trigger the window when either 30 mins from the start of
>>> the window reached or event_type : *"start" *is appeared
>&g

Flink Window with multiple trigger condition

2020-05-21 Thread aj
Hello All,

I am getting a lot of user events in a  stream. There are different types
of events, now I want to build some aggregation metrics for the user by
grouping events in buckets.

My condition for windowing is :

1. Start the window for the user when event_name: *"search"  *arrived for
the user.
2. Trigger the window when
  either 30 mins from the start of the window reached
   OR
   event_type : *"start" *is appeared.

After that, I want to do calculate some aggregation on those window events.
I know this can be done using process function but I am stuck to create the
window with multiple conditions trigger.

Please help me how to create this type of window with multiple
trigger condition either time or some event happen.


-- 
Thanks & Regards,
Anuj Jain





Re: upgrade flink from 1.9.1 to 1.10.0 on EMR

2020-05-14 Thread aj
Hi Yang,

I am able to resolve the issue by removing Hadoop dependency as you
mentioned.

1. Removed hadoop-common dependency and

org.apache.flink
flink-streaming-java_${scala.binary.version}
${flink.version}


org.apache.flink
flink-hadoop-fs






2. After the above changes, I am able to submit job on yarn but facing
issues with s3 plugin that I resolved by putting *
flink-s3-fs-hadoop-1.9.0.jar *i n the plugins/s3-fs-hadoop  directory.

Thanks for your support.

Any update when will flink.10 officially supported in EMR.  Even in new EMR
version(emr 6.0)  flink has been removed.



On Sat, May 9, 2020 at 1:36 PM aj  wrote:

> Hello Yang,
>
> I have attached my pom file and I did not see that I am using any Hadoop
> dependency. Can you please help me.
>
> On Wed, May 6, 2020 at 1:22 PM Yang Wang  wrote:
>
>> Hi aj,
>>
>> From the logs you have provided, the hadoop version is still 2.4.1.
>> Could you check the user jar(i.e. events-processor-1.0-SNAPSHOT.jar) have
>> some
>> hadoop classes? If it is, you need to exclude the hadoop dependency.
>>
>>
>> Best,
>> Yang
>>
>> aj  于2020年5月6日周三 下午3:38写道:
>>
>>> Hello,
>>>
>>> Please help me upgrade to 1.10 in AWS EMR.
>>>
>>> On Fri, May 1, 2020 at 4:05 PM aj  wrote:
>>>
>>>> Hi Yang,
>>>>
>>>> I am attaching the logs for your reference, please help me what i am
>>>> doing wrong.
>>>>
>>>> Thanks,
>>>> Anuj
>>>>
>>>> On Wed, Apr 29, 2020 at 9:06 AM Yang Wang 
>>>> wrote:
>>>>
>>>>> Hi Anuj,
>>>>>
>>>>> I think the exception you come across still because the hadoop version
>>>>> is 2.4.1. I have checked the hadoop code, the code line are exactly
>>>>> same.
>>>>> For 2.8.1, i also have checked the ruleParse. It could work.
>>>>>
>>>>> /**
>>>>>  * A pattern for parsing a auth_to_local rule.
>>>>>  */
>>>>> private static final Pattern ruleParser =
>>>>>   
>>>>> Pattern.compile("\\s*((DEFAULT)|(RULE:\\[(\\d*):([^\\]]*)](\\(([^)]*)\\))?"+
>>>>>   "(s/([^/]*)/([^/]*)/(g)?)?))/?(L)?");
>>>>>
>>>>>
>>>>> Could you share the jobmanager logs so that i could check the
>>>>> classpath and hadoop version?
>>>>>
>>>>> Best,
>>>>> Yang
>>>>>
>>>>> aj  于2020年4月28日周二 上午1:01写道:
>>>>>
>>>>>> Hello Yang,
>>>>>> My Hadoop version is Hadoop 3.2.1-amzn-0
>>>>>> and I have put in flink/lib.
>>>>>>  flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
>>>>>>
>>>>>> then I am getting below error :
>>>>>>
>>>>>> SLF4J: Class path contains multiple SLF4J bindings.
>>>>>> SLF4J: Found binding in
>>>>>> [jar:file:/mnt/yarn/usercache/hadoop/appcache/application_1587983834922_0002/filecache/10/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>>>>> SLF4J: Found binding in
>>>>>> [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>>>>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>>>>>> explanation.
>>>>>> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
>>>>>> Exception in thread "main" java.lang.IllegalArgumentException:
>>>>>> Invalid rule: /L
>>>>>>   RULE:[2:$1@$0](.*@)s/@.*///L
>>>>>>   DEFAULT
>>>>>> at
>>>>>> org.apache.hadoop.security.authentication.util.KerberosName.parseRules(KerberosName.java:321)
>>>>>> at
>>>>>> org.apache.hadoop.security.authentication.util.KerberosName.setRules(KerberosName.java:386)
>>>>>> at
>>>>>> org.apache.hadoop.security.HadoopKerberosName.setConfiguration(HadoopKerberosName.java:75)
>>>>>> at
>>>>>> org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:247)
>>>>>> at
>>>>>> org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:232)
>>>>&

Re: upgrade flink from 1.9.1 to 1.10.0 on EMR

2020-05-09 Thread aj
Hello Yang,

I have attached my pom file and I did not see that I am using any Hadoop
dependency. Can you please help me.

On Wed, May 6, 2020 at 1:22 PM Yang Wang  wrote:

> Hi aj,
>
> From the logs you have provided, the hadoop version is still 2.4.1.
> Could you check the user jar(i.e. events-processor-1.0-SNAPSHOT.jar) have
> some
> hadoop classes? If it is, you need to exclude the hadoop dependency.
>
>
> Best,
> Yang
>
> aj  于2020年5月6日周三 下午3:38写道:
>
>> Hello,
>>
>> Please help me upgrade to 1.10 in AWS EMR.
>>
>> On Fri, May 1, 2020 at 4:05 PM aj  wrote:
>>
>>> Hi Yang,
>>>
>>> I am attaching the logs for your reference, please help me what i am
>>> doing wrong.
>>>
>>> Thanks,
>>> Anuj
>>>
>>> On Wed, Apr 29, 2020 at 9:06 AM Yang Wang  wrote:
>>>
>>>> Hi Anuj,
>>>>
>>>> I think the exception you come across still because the hadoop version
>>>> is 2.4.1. I have checked the hadoop code, the code line are exactly
>>>> same.
>>>> For 2.8.1, i also have checked the ruleParse. It could work.
>>>>
>>>> /**
>>>>  * A pattern for parsing a auth_to_local rule.
>>>>  */
>>>> private static final Pattern ruleParser =
>>>>   
>>>> Pattern.compile("\\s*((DEFAULT)|(RULE:\\[(\\d*):([^\\]]*)](\\(([^)]*)\\))?"+
>>>>   "(s/([^/]*)/([^/]*)/(g)?)?))/?(L)?");
>>>>
>>>>
>>>> Could you share the jobmanager logs so that i could check the classpath
>>>> and hadoop version?
>>>>
>>>> Best,
>>>> Yang
>>>>
>>>> aj  于2020年4月28日周二 上午1:01写道:
>>>>
>>>>> Hello Yang,
>>>>> My Hadoop version is Hadoop 3.2.1-amzn-0
>>>>> and I have put in flink/lib.
>>>>>  flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
>>>>>
>>>>> then I am getting below error :
>>>>>
>>>>> SLF4J: Class path contains multiple SLF4J bindings.
>>>>> SLF4J: Found binding in
>>>>> [jar:file:/mnt/yarn/usercache/hadoop/appcache/application_1587983834922_0002/filecache/10/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>>>> SLF4J: Found binding in
>>>>> [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>>>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>>>>> explanation.
>>>>> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
>>>>> Exception in thread "main" java.lang.IllegalArgumentException: Invalid
>>>>> rule: /L
>>>>>   RULE:[2:$1@$0](.*@)s/@.*///L
>>>>>   DEFAULT
>>>>> at
>>>>> org.apache.hadoop.security.authentication.util.KerberosName.parseRules(KerberosName.java:321)
>>>>> at
>>>>> org.apache.hadoop.security.authentication.util.KerberosName.setRules(KerberosName.java:386)
>>>>> at
>>>>> org.apache.hadoop.security.HadoopKerberosName.setConfiguration(HadoopKerberosName.java:75)
>>>>> at
>>>>> org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:247)
>>>>> at
>>>>> org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:232)
>>>>> at
>>>>> org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:718)
>>>>> at
>>>>> org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:703)
>>>>> at
>>>>> org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:605)
>>>>> at
>>>>> org.apache.flink.yarn.entrypoint.YarnEntrypointUtils.logYarnEnvironmentInformation(YarnEntrypointUtils.java:136)
>>>>> at
>>>>> org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:109)
>>>>>
>>>>>
>>>>> if I remove the  flink-shaded-hadoop-2-uber-2.8.3-10.0.jar  from lib
>>>>> then i get below error:
>>>>>
>>>>> 2020-04-27 16:59:37,293 INFO  org.apache.flink.client.cli.CliFrontend
>>>>>   -  Classpath:
>>>>>

Re: upgrade flink from 1.9.1 to 1.10.0 on EMR

2020-05-06 Thread aj
Hello,

Please help me upgrade to 1.10 in AWS EMR.

On Fri, May 1, 2020 at 4:05 PM aj  wrote:

> Hi Yang,
>
> I am attaching the logs for your reference, please help me what i am doing
> wrong.
>
> Thanks,
> Anuj
>
> On Wed, Apr 29, 2020 at 9:06 AM Yang Wang  wrote:
>
>> Hi Anuj,
>>
>> I think the exception you come across still because the hadoop version
>> is 2.4.1. I have checked the hadoop code, the code line are exactly same.
>> For 2.8.1, i also have checked the ruleParse. It could work.
>>
>> /**
>>  * A pattern for parsing a auth_to_local rule.
>>  */
>> private static final Pattern ruleParser =
>>   
>> Pattern.compile("\\s*((DEFAULT)|(RULE:\\[(\\d*):([^\\]]*)](\\(([^)]*)\\))?"+
>>   "(s/([^/]*)/([^/]*)/(g)?)?))/?(L)?");
>>
>>
>> Could you share the jobmanager logs so that i could check the classpath
>> and hadoop version?
>>
>> Best,
>> Yang
>>
>> aj  于2020年4月28日周二 上午1:01写道:
>>
>>> Hello Yang,
>>> My Hadoop version is Hadoop 3.2.1-amzn-0
>>> and I have put in flink/lib.   flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
>>>
>>> then I am getting below error :
>>>
>>> SLF4J: Class path contains multiple SLF4J bindings.
>>> SLF4J: Found binding in
>>> [jar:file:/mnt/yarn/usercache/hadoop/appcache/application_1587983834922_0002/filecache/10/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>> SLF4J: Found binding in
>>> [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>>> explanation.
>>> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
>>> Exception in thread "main" java.lang.IllegalArgumentException: Invalid
>>> rule: /L
>>>   RULE:[2:$1@$0](.*@)s/@.*///L
>>>   DEFAULT
>>> at
>>> org.apache.hadoop.security.authentication.util.KerberosName.parseRules(KerberosName.java:321)
>>> at
>>> org.apache.hadoop.security.authentication.util.KerberosName.setRules(KerberosName.java:386)
>>> at
>>> org.apache.hadoop.security.HadoopKerberosName.setConfiguration(HadoopKerberosName.java:75)
>>> at
>>> org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:247)
>>> at
>>> org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:232)
>>> at
>>> org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:718)
>>> at
>>> org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:703)
>>> at
>>> org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:605)
>>> at
>>> org.apache.flink.yarn.entrypoint.YarnEntrypointUtils.logYarnEnvironmentInformation(YarnEntrypointUtils.java:136)
>>> at
>>> org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:109)
>>>
>>>
>>> if I remove the  flink-shaded-hadoop-2-uber-2.8.3-10.0.jar  from lib
>>> then i get below error:
>>>
>>> 2020-04-27 16:59:37,293 INFO  org.apache.flink.client.cli.CliFrontend
>>> -  Classpath:
>>> /usr/lib/flink/lib/flink-table-blink_2.11-1.10.0.jar:/usr/lib/flink/lib/flink-table_2.11-1.10.0.jar:/usr/lib/flink/lib/log4j-1.2.17.jar:/usr/lib/flink/lib/slf4j-log4j12-1.7.15.jar:/usr/lib/flink/lib/flink-dist_2.11-1.10.0.jar::/etc/hadoop/conf:/etc/hadoop/conf
>>> 2020-04-27 16:59:37,293 INFO  org.apache.flink.client.cli.CliFrontend
>>> -
>>> 
>>> 2020-04-27 16:59:37,300 INFO
>>>  org.apache.flink.configuration.GlobalConfiguration- Loading
>>> configuration property: jobmanager.heap.size, 1024m
>>> 2020-04-27 16:59:37,300 INFO
>>>  org.apache.flink.configuration.GlobalConfiguration- Loading
>>> configuration property: taskmanager.memory.process.size, 1568m
>>> 2020-04-27 16:59:37,300 INFO
>>>  org.apache.flink.configuration.GlobalConfiguration- Loading
>>> configuration property: taskmanager.numberOfTaskSlots, 1
>>> 2020-04-27 16:59:37,300 INFO
>>>  org.apache.flink.configuration.GlobalConfiguration- Loading
>>> configuratio

Re: upgrade flink from 1.9.1 to 1.10.0 on EMR

2020-05-01 Thread aj
Hi Yang,

I am attaching the logs for your reference, please help me what i am doing
wrong.

Thanks,
Anuj

On Wed, Apr 29, 2020 at 9:06 AM Yang Wang  wrote:

> Hi Anuj,
>
> I think the exception you come across still because the hadoop version
> is 2.4.1. I have checked the hadoop code, the code line are exactly same.
> For 2.8.1, i also have checked the ruleParse. It could work.
>
> /**
>  * A pattern for parsing a auth_to_local rule.
>  */
> private static final Pattern ruleParser =
>   Pattern.compile("\\s*((DEFAULT)|(RULE:\\[(\\d*):([^\\]]*)](\\(([^)]*)\\))?"+
>   "(s/([^/]*)/([^/]*)/(g)?)?))/?(L)?");
>
>
> Could you share the jobmanager logs so that i could check the classpath
> and hadoop version?
>
> Best,
> Yang
>
> aj  于2020年4月28日周二 上午1:01写道:
>
>> Hello Yang,
>> My Hadoop version is Hadoop 3.2.1-amzn-0
>> and I have put in flink/lib.   flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
>>
>> then I am getting below error :
>>
>> SLF4J: Class path contains multiple SLF4J bindings.
>> SLF4J: Found binding in
>> [jar:file:/mnt/yarn/usercache/hadoop/appcache/application_1587983834922_0002/filecache/10/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> SLF4J: Found binding in
>> [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>> explanation.
>> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
>> Exception in thread "main" java.lang.IllegalArgumentException: Invalid
>> rule: /L
>>   RULE:[2:$1@$0](.*@)s/@.*///L
>>   DEFAULT
>> at
>> org.apache.hadoop.security.authentication.util.KerberosName.parseRules(KerberosName.java:321)
>> at
>> org.apache.hadoop.security.authentication.util.KerberosName.setRules(KerberosName.java:386)
>> at
>> org.apache.hadoop.security.HadoopKerberosName.setConfiguration(HadoopKerberosName.java:75)
>> at
>> org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:247)
>> at
>> org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:232)
>> at
>> org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:718)
>> at
>> org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:703)
>> at
>> org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:605)
>> at
>> org.apache.flink.yarn.entrypoint.YarnEntrypointUtils.logYarnEnvironmentInformation(YarnEntrypointUtils.java:136)
>> at
>> org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:109)
>>
>>
>> if I remove the  flink-shaded-hadoop-2-uber-2.8.3-10.0.jar  from lib
>> then i get below error:
>>
>> 2020-04-27 16:59:37,293 INFO  org.apache.flink.client.cli.CliFrontend
>>   -  Classpath:
>> /usr/lib/flink/lib/flink-table-blink_2.11-1.10.0.jar:/usr/lib/flink/lib/flink-table_2.11-1.10.0.jar:/usr/lib/flink/lib/log4j-1.2.17.jar:/usr/lib/flink/lib/slf4j-log4j12-1.7.15.jar:/usr/lib/flink/lib/flink-dist_2.11-1.10.0.jar::/etc/hadoop/conf:/etc/hadoop/conf
>> 2020-04-27 16:59:37,293 INFO  org.apache.flink.client.cli.CliFrontend
>>   -
>> 
>> 2020-04-27 16:59:37,300 INFO
>>  org.apache.flink.configuration.GlobalConfiguration- Loading
>> configuration property: jobmanager.heap.size, 1024m
>> 2020-04-27 16:59:37,300 INFO
>>  org.apache.flink.configuration.GlobalConfiguration- Loading
>> configuration property: taskmanager.memory.process.size, 1568m
>> 2020-04-27 16:59:37,300 INFO
>>  org.apache.flink.configuration.GlobalConfiguration- Loading
>> configuration property: taskmanager.numberOfTaskSlots, 1
>> 2020-04-27 16:59:37,300 INFO
>>  org.apache.flink.configuration.GlobalConfiguration- Loading
>> configuration property: parallelism.default, 1
>> 2020-04-27 16:59:37,300 INFO
>>  org.apache.flink.configuration.GlobalConfiguration- Loading
>> configuration property: env.yarn.conf.dir, /etc/hadoop/conf
>> 2020-04-27 16:59:37,300 INFO
>>  org.apache.flink.configuration.GlobalConfiguration- Loading
>> configuration property: env.hadoop.conf.dir, /etc/hadoop/conf
>> 2020-04-27 16:59:37,301 INFO
>>  org.apache.fl

Re: upgrade flink from 1.9.1 to 1.10.0 on EMR

2020-04-27 Thread aj
.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
... 5 more
2020-04-27 16:59:37,406 INFO  org.apache.flink.core.fs.FileSystem
- Hadoop is not in the classpath/dependencies. The extended
set of supported File Systems via Hadoop is not available.
2020-04-27 16:59:37,458 INFO
 org.apache.flink.runtime.security.modules.HadoopModuleFactory  - Cannot
create Hadoop Security Module because Hadoop cannot be found in the
Classpath.
2020-04-27 16:59:37,476 INFO
 org.apache.flink.runtime.security.modules.JaasModule  - Jaas file
will be created as /tmp/jaas-7054453135321774613.conf.
2020-04-27 16:59:37,480 INFO
 org.apache.flink.runtime.security.SecurityUtils   - Cannot
install HadoopSecurityContext because Hadoop cannot be found in the
Classpath.
2020-04-27 16:59:37,481 INFO  org.apache.flink.client.cli.CliFrontend
- Running 'run' command.
2020-04-27 16:59:37,488 INFO  org.apache.flink.client.cli.CliFrontend
- Building program from JAR file
2020-04-27 16:59:37,488 ERROR org.apache.flink.client.cli.CliFrontend
- Invalid command line arguments.
org.apache.flink.client.cli.CliArgsException: Could not build the program
from JAR file.
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:203)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
at
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
Caused by: java.io.FileNotFoundException: JAR file does not exist: -ynm
at
org.apache.flink.client.cli.CliFrontend.getJarFile(CliFrontend.java:719)
at
org.apache.flink.client.cli.CliFrontend.buildProgram(CliFrontend.java:695)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:200)
... 4 more


Can you please help.

Thanks,
Anuj


On Mon, Apr 13, 2020 at 7:43 AM Yang Wang  wrote:

> Hi Anuj,
>
> It seems that you are using hadoop version 2.4.1. I think "L" could not be
> supported in
> this version. Could you upgrade your hadoop version to 2.8 and have a try?
> If your
> YARN cluster version is 2.8+, then you could directly remove the
> flink-shaded-hadoop
> in your lib directory. Otherwise, you need to download the
> flink-shaded-hadoop with
> version 2.8 here[1].
>
>
> [1]. https://flink.apache.org/downloads.html#additional-components
>
> Best,
> Yang
>
> aj  于2020年4月11日周六 上午4:21写道:
>
>> Hi Robert,
>> attached the full application log file.
>>
>> Thanks,
>> Anuj
>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>
>

-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07
<http://www.oracle.com/>


<http://www.cse.iitm.ac.in/%7Eanujjain/>


Re: upgrade flink from 1.9.1 to 1.10.0 on EMR

2020-04-10 Thread aj
Hi Robert,
attached the full application log file.

Thanks,
Anuj

Container: container_1585301409143_0044_01_01 on 
ip-172-25-2-209.ap-south-1.compute.internal_8041
===
LogType:jobmanager.err
Log Upload Time:Sun Apr 05 19:52:27 + 2020
LogLength:1634
Log Contents:
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in 
[jar:file:/mnt/yarn/usercache/hadoop/appcache/application_1585301409143_0044/filecache/10/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in 
[jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Exception in thread "main" java.lang.IllegalArgumentException: Invalid rule: /L
  RULE:[2:$1@$0](.*@)s/@.*///L
  DEFAULT
at 
org.apache.hadoop.security.authentication.util.KerberosName.parseRules(KerberosName.java:321)
at 
org.apache.hadoop.security.authentication.util.KerberosName.setRules(KerberosName.java:386)
at 
org.apache.hadoop.security.HadoopKerberosName.setConfiguration(HadoopKerberosName.java:75)
at 
org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:247)
at 
org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:232)
at 
org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:718)
at 
org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:703)
at 
org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:605)
at 
org.apache.flink.yarn.entrypoint.YarnEntrypointUtils.logYarnEnvironmentInformation(YarnEntrypointUtils.java:136)
at 
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:109)
End of LogType:jobmanager.err


LogType:jobmanager.log
Log Upload Time:Sun Apr 05 19:52:27 + 2020
LogLength:35952
Log Contents:
2020-04-05 19:52:26,459 INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint - 

2020-04-05 19:52:26,461 INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  Starting 
YarnJobClusterEntrypoint (Version: , Rev:aa4eb8f, Date:07.02.2020 @ 
19:18:19 CET)
2020-04-05 19:52:26,461 INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  OS current 
user: yarn
2020-04-05 19:52:26,953 WARN  
org.apache.flink.runtime.util.EnvironmentInformation  - Error while 
accessing user/group information via Hadoop utils.
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.runtime.util.EnvironmentInformation.getHadoopUser(EnvironmentInformation.java:96)
at 
org.apache.flink.runtime.util.EnvironmentInformation.logEnvironmentInfo(EnvironmentInformation.java:293)
at 
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:96)
Caused by: java.lang.IllegalArgumentException: Invalid rule: /L
  RULE:[2:$1@$0](.*@)s/@.*///L
  DEFAULT
at 
org.apache.hadoop.security.authentication.util.KerberosName.parseRules(KerberosName.java:321)
at 
org.apache.hadoop.security.authentication.util.KerberosName.setRules(KerberosName.java:386)
at 
org.apache.hadoop.security.HadoopKerberosName.setConfiguration(HadoopKerberosName.java:75)
at 
org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:247)
at 
org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:232)
at 
org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:718)
at 
org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:703)
at 
org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:605)
... 7 more
2020-04-05 19:52:26,956 INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  Current 
Hadoop/Kerberos user: 
2020-04-05 19:52:26,956 INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  JVM: OpenJDK 
64-Bit Server VM - Oracle Corporation - 1.8/25.242-b08
2020-04-05 19:52:26,956 INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  Maximum heap 
size: 406 

Re: flink 1.9 conflict jackson version

2020-04-07 Thread aj
Hi Fanbin,

I am facing a similar kind of issue. Let me know if you are able to
resolve this issue then please help me also

https://stackoverflow.com/questions/61012350/flink-reading-a-s3-file-causing-jackson-dependency-issue



On Tue, Dec 17, 2019 at 7:50 AM ouywl  wrote:

> Hi Bu
>I think It can use mvn-shade-plugin to resolve your problem,  It seem
> flink-client conflict with your owner jar?
>
> ouywl
> ou...@139.com
>
> 
> 签名由 网易邮箱大师  定制
>
> On 12/17/2019 08:10,Fanbin Bu
>  wrote:
>
> Hi,
>
> After I upgrade flink 1.9, I got the following error message on EMR, it
> works locally on IntelliJ.
>
> I'm explicitly declaring the dependency as
> implementation
> 'com.fasterxml.jackson.module:jackson-module-scala_2.11:2.10.1'
> and I have
> implementation group: 'com.amazonaws', name: 'aws-java-sdk-emr', version:
> '1.11.595'
>
>
>
> java.lang.NoSuchMethodError: 
> com.fasterxml.jackson.databind.ObjectMapper.enable([Lcom/fasterxml/jackson/core/JsonParser$Feature;)Lcom/fasterxml/jackson/databind/ObjectMapper;
>   at 
> com.amazonaws.partitions.PartitionsLoader.(PartitionsLoader.java:54)
>   at 
> com.amazonaws.regions.RegionMetadataFactory.create(RegionMetadataFactory.java:30)
>   at com.amazonaws.regions.RegionUtils.initialize(RegionUtils.java:65)
>   at 
> com.amazonaws.regions.RegionUtils.getRegionMetadata(RegionUtils.java:53)
>   at com.amazonaws.regions.RegionUtils.getRegion(RegionUtils.java:107)
>   at 
> com.amazonaws.client.builder.AwsClientBuilder.getRegionObject(AwsClientBuilder.java:256)
>   at 
> com.amazonaws.client.builder.AwsClientBuilder.setRegion(AwsClientBuilder.java:460)
>   at 
> com.amazonaws.client.builder.AwsClientBuilder.configureMutableProperties(AwsClientBuilder.java:424)
>   at 
> com.amazonaws.client.builder.AwsAsyncClientBuilder.build(AwsAsyncClientBuilder.java:80)
>   at 
> com.coinbase.util.KmsClient$.getSnowflakeUsernamePassword(KmsClient.scala:21)
>   at com.coinbase.ml.RunFlinkJob$.runBatch(RunFlinkJob.scala:94)
>   at com.coinbase.ml.RunFlinkJob$.runFlinkJob(RunFlinkJob.scala:38)
>   at 
> com.coinbase.ml.FlinkFeatureProcessingJobEntryPoint$.main(CmdLineParser.scala:76)
>   at 
> com.coinbase.ml.FlinkFeatureProcessingJobEntryPoint.main(CmdLineParser.scala)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
>   at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
>   at 
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
>   at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
>   at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
>   at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
>   at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>   at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
>
>

-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07






upgrade flink from 1.9.1 to 1.10.0 on EMR

2020-04-07 Thread aj
Hello All,

I am running Flink on AWS EMR, as currently the latest version available on
EMR is 1.9.1 but I want to upgrade to 1.10.0. I tried to manually replace
lib jars by downloading the 1.10.0 version but this is not working. I am
getting the following exception when trying to submit a job on yarn.

SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Exception in thread "main" java.lang.IllegalArgumentException: Invalid
rule: /L
  RULE:[2:$1@$0](.*@)s/@.*///L
  DEFAULT
at
org.apache.hadoop.security.authentication.util.KerberosName.parseRules(KerberosName.java:321)
at
org.apache.hadoop.security.authentication.util.KerberosName.setRules(KerberosName.java:386)
at
org.apache.hadoop.security.HadoopKerberosName.setConfiguration(HadoopKerberosName.java:75)
at
org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:247)
at
org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:232)
at
org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:718)
at
org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:703)
at
org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:605)
at
org.apache.flink.yarn.entrypoint.YarnEntrypointUtils.logYarnEnvironmentInformation(YarnEntrypointUtils.java:136)
at
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:109)
End of LogType:jobmanager.err


Please help to understand this error and how to resolve this.


-- 
Thanks & Regards,
Anuj Jain






Flink jackson conflict issue with aws-sdk dependency

2020-04-04 Thread aj
Hello,

Please help me resolve this issue
https://stackoverflow.com/questions/61012350/flink-reading-a-s3-file-causing-jackson-dependency-issue

-- 
Thanks & Regards,
Anuj Jain





Re: Help me understand this Exception

2020-03-18 Thread aj
Thanks, Zhijiang and Gordon.

I will see the logs to find out more.

On Wed, Mar 18, 2020 at 1:44 PM Zhijiang  wrote:

> Agree with Gordon's below explanation!
>
> Besides that, maybe you can also check the job master's log which might
> probably show the specific exception to cause this failure.
>
> I was thinking whether it is necessary to improve
> ExceptionInChainedOperatorException to also provide the message from the
> wrapped real exception,
> then users can easily get the root cause directly, not only for the
> current message "Could not forward element to next operator".
>
> Best,
> Zhijiang
>
> --
> From:Tzu-Li (Gordon) Tai 
> Send Time:2020 Mar. 18 (Wed.) 00:01
> To:aj 
> Cc:user 
> Subject:Re: Help me understand this Exception
>
> Hi,
>
> The exception stack you posted simply means that the next operator in the
> chain failed to process the output watermark.
> There should be another exception, which would explain why some operator
> was closed / failed and eventually leading to the above exception.
> That would provide more insight to exactly why your job is failing.
>
> Cheers,
> Gordon
>
> On Tue, Mar 17, 2020 at 11:27 PM aj  wrote:
> Hi,
> I am running a streaming job with generating watermark like this :
>
> public static class SessionAssigner implements 
> AssignerWithPunctuatedWatermarks {
> @Override
> public long extractTimestamp(GenericRecord record, long 
> previousElementTimestamp) {
> long timestamp = (long) record.get("event_ts");
> LOGGER.info("timestamp", timestamp);
> return timestamp;
> }
>
> @Override
> public Watermark checkAndGetNextWatermark(GenericRecord record, long 
> extractedTimestamp) {
> // simply emit a watermark with every event
> LOGGER.info("extractedTimestamp ", extractedTimestamp);
> return new Watermark(extractedTimestamp);
> }
> }
>
> Please help me understand what this exception means:
>
> java.lang.RuntimeException: Exception occurred while processing valve
> output watermark:
> at org.apache.flink.streaming.runtime.io.
> StreamOneInputProcessor$ForwardingValveOutputHandler.handleWatermark(
> StreamOneInputProcessor.java:216)
> at org.apache.flink.streaming.runtime.streamstatus.
> StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(
> StatusWatermarkValve.java:189)
> at org.apache.flink.streaming.runtime.streamstatus.
> StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
> .processElement(StreamOneInputProcessor.java:169)
> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
> .processInput(StreamOneInputProcessor.java:143)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
> StreamTask.java:279)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask
> .java:301)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:406)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.streaming.runtime.tasks.
> ExceptionInChainedOperatorException: Could not forward element to next
> operator
> at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
> at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
> at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
> at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:707)
> at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:660)
> at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
> 727)
> at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
> 705)
> at org.apache.flink.streaming.api.operators.TimestampedCollector.
> collect(TimestampedCollector.java:51)
> at com.bounce.test.SessionProcessor$2.process(SessionProcessor.java:
> 137)
> at com.bounce.test.SessionProcessor$2.process(SessionProcessor.java:
> 116)
> at org.apache.flink.streaming.runtime.operato

Help me understand this Exception

2020-03-17 Thread aj
Hi,
I am running a streaming job with generating watermark like this :

public static class SessionAssigner implements
AssignerWithPunctuatedWatermarks {
@Override
public long extractTimestamp(GenericRecord record, long
previousElementTimestamp) {
long timestamp = (long) record.get("event_ts");
LOGGER.info("timestamp", timestamp);
return timestamp;
}

@Override
public Watermark checkAndGetNextWatermark(GenericRecord record,
long extractedTimestamp) {
// simply emit a watermark with every event
LOGGER.info("extractedTimestamp ", extractedTimestamp);
return new Watermark(extractedTimestamp);
}
}

Please help me understand what this exception means:

java.lang.RuntimeException: Exception occurred while processing valve
output watermark:
at org.apache.flink.streaming.runtime.io.
StreamOneInputProcessor$ForwardingValveOutputHandler.handleWatermark(
StreamOneInputProcessor.java:216)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve
.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve
.java:189)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve
.inputWatermark(StatusWatermarkValve.java:111)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
.processElement(StreamOneInputProcessor.java:169)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
.processInput(StreamOneInputProcessor.java:143)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
StreamTask.java:279)
at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask
.java:301)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:406)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.runtime.tasks.
ExceptionInChainedOperatorException: Could not forward element to next
operator
at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
at org.apache.flink.streaming.runtime.tasks.
OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:707)
at org.apache.flink.streaming.runtime.tasks.
OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:660)
at org.apache.flink.streaming.api.operators.
AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
727)
at org.apache.flink.streaming.api.operators.
AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
705)
at org.apache.flink.streaming.api.operators.TimestampedCollector.collect
(TimestampedCollector.java:51)
at com.bounce.test.SessionProcessor$2.process(SessionProcessor.java:137)
at com.bounce.test.SessionProcessor$2.process(SessionProcessor.java:116)
at org.apache.flink.streaming.runtime.operators.windowing.functions.
InternalIterableProcessWindowFunction.process(
InternalIterableProcessWindowFunction.java:50)
at org.apache.flink.streaming.runtime.operators.windowing.functions.
InternalIterableProcessWindowFunction.process(
InternalIterableProcessWindowFunction.java:32)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator
.emitWindowContents(WindowOperator.java:549)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator
.onEventTime(WindowOperator.java:457)
at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl
.advanceWatermark(InternalTimerServiceImpl.java:276)
at org.apache.flink.streaming.api.operators.InternalTimeServiceManager
.advanceWatermark(InternalTimeServiceManager.java:128)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator
.processWatermark(AbstractStreamOperator.java:784)
at org.apache.flink.streaming.runtime.io.
StreamOneInputProcessor$ForwardingValveOutputHandler.handleWatermark(
StreamOneInputProcessor.java:213)
... 10 more

-- 
Thanks & Regards,
Anuj Jain






Re: Flink Session Window to enrich Event with unique id

2020-03-07 Thread aj
Please help me to implement the above logic.

On Mon, Mar 2, 2020 at 4:47 PM aj  wrote:

> Hi,
> Is using the session window to implement the above logic is good idea or i
> should use process function.
>
> On Sun, Mar 1, 2020 at 11:39 AM aj  wrote:
>
>> Hi ,
>>
>> I am working on a use case where i have a stream of events. I want to
>> attach a unique id to all the events happened in a session.
>> Below is the logis that i am trying to implement. -
>>
>> 1. session_started
>> 2 whenevr a event_name=search generate a unique search_id and attch this
>> id to all the following events in session until a new "search" event
>> encountered in session.
>>
>> Example :
>> *user-1.  session-1   event_name- search (generate searchid --1)*
>> user-1.  session-1   event_name- x  (attach above search id -1)
>> user-1.  session-1   event_name- y (attach above search id -1)
>> user-1.  session-1   event_name- y (attach above search id -1)
>> *user-1.  session-1   event_name- search (generate searchid --2)*
>> user-1.  session-1   event_name- x  (attach above search id -2)
>> user-1.  session-1   event_name- y (attach above search id -2)
>> user-1.  session-1   event_name- y (attach above search id -2)
>>
>> As events can come out of order so i want to do this after session window
>> got over. So after session window i am doing like this :
>>
>> 1. sort all the events by time.
>> 2. iterate ech event and attach the search_id
>> 3. collect all th events and generate another stream with enrich
>> search_id.
>>
>> I am trying with below code but its not working as expected . i am not
>> able to understand what is happening.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *dataStream.keyBy((KeySelector) record -> {
>> StringBuilder builder = new StringBuilder();
>> builder.append(record.get("session_id"));
>> builder.append(record.get("user_id"));return
>> builder.toString();
>> }).window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
>>   .process(new ProcessWindowFunction> String, TimeWindow>() {@Override
>> public void process(String key, Context context,
>> Iterable iterable, Collector collector)
>> throws Exception {Stream result
>> = IterableUtils.toStream(iterable);
>> List s = result.collect(Collectors.toList());
>> Map recordMap = new HashMap<>();
>> for(GenericRecord record : s) {
>> recordMap.put((long)record.get("event_ts"),record);
>> }Map
>> sortedRecordMap = new LinkedHashMap<>();
>> recordMap.entrySet().stream()
>> .sorted(Map.Entry.comparingByKey())
>> .forEachOrdered(x -> sortedRecordMap.put(x.getKey(), x.getValue()));
>> String search_id = null;
>> for(Map.Entry element :sortedRecordMap.entrySet()) {
>> GenericRecord record = element.getValue();
>>   if(record.get("event_name").equals("search")) {
>>   search_id =
>> UUID.randomUUID().toString();}
>>   record.put("search_id",search_id);
>> collector.collect(record);}
>> }}).print();*
>>
>>
>> --
>> Thanks & Regards,
>> Anuj Jain
>>
>>
>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>
>
>
> --
> Thanks & Regards,
> Anuj Jain
> Mob. : +91- 8588817877
> Skype : anuj.jain07
> <http://www.oracle.com/>
>
>
> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>


-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07
<http://www.oracle.com/>


<http://www.cse.iitm.ac.in/%7Eanujjain/>


Re: Flink EventTime Processing Watermark is always coming as 9223372036854725808

2020-03-03 Thread aj
Thanks, Robert for mentioning this, I will take care of it in future posts.

I am able to figure out the issue. When I disable checkpoint then the
watermark is getting updated and its working. I need to understand 2 things
:

1. Please help to understand what is happening when I enable checkpointing,
and how to make it work with enable checkpointing as I need to write a data
stream with checkpoint enable.

2. Second, so basically I want to collect all the session data and want to
process all the events data at the end of the session (using inactivity for
x minutes).
I know this functionality is available in the session window where I can
create a session window using an inactive period But there enrichment and
processing of events is not recommended. So, how I can use the same
functionality to trigger based on the inactivity period and process all the
events and clear the queue.


Thanks,
Anuj


On Tue, Mar 3, 2020 at 3:40 AM Robert Metzger  wrote:

> side note: this question has been asked on SO as well:
> https://stackoverflow.com/questions/60487571/flink-eventtime-processing-watermark-is-always-coming-as-9223372036854725808
> (I'm mentioning this here so that we are not wasting support resources in
> our community on double-debugging issues)
>
> On Mon, Mar 2, 2020 at 5:36 PM aj  wrote:
>
>> Hi David,
>>
>> Currently, I am testing it with a single source and parallelism 1 only so
>> not able to understand this behavior.
>>
>> On Mon, Mar 2, 2020 at 9:02 PM Dawid Wysakowicz 
>> wrote:
>>
>>> Hi Anuj,
>>>
>>> What parallelism has your source? Do all of your source tasks produce
>>> records? Watermark is always the minimum of timestamps seen from all the
>>> upstream operators. Therefore if some of them do not produce records the
>>> watermark will not progress. You can read more about Watermarks and how
>>> they work here:
>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/event_time.html#watermarks-in-parallel-streams
>>>
>>> Hope that helps
>>>
>>> Best,
>>>
>>> Dawid
>>> On 02/03/2020 16:26, aj wrote:
>>>
>>> I am trying to use process function to some processing on a set of
>>> events. I am using event time and keystream. The issue I am facing is The
>>> watermark value is always coming as 9223372036854725808. I have put print
>>> statement to debug and it shows like this:
>>>
>>> timestamp--1583128014000 extractedTimestamp 1583128014000
>>> currentwatermark-9223372036854775808
>>>
>>> timestamp--1583128048000 extractedTimestamp 1583128048000
>>> currentwatermark-9223372036854775808
>>>
>>> timestamp--1583128089000 extractedTimestamp 1583128089000
>>> currentwatermark-9223372036854775808
>>>
>>> timestamp and extracted timestamp changing but watermark not getting
>>> updated. So no record is getting in the queue as context.timestamp is never
>>> less than the watermark.
>>>
>>>
>>> DataStream dataStream = 
>>> env.addSource(searchConsumer).name("search_list_keyless");
>>> DataStream dataStreamWithWaterMark =  
>>> dataStream.assignTimestampsAndWatermarks(new SessionAssigner());
>>>
>>>try {
>>> dataStreamWithWaterMark.keyBy((KeySelector>> String>) record -> {
>>> StringBuilder builder = new StringBuilder();
>>> builder.append(record.get("session_id"));
>>> builder.append(record.get("user_id"));
>>> return builder.toString();
>>> }).process(new MatchFunction()).print();
>>> }
>>> catch (Exception e){
>>> e.printStackTrace();
>>> }
>>> env.execute("start session process");
>>>
>>> }
>>>
>>> public static class SessionAssigner implements 
>>> AssignerWithPunctuatedWatermarks  {
>>> @Override
>>> public long extractTimestamp(GenericRecord record, long 
>>> previousElementTimestamp) {
>>> long timestamp = (long) record.get("event_ts");
>>> System.out.println("timestamp--"+ timestamp);
>>> return timestamp;
>>> }
>>>
>>> @Override
>>> public Watermark checkAndGetNextWatermark(GenericRecord record, 
>>> long extractedTimestamp) {
>>> // simply emit a watermark with every event

Re: Flink EventTime Processing Watermark is always coming as 9223372036854725808

2020-03-02 Thread aj
Hi David,

Currently, I am testing it with a single source and parallelism 1 only so
not able to understand this behavior.

On Mon, Mar 2, 2020 at 9:02 PM Dawid Wysakowicz 
wrote:

> Hi Anuj,
>
> What parallelism has your source? Do all of your source tasks produce
> records? Watermark is always the minimum of timestamps seen from all the
> upstream operators. Therefore if some of them do not produce records the
> watermark will not progress. You can read more about Watermarks and how
> they work here:
> https://ci.apache.org/projects/flink/flink-docs-master/dev/event_time.html#watermarks-in-parallel-streams
>
> Hope that helps
>
> Best,
>
> Dawid
> On 02/03/2020 16:26, aj wrote:
>
> I am trying to use process function to some processing on a set of events.
> I am using event time and keystream. The issue I am facing is The watermark
> value is always coming as 9223372036854725808. I have put print statement
> to debug and it shows like this:
>
> timestamp--1583128014000 extractedTimestamp 1583128014000
> currentwatermark-9223372036854775808
>
> timestamp--1583128048000 extractedTimestamp 1583128048000
> currentwatermark-9223372036854775808
>
> timestamp--1583128089000 extractedTimestamp 1583128089000
> currentwatermark-9223372036854775808
>
> timestamp and extracted timestamp changing but watermark not getting
> updated. So no record is getting in the queue as context.timestamp is never
> less than the watermark.
>
>
> DataStream dataStream = 
> env.addSource(searchConsumer).name("search_list_keyless");
> DataStream dataStreamWithWaterMark =  
> dataStream.assignTimestampsAndWatermarks(new SessionAssigner());
>
>try {
> dataStreamWithWaterMark.keyBy((KeySelector String>) record -> {
> StringBuilder builder = new StringBuilder();
> builder.append(record.get("session_id"));
> builder.append(record.get("user_id"));
> return builder.toString();
> }).process(new MatchFunction()).print();
> }
> catch (Exception e){
> e.printStackTrace();
> }
> env.execute("start session process");
>
> }
>
> public static class SessionAssigner implements 
> AssignerWithPunctuatedWatermarks  {
> @Override
> public long extractTimestamp(GenericRecord record, long 
> previousElementTimestamp) {
> long timestamp = (long) record.get("event_ts");
> System.out.println("timestamp--"+ timestamp);
> return timestamp;
> }
>
> @Override
> public Watermark checkAndGetNextWatermark(GenericRecord record, long 
> extractedTimestamp) {
> // simply emit a watermark with every event
> System.out.println("extractedTimestamp "+extractedTimestamp);
> return new Watermark(extractedTimestamp - 3);
> }
>  }
>
>@Override
> public void processElement(GenericRecord record, Context context, 
> Collector collector) throws Exception {
>
> TimerService timerService = context.timerService();
> System.out.println("currentwatermark"+ 
> timerService.currentWatermark());
> if (context.timestamp() > timerService.currentWatermark()) {
>
> Tuple2> queueval = 
> queueState.value();
> PriorityQueue queue = queueval.f1;
> long startTime = queueval.f0;
> System.out.println("starttime"+ startTime);
>
> if (queue == null) {
> queue = new PriorityQueue<>(10, new TimeStampComprator());
> startTime = (long) record.get("event_ts");
> }
> queueState.update(new Tuple2<>(startTime, queue));
> timerService.registerEventTimeTimer(startTime + 5 * 60 * 1000);
> }
> }
>
> }
>
> Please help me to underand what i am doing wrong.
>
>  --
> Thanks & Regards,
> Anuj Jain
>
>
>
>

-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07
<http://www.oracle.com/>


<http://www.cse.iitm.ac.in/%7Eanujjain/>


Flink EventTime Processing Watermark is always coming as 9223372036854725808

2020-03-02 Thread aj
I am trying to use process function to some processing on a set of events.
I am using event time and keystream. The issue I am facing is The watermark
value is always coming as 9223372036854725808. I have put print statement
to debug and it shows like this:

timestamp--1583128014000 extractedTimestamp 1583128014000
currentwatermark-9223372036854775808

timestamp--1583128048000 extractedTimestamp 1583128048000
currentwatermark-9223372036854775808

timestamp--1583128089000 extractedTimestamp 1583128089000
currentwatermark-9223372036854775808

timestamp and extracted timestamp changing but watermark not getting
updated. So no record is getting in the queue as context.timestamp is never
less than the watermark.


DataStream dataStream =
env.addSource(searchConsumer).name("search_list_keyless");
DataStream dataStreamWithWaterMark =
dataStream.assignTimestampsAndWatermarks(new SessionAssigner());

   try {
dataStreamWithWaterMark.keyBy((KeySelector) record -> {
StringBuilder builder = new StringBuilder();
builder.append(record.get("session_id"));
builder.append(record.get("user_id"));
return builder.toString();
}).process(new MatchFunction()).print();
}
catch (Exception e){
e.printStackTrace();
}
env.execute("start session process");

}

public static class SessionAssigner implements
AssignerWithPunctuatedWatermarks  {
@Override
public long extractTimestamp(GenericRecord record, long
previousElementTimestamp) {
long timestamp = (long) record.get("event_ts");
System.out.println("timestamp--"+ timestamp);
return timestamp;
}

@Override
public Watermark checkAndGetNextWatermark(GenericRecord
record, long extractedTimestamp) {
// simply emit a watermark with every event
System.out.println("extractedTimestamp "+extractedTimestamp);
return new Watermark(extractedTimestamp - 3);
}
 }

   @Override
public void processElement(GenericRecord record, Context context,
Collector collector) throws Exception {

TimerService timerService = context.timerService();
System.out.println("currentwatermark"+
timerService.currentWatermark());
if (context.timestamp() > timerService.currentWatermark()) {

Tuple2> queueval =
queueState.value();
PriorityQueue queue = queueval.f1;
long startTime = queueval.f0;
System.out.println("starttime"+ startTime);

if (queue == null) {
queue = new PriorityQueue<>(10, new TimeStampComprator());
startTime = (long) record.get("event_ts");
}
queueState.update(new Tuple2<>(startTime, queue));
timerService.registerEventTimeTimer(startTime + 5 * 60 * 1000);
}
}

}

Please help me to underand what i am doing wrong.


-- 
Thanks & Regards,
Anuj Jain






Re: Flink Session Window to enrich Event with unique id

2020-03-02 Thread aj
Hi,
Is using the session window to implement the above logic is good idea or i
should use process function.

On Sun, Mar 1, 2020 at 11:39 AM aj  wrote:

> Hi ,
>
> I am working on a use case where i have a stream of events. I want to
> attach a unique id to all the events happened in a session.
> Below is the logis that i am trying to implement. -
>
> 1. session_started
> 2 whenevr a event_name=search generate a unique search_id and attch this
> id to all the following events in session until a new "search" event
> encountered in session.
>
> Example :
> *user-1.  session-1   event_name- search (generate searchid --1)*
> user-1.  session-1   event_name- x  (attach above search id -1)
> user-1.  session-1   event_name- y (attach above search id -1)
> user-1.  session-1   event_name- y (attach above search id -1)
> *user-1.  session-1   event_name- search (generate searchid --2)*
> user-1.  session-1   event_name- x  (attach above search id -2)
> user-1.  session-1   event_name- y (attach above search id -2)
> user-1.  session-1   event_name- y (attach above search id -2)
>
> As events can come out of order so i want to do this after session window
> got over. So after session window i am doing like this :
>
> 1. sort all the events by time.
> 2. iterate ech event and attach the search_id
> 3. collect all th events and generate another stream with enrich search_id.
>
> I am trying with below code but its not working as expected . i am not
> able to understand what is happening.
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *dataStream.keyBy((KeySelector) record -> {
> StringBuilder builder = new StringBuilder();
> builder.append(record.get("session_id"));
> builder.append(record.get("user_id"));return
> builder.toString();
> }).window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
>   .process(new ProcessWindowFunction String, TimeWindow>() {@Override
> public void process(String key, Context context,
> Iterable iterable, Collector collector)
> throws Exception {Stream result
> = IterableUtils.toStream(iterable);
> List s = result.collect(Collectors.toList());
> Map recordMap = new HashMap<>();
> for(GenericRecord record : s) {
> recordMap.put((long)record.get("event_ts"),record);
> }Map
> sortedRecordMap = new LinkedHashMap<>();
> recordMap.entrySet().stream()
> .sorted(Map.Entry.comparingByKey())
> .forEachOrdered(x -> sortedRecordMap.put(x.getKey(), x.getValue()));
> String search_id = null;
> for(Map.Entry element :sortedRecordMap.entrySet()) {
> GenericRecord record = element.getValue();
>   if(record.get("event_name").equals("search")) {
>   search_id =
> UUID.randomUUID().toString();}
>   record.put("search_id",search_id);
> collector.collect(record);}
> }}).print();*
>
>
> --
> Thanks & Regards,
> Anuj Jain
>
>
> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>


-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07
<http://www.oracle.com/>


<http://www.cse.iitm.ac.in/%7Eanujjain/>


Flink Session Window to enrich Event with unique id

2020-02-29 Thread aj
Hi ,

I am working on a use case where i have a stream of events. I want to
attach a unique id to all the events happened in a session.
Below is the logis that i am trying to implement. -

1. session_started
2 whenevr a event_name=search generate a unique search_id and attch this id
to all the following events in session until a new "search" event
encountered in session.

Example :
*user-1.  session-1   event_name- search (generate searchid --1)*
user-1.  session-1   event_name- x  (attach above search id -1)
user-1.  session-1   event_name- y (attach above search id -1)
user-1.  session-1   event_name- y (attach above search id -1)
*user-1.  session-1   event_name- search (generate searchid --2)*
user-1.  session-1   event_name- x  (attach above search id -2)
user-1.  session-1   event_name- y (attach above search id -2)
user-1.  session-1   event_name- y (attach above search id -2)

As events can come out of order so i want to do this after session window
got over. So after session window i am doing like this :

1. sort all the events by time.
2. iterate ech event and attach the search_id
3. collect all th events and generate another stream with enrich search_id.

I am trying with below code but its not working as expected . i am not able
to understand what is happening.
































*dataStream.keyBy((KeySelector) record -> {
  StringBuilder builder = new StringBuilder();
builder.append(record.get("session_id"));
builder.append(record.get("user_id"));return
builder.toString();
}).window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
  .process(new ProcessWindowFunction() {@Override
public void process(String key, Context context,
Iterable iterable, Collector collector)
throws Exception {Stream result
= IterableUtils.toStream(iterable);
List s = result.collect(Collectors.toList());
Map recordMap = new HashMap<>();
for(GenericRecord record : s) {
recordMap.put((long)record.get("event_ts"),record);
}Map
sortedRecordMap = new LinkedHashMap<>();
recordMap.entrySet().stream()
.sorted(Map.Entry.comparingByKey())
.forEachOrdered(x -> sortedRecordMap.put(x.getKey(), x.getValue()));
String search_id = null;
for(Map.Entry element :sortedRecordMap.entrySet()) {
GenericRecord record = element.getValue();
  if(record.get("event_name").equals("search")) {
  search_id =
UUID.randomUUID().toString();}
  record.put("search_id",search_id);
collector.collect(record);}
}}).print();*


-- 
Thanks & Regards,
Anuj Jain





Re: Flink Stream Sink ParquetWriter Failing with ClassCastException

2020-02-29 Thread aj
Hi Till,

Thanks for the reply .
I have doubt that input has problem because :

1. if input has some problem than it should not come in the topic itself as
schema validation fail at producer side only.
2.  i am using the same schema that was used to writed the record in topic
and i am able to parse the record with same schema as when i try to print
the stream its not giving any error , only problem occurring when writing
as parquet.

This is the code that i am using to get the schema that i m passing to
parquetwriter.

public static Schema getSchema(String subjectName) {
try {
List versions = registryClient.getAllVersions(subjectName);
SchemaMetadata schemaMeta =
registryClient.getSchemaMetadata(subjectName,
versions.get(versions.size() - 1));
Schema schema = new Schema.Parser().parse(schemaMeta.getSchema());
return schema;
} catch (Exception e) {
e.printStackTrace();
return null;
}
}


How input can pass through and inserted in topic if it has some issue. Even
if its occusring how to find those record and skip that so that because of
one record my whole processing should not fail.

Thanks,
Anuj





On Sat, Feb 29, 2020 at 9:12 PM Till Rohrmann  wrote:

> Hi Anuj,
>
> it looks to me that your input GenericRecords don't conform with your
> output schema schemaSubject. At least, the stack trace says that your
> output schema expects some String field but the field was actually some
> ArrayList. Consequently, I would suggest to verify that your input data has
> the right format and if not to filter those records out which are
> non-conformant.
>
> Cheers,
> Till
>
> On Sat, Feb 29, 2020 at 2:13 PM aj  wrote:
>
>> Hi All,
>>
>> i have Written a consumer that read from kafka topic and write the data
>> in parquet format using StreamSink . But i am getting following error. Its
>> runs for some hours than start failing with this excpetions. I tried to
>> restart it but failing with same exceptions.After i restart with latest
>> offset it started working fine for soem hours and than again fail. I am not
>> able to find root cause for this issue.
>>
>> java.lang.Exception: 
>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
>>  Could not forward element to next operator
>> at 
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212)
>> at 
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132)
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>> at java.lang.Thread.run(Thread.java:748)Caused by: 
>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
>>  Could not forward element to next operator
>> at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:651)
>> at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>> at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>> at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>> at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>> at 
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
>> at 
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
>> at 
>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
>> at 
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
>> at 
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
>> at 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
>> at 
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>> at 
>> org.apache.flink.streaming.api.operators

Flink Stream Sink ParquetWriter Failing with ClassCastException

2020-02-29 Thread aj
Hi All,

i have Written a consumer that read from kafka topic and write the data in
parquet format using StreamSink . But i am getting following error. Its
runs for some hours than start failing with this excpetions. I tried to
restart it but failing with same exceptions.After i restart with latest
offset it started working fine for soem hours and than again fail. I am not
able to find root cause for this issue.

java.lang.Exception:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)Caused by:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:651)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202)Caused
by: java.lang.ClassCastException: java.util.ArrayList cannot be cast
to java.lang.CharSequence
at 
org.apache.parquet.avro.AvroWriteSupport.fromAvroString(AvroWriteSupport.java:371)
at 
org.apache.parquet.avro.AvroWriteSupport.writeValueWithoutConversion(AvroWriteSupport.java:346)
at 
org.apache.parquet.avro.AvroWriteSupport.writeValue(AvroWriteSupport.java:278)
at 
org.apache.parquet.avro.AvroWriteSupport.writeRecordFields(AvroWriteSupport.java:191)
at org.apache.parquet.avro.AvroWriteSupport.write(AvroWriteSupport.java:165)
at 
org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128)
at org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:299)
at 
org.apache.flink.formats.parquet.ParquetBulkWriter.addElement(ParquetBulkWriter.java:52)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.write(BulkPartWriter.java:50)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:214)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:268)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:370)
at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)


*code  :*


*DataStream sourceStream = env.addSource(kafkaConsumer010);*

*
final StreamingFileSink sink = StreamingFileSink.forBulkFormat
(path,
ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(schemaSubject)))
.withBucketAssigner(new EventTimeBucketAssigner())
.build();

sourceStream.addSink(sink).setParallelism(parallelism);*

I need to undetstand why its ran for few hours than start failing.
Please help me to understand this.



-- 
Thanks & Regards,
Anuj Jain



Re: Map Of DataStream getting NullPointer Exception

2020-02-27 Thread aj
Hi Khachatryan,

This is the use case to create multiple streams:

I have a use case where multiple types of Avro records are coming in single
Kafka topic as we are suing TopicRecordNameStrategy for the subject in the
schema registry. Now I have written a consumer to read that topic and build
a Datastream of GenericRecord. Now I can not sink this stream to hdfs/s3 in
parquet format as this stream contains different types of schema records.
So I am filtering different records for each type by applying a filter and
creating different streams and then sinking each stream separately.

So can you please help me create multiple dynamic streams with the code
that I shared. How to resolve this issue?

On Tue, Feb 25, 2020 at 10:46 PM Khachatryan Roman <
khachatryan.ro...@gmail.com> wrote:

> As I understand from code, streamMap is a Java map, not Scala. So you can
> get NPE while unreferencing the value you got from it.
>
> Also, the approach looks a bit strange.
> Can you describe what are you trying to achieve?
>
> Regards,
> Roman
>
>
> On Mon, Feb 24, 2020 at 5:47 PM aj  wrote:
>
>>
>> I am trying below piece of code to create multiple datastreams object and
>> store in map.
>>
>> for (EventConfig eventConfig : eventTypesList) {
>> LOGGER.info("creating a stream for ",
>> eventConfig.getEvent_name());
>> String key = eventConfig.getEvent_name();
>> final StreamingFileSink sink =
>> StreamingFileSink.forBulkFormat
>> (path,
>> ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(eventConfig.getSchema_subject(
>> .withBucketAssigner(new EventTimeBucketAssigner())
>> .build();
>>
>> DataStream stream =
>> dataStream.filter((FilterFunction) genericRecord -> {
>> if
>> (genericRecord.get(EVENT_NAME).toString().equals(eventConfig.getEvent_name()))
>> {
>> return true;
>> }
>> return false;
>> });
>>
>> Tuple2,
>> StreamingFileSink> tuple2 = new Tuple2<>(stream, sink);
>> streamMap.put(key, tuple2);
>> }
>>
>> DataStream searchStream =
>> streamMap.get(SEARCH_LIST_KEYLESS).getField(0);
>> searchStream.map(new
>> Enricher()).addSink(streamMap.get(SEARCH_LIST_KEYLESS).getField(1));
>>
>>
>> I am getting Nullpointer Exception when trying to get the stream from map
>> value at :
>>
>>
>> *DataStream searchStream =
>> streamMap.get(SEARCH_LIST_KEYLESS).getField(0);*
>>
>> As per my understanding, this is due to the map is local to main and not
>> broadcast to tasks.
>> If I want to do this how should I do, please help me to resolve this?
>>
>>
>>
>> --
>> Thanks & Regards,
>> Anuj Jain
>>
>>
>>
>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>
>

-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07
<http://www.oracle.com/>


<http://www.cse.iitm.ac.in/%7Eanujjain/>


Map Of DataStream getting NullPointer Exception

2020-02-24 Thread aj
I am trying below piece of code to create multiple datastreams object and
store in map.

for (EventConfig eventConfig : eventTypesList) {
LOGGER.info("creating a stream for ",
eventConfig.getEvent_name());
String key = eventConfig.getEvent_name();
final StreamingFileSink sink =
StreamingFileSink.forBulkFormat
(path,
ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(eventConfig.getSchema_subject(
.withBucketAssigner(new EventTimeBucketAssigner())
.build();

DataStream stream =
dataStream.filter((FilterFunction) genericRecord -> {
if
(genericRecord.get(EVENT_NAME).toString().equals(eventConfig.getEvent_name()))
{
return true;
}
return false;
});

Tuple2,
StreamingFileSink> tuple2 = new Tuple2<>(stream, sink);
streamMap.put(key, tuple2);
}

DataStream searchStream =
streamMap.get(SEARCH_LIST_KEYLESS).getField(0);
searchStream.map(new
Enricher()).addSink(streamMap.get(SEARCH_LIST_KEYLESS).getField(1));


I am getting Nullpointer Exception when trying to get the stream from map
value at :


*DataStream searchStream =
streamMap.get(SEARCH_LIST_KEYLESS).getField(0);*

As per my understanding, this is due to the map is local to main and not
broadcast to tasks.
If I want to do this how should I do, please help me to resolve this?



-- 
Thanks & Regards,
Anuj Jain






Re: BucketingSink capabilities for DataSet API

2020-02-19 Thread aj
Thanks, Timo. I have not used and explore Table API until now. I have used
dataset and datastream API only.
I will read about the Table API.

On Wed, Feb 19, 2020 at 4:33 PM Timo Walther  wrote:

> Hi Anuj,
>
> another option would be to use the new Hive connectors. Have you looked
> into those? They might work on SQL internal data types which is why you
> would need to use the Table API then.
>
> Maybe Bowen in CC can help you here.
>
> Regards,
> Timo
>
> On 19.02.20 11:14, Rafi Aroch wrote:
> > Hi Anuj,
> >
> > It's been a while since I wrote this (Flink 1.5.2). Could be a
> > better/newer way, but this is what how I read & write Parquet with
> > hadoop-compatibility:
> >
> > // imports
> > import org.apache.avro.generic.GenericRecord;
> > import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat;
> >
> > import
> org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
> >
> > import org.apache.flink.hadoopcompatibility.HadoopInputs;
> > import org.apache.hadoop.conf.Configuration;
> > import org.apache.hadoop.fs.Path;
> > import org.apache.hadoop.mapreduce.Job;
> > import org.apache.parquet.avro.AvroParquetInputFormat;
> >
> > // Creating Parquet input format
> > Configuration conf = new Configuration();
> > Job job = Job.getInstance(conf);
> > AvroParquetInputFormat parquetInputFormat = new
> > AvroParquetInputFormat<>();
> > AvroParquetInputFormat.setInputDirRecursive(job, true);
> > AvroParquetInputFormat.setInputPaths(job, pathsToProcess);
> > HadoopInputFormat inputFormat
> > = HadoopInputs.createHadoopInput(parquetInputFormat, Void.class,
> > GenericRecord.class, job);
> >
> > // Creating Parquet output format
> > AvroParquetOutputFormat parquetOutputFormat = new
> > AvroParquetOutputFormat<>();
> > AvroParquetOutputFormat.setSchema(job, new
> > Schema.Parser().parse(SomeEvent.SCHEMA));
> > AvroParquetOutputFormat.setCompression(job,
> > CompressionCodecName.SNAPPY);
> > AvroParquetOutputFormat.setCompressOutput(job, true);
> > AvroParquetOutputFormat.setOutputPath(job, new Path(pathString));
> > HadoopOutputFormat outputFormat = new
> > HadoopOutputFormat<>(parquetOutputFormat, job);
> >
> > DataSource> inputFileSource =
> > env.createInput(inputFormat);
> >
> > // Start processing...
> >
> > // Writing result as Parquet
> > resultDataSet.output(outputFormat);
> >
> >
> > Regarding writing partitioned data, as far as I know, there is no way to
> > achieve that with the DataSet API with hadoop-compatibility.
> >
> > You could implement this with reading from input files as stream and
> > then using StreamingFileSink with a custom BucketAssigner [1].
> > The problem with that (which was not yet resolved AFAIK) is described
> > here [2] in "Important Notice 2".
> >
> > Sadly I say, that eventually, for this use-case I chose Spark to do the
> > job...
> >
> > [1]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/streamfile_sink.html
> > [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/streamfile_sink.html#general
> >
> > Hope this helps.
> >
> > Rafi
> >
> >
> > On Sat, Feb 15, 2020 at 5:03 PM aj  > <mailto:ajainje...@gmail.com>> wrote:
> >
> > Hi Rafi,
> >
> > I have a similar use case where I want to read parquet files in the
> > dataset and want to perform some transformation and similarly want
> > to write the result using year month day partitioned.
> >
> > I am stuck at first step only where how to read and write
> > Parquet files using hadoop-Compatability.
> >
> > Please help me with this and also if u find the solution for how to
> > write data in partitioned.
> >
> > Thanks,
> > Anuj
> >
> > On Thu, Oct 25, 2018 at 5:35 PM Andrey Zagrebin
> > mailto:and...@data-artisans.com>> wrote:
> >
> > Hi Rafi,
> >
> > At the moment I do not see any support of Parquet in DataSet API
> > except HadoopOutputFormat, mentioned in stack overflow question.
> > I have cc’ed Fabian and Aljoscha, maybe they could provide more
> > information.
> >
> > Best,
> > Andrey
> >
> >> On 

Re: BucketingSink capabilities for DataSet API

2020-02-19 Thread aj
Thanks, Rafi. I will try with this but yes if partitioning is not possible
then I also have to look some other solution.

On Wed, Feb 19, 2020 at 3:44 PM Rafi Aroch  wrote:

> Hi Anuj,
>
> It's been a while since I wrote this (Flink 1.5.2). Could be a
> better/newer way, but this is what how I read & write Parquet with
> hadoop-compatibility:
>
> // imports
>> import org.apache.avro.generic.GenericRecord;
>> import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat;
>>
> import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
>
> import org.apache.flink.hadoopcompatibility.HadoopInputs;
>> import org.apache.hadoop.conf.Configuration;
>> import org.apache.hadoop.fs.Path;
>> import org.apache.hadoop.mapreduce.Job;
>> import org.apache.parquet.avro.AvroParquetInputFormat;
>>
>> // Creating Parquet input format
>> Configuration conf = new Configuration();
>> Job job = Job.getInstance(conf);
>> AvroParquetInputFormat parquetInputFormat = new
>> AvroParquetInputFormat<>();
>> AvroParquetInputFormat.setInputDirRecursive(job, true);
>> AvroParquetInputFormat.setInputPaths(job, pathsToProcess);
>> HadoopInputFormat inputFormat
>> = HadoopInputs.createHadoopInput(parquetInputFormat, Void.class,
>> GenericRecord.class, job);
>>
>
>
>> // Creating Parquet output format
>> AvroParquetOutputFormat parquetOutputFormat = new
>> AvroParquetOutputFormat<>();
>> AvroParquetOutputFormat.setSchema(job, new
>> Schema.Parser().parse(SomeEvent.SCHEMA));
>> AvroParquetOutputFormat.setCompression(job, CompressionCodecName.SNAPPY);
>> AvroParquetOutputFormat.setCompressOutput(job, true);
>> AvroParquetOutputFormat.setOutputPath(job, new Path(pathString));
>> HadoopOutputFormat outputFormat = new
>> HadoopOutputFormat<>(parquetOutputFormat, job);
>
>
>
> DataSource> inputFileSource =
>> env.createInput(inputFormat);
>
>
>
> // Start processing...
>
>
>
> // Writing result as Parquet
>> resultDataSet.output(outputFormat);
>
>
> Regarding writing partitioned data, as far as I know, there is no way to
> achieve that with the DataSet API with hadoop-compatibility.
>
> You could implement this with reading from input files as stream and then
> using StreamingFileSink with a custom BucketAssigner [1].
> The problem with that (which was not yet resolved AFAIK) is described here
> [2] in "Important Notice 2".
>
> Sadly I say, that eventually, for this use-case I chose Spark to do the
> job...
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/streamfile_sink.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/streamfile_sink.html#general
>
> Hope this helps.
>
> Rafi
>
>
> On Sat, Feb 15, 2020 at 5:03 PM aj  wrote:
>
>> Hi Rafi,
>>
>> I have a similar use case where I want to read parquet files in the
>> dataset and want to perform some transformation and similarly want to write
>> the result using year month day partitioned.
>>
>> I am stuck at first step only where how to read and write Parquet files
>> using hadoop-Compatability.
>>
>> Please help me with this and also if u find the solution for how to write
>> data in partitioned.
>>
>> Thanks,
>> Anuj
>>
>>
>> On Thu, Oct 25, 2018 at 5:35 PM Andrey Zagrebin 
>> wrote:
>>
>>> Hi Rafi,
>>>
>>> At the moment I do not see any support of Parquet in DataSet API
>>> except HadoopOutputFormat, mentioned in stack overflow question. I have
>>> cc’ed Fabian and Aljoscha, maybe they could provide more information.
>>>
>>> Best,
>>> Andrey
>>>
>>> On 25 Oct 2018, at 13:08, Rafi Aroch  wrote:
>>>
>>> Hi,
>>>
>>> I'm writing a Batch job which reads Parquet, does some aggregations and
>>> writes back as Parquet files.
>>> I would like the output to be partitioned by year, month, day by event
>>> time. Similarly to the functionality of the BucketingSink.
>>>
>>> I was able to achieve the reading/writing to/from Parquet by using the
>>> hadoop-compatibility features.
>>> I couldn't find a way to partition the data by year, month, day to
>>> create a folder hierarchy accordingly. Everything is written to a single
>>> directory.
>>>
>>> I could find an unanswered question about this issue:
>>> https://stackoverflow.com/questions/52204034/apache-flink-does-dataset-api-support-writing-output-to-individual-file-partit
>>>
>>> Can anyone suggest a way to achieve this? Maybe there's a way to
>>> integrate the BucketingSink with the DataSet API? Another solution?
>>>
>>> Rafi
>>>
>>>
>>>
>>
>> --
>> Thanks & Regards,
>> Anuj Jain
>> Mob. : +91- 8588817877
>> Skype : anuj.jain07
>> <http://www.oracle.com/>
>>
>>
>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>
>

-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07
<http://www.oracle.com/>


<http://www.cse.iitm.ac.in/%7Eanujjain/>


Re: BucketingSink capabilities for DataSet API

2020-02-15 Thread aj
Hi Rafi,

I have a similar use case where I want to read parquet files in the dataset
and want to perform some transformation and similarly want to write the
result using year month day partitioned.

I am stuck at first step only where how to read and write Parquet files
using hadoop-Compatability.

Please help me with this and also if u find the solution for how to write
data in partitioned.

Thanks,
Anuj


On Thu, Oct 25, 2018 at 5:35 PM Andrey Zagrebin 
wrote:

> Hi Rafi,
>
> At the moment I do not see any support of Parquet in DataSet API
> except HadoopOutputFormat, mentioned in stack overflow question. I have
> cc’ed Fabian and Aljoscha, maybe they could provide more information.
>
> Best,
> Andrey
>
> On 25 Oct 2018, at 13:08, Rafi Aroch  wrote:
>
> Hi,
>
> I'm writing a Batch job which reads Parquet, does some aggregations and
> writes back as Parquet files.
> I would like the output to be partitioned by year, month, day by event
> time. Similarly to the functionality of the BucketingSink.
>
> I was able to achieve the reading/writing to/from Parquet by using the
> hadoop-compatibility features.
> I couldn't find a way to partition the data by year, month, day to create
> a folder hierarchy accordingly. Everything is written to a single directory.
>
> I could find an unanswered question about this issue:
> https://stackoverflow.com/questions/52204034/apache-flink-does-dataset-api-support-writing-output-to-individual-file-partit
>
> Can anyone suggest a way to achieve this? Maybe there's a way to integrate
> the BucketingSink with the DataSet API? Another solution?
>
> Rafi
>
>
>

-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07






Re: Flink ParquetAvroWriters Sink

2020-01-28 Thread aj
I am able to resolve this issue by setting classloader.resolve-order as
parent-first.

On Wed, Jan 22, 2020, 23:13 aj  wrote:

> Hi Arvid,
>
> I have implemented the code with envelope schema as you suggested but now
> I am facing issues with the consumer . I have written code like this:
>
> FlinkKafkaConsumer010 kafkaConsumer010 = new
> FlinkKafkaConsumer010(KAFKA_TOPICS,
> new
> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
> properties);
>
> And the Deserialization class looks like this :
>
> pblic class KafkaGenericAvroDeserializationSchema implements
> KeyedDeserializationSchema {
>
> private final String registryUrl;
> private transient KafkaAvroDeserializer inner;
>
> public KafkaGenericAvroDeserializationSchema(String registryUrl) {
> this.registryUrl = registryUrl;
> }
>
> @Override
> public GenericRecord deserialize(byte[] messageKey, byte[] message,
> String topic, int partition, long offset) {
> checkInitialized();
> return (GenericRecord) inner.deserialize(topic, message);
> }
>
> @Override
> public boolean isEndOfStream(GenericRecord nextElement) {
> return false;
> }
>
> @Override
> public TypeInformation getProducedType() {
> return TypeExtractor.getForClass(GenericRecord.class);
> }
>
> private void checkInitialized() {
> if (inner == null) {
> Map props = new HashMap<>();
>
> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
> registryUrl);
>
> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
> SchemaRegistryClient client =
> new CachedSchemaRegistryClient(
> registryUrl,
> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
> inner = new KafkaAvroDeserializer(client, props);
> }
> }
> }
>
>
> It's working locally on my machine but when I deployed it on yarn cluster
> I am getting below exception:
>
>
> java.lang.Exception: org.apache.flink.streaming.runtime.tasks.
> ExceptionInChainedOperatorException: Could not forward element to next
> operator
> at org.apache.flink.streaming.runtime.tasks.
> SourceStreamTask$LegacySourceFunctionThread
> .checkThrowSourceExecutionException(SourceStreamTask.java:212)
> at org.apache.flink.streaming.runtime.tasks.SourceStreamTask
> .performDefaultAction(SourceStreamTask.java:132)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask
> .java:298)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:403)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.streaming.runtime.tasks.
> ExceptionInChainedOperatorException: Could not forward element to next
> operator
> at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
> at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
> at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
> at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
> 727)
> at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
> 705)
> at org.apache.flink.streaming.api.operators.
> StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts
> .java:104)
> at org.apache.flink.streaming.api.operators.
> StreamSourceContexts$NonTimestampContext.collectWithTimestamp(
> StreamSourceContexts.java:111)
> at org.apache.flink.streaming.connectors.kafka.internals.
> AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
> at org.apache.flink.streaming.connectors.kafka.internal.
> Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:91)
> at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher
> .runFetchLoop(Kafka09Fetcher.java:156)
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase
> .run(FlinkKafkaConsumerBase.java:715)
> at org.apache.flink.streaming.api.operators.StreamSource.run(
> StreamSource.java:100)
> at org.apache.flink.streaming.api.operators.StreamSource.run(
> StreamSource.java:63)
> at org.apache.flink.streaming.runtime.tasks.
> SourceStreamTask$Legacy

Re: Flink ParquetAvroWriters Sink

2020-01-23 Thread aj
Hi Arvid,

I am not clear with this " Note that I still recommend to just bundle the
schema with your Flink application and not reinvent the wheel."

Can you please help with some sample code on how it should be written. Or
can we connect some way so that I can understand with you .


On Thu, Jan 23, 2020 at 2:09 PM Arvid Heise  wrote:

> The issue is that your are not providing any meaningful type information,
> so that Flink has to resort to Kryo. You need to extract the schema during
> query compilation (in your main) and pass it to your deserialization schema.
>
> public TypeInformation getProducedType() {
>   return (TypeInformation) new GenericRecordAvroTypeInfo(this.schema);
> }
>
> If you don't want to extract it statically you need to tell Flink how to
> handle arbitrary GenericRecords. You could implement your own serializer
> [1], which would write GenericRecords to byte[] and vice versa.
>
> Note that I still recommend to just bundle the schema with your Flink
> application and not reinvent the wheel.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/custom_serializers.html
>
> On Thu, Jan 23, 2020 at 2:22 AM aj  wrote:
>
>>  Hi Arvid,
>>
>> I want to keep generic records only and I do not want to keep the schema
>> definition on the consumer side and should be resolve from the schema
>> registry only. I am following the below post
>>
>>
>> https://stackoverflow.com/questions/58849635/is-it-possible-to-deserialize-avro-messageconsuming-message-from-kafka-without/59865360#59865360
>>
>> so please help me what is wrong with my code.
>>
>>
>>
>> On Thu, Jan 23, 2020, 00:38 Arvid Heise  wrote:
>>
>>> Hi Anuj,
>>>
>>> I recommend using the ConfluentRegistryAvroDeserializationSchema [1]
>>> with a specific record that has been generated with the Avro Maven Plugin
>>> [2] or Avro Gradle Plugin [3]. That should result into almost no code and
>>> maximal maintainability.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html#the-deserializationschema
>>> [2] https://avro.apache.org/docs/1.8.2/gettingstartedjava.html
>>> [3] https://github.com/davidmc24/gradle-avro-plugin
>>>
>>> On Wed, Jan 22, 2020 at 6:43 PM aj  wrote:
>>>
>>>> Hi Arvid,
>>>>
>>>> I have implemented the code with envelope schema as you suggested but
>>>> now I am facing issues with the consumer . I have written code like this:
>>>>
>>>> FlinkKafkaConsumer010 kafkaConsumer010 = new
>>>> FlinkKafkaConsumer010(KAFKA_TOPICS,
>>>> new
>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>>> properties);
>>>>
>>>> And the Deserialization class looks like this :
>>>>
>>>> pblic class KafkaGenericAvroDeserializationSchema implements
>>>> KeyedDeserializationSchema {
>>>>
>>>> private final String registryUrl;
>>>> private transient KafkaAvroDeserializer inner;
>>>>
>>>> public KafkaGenericAvroDeserializationSchema(String registryUrl) {
>>>> this.registryUrl = registryUrl;
>>>> }
>>>>
>>>> @Override
>>>> public GenericRecord deserialize(byte[] messageKey, byte[] message,
>>>> String topic, int partition, long offset) {
>>>> checkInitialized();
>>>> return (GenericRecord) inner.deserialize(topic, message);
>>>> }
>>>>
>>>> @Override
>>>> public boolean isEndOfStream(GenericRecord nextElement) {
>>>> return false;
>>>> }
>>>>
>>>> @Override
>>>> public TypeInformation getProducedType() {
>>>> return TypeExtractor.getForClass(GenericRecord.class);
>>>> }
>>>>
>>>> private void checkInitialized() {
>>>> if (inner == null) {
>>>> Map props = new HashMap<>();
>>>>
>>>> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
>>>> registryUrl);
>>>>
>>>> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
>>>> SchemaRegistryClient client =
>>>> new CachedSchemaRegistryClient(
>>>> registryUrl,
>>>> AbstractKafkaAvroSerDeCon

Re: Flink ParquetAvroWriters Sink

2020-01-22 Thread aj
 Hi Arvid,

I want to keep generic records only and I do not want to keep the schema
definition on the consumer side and should be resolve from the schema
registry only. I am following the below post

https://stackoverflow.com/questions/58849635/is-it-possible-to-deserialize-avro-messageconsuming-message-from-kafka-without/59865360#59865360

so please help me what is wrong with my code.



On Thu, Jan 23, 2020, 00:38 Arvid Heise  wrote:

> Hi Anuj,
>
> I recommend using the ConfluentRegistryAvroDeserializationSchema [1] with
> a specific record that has been generated with the Avro Maven Plugin [2] or
> Avro Gradle Plugin [3]. That should result into almost no code and maximal
> maintainability.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html#the-deserializationschema
> [2] https://avro.apache.org/docs/1.8.2/gettingstartedjava.html
> [3] https://github.com/davidmc24/gradle-avro-plugin
>
> On Wed, Jan 22, 2020 at 6:43 PM aj  wrote:
>
>> Hi Arvid,
>>
>> I have implemented the code with envelope schema as you suggested but now
>> I am facing issues with the consumer . I have written code like this:
>>
>> FlinkKafkaConsumer010 kafkaConsumer010 = new
>> FlinkKafkaConsumer010(KAFKA_TOPICS,
>> new
>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>> properties);
>>
>> And the Deserialization class looks like this :
>>
>> pblic class KafkaGenericAvroDeserializationSchema implements
>> KeyedDeserializationSchema {
>>
>> private final String registryUrl;
>> private transient KafkaAvroDeserializer inner;
>>
>> public KafkaGenericAvroDeserializationSchema(String registryUrl) {
>> this.registryUrl = registryUrl;
>> }
>>
>> @Override
>> public GenericRecord deserialize(byte[] messageKey, byte[] message,
>> String topic, int partition, long offset) {
>> checkInitialized();
>> return (GenericRecord) inner.deserialize(topic, message);
>> }
>>
>> @Override
>> public boolean isEndOfStream(GenericRecord nextElement) {
>> return false;
>> }
>>
>> @Override
>> public TypeInformation getProducedType() {
>> return TypeExtractor.getForClass(GenericRecord.class);
>> }
>>
>> private void checkInitialized() {
>> if (inner == null) {
>> Map props = new HashMap<>();
>>
>> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
>> registryUrl);
>>
>> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
>> SchemaRegistryClient client =
>> new CachedSchemaRegistryClient(
>> registryUrl,
>> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
>> inner = new KafkaAvroDeserializer(client, props);
>> }
>> }
>> }
>>
>>
>> It's working locally on my machine but when I deployed it on yarn cluster
>> I am getting below exception:
>>
>>
>> java.lang.Exception: org.apache.flink.streaming.runtime.tasks.
>> ExceptionInChainedOperatorException: Could not forward element to next
>> operator
>> at org.apache.flink.streaming.runtime.tasks.
>> SourceStreamTask$LegacySourceFunctionThread
>> .checkThrowSourceExecutionException(SourceStreamTask.java:212)
>> at org.apache.flink.streaming.runtime.tasks.SourceStreamTask
>> .performDefaultAction(SourceStreamTask.java:132)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask
>> .java:298)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>> StreamTask.java:403)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.flink.streaming.runtime.tasks.
>> ExceptionInChainedOperatorException: Could not forward element to next
>> operator
>> at org.apache.flink.streaming.runtime.tasks.
>> OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654
>> )
>> at org.apache.flink.streaming.runtime.tasks.
>> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>> at org.apache.flink.streaming.runtime.tasks.
>> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>> at org.apache.flink.streaming.api.operators.
>> AbstractStreamOperator$CountingOutput

Re: Flink ParquetAvroWriters Sink

2020-01-22 Thread aj
ava:89)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(
CollectionSerializer.java:93)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(
CollectionSerializer.java:22)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField
.java:106)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField
.java:106)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at com.esotericsoftware.kryo.serializers.MapSerializer.read(
MapSerializer.java:143)
at com.esotericsoftware.kryo.serializers.MapSerializer.read(
MapSerializer.java:21)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField
.java:106)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField
.java:106)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(
KryoSerializer.java:262)
at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:635)
... 13 more
Caused by: java.lang.IllegalAccessException: Class com.twitter.chill.
Instantiators$$anonfun$normalJava$1 can not access a member of class
org.apache.avro.Schema$LockableArrayList with modifiers "public"

Please help me to resolve this issue.

Thanks,
Anuj





On Mon, Jan 20, 2020 at 9:42 PM aj  wrote:

> Thanks, Arvid for all the clarification. I will work on the approach you
> suggested.
>
> Thanks,
> Anuj
>
> On Sat, Jan 18, 2020 at 10:17 PM Arvid Heise  wrote:
>
>> Hi Anuj,
>>
>> I think that there may be a fundamental misunderstanding about the role
>> of a schema registry in Kafka. So let me first clarify that.
>> In each Avro/Parquet file, all records have the same schema. The schema
>> is stored within the file, such that we can always retrieve the writer
>> schema for the records.
>> When Avro was first applied to Kafka, there was the basic question on how
>> the writer schema for any record is known to the consumer. Storing the
>> complete schema on each record would mean that each record would be much
>> larger than needed. Hence, they added the schema registry that assigns a
>> unique id to schema, which is then embedded into the records.
>> Now, whenever I update a schema in my producer, I would have old records
>> with the old schema id and new records with the new schema id.
>> In my consumer, I'd use a fixed reader schema, such that my application
>> would not need to worry if the record is written with old or new schema; my
>> consumer would only see records with the reader schema.
>>
>> Given that background information, you see that in general, it's
>> impossible with a generic approach to write the parquet with the same
>> schema as it has been written in Kafka: the parquet schema needs to be
>> supplied statically during query compilation while the actual used Avro
>> schema in Kafka is only known when actually consuming data.
>>
>> But looking further down the road:
>> * since you need one schema to write the parquet files, you'd need to
>> decide: do you want to write with the new or the old schema in case of a
>> schema update? That should also be the reader schema of your application
>> for a given event type.
>> * this decision has further implications: your application need to
>> extract exactly one specific version of the schema from the schema registry
>> at query compilation. That could be either a specific schema id or the
>> latest schema for the event type.
>> * that means that the output schema is locked until you restart your
>> application and fetch a new latest schema in case of an update.
>> * at that point, it might just be easier to use the approach that I
>> outlined previously by bundling a specific schema with your application.
>>
>> If you want to extract the latest schema for a subject:
>>
>> var registryClient = new CachedSchemaRegistryClient(, 1000);
>> var versions = registryClient.getAllVersions();
>> var schema = registryClient.getSchemaById(versions.get(versions.size() - 1));
>>
>>
>> On Sat, Ja

Re: Flink ParquetAvroWriters Sink

2020-01-20 Thread aj
Thanks, Arvid for all the clarification. I will work on the approach you
suggested.

Thanks,
Anuj

On Sat, Jan 18, 2020 at 10:17 PM Arvid Heise  wrote:

> Hi Anuj,
>
> I think that there may be a fundamental misunderstanding about the role of
> a schema registry in Kafka. So let me first clarify that.
> In each Avro/Parquet file, all records have the same schema. The schema is
> stored within the file, such that we can always retrieve the writer schema
> for the records.
> When Avro was first applied to Kafka, there was the basic question on how
> the writer schema for any record is known to the consumer. Storing the
> complete schema on each record would mean that each record would be much
> larger than needed. Hence, they added the schema registry that assigns a
> unique id to schema, which is then embedded into the records.
> Now, whenever I update a schema in my producer, I would have old records
> with the old schema id and new records with the new schema id.
> In my consumer, I'd use a fixed reader schema, such that my application
> would not need to worry if the record is written with old or new schema; my
> consumer would only see records with the reader schema.
>
> Given that background information, you see that in general, it's
> impossible with a generic approach to write the parquet with the same
> schema as it has been written in Kafka: the parquet schema needs to be
> supplied statically during query compilation while the actual used Avro
> schema in Kafka is only known when actually consuming data.
>
> But looking further down the road:
> * since you need one schema to write the parquet files, you'd need to
> decide: do you want to write with the new or the old schema in case of a
> schema update? That should also be the reader schema of your application
> for a given event type.
> * this decision has further implications: your application need to extract
> exactly one specific version of the schema from the schema registry at
> query compilation. That could be either a specific schema id or the latest
> schema for the event type.
> * that means that the output schema is locked until you restart your
> application and fetch a new latest schema in case of an update.
> * at that point, it might just be easier to use the approach that I
> outlined previously by bundling a specific schema with your application.
>
> If you want to extract the latest schema for a subject:
>
> var registryClient = new CachedSchemaRegistryClient(, 1000);
> var versions = registryClient.getAllVersions();
> var schema = registryClient.getSchemaById(versions.get(versions.size() - 1));
>
>
> On Sat, Jan 18, 2020 at 5:22 PM aj  wrote:
>
>> Thanks, Arvid.
>>
>> I do not fully understand the above approach,
>> so currently, I am thinking to go with the envelope approach that you
>> suggested.
>>
>> One more question I have if I do not want to keep schema in my consumer
>> project even its a single envelope schema. I want it to be fetched from the
>> schema registry and pass to my parquet-sink so that I always use the same
>> schema that is used by the producer.  Can you provide a sample code how can
>> i infer the schema from the generic record or get it from schema registry?
>>
>>
>> Regards,
>> Anuj
>>
>>
>>
>> On Sat, Jan 18, 2020 at 6:55 PM Arvid Heise  wrote:
>>
>>> (Readded user mailing list)
>>>
>>> Hi Anuj,
>>>
>>> since I'd still recommend going with distinct sources/sinks, let me try
>>> to solve your issues in this mail. If that doesn't work out, I'd address
>>> your concerns about the envelope format later.
>>>
>>> In Flink, you can have several subtopologies in the same application.
>>>
>>> Thus, for each event type, you can add
>>> AvroSource(eventType) -> generic transformation/validation ->
>>> AvroSink(eventType)
>>> for each event.
>>>
>>> I'd put all avro schema in one project and use an avro plugin to
>>> generate the respective Java Classes. Then I'd simply create a map of Avro
>>> Schema (GeneratedClassA::SCHEMA, GeneratedClassB::SCHEMA...) to topic name
>>> (event-a, event-b, ...).
>>> Next, I'd iterate over the list to add the respective subtopologies to
>>> env.
>>> Finally, execute everything.
>>>
>>> You have one project where all validations reside. But you'd have almost
>>> no overhead to process a given source of eventType. The downside of that
>>> approach is of course, that each new event type would require a
>>> redeployment, but that seems like what you'd want to do anyhow.
>>>

Re: Flink ParquetAvroWriters Sink

2020-01-18 Thread aj
Thanks, Arvid.

I do not fully understand the above approach,
so currently, I am thinking to go with the envelope approach that you
suggested.

One more question I have if I do not want to keep schema in my consumer
project even its a single envelope schema. I want it to be fetched from the
schema registry and pass to my parquet-sink so that I always use the same
schema that is used by the producer.  Can you provide a sample code how can
i infer the schema from the generic record or get it from schema registry?


Regards,
Anuj



On Sat, Jan 18, 2020 at 6:55 PM Arvid Heise  wrote:

> (Readded user mailing list)
>
> Hi Anuj,
>
> since I'd still recommend going with distinct sources/sinks, let me try to
> solve your issues in this mail. If that doesn't work out, I'd address your
> concerns about the envelope format later.
>
> In Flink, you can have several subtopologies in the same application.
>
> Thus, for each event type, you can add
> AvroSource(eventType) -> generic transformation/validation ->
> AvroSink(eventType)
> for each event.
>
> I'd put all avro schema in one project and use an avro plugin to generate
> the respective Java Classes. Then I'd simply create a map of Avro Schema
> (GeneratedClassA::SCHEMA, GeneratedClassB::SCHEMA...) to topic name
> (event-a, event-b, ...).
> Next, I'd iterate over the list to add the respective subtopologies to env.
> Finally, execute everything.
>
> You have one project where all validations reside. But you'd have almost
> no overhead to process a given source of eventType. The downside of that
> approach is of course, that each new event type would require a
> redeployment, but that seems like what you'd want to do anyhow.
>
> Best,
>
> Arvid
>
> On Sat, Jan 18, 2020 at 2:08 PM aj  wrote:
>
>> Thanks, Arvid.
>>
>> 1. I like your approach as I can write a single consumer and put the data
>> in S3 in parquet format. The only challenge is there are extra columns that
>> always going to be null as at a time I will get one type of event.
>>
>> 2. if I go with a separate schema I am not sure how I can solve it using
>> a single generalize consumer. Till now what my understanding is I have to
>> write a consumer for each type of event. Each consumer will read the whole
>> data then filter the respective events from this and then I can pass this
>> stream to sink. But this does not look scalable solution as the new events
>> keep growing then I have to write a consumer for each new type.
>>
>>
>> DataStreamSource input = env
>> .addSource(
>> new FlinkKafkaConsumer010(topics,
>> new
>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>> config).setStartFromEarliest());
>>
>> Example :
>>
>> * 1st Consumer:*
>>   DataStreamSource input = env.addSource(
>> new FlinkKafkaConsumer010(topics,
>> new
>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>> config).setStartFromEarliest());
>> * DataStream aInput =
>> input.filter("event_name"= "a")*
>>
>> * 2nd Consumer:*
>>   DataStreamSource input = env.addSource(
>> new FlinkKafkaConsumer010(topics,
>> new
>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>> config).setStartFromEarliest());
>> * DataStream bInput =
>> input.filter("event_name"= "b")*
>>
>>
>> Can you help me How I solve this using a single consumer as I do not want
>> to write a separate consumer for each type of schema?
>>
>> For example, this is my consumer that contains different types of records.
>>
>> DataStreamSource input = env
>> .addSource(
>> new FlinkKafkaConsumer010(topics,
>> new
>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>> config).setStartFromEarliest());
>>
>> Now I can not write this stream directly as there is no common schema of
>> records in this stream. So possible way I am thinking is
>>
>> 1. Can I create multiple streams from this stream using the key by on 
>> *"event_name"
>> *and then write each stream separately.
>>
>> Just wanna know is this possible ??
>>
>>
>> Thanks,
>> Anuj
>>
>> On Fri, Jan 17, 2020 at 3:20 PM Arvid Heise  wrote:
>>
>>> Hi Anuj,
>>>
>>> I originally understood tha

Re: Flink ParquetAvroWriters Sink

2020-01-16 Thread aj
Hi Arvid,
Thanks for the quick response. I am new to this Avro design so can you
please help me understand and design for my use case.

I have use case like this :

1. we have an app where a lot of action happened from the user side.
2. for each action we collect some set of information that defined using
some key-value pairs. This information we want to define as proper schemas
so that we maintain the proper format and not push random data.
3. So we are defining for each action a schema and register in the schema
registry using  topic+record.name as the subject .
4. So I do not think the producer side has any issue as whenever we push
the event to Kafka we register a new schema with the above subject.

Example :

{
event_name : "a"
"timestamp":
"properties"  :[
  "key-1 : "val-1"
   "key-2 : "val-2"
]
}

{
event_name : "b"
"timestamp":
"properties"  :[
  "key-3 : "val-3"
   "key-4 : "val-4"
]
}

Now I  have a consumer that will parse the data by fetching the schema from
schema registry and deserialize in the generic record streams.

Why you think it will break as I am always deserializing with writer schema
only.

As you suggested to keep an envelope Avro schema and not separate schema
for each type of event that I am generating. I have some doubts about that:

1. How I enforce a schema on each event as it subtypes in the main schema.
so when I am getting a JSON event of type "a" how I enforce and convert it
to subschema type of "a" and push to Kafka.
2. I want to create a separate hive table for each of the events so when I
write this data and lets says I have 20 events than for 19 columns I am
getting null values always in data.

Please help me in doing this right way. It will be a great help and
learning for me.

Thanks,
Anuj







On Thu, Jan 16, 2020 at 7:30 PM Arvid Heise  wrote:

> Hi Anuj,
>
> you should always avoid having records with different schemas in the same
> topic/dataset. You will break the compatibility features of the schema
> registry and your consumer/producer code is always hard to maintain.
>
> A common and scalable way to avoid it is to use some kind of envelope
> format.
>
> {
>   "namespace": "example",
>   "name": "Envelope",
>   "type": "record",
>   "fields": [
> {
>   "name": "type1",
>   "type": ["null", {
> "type": "record",
> "fields": [ ... ]
>   }],
>   "default": null
> },
> {
>   "name": "type2",
>   "type": ["null", {
> "type": "record",
> "fields": [ ... ]
>   }],
>   "default": null
> }
>   ]
> }
>
> This envelope is evolvable (arbitrary addition/removal of wrapped types,
> which by themselves can be evolved), and adds only a little overhead (1
> byte per subtype). The downside is that you cannot enforce that exactly one
> of the subtypes is set.
>
> This schema is fully compatible with the schema registry, so no need to
> parse anything manually.
>
> This schema can easily be used with Parquet. If you can't change the input
> format anymore, you can at least use that approach on your output.
>
> Best,
>
> Arvid
>
> On Thu, Jan 16, 2020 at 2:53 PM aj  wrote:
>
>> Hi All,
>>
>> I have a use case where I am getting a different set of Avro records in
>> Kafka. I am using the schema registry to store Avro schema. One topic can
>> also have different types of records.
>>
>> Now I have created a GenericRecord Stream using kafkaAvroDeseralizer by
>> defining custom
>> Deserializer class like this
>>
>> @Override
>> public GenericRecord deserialize(
>> byte[] messageKey, byte[] message, String topic, int partition, long
>> offset) {
>> checkInitialized();
>> return (GenericRecord) inner.deserialize(topic, message);
>> }
>>
>> private void checkInitialized() {
>> if (inner == null) {
>> Map props = new HashMap<>();
>> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
>> registryUrl);
>> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
>> SchemaRegistryClient client =
>> new CachedSchemaRegistryClient(
>> registryUrl,
>> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
>> inner = new KafkaAvroDeserializer(client, props);
>> }
>> }
>>
>>
>> And this is my consumer code :
>>
>> DataStreamSource input = e

Flink ParquetAvroWriters Sink

2020-01-16 Thread aj
Hi All,

I have a use case where I am getting a different set of Avro records in
Kafka. I am using the schema registry to store Avro schema. One topic can
also have different types of records.

Now I have created a GenericRecord Stream using kafkaAvroDeseralizer by
defining custom
Deserializer class like this

@Override
public GenericRecord deserialize(
byte[] messageKey, byte[] message, String topic, int partition, long
offset) {
checkInitialized();
return (GenericRecord) inner.deserialize(topic, message);
}

private void checkInitialized() {
if (inner == null) {
Map props = new HashMap<>();
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
registryUrl);
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
SchemaRegistryClient client =
new CachedSchemaRegistryClient(
registryUrl, AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
inner = new KafkaAvroDeserializer(client, props);
}
}


And this is my consumer code :

DataStreamSource input = env
.addSource(
new FlinkKafkaConsumer010(topics,
new
KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
config).setStartFromEarliest());

Now I want to write this stream partition on
*event_name="a"/year=/month=/day=* in parquet format so that I can expose
hive tables directly on top of this data.
event_name is common field for all types of records that I am getting in
Kafka.
I am stuck as parquet writer needs a schema to write but my different
records have different schemas  So how do I write this stream in s3 in
above partition format.


Thanks & Regards,
Anuj Jain






Re: Flink Dataset to ParquetOutputFormat

2020-01-16 Thread aj
Hi Arvid,

Thanks for the details reply. I am using Dataset API and its a batch job so
wondering is the option you provided is works for that.

Thanks,
Anuj

On Wed, Jan 8, 2020 at 7:01 PM Arvid Heise  wrote:

> Hi Anji,
>
> StreamingFileSink has a BucketAssigner that you can use for that purpose.
>
> From the javadoc: The sink uses a BucketAssigner to determine in which
> bucket directory each element should be written to inside the base
> directory. The BucketAssigner can, for example, use time or a property of
> the element to determine the bucket directory. The default BucketAssigner
> is a DateTimeBucketAssigner which will create one new bucket every hour.
> You can specify a custom BucketAssigner using the
> setBucketAssigner(bucketAssigner) method, after calling forRowFormat(Path,
> Encoder) or forBulkFormat(Path, BulkWriter.Factory).
>
> If that doesn't work for you, please let me know. Btw, are you using event
> or processing time?
>
> Best,
>
> Arvid
>
> On Fri, Dec 27, 2019 at 4:24 AM vino yang  wrote:
>
>> Hi Anji,
>>
>> Actually, I am not familiar with how to partition via timestamp. Flink's
>> streaming BucketingSink provides this feature.[1] You may refer to this
>> link and customize your sink.
>>
>> I can ping a professional committer who knows more detail of FS connector
>> than me, @kklou...@gmail.com  may give you help.
>>
>> Best,
>> Vino
>>
>> [1]:
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/filesystem_sink.html#bucketing-file-sink
>>
>> aj  于2019年12月27日周五 上午1:51写道:
>>
>>> Thanks Vino.
>>>
>>> I am able to write data in parquet now. But now the issue is how to
>>> write a dataset to multiple output path as per timestamp partition.
>>> I want to partition data on date wise.
>>>
>>> I am writing like this currently that will write to single output path.
>>>
>>> DataSet> df = allEvents.flatMap(new 
>>> EventMapProcessor(schema.toString())).withParameters(configuration);
>>>
>>> Job job = Job.getInstance();
>>> AvroParquetOutputFormat.setSchema(job, book_bike.getClassSchema());
>>> HadoopOutputFormat parquetFormat = new HadoopOutputFormat>> GenericRecord>(new AvroParquetOutputFormat(), job);
>>> FileOutputFormat.setOutputPath(job, new Path(outputDirectory));
>>>
>>> df.output(parquetFormat);
>>> env.execute();
>>>
>>>
>>> Please suggest.
>>>
>>> Thanks,
>>> Anuj
>>>
>>> On Mon, Dec 23, 2019 at 12:59 PM vino yang 
>>> wrote:
>>>
>>>> Hi Anuj,
>>>>
>>>> After searching in Github, I found a demo repository about how to use
>>>> parquet in Flink.[1]
>>>>
>>>> You can have a look. I can not make sure whether it is helpful or not.
>>>>
>>>> [1]: https://github.com/FelixNeutatz/parquet-flinktacular
>>>>
>>>> Best,
>>>> Vino
>>>>
>>>> aj  于2019年12月21日周六 下午7:03写道:
>>>>
>>>>> Hello All,
>>>>>
>>>>> I am getting a set of events in JSON that I am dumping in the hourly
>>>>> bucket in S3.
>>>>> I am reading this hourly bucket and created a DataSet.
>>>>>
>>>>> I want to write this dataset as a parquet but I am not able to figure
>>>>> out. Can somebody help me with this?
>>>>>
>>>>>
>>>>> Thanks,
>>>>> Anuj
>>>>>
>>>>>
>>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>>>
>>>>
>>>
>>> --
>>> Thanks & Regards,
>>> Anuj Jain
>>> Mob. : +91- 8588817877
>>> Skype : anuj.jain07
>>> <http://www.oracle.com/>
>>>
>>>
>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>
>>

-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07
<http://www.oracle.com/>


<http://www.cse.iitm.ac.in/%7Eanujjain/>


Re: Kafka Schema registry

2020-01-14 Thread aj
 ConfluentRegistryAvroDeserializationSchema.forGeneric() is require reader
schema .How we can used it deseralize using writer schema.

On Fri, Sep 13, 2019 at 12:04 AM Lasse Nedergaard 
wrote:

> Hi Elias
>
> Thanks for letting me know. I have found it but we also need the option to
> register Avro Schema’s and use the registry when we write to Kafka. So we
> will create a serialisation version and when it works implement it into
> Flink and create a pull request for the community.
>
> Med venlig hilsen / Best regards
> Lasse Nedergaard
>
>
> Den 12. sep. 2019 kl. 17.45 skrev Elias Levy  >:
>
> Just for a Kafka source:
>
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#the-deserializationschema
>
>
>- There is also a version of this schema available that can lookup the
>writer’s schema (schema which was used to write the record) in Confluent
>Schema Registry
>.
>Using these deserialization schema record will be read with the schema that
>was retrieved from Schema Registry and transformed to a statically
>provided( either through
>ConfluentRegistryAvroDeserializationSchema.forGeneric(...) or
>ConfluentRegistryAvroDeserializationSchema.forSpecific(...)).
>
>
> On Wed, Sep 11, 2019 at 1:48 PM Lasse Nedergaard <
> lassenederga...@gmail.com> wrote:
>
>> Hi.
>> Do Flink have out of the Box Support for Kafka Schema registry for both
>> sources and sinks?
>> If not, does anyone knows about a implementation we can build on so we
>> can help make it general available in a future release.
>>
>> Med venlig hilsen / Best regards
>> Lasse Nedergaard
>>
>>

-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07






Re: Flink Dataset to ParquetOutputFormat

2019-12-26 Thread aj
Thanks Vino.

I am able to write data in parquet now. But now the issue is how to write a
dataset to multiple output path as per timestamp partition.
I want to partition data on date wise.

I am writing like this currently that will write to single output path.

DataSet> df = allEvents.flatMap(new
EventMapProcessor(schema.toString())).withParameters(configuration);

Job job = Job.getInstance();
AvroParquetOutputFormat.setSchema(job, book_bike.getClassSchema());
HadoopOutputFormat parquetFormat = new HadoopOutputFormat(new AvroParquetOutputFormat(), job);
FileOutputFormat.setOutputPath(job, new Path(outputDirectory));

df.output(parquetFormat);
env.execute();


Please suggest.

Thanks,
Anuj

On Mon, Dec 23, 2019 at 12:59 PM vino yang  wrote:

> Hi Anuj,
>
> After searching in Github, I found a demo repository about how to use
> parquet in Flink.[1]
>
> You can have a look. I can not make sure whether it is helpful or not.
>
> [1]: https://github.com/FelixNeutatz/parquet-flinktacular
>
> Best,
> Vino
>
> aj  于2019年12月21日周六 下午7:03写道:
>
>> Hello All,
>>
>> I am getting a set of events in JSON that I am dumping in the hourly
>> bucket in S3.
>> I am reading this hourly bucket and created a DataSet.
>>
>> I want to write this dataset as a parquet but I am not able to figure
>> out. Can somebody help me with this?
>>
>>
>> Thanks,
>> Anuj
>>
>>
>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>
>

-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07
<http://www.oracle.com/>


<http://www.cse.iitm.ac.in/%7Eanujjain/>


Flink Dataset to ParquetOutputFormat

2019-12-21 Thread aj
Hello All,

I am getting a set of events in JSON that I am dumping in the hourly
bucket in S3.
I am reading this hourly bucket and created a DataSet.

I want to write this dataset as a parquet but I am not able to figure out.
Can somebody help me with this?


Thanks,
Anuj





Re: watermark trigger doesn't check whether element's timestamp is passed

2016-11-01 Thread aj heller
Hi Manu, Aljoscha,

I had been interested in implementing FLIP-2, but I haven't been able to
make time for it. There is no implementation yet that I'm aware of, and
I'll gladly step aside (or help out how I can) if you or anyone is
interested to take charge of it.

That said, I'm also not sure if discussions are ongoing. I had hoped to
prototype the proposal as is, to have something more concrete to discuss.

Cheers,
aj
On Nov 1, 2016 3:24 PM, "Manu Zhang" <owenzhang1...@gmail.com> wrote:

> Thanks.  The ideal case is to fire after watermark past each element from
> the window but that requires a custom trigger and FLIP-2 as well. The
> enhanced window evictor will help to avoid the last firing.
>
> Are the discussions on FLIP-2 still going on ?
> Are there any opening JIRAs or PRs ? (The proposed `ProcessWindowFunction`
> will be sufficient for my case)
> Is there a workaround now for my case ?
>
> Thanks again for following through this.
> Manu
>
> On Wed, Nov 2, 2016 at 1:07 AM Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
>> Ah, I finally understand it. You would a way to query the current
>> watermark in the window function to only emit those elements where the
>> timestamp is lower than the watermark.
>>
>> When the window fires again, do you want to emit elements that you
>> emitted during the last firing again? If not, I think you also need to use
>> an evictor to evict the elements from the window where the timestamp is
>> lower than the watermark. With this FLIP https://cwiki.apache.org/
>> confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata we
>> should be able to extend the WindowFunction Context to also provide the
>> current watermark. With this recent PR https://github.com/apache/
>> flink/pull/2736 you would be able to evict elements from the window
>> state after the window function was called.
>>
>> Cheers,
>> Aljoscha
>>
>> On Tue, 1 Nov 2016 at 02:27 Manu Zhang <owenzhang1...@gmail.com> wrote:
>>
>> Yes, here's the example https://github.com/manuzhang/flink/blob/pv/flink-
>> examples/flink-examples-streaming/src/main/scala/org/
>> apache/flink/streaming/scala/examples/session/
>> PageViewSessionWindowing.scala
>>
>> If you print and compare the timestamp of timer with that of "PageView"
>> in the outputs, you could see what I mean.
>>
>> I think the recently introduced TimelyFlatMapFunction is close to what I
>> want to achieve. It will be great if we can query time information in the
>> window function so I filed https://issues.apache.
>> org/jira/browse/FLINK-4953
>>
>> Thanks for your time.
>>
>> Manu
>>
>> On Mon, Oct 31, 2016 at 10:29 PM Aljoscha Krettek <aljos...@apache.org>
>> wrote:
>>
>> Hmm, I don't completely understand what's going on. Could you maybe post
>> an example, with the trigger code that shows this behaviour?
>>
>> Cheers,
>> Aljoscha
>>
>> On Thu, 27 Oct 2016 at 17:12 Manu Zhang <owenzhang1...@gmail.com> wrote:
>>
>> Hi,
>>
>> It's what I'm seeing. If timers are not fired at the end of window, a
>> state (in the window) whose timestamp is *after *the timer will also be
>> emitted. That's a problem for event-time trigger.
>>
>> Thanks,
>> Manu
>>
>>
>> On Thu, Oct 27, 2016 at 10:29 PM Aljoscha Krettek <aljos...@apache.org>
>> wrote:
>>
>> Hi,
>> is that example input/output what you would like to achieve or what you
>> are currently seeing with Flink? I think for your use case a custom Trigger
>> would be required that works like the event-time trigger but additionally
>> registers timers for each element where you want to emit.
>>
>> Cheers,
>> Aljoscha
>>
>> On Wed, 26 Oct 2016 at 04:04 Manu Zhang <owenzhang1...@gmail.com> wrote:
>>
>> Hi Aljoscha,
>>
>> Thanks for your response.  My use case is to track user trajectory based
>> on page view event when they visit a website.  The input would be like a
>> list of PageView(userId, url, eventTimestamp) with watermarks (=
>> eventTimestamp - duration). I'm trying SessionWindows with some event time
>> trigger. Note we can't wait for the end of session window due to latency.
>> Instead, we want to emit the user trajectories whenever a buffered
>> PageView's event time is passed by watermark. I tried
>> ContinuousEventTimeTrigger and a custom trigger which sets timer on each
>> element's timestamp. For both triggers I've witnessed a problem like the
>> following (e.g. a session gap of 5)
>>

Re: Merging N parallel/partitioned WindowedStreams together, one-to-one, into a global window stream

2016-10-06 Thread AJ Heller
Thank you Fabian, I think that solves it. I'll need to rig up some tests to
verify, but it looks good.

I used a RichMapFunction to assign ids incrementally to windows (mapping
STREAM_OBJECT to Tuple2<Long, STREAM_OBJECT> using a private long value in
the mapper that increments on every map call). It works, but by any chance
is there a more succinct way to do it?

On Thu, Oct 6, 2016 at 1:50 PM, Fabian Hueske <fhue...@gmail.com> wrote:

> Maybe this can be done by assigning the same window id to each of the N
> local windows, and do a
>
> .keyBy(windowId)
> .countWindow(N)
>
> This should create a new global window for each window id and collect all
> N windows.
>
> Best, Fabian
>
> 2016-10-06 22:39 GMT+02:00 AJ Heller <a...@drfloob.com>:
>
>> The goal is:
>>  * to split data, random-uniformly, across N nodes,
>>  * window the data identically on each node,
>>  * transform the windows locally on each node, and
>>  * merge the N parallel windows into a global window stream, such that
>> one window from each parallel process is merged into a "global window"
>> aggregate
>>
>> I've achieved all but the last bullet point, merging one window from each
>> partition into a globally-aggregated window output stream.
>>
>> To be clear, a rolling reduce won't work because it would aggregate over
>> all previous windows in all partitioned streams, and I only need to
>> aggregate over one window from each partition at a time.
>>
>> Similarly for a fold.
>>
>> The closest I have found is ParallelMerge for ConnectedStreams, but I
>> have not found a way to apply it to this problem. Can flink achieve this?
>> If so, I'd greatly appreciate a point in the right direction.
>>
>> Cheers,
>> -aj
>>
>
>


Re: ResourceManager not using correct akka URI in standalone cluster (?)

2016-09-28 Thread AJ Heller
Thank you Till. I was in a time crunch, and rebuilt my cluster from the
ground up with hadoop installed. All works fine now, `netstat -pn | grep
6123` shows flink's pid. Hadoop may be irrelevant, I can't rule out PEBKAC
yet :-). Sorry, when I have time I'll attempt to reproduce the scenario, on
the off chance there's a bug in there I can help dig up.

Best,
aj


Re: ResourceManager not using correct akka URI in standalone cluster (?)

2016-09-15 Thread AJ Heller
More information:

>From the master node, I cannot `telnet localhost 6123` nor `telnet  6123` while the cluster is apparently running. Connection refused
immediately. `netstat -n | grep 6123` is empty. There's no server
listening. But the processes are running on all machines.

Does it matter that I don't have hadoop or HDFS installed? It is optional,
right? To be clear, this fails at startup, long before I'm able to run any
job.

On Amazon EC2, the machines know of their private IPs, but not their public
IPs. I've instructed the cluster to operate over the public network because
I couldn't get the private IP scenario working.

Running `./bin/start-local.sh` shows non-zero counts in the Flink
Dashboard. Cluster setups show zero-counts all around.

-aj

On Thu, Sep 15, 2016 at 12:41 PM, AJ Heller <a...@drfloob.com> wrote:

> I'm running a standalone cluster on Amazon EC2. Leader election is
> happening according to the logs, and the Flink Dashboard is up and running,
> accessible remotely. The issue I'm having is that the SocketWordCount
> example is not working, the local connection is being refused!
>
> In the Flink Dashboard, 0 task managers are being reported. And in the
> jobmanager logs, the last line indicates "leader session null". All other
> akka URIs in the log file begin "akka.tcp://flink@PUBLIC_IP/...", but the
> Resourse Manager URI indicated "akka://flink/...".
>
>
> jobmanager log:
> http://pastebin.com/VWJM8XvW
>
> client log:
> http://pastebin.com/ZrWsbcwa
>
> flink-conf.yaml:
> http://pastebin.com/xy2tz7WS
>
> master and slave files are populated with public ips as well.
>


ResourceManager not using correct akka URI in standalone cluster (?)

2016-09-15 Thread AJ Heller
I'm running a standalone cluster on Amazon EC2. Leader election is
happening according to the logs, and the Flink Dashboard is up and running,
accessible remotely. The issue I'm having is that the SocketWordCount
example is not working, the local connection is being refused!

In the Flink Dashboard, 0 task managers are being reported. And in the
jobmanager logs, the last line indicates "leader session null". All other
akka URIs in the log file begin "akka.tcp://flink@PUBLIC_IP/...", but the
Resourse Manager URI indicated "akka://flink/...".


jobmanager log:
http://pastebin.com/VWJM8XvW

client log:
http://pastebin.com/ZrWsbcwa

flink-conf.yaml:
http://pastebin.com/xy2tz7WS

master and slave files are populated with public ips as well.