Re: CoProcessFunction vs Temporal Table to enrich multiple streams with a slow stream

2019-05-03 Thread Averell
Thank you Piotr for the thorough answer.

So you meant implementation in DataStreamAPI with cutting corners would,
generally, shorter than Table Join. I thought that using Tables would be
more intuitive and shorter, hence my initial question :)

Regarding all the limitations with Table API that you mentioned, is there
any summary page in Flink docs for that?

Thanks and regards,
Averell




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: DateTimeBucketAssigner using Element Timestamp

2019-05-03 Thread Peter Groesbeck
Thanks for the quick response Piotr,

I feel like I have everything working but no files are getting written to
disk. I've implemented my own BucketAssigner like so:

class BackFillBucketAssigner[IN] extends BucketAssigner[IN, String] {
  override def getBucketId(element: IN, context:
BucketAssigner.Context): String = {

DateTimeFormatter.ofPattern(formatString).withZone(ZoneId.systemDefault).format(Instant.ofEpochMilli(context.timestamp()))
  }

  override def getSerializer: SimpleVersionedSerializer[String] =
SimpleVersionedStringSerializer.INSTANCE

}

And plugged it into my sink:

  val parquet = StreamingFileSink
.forBulkFormat(path,
ParquetAvroWriters.forGenericRecord(ReflectData.get().getSchema(clazz)))
.withBucketAssigner(new BackFillBucketAssigner[GenericRecord])
.build

stream.addSink(parquet)

When I run locally I can see the temporary part files but nothing ever
gets rolled. I saw this once before when I didn't have checkpointing
enabled for my original streaming job and this note tipped me off:

IMPORTANT: Bulk-encoding formats can only be combined with the
`OnCheckpointRollingPolicy`, which rolls the in-progress part file on
every checkpoint.

Is it possible that something similar is happening? I have enabled
checkpointing in the job however since it is reading from flat files
and assigning a timestamp, is it possible checkpointing not working as
I expect? Nothing in my logs seems to suggest an error and the job
runs to completion (about 30 minutes).

Thanks again for your help!
Peter


On Fri, May 3, 2019 at 4:46 AM Piotr Nowojski  wrote:

> Hi Peter,
>
> It sounds like this should work, however my question would be do you want
> exactly-once processing? If yes, then you would have to somehow know which
> exact events needs re-processing or deduplicate them somehow. Keep in mind
> that in case of an outage in the original job, you probably will have some
> files already committed by the StreamingFileSink.
>
> Another approach might be to somehow overwrite the previous files (but
> then you would have to check whether the bucket assignment and file naming
> is completely deterministic) or before reprocessing from backup remove the
> dirty files from the crashed job.
>
> Piotrek
>
> On 2 May 2019, at 23:10, Peter Groesbeck 
> wrote:
>
> Hi all,
>
> I have an application that reads from various Kafka topics and writes
> parquet files to corresponding buckets on S3 using StreamingFileSink with
> DateTimeBucketAssigner. The upstream application that writes to Kafka
> also writes records as gzipped json files to date bucketed locations on S3
> as backup.
>
> One requirement we have is to back fill missing data in the event that the
> application or Kafka experiences an outage. This can be accomplished by
> reading the backup files that were written to S3 by our upstream
> application instead of reading from Kafka. My current approach is to read
> the hourly backup buckets, transform the files into a DataStream and
> assign them a timestamp based on a datetime field on the json records using
> BoundedOutOfOrdernessTimestampExtractor. I was then hoping to connect the
> DataStream to the same StreamingFileSink which ideally would write past
> records in the same manner as if they had been streamed by Kafka.
>
> Unfortunately for me, the bucket assigner works on system time:
>
> A BucketAssigner
> 
>  that
> assigns to buckets based on current system time.
>
> @Override
> public String getBucketId(IN element, BucketAssigner.Context context) {
>if (dateTimeFormatter == null) {
>   dateTimeFormatter = 
> DateTimeFormatter.ofPattern(formatString).withZone(zoneId);
>}
>return 
> dateTimeFormatter.format(Instant.ofEpochMilli(context.currentProcessingTime()));
> }
>
>
> No problem, I can extend DateTimeBucketAssigner and override the method to 
> grab elementTimestamp instead of currentProccessingTime, but I'm wondering if 
> this is the right approach? And if so would this behavior be useful outside 
> of the context of my application?
>
> Thanks in advance for your help and for this awesome framework!
>
> Peter
>
>
>


Re: Filter push-down not working for a custom BatchTableSource

2019-05-03 Thread Josh Bradt
Hi Fabian,

Thanks for taking a look. I've filed this ticket:
https://issues.apache.org/jira/browse/FLINK-12399

Thanks,

Josh

On Fri, May 3, 2019 at 3:41 AM Fabian Hueske  wrote:

> Hi Josh,
>
> The code looks good to me.
> This seems to be a bug then.
> It's strange that it works for ORC.
>
> Would you mind opening a Jira ticket and maybe a simple reproducable code
> example?
>
> Thank you,
> Fabian
>
> Am Do., 2. Mai 2019 um 18:23 Uhr schrieb Josh Bradt <
> josh.br...@klaviyo.com>:
>
>> Hi Fabian,
>>
>> Thanks for your reply. My custom table source does not implement
>> ProjectableTableSource. I believe that isFilterPushedDown is implemented
>> correctly since it's nearly identical to what's written in the
>> OrcTableSource. I pasted a slightly simplified version of the
>> implementation below. If you wouldn't mind reading over it, is there
>> anything obviously wrong?
>>
>> public final class CustomerTableSource implements BatchTableSource,
>> FilterableTableSource {
>>
>> // Iterator that gets data from a REST API as POJO instances
>> private final AppResourceIterator resourceIterator;
>> private final String tableName;
>> private final Class modelClass;
>> private final AppRequestFilter[] filters;
>>
>> public CustomerTableSource(
>> AppResourceIterator resourceIterator,
>> String tableName,
>> Class modelClass) {
>>
>> this(resourceIterator, tableName, modelClass, null);
>> }
>>
>> protected CustomerTableSource(
>> AppResourceIterator resourceIterator,
>> String tableName,
>> Class modelClass,
>> AppRequestFilter[] filters) {
>>
>> this.resourceIterator = resourceIterator;
>> this.tableName = tableName;
>> this.modelClass = modelClass;
>> this.filters = filters;
>> }
>>
>> @Override
>> public TableSource applyPredicate(List predicates) 
>> {
>> List acceptedPredicates = new ArrayList<>();
>> List acceptedFilters = new ArrayList<>();
>>
>> for (final Expression predicate : predicates) {
>> buildFilterForPredicate(predicate).ifPresent(filter -> {
>> acceptedFilters.add(filter);
>> acceptedPredicates.add(predicate);
>> });
>> }
>>
>> predicates.removeAll(acceptedPredicates);
>>
>> return new CustomerTableSource(
>> resourceIterator.withFilters(acceptedFilters),
>> tableName,
>> modelClass,
>> acceptedFilters.toArray(new AppRequestFilter[0])
>> );
>> }
>>
>> public Optional buildFilterForPredicate(Expression 
>> predicate) {
>> // Code for translating an Expression into an AppRequestFilter
>> // Returns Optional.empty() for predicates we don't want to / can't 
>> apply
>> }
>>
>> @Override
>> public boolean isFilterPushedDown() {
>> return filters != null;
>> }
>>
>> @Override
>> public DataSet getDataSet(ExecutionEnvironment execEnv) {
>> return execEnv.fromCollection(resourceIterator, modelClass);
>> }
>>
>> @Override
>> public TypeInformation getReturnType() {
>> return TypeInformation.of(modelClass);
>> }
>>
>> @Override
>> public TableSchema getTableSchema() {
>> return TableSchema.fromTypeInfo(getReturnType());
>> }
>> }
>>
>>
>> Thanks,
>>
>> Josh
>>
>> On Thu, May 2, 2019 at 3:42 AM Fabian Hueske  wrote:
>>
>>> Hi Josh,
>>>
>>> Does your TableSource also implement ProjectableTableSource?
>>> If yes, you need to make sure that the filter information is also
>>> forwarded if ProjectableTableSource.projectFields() is called after
>>> FilterableTableSource.applyPredicate().
>>> Also make sure to correctly implement
>>> FilterableTableSource.isFilterPushedDown().
>>>
>>> Hope this helps,
>>> Fabian
>>>
>>> Am Di., 30. Apr. 2019 um 22:29 Uhr schrieb Josh Bradt <
>>> josh.br...@klaviyo.com>:
>>>
 Hi all,

 I'm trying to implement filter push-down on a custom BatchTableSource
 that retrieves data from a REST API and returns it as POJO instances. I've
 implemented FilterableTableSource as described in the docs, returning a new
 instance of my table source containing the predicates that I've removed
 from the list of predicates passed into applyPredicate. However, when
 getDataSet is eventually called, it's called on the instance of the table
 source that was originally registered with the table environment, which
 does not have any filters in it. I've stepped through the code in a
 debugger, and applyPredicates is definitely being called, and it's
 definitely returning new instances of my table source, but they don't seem
 to be being used.

 I also played with the OrcTableSource, which is the only example of a
 push-down filter implementation I could find, and it doesn't behave this
 

Re: [EXTERNAL] Re: Avro SerializationSchema for Confluent Registry

2019-05-03 Thread PoolakkalMukkath, Shakir
Thanks Dawid. That helps

From: Dawid Wysakowicz 
Date: Friday, May 3, 2019 at 9:26 AM
To: "PoolakkalMukkath, Shakir" , Till 
Rohrmann , "user@flink.apache.org" 
, Dominik Wosiński 
Subject: [EXTERNAL] Re: Avro SerializationSchema for Confluent Registry


Hi Shakir,

There is no out of the box Serialization schema that uses Confluent Registry. 
There is an open PR[1] that tries to implement that. It is a work in progress 
though. Recently the issue was picked up by another contributor (Dominik) that 
wanted to take it over.

You may try to reuse some of the code there.

Best,

Dawid

[1] 
https://github.com/apache/flink/pull/6259
On 03/05/2019 15:09, PoolakkalMukkath, Shakir wrote:
Hi Till,
Is there a Serialization schema for Kafka Producer when using Confluent 
Registry ? I am trying to publish to a topic which uses confluent registry and 
Avro schema.

If there is one, please point me. Otherwise, what are the alternatives to do 
this ?

Thanks
Shakir


Ask about submitting multiple stream jobs to Flink

2019-05-03 Thread Rad Rad
Hi, 

I have a jar file which aims to submit multiple Flink Stream jobs. When the
program submits the first one successfully to the Flink,  my program can't
submit the second one even it can't go to the second code line. 

How can I fix this problem to submit different stream jobs at regular
intervals within the same program? 


Regards, 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: assignTimestampsAndWatermarks not work after KeyedStream.process

2019-05-03 Thread an0
Thanks, but it does't seem covering this rule:
--- Quote
Watermarks are generated at, or directly after, source functions. Each parallel 
subtask of a source function usually generates its watermarks independently. 
These watermarks define the event time at that particular parallel source.

As the watermarks flow through the streaming program, they advance the event 
time at the operators where they arrive. Whenever an operator advances its 
event time, it generates a new watermark downstream for its successor operators.

Some operators consume multiple input streams; a union, for example, or 
operators following a keyBy(…) or partition(…) function. Such an operator’s 
current event time is the minimum of its input streams’ event times. As its 
input streams update their event times, so does the operator.
--- End Quote

The most relevant part, I believe, is this:
"Some operators consume multiple input streams…operators following a keyBy(…) 
function. Such an operator’s current event time is the minimum of its input 
streams’ event times."

But the wording of "current event time is the minimum of its input streams’ 
event times" actually implies that the input streams(produced by keyBy) have 
different watermarks, the exactly opposite of what you just explained.

On 2019/05/03 07:32:07, Fabian Hueske  wrote: 
> Hi,
> 
> this should be covered here:
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/event_time.html#watermarks-in-parallel-streams
> 
> Best, Fabian
> 
> Am Do., 2. Mai 2019 um 17:48 Uhr schrieb an0 :
> 
> > This explanation is exactly what I'm looking for, thanks! Is such an
> > important rule documented anywhere in the official document?
> >
> > On 2019/04/30 08:47:29, Fabian Hueske  wrote:
> > > An operator task broadcasts its current watermark to all downstream tasks
> > > that might receive its records.
> > > If you have an the following code:
> > >
> > > DataStream a = ...
> > > a.map(A).map(B).keyBy().window(C)
> > >
> > > and execute this with parallelism 2, your plan looks like this
> > >
> > > A.1 -- B.1 --\--/-- C.1
> > >   X
> > > A.2 -- B.2 --/--\-- C.2
> > >
> > > A.1 will propagate its watermarks to B.1 because only B.1 will receive
> > its
> > > output events.
> > > However, B.1 will propagate its watermarks to C.1 and C.2 because the
> > > output of B.1 is partitioned and all C tasks might receive output events
> > > from B.1.
> > >
> > > Best, Fabian
> > >
> > > Am Mo., 29. Apr. 2019 um 20:06 Uhr schrieb an0 :
> > >
> > > > Thanks very much. It definitely explains the problem I'm seeing.
> > However,
> > > > something I need to confirm:
> > > > You say "Watermarks are broadcasted/forwarded anyway." Do you mean, in
> > > > assingTimestampsAndWatermarks.keyBy.window, it doesn't matter what data
> > > > flows through a specific key's stream, all key streams have the same
> > > > watermarks? So time-wise, `window` behaves as if `keyBy` is not there
> > at
> > > > all?
> > > >
> > > > On 2019/04/26 06:34:10, Dawid Wysakowicz 
> > wrote:
> > > > > Hi,
> > > > >
> > > > > Watermarks are meta events that travel independently of data events.
> > > > >
> > > > > 1) If you assingTimestampsAndWatermarks before keyBy, all parallel
> > > > > instances of trips have some data(this is my assumption) so
> > Watermarks
> > > > > can be generated. Afterwards even if some of the keyed partitions
> > have
> > > > > no data, Watermarks are broadcasted/forwarded anyway. In other words
> > if
> > > > > at some point Watermarks were generated for all partitions of a
> > single
> > > > > stage, they will be forwarded beyond this point.
> > > > >
> > > > > 2) If you assingTimestampsAndWatermarks after keyBy, you try to
> > assign
> > > > > watermarks for an empty partition which produces no Watermarks at all
> > > > > for this partition, therefore there is no progress beyond this point.
> > > > >
> > > > > I hope this clarifies it a bit.
> > > > >
> > > > > Best,
> > > > >
> > > > > Dawid
> > > > >
> > > > > On 25/04/2019 16:49, an0 wrote:
> > > > > > If my understanding is correct, then why
> > > > `assignTimestampsAndWatermarks` before `keyBy` works? The
> > `timeWindowAll`
> > > > stream's input streams are task 1 and task 2, with task 2 idling, no
> > matter
> > > > whether `assignTimestampsAndWatermarks` is before or after `keyBy`,
> > because
> > > > whether task 2 receives elements only depends on the key distribution,
> > has
> > > > nothing to do with timestamp assignment, right?
> > > > > >
> > > > > >
> > > >  /key 1 trips\
> > > > > >
> > > >/\
> > > > > > (A) trips--> assignTimestampsAndWatermarks-->keyBy
> > > > timeWindowAll
> > > > > >
> > > >\   idle/
> > > > > >
> > > >  \key 2 trips/
> > > > > >
> > > > > >/key 1 trips-->
> > > > assignTimestampsAndWatermarks\
> > > > > >  /
> > > >

Re: CoProcessFunction vs Temporal Table to enrich multiple streams with a slow stream

2019-05-03 Thread Piotr Nowojski
Hi Averell,

I will be referring to your original two options: 1 (duplicating stream_C) and 
2 (multiplexing stream_A and stream_B).

Both of them could be expressed using Temporal Table Join. You could multiplex 
stream_A and stream_B in Table API, temporal table join them with stream_C and 
then de multiplex them in DataStream API.

Resource usage/consumption would be more or less the same, but it depends to 
what you are comparing it. Temporal Table Joins in Table API when using 
processing time have little no overhead. When using event time, there is much 
more complicated logic how handle out of order data, when to emit the data (on 
watermark as the Table API’s implementation? Asap?). I could imagine different 
implementations cutting some corners here and there, but if you would like to 
implement the same set of features that Temporal Table Join provides in 
DataStream API, you would end up with roughly the same code (if not, if you end 
up with something better please contribute it! :) ). Please check the 
implementation details of 
org.apache.flink.table.runtime.join.TemporalRowtimeJoin and 
org.apache.flink.table.runtime.join.TemporalProcessTimeJoin.

Having said that, you have to answer yourself whether it’s better to implement 
the Temporal Join on your own in DataStream API or wether to go through the 
hassle of converting your DataStream to Tables and back again. I would guess no 
- if you are already working in DataStream API environment, using Table API 
will have some limitations, like possible data conversion or the fact that you 
are loosing the control over the state of your operator - Table API doesn’t 
provide support for keeping the state of the job/query during upgrading Flink 
versions or if you would like to modify your Table API job graph/query. While 
with DataStream API both of those things are supported.

Piotrek

> On 3 May 2019, at 15:22, Averell  wrote:
> 
> Hi,
> 
> Back to my story about enriching two different streams with data from one
> (slow stream) using Flink's low lever functions like CoProcessFunction
> (mentioned in this thread:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/CoFlatMapFunction-with-more-than-two-input-streams-td22320.html)
> 
> Now I see that Flink Table also support doing something similar with
> Temporal Table [1]. With this, I would only need to convert my enrichment
> stream to be a Temporal table, and the two other streams into two unbounded
> tables.
> 
> */In term of performance and resource usage/*, would this way of
> implementation (using Flink Table) be better than the option no.1 mentioned
> in my other thread: creating two different (though similar)
> CoProcessFunction's, maintaining two state tables (for the enrichment
> stream, one in each function)?
> 
> Thanks and best regards,
> Averell 
> 
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/streaming/temporal_tables.html
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



CoProcessFunction vs Temporal Table to enrich multiple streams with a slow stream

2019-05-03 Thread Averell
Hi,

Back to my story about enriching two different streams with data from one
(slow stream) using Flink's low lever functions like CoProcessFunction
(mentioned in this thread:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/CoFlatMapFunction-with-more-than-two-input-streams-td22320.html)

Now I see that Flink Table also support doing something similar with
Temporal Table [1]. With this, I would only need to convert my enrichment
stream to be a Temporal table, and the two other streams into two unbounded
tables.

*/In term of performance and resource usage/*, would this way of
implementation (using Flink Table) be better than the option no.1 mentioned
in my other thread: creating two different (though similar)
CoProcessFunction's, maintaining two state tables (for the enrichment
stream, one in each function)?

Thanks and best regards,
Averell 

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/streaming/temporal_tables.html



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Avro SerializationSchema for Confluent Registry

2019-05-03 Thread Dawid Wysakowicz
Hi Shakir,

There is no out of the box Serialization schema that uses Confluent
Registry. There is an open PR[1] that tries to implement that. It is a
work in progress though. Recently the issue was picked up by another
contributor (Dominik) that wanted to take it over.

You may try to reuse some of the code there.

Best,

Dawid

[1] https://github.com/apache/flink/pull/6259

On 03/05/2019 15:09, PoolakkalMukkath, Shakir wrote:
>
> Hi Till,
>
> Is there a Serialization schema for Kafka Producer when using
> Confluent Registry ? I am trying to publish to a topic which uses
> confluent registry and Avro schema.
>
> If there is one, please point me. Otherwise, what are the alternatives
> to do this ?
>
> Thanks
>
> Shakir
>


signature.asc
Description: OpenPGP digital signature


Avro SerializationSchema for Confluent Registry

2019-05-03 Thread PoolakkalMukkath, Shakir
Hi Till,
Is there a Serialization schema for Kafka Producer when using Confluent 
Registry ? I am trying to publish to a topic which uses confluent registry and 
Avro schema.

If there is one, please point me. Otherwise, what are the alternatives to do 
this ?

Thanks
Shakir


Re: Timestamp and key preservation over operators

2019-05-03 Thread Averell
Thank you Fabian.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: RocksDB native checkpoint time

2019-05-03 Thread Konstantin Knauf
Hi Gyula,

I looked into this a bit recently as well and did some experiments (on my
local machine). The only parameter that significantly changed anything in
this setup was reducing the total size of the write buffers (number or size
memtables). I was not able to find any online resources on the performance
of checkpoint creation in RocksDB, so looking forward to your findings...

Cheers,

Konstantin


On Fri, May 3, 2019 at 12:10 PM Gyula Fóra  wrote:

> Thanks Piotr for the tips we will play around with some settings.
>
> @Stefan
> It is a few columns but a lot of rows
>
> Gyula
>
> On Fri, May 3, 2019 at 11:43 AM Stefan Richter 
> wrote:
>
>> Hi,
>>
>> out of curiosity, does it happen with jobs that have a large number of
>> states (column groups) or also for jobs with few column groups and just
>> “big state”?
>>
>> Best,
>> Stefan
>>
>> On 3. May 2019, at 11:04, Piotr Nowojski  wrote:
>>
>> Hi Gyula,
>>
>> Have you read our tuning guide?
>>
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#tuning-rocksdb
>>
>> Synchronous part is mostly about flushing data to disks, so you could try
>> to optimise your setup having that in mind. Limiting the size of a page
>> cache, speeding up the writes (using more/faster disks…), etc… Maybe you
>> can also look at online resources how to speedup calls to
>> `org.rocksdb.Checkpoint#create`.
>>
>> Piotrek
>>
>> On 3 May 2019, at 10:30, Gyula Fóra  wrote:
>>
>> Hi!
>>
>> Does anyone know what parameters might affect the RocksDB native
>> checkpoint time? (basically the sync part of the rocksdb incremental
>> snapshots)
>>
>> It seems to take 60-70 secs in some cases for larger state sizes, and I
>> wonder if there is anything we could tune to reduce this. Maybe its only a
>> matter of size i dont know.
>>
>> Any ideas would be appreciated :)
>> Gyula
>>
>>
>>
>>

-- 

Konstantin Knauf | Solutions Architect

+49 160 91394525


Planned Absences: -




Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen


Re: RocksDB native checkpoint time

2019-05-03 Thread Gyula Fóra
Thanks Piotr for the tips we will play around with some settings.

@Stefan
It is a few columns but a lot of rows

Gyula

On Fri, May 3, 2019 at 11:43 AM Stefan Richter 
wrote:

> Hi,
>
> out of curiosity, does it happen with jobs that have a large number of
> states (column groups) or also for jobs with few column groups and just
> “big state”?
>
> Best,
> Stefan
>
> On 3. May 2019, at 11:04, Piotr Nowojski  wrote:
>
> Hi Gyula,
>
> Have you read our tuning guide?
>
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#tuning-rocksdb
>
> Synchronous part is mostly about flushing data to disks, so you could try
> to optimise your setup having that in mind. Limiting the size of a page
> cache, speeding up the writes (using more/faster disks…), etc… Maybe you
> can also look at online resources how to speedup calls to
> `org.rocksdb.Checkpoint#create`.
>
> Piotrek
>
> On 3 May 2019, at 10:30, Gyula Fóra  wrote:
>
> Hi!
>
> Does anyone know what parameters might affect the RocksDB native
> checkpoint time? (basically the sync part of the rocksdb incremental
> snapshots)
>
> It seems to take 60-70 secs in some cases for larger state sizes, and I
> wonder if there is anything we could tune to reduce this. Maybe its only a
> matter of size i dont know.
>
> Any ideas would be appreciated :)
> Gyula
>
>
>
>


Re: update the existing Keyed value state

2019-05-03 Thread Piotr Nowojski
Hi,

This might be tricky. There are some on-going efforts [1] and 3rd party project 
[2] that allow you to read save point, modify it and write back the new 
modified save point from which you can restore.

Besides those, you might be able to modify the code of your aggregators, to 
initialise their value for the first time either from something hardcoded or by 
some call to external system/lookup in a database/read from file.

[1] https://github.com/sjwiesman/flink/commits/savepoint-connector 

[2] https://github.com/king/bravo 

Piotr Nowojski

> On 3 May 2019, at 11:14, Selvaraj chennappan  
> wrote:
> 
> Hi Users,
> We want to have a real time aggregation (KPI) .
> we are maintaining aggregation counters in the keyed value  state  . 
> key could be customer activation date and type.
> Lot of counters are maintained against that key.
> 
> If we want to add one more counter for the existing keys which is in the 
> state backend.  
> 
> 1.compute the new counter value using database data .
> 2. group the counter value based on the key   
> 
> How do we update the new computed counter to all the existing keyed state?.
> 
> 
> 
> 
> 
> Regards,
> Selvaraj C



Re: DateTimeBucketAssigner using Element Timestamp

2019-05-03 Thread Piotr Nowojski
Hi Peter,

It sounds like this should work, however my question would be do you want 
exactly-once processing? If yes, then you would have to somehow know which 
exact events needs re-processing or deduplicate them somehow. Keep in mind that 
in case of an outage in the original job, you probably will have some files 
already committed by the StreamingFileSink.

Another approach might be to somehow overwrite the previous files (but then you 
would have to check whether the bucket assignment and file naming is completely 
deterministic) or before reprocessing from backup remove the dirty files from 
the crashed job.

Piotrek

> On 2 May 2019, at 23:10, Peter Groesbeck  wrote:
> 
> Hi all,
> 
> I have an application that reads from various Kafka topics and writes parquet 
> files to corresponding buckets on S3 using StreamingFileSink with 
> DateTimeBucketAssigner. The upstream application that writes to Kafka also 
> writes records as gzipped json files to date bucketed locations on S3 as 
> backup.
> 
> One requirement we have is to back fill missing data in the event that the 
> application or Kafka experiences an outage. This can be accomplished by 
> reading the backup files that were written to S3 by our upstream application 
> instead of reading from Kafka. My current approach is to read the hourly 
> backup buckets, transform the files into a DataStream and assign them a 
> timestamp based on a datetime field on the json records using 
> BoundedOutOfOrdernessTimestampExtractor. I was then hoping to connect the 
> DataStream to the same StreamingFileSink which ideally would write past 
> records in the same manner as if they had been streamed by Kafka. 
> 
> Unfortunately for me, the bucket assigner works on system time: 
> 
> A BucketAssigner 
> 
>  that assigns to buckets based on current system time.
> @Override
> public String getBucketId(IN element, BucketAssigner.Context context) {
>if (dateTimeFormatter == null) {
>   dateTimeFormatter = 
> DateTimeFormatter.ofPattern(formatString).withZone(zoneId);
>}
>return 
> dateTimeFormatter.format(Instant.ofEpochMilli(context.currentProcessingTime()));
> }
> 
> No problem, I can extend DateTimeBucketAssigner and override the method to 
> grab elementTimestamp instead of currentProccessingTime, but I'm wondering if 
> this is the right approach? And if so would this behavior be useful outside 
> of the context of my application?
> 
> Thanks in advance for your help and for this awesome framework!
> 
> Peter



Re: RocksDB native checkpoint time

2019-05-03 Thread Stefan Richter
Hi,

out of curiosity, does it happen with jobs that have a large number of states 
(column groups) or also for jobs with few column groups and just “big state”?

Best,
Stefan

> On 3. May 2019, at 11:04, Piotr Nowojski  wrote:
> 
> Hi Gyula,
> 
> Have you read our tuning guide?
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#tuning-rocksdb
>  
> 
> 
> Synchronous part is mostly about flushing data to disks, so you could try to 
> optimise your setup having that in mind. Limiting the size of a page cache, 
> speeding up the writes (using more/faster disks…), etc… Maybe you can also 
> look at online resources how to speedup calls to 
> `org.rocksdb.Checkpoint#create`.
> 
> Piotrek
> 
>> On 3 May 2019, at 10:30, Gyula Fóra > > wrote:
>> 
>> Hi!
>> 
>> Does anyone know what parameters might affect the RocksDB native checkpoint 
>> time? (basically the sync part of the rocksdb incremental snapshots)
>> 
>> It seems to take 60-70 secs in some cases for larger state sizes, and I 
>> wonder if there is anything we could tune to reduce this. Maybe its only a 
>> matter of size i dont know.
>> 
>> Any ideas would be appreciated :)
>> Gyula
> 



update the existing Keyed value state

2019-05-03 Thread Selvaraj chennappan
Hi Users,
We want to have a real time aggregation (KPI) .
we are maintaining aggregation counters in the keyed value  state  .
key could be customer activation date and type.
Lot of counters are maintained against that key.

If we want to add one more counter for the existing keys which is in the
state backend.

1.compute the new counter value using database data .
2. group the counter value based on the key

How do we update the new computed counter to all the existing keyed state?.





Regards,
Selvaraj C


Re: RocksDB native checkpoint time

2019-05-03 Thread Piotr Nowojski
Hi Gyula,

Have you read our tuning guide?
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#tuning-rocksdb
 


Synchronous part is mostly about flushing data to disks, so you could try to 
optimise your setup having that in mind. Limiting the size of a page cache, 
speeding up the writes (using more/faster disks…), etc… Maybe you can also look 
at online resources how to speedup calls to `org.rocksdb.Checkpoint#create`.

Piotrek

> On 3 May 2019, at 10:30, Gyula Fóra  wrote:
> 
> Hi!
> 
> Does anyone know what parameters might affect the RocksDB native checkpoint 
> time? (basically the sync part of the rocksdb incremental snapshots)
> 
> It seems to take 60-70 secs in some cases for larger state sizes, and I 
> wonder if there is anything we could tune to reduce this. Maybe its only a 
> matter of size i dont know.
> 
> Any ideas would be appreciated :)
> Gyula



Re: ClassNotFoundException on remote cluster

2019-05-03 Thread Chesnay Schepler
Historically spring applications are not interacting well with Flink; 
we've had people run in these issues multiple times on the mailing 
lists. Unfortunately, I don't believe any committer has any real 
experience in this area; I'm afraid I can neither help you myself nor 
refer you to someone that could :/


On 02/05/2019 15:26, Abhishek Jain wrote:
Also, the `execute()` call happens inside `MyWikiAnalysis` spring 
managed bean on PostConstruct but I don't think that should cause any 
issue. Any idea?


Let me know if you need more info on my environment.

On Thu, 2 May 2019 at 17:32, Abhishek Jain > wrote:


This is a spring boot app that I've packaged using maven (Apache
Maven 3.3.9). I've verified the class is present in the jar as well.

On Thu, 2 May 2019 at 17:25, Chesnay Schepler mailto:ches...@apache.org>> wrote:

How are you packaging the jar that you submit? Specifically,
are you ensuring that all your classes are actually contained
within?

On 02/05/2019 13:38, Abhishek Jain wrote:

Hi,
I'm running into ClassNotFoundException only when I run my
application on a standalone cluster (using flink cli). If i
directly run the main class in my IDE, it's working fine.
I've copied the stacktrace from flink standalone session logs
here
.

-- 
Warm Regards,

Abhishek Jain





-- 
Warm Regards,

Abhishek Jain



--
Warm Regards,
Abhishek Jain





RocksDB native checkpoint time

2019-05-03 Thread Gyula Fóra
Hi!

Does anyone know what parameters might affect the RocksDB native checkpoint
time? (basically the sync part of the rocksdb incremental snapshots)

It seems to take 60-70 secs in some cases for larger state sizes, and I
wonder if there is anything we could tune to reduce this. Maybe its only a
matter of size i dont know.

Any ideas would be appreciated :)
Gyula


Re: Timestamp and key preservation over operators

2019-05-03 Thread Fabian Hueske
The window operator cannot configured to use the max timestamp of the
events in the window as the timestamp of the output record.
The reason is that such a behavior can produce late records.

If you want to do that, you have to track the max timestamp and assign it
yourself with a timestamp assigner.

Best, Fabian

Am Fr., 3. Mai 2019 um 09:54 Uhr schrieb Averell :

> Thank you Fabian.
>
> One more question from me on this topic: as I send out early messages in my
> window function, the timestamp assigned by window function (to the end-time
> of the window) is not like my expectation. I want it to be the time of the
> (last) message that triggered the output.
>
> Is there any way to accomplish that?
> Currently, I have an assignTimestampsAndWatermarks after my window
> function,
> but, as you said, it is against the best practice.
>
> Thanks and regards,
> Averell
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Timestamp and key preservation over operators

2019-05-03 Thread Averell
Thank you Fabian.

One more question from me on this topic: as I send out early messages in my
window function, the timestamp assigned by window function (to the end-time
of the window) is not like my expectation. I want it to be the time of the
(last) message that triggered the output.

Is there any way to accomplish that?
Currently, I have an assignTimestampsAndWatermarks after my window function,
but, as you said, it is against the best practice. 

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Filter push-down not working for a custom BatchTableSource

2019-05-03 Thread Fabian Hueske
Hi Josh,

The code looks good to me.
This seems to be a bug then.
It's strange that it works for ORC.

Would you mind opening a Jira ticket and maybe a simple reproducable code
example?

Thank you,
Fabian

Am Do., 2. Mai 2019 um 18:23 Uhr schrieb Josh Bradt :

> Hi Fabian,
>
> Thanks for your reply. My custom table source does not implement
> ProjectableTableSource. I believe that isFilterPushedDown is implemented
> correctly since it's nearly identical to what's written in the
> OrcTableSource. I pasted a slightly simplified version of the
> implementation below. If you wouldn't mind reading over it, is there
> anything obviously wrong?
>
> public final class CustomerTableSource implements BatchTableSource,
> FilterableTableSource {
>
> // Iterator that gets data from a REST API as POJO instances
> private final AppResourceIterator resourceIterator;
> private final String tableName;
> private final Class modelClass;
> private final AppRequestFilter[] filters;
>
> public CustomerTableSource(
> AppResourceIterator resourceIterator,
> String tableName,
> Class modelClass) {
>
> this(resourceIterator, tableName, modelClass, null);
> }
>
> protected CustomerTableSource(
> AppResourceIterator resourceIterator,
> String tableName,
> Class modelClass,
> AppRequestFilter[] filters) {
>
> this.resourceIterator = resourceIterator;
> this.tableName = tableName;
> this.modelClass = modelClass;
> this.filters = filters;
> }
>
> @Override
> public TableSource applyPredicate(List predicates) {
> List acceptedPredicates = new ArrayList<>();
> List acceptedFilters = new ArrayList<>();
>
> for (final Expression predicate : predicates) {
> buildFilterForPredicate(predicate).ifPresent(filter -> {
> acceptedFilters.add(filter);
> acceptedPredicates.add(predicate);
> });
> }
>
> predicates.removeAll(acceptedPredicates);
>
> return new CustomerTableSource(
> resourceIterator.withFilters(acceptedFilters),
> tableName,
> modelClass,
> acceptedFilters.toArray(new AppRequestFilter[0])
> );
> }
>
> public Optional buildFilterForPredicate(Expression 
> predicate) {
> // Code for translating an Expression into an AppRequestFilter
> // Returns Optional.empty() for predicates we don't want to / can't 
> apply
> }
>
> @Override
> public boolean isFilterPushedDown() {
> return filters != null;
> }
>
> @Override
> public DataSet getDataSet(ExecutionEnvironment execEnv) {
> return execEnv.fromCollection(resourceIterator, modelClass);
> }
>
> @Override
> public TypeInformation getReturnType() {
> return TypeInformation.of(modelClass);
> }
>
> @Override
> public TableSchema getTableSchema() {
> return TableSchema.fromTypeInfo(getReturnType());
> }
> }
>
>
> Thanks,
>
> Josh
>
> On Thu, May 2, 2019 at 3:42 AM Fabian Hueske  wrote:
>
>> Hi Josh,
>>
>> Does your TableSource also implement ProjectableTableSource?
>> If yes, you need to make sure that the filter information is also
>> forwarded if ProjectableTableSource.projectFields() is called after
>> FilterableTableSource.applyPredicate().
>> Also make sure to correctly implement
>> FilterableTableSource.isFilterPushedDown().
>>
>> Hope this helps,
>> Fabian
>>
>> Am Di., 30. Apr. 2019 um 22:29 Uhr schrieb Josh Bradt <
>> josh.br...@klaviyo.com>:
>>
>>> Hi all,
>>>
>>> I'm trying to implement filter push-down on a custom BatchTableSource
>>> that retrieves data from a REST API and returns it as POJO instances. I've
>>> implemented FilterableTableSource as described in the docs, returning a new
>>> instance of my table source containing the predicates that I've removed
>>> from the list of predicates passed into applyPredicate. However, when
>>> getDataSet is eventually called, it's called on the instance of the table
>>> source that was originally registered with the table environment, which
>>> does not have any filters in it. I've stepped through the code in a
>>> debugger, and applyPredicates is definitely being called, and it's
>>> definitely returning new instances of my table source, but they don't seem
>>> to be being used.
>>>
>>> I also played with the OrcTableSource, which is the only example of a
>>> push-down filter implementation I could find, and it doesn't behave this
>>> way. When I set a breakpoint in getDataSet in that case, it's being called
>>> on one of the new instances of the table source that contains the accepted
>>> filters.
>>>
>>> Are there any other requirements for implementing push-down filters that
>>> aren't listed in the docs? Or does anyone have any tips for this?
>>>
>>> Thanks,
>>>
>>> Josh
>>>
>>> --
>>> 

Re: Timestamp and key preservation over operators

2019-05-03 Thread Fabian Hueske
Hi Averell,

Yes, timestamps and watermarks do not (completely) move together.
The watermark should always be lower than the timestamps of the currently
processed records.
Otherwise, the records might be processed as late records (depending on the
logic).

The easiest way to check the timestamp of a message is using a
ProcessFunction.
The Context of the processElement() method has a timestamp() method that
returns the timestamp of the current record.

Best, Fabian

Am Fr., 3. Mai 2019 um 06:08 Uhr schrieb Averell :

> Thank you Fabian.
>
> I have one more question about timestamp:
> In the previous email, you asked how did I check the timestamp - I don't
> have an answer. Then I only checked the watermark, not the timestamp. I had
> the (wrong) assumption that watermarks advance along with timestamps.
> Today I played with that early trigger window, putting the output into a
> table, and found that the timestamp is set to the window's end-time, but
> the
> watermark seems not. (My window is [10:00-10:15), my incoming msgs both
> have
> time-stamp of 10:00, which trigger one early output with timestamp
> 10:14:59.999, but the watermark stays at 10:00)
>
> Thus, my question: what is the easiest way to check the timestamp of a
> message?
>
> Thanks and regards,
> Averell
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: assignTimestampsAndWatermarks not work after KeyedStream.process

2019-05-03 Thread Fabian Hueske
Hi,

this should be covered here:
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/event_time.html#watermarks-in-parallel-streams

Best, Fabian

Am Do., 2. Mai 2019 um 17:48 Uhr schrieb an0 :

> This explanation is exactly what I'm looking for, thanks! Is such an
> important rule documented anywhere in the official document?
>
> On 2019/04/30 08:47:29, Fabian Hueske  wrote:
> > An operator task broadcasts its current watermark to all downstream tasks
> > that might receive its records.
> > If you have an the following code:
> >
> > DataStream a = ...
> > a.map(A).map(B).keyBy().window(C)
> >
> > and execute this with parallelism 2, your plan looks like this
> >
> > A.1 -- B.1 --\--/-- C.1
> >   X
> > A.2 -- B.2 --/--\-- C.2
> >
> > A.1 will propagate its watermarks to B.1 because only B.1 will receive
> its
> > output events.
> > However, B.1 will propagate its watermarks to C.1 and C.2 because the
> > output of B.1 is partitioned and all C tasks might receive output events
> > from B.1.
> >
> > Best, Fabian
> >
> > Am Mo., 29. Apr. 2019 um 20:06 Uhr schrieb an0 :
> >
> > > Thanks very much. It definitely explains the problem I'm seeing.
> However,
> > > something I need to confirm:
> > > You say "Watermarks are broadcasted/forwarded anyway." Do you mean, in
> > > assingTimestampsAndWatermarks.keyBy.window, it doesn't matter what data
> > > flows through a specific key's stream, all key streams have the same
> > > watermarks? So time-wise, `window` behaves as if `keyBy` is not there
> at
> > > all?
> > >
> > > On 2019/04/26 06:34:10, Dawid Wysakowicz 
> wrote:
> > > > Hi,
> > > >
> > > > Watermarks are meta events that travel independently of data events.
> > > >
> > > > 1) If you assingTimestampsAndWatermarks before keyBy, all parallel
> > > > instances of trips have some data(this is my assumption) so
> Watermarks
> > > > can be generated. Afterwards even if some of the keyed partitions
> have
> > > > no data, Watermarks are broadcasted/forwarded anyway. In other words
> if
> > > > at some point Watermarks were generated for all partitions of a
> single
> > > > stage, they will be forwarded beyond this point.
> > > >
> > > > 2) If you assingTimestampsAndWatermarks after keyBy, you try to
> assign
> > > > watermarks for an empty partition which produces no Watermarks at all
> > > > for this partition, therefore there is no progress beyond this point.
> > > >
> > > > I hope this clarifies it a bit.
> > > >
> > > > Best,
> > > >
> > > > Dawid
> > > >
> > > > On 25/04/2019 16:49, an0 wrote:
> > > > > If my understanding is correct, then why
> > > `assignTimestampsAndWatermarks` before `keyBy` works? The
> `timeWindowAll`
> > > stream's input streams are task 1 and task 2, with task 2 idling, no
> matter
> > > whether `assignTimestampsAndWatermarks` is before or after `keyBy`,
> because
> > > whether task 2 receives elements only depends on the key distribution,
> has
> > > nothing to do with timestamp assignment, right?
> > > > >
> > > > >
> > >  /key 1 trips\
> > > > >
> > >/\
> > > > > (A) trips--> assignTimestampsAndWatermarks-->keyBy
> > > timeWindowAll
> > > > >
> > >\   idle/
> > > > >
> > >  \key 2 trips/
> > > > >
> > > > >/key 1 trips-->
> > > assignTimestampsAndWatermarks\
> > > > >  /
> > >  \
> > > > > (B) trips-->keyBy
> > >timeWindowAll
> > > > >  \   idle
> > >/
> > > > >\key 2 trips-->
> > > assignTimestampsAndWatermarks/
> > > > >
> > > > > How things are different between A and B from `timeWindowAll`'s
> > > perspective?
> > > > >
> > > > > BTW, thanks for the webinar link, I'll check it later.
> > > > >
> > > > > On 2019/04/25 08:30:20, Dawid Wysakowicz 
> > > wrote:
> > > > >> Hi,
> > > > >>
> > > > >> Yes I think your explanation is correct. I can also recommend
> Seth's
> > > > >> webinar where he talks about debugging Watermarks[1]
> > > > >>
> > > > >> Best,
> > > > >>
> > > > >> Dawid
> > > > >>
> > > > >> [1]
> > > > >>
> > >
> https://www.ververica.com/resources/webinar/webinar/debugging-flink-tutorial
> > > > >>
> > > > >> On 22/04/2019 22:55, an0 wrote:
> > > > >>> Thanks, I feel I'm getting closer to the truth.
> > > > >>>
> > > > >>> So parallelism is the cause? Say my parallelism is 2. Does that
> mean
> > > I get 2 tasks running after `keyBy` if even all elements have the same
> key
> > > so go to 1 down stream(say task 1)? And it is the other task(task 2)
> with
> > > no incoming data that caused the `timeWindowAll` stream unable to
> progress?
> > > Because both task 1 and task 2 are its input streams and one is idling
> so
> > > its event time cannot make progress?
> > > > >>>
> > > > >>> On 2019/04/22 01:57:39, Guowei Ma  wrote:
> > >