Re: Need help in understanding PojoSerializer

2024-03-20 Thread Ken Krugler
Flink doesn’t have built-in support for serializing Sets.

See this (stale) issue about the same: 
https://issues.apache.org/jira/browse/FLINK-16729

You could create a custom serializer for sets, see 
https://stackoverflow.com/questions/59800851/flink-serialization-of-java-util-list-and-java-util-map
 and 
https://nightlies.apache.org/flink/flink-docs-stable/api/java/org/apache/flink/api/common/typeutils/base/ListSerializer.html
 for details on how this was done for a list, but it’s not trivial.

Or as a hack, use a Map and the existing support for map 
serialization via 
https://nightlies.apache.org/flink/flink-docs-stable/api/java/org/apache/flink/api/common/typeutils/base/MapSerializer.html

— Ken


> On Mar 20, 2024, at 10:04 AM, Sachin Mittal  wrote:
> 
> Hi,
> I have a Pojo class like this
> 
> public class A {
> public String str;
> public Set aSet;
> public Map dMap;
> }
> 
> However when I start the flink program I get this message:
> 
> org.apache.flink.api.java.typeutils.TypeExtractor[] - Field 
> A#dMap will be processed as GenericType. Please read the Flink documentation 
> on "Data Types & Serialization" for details of the effect on performance and 
> schema evolution.
> 
> org.apache.flink.api.java.typeutils.TypeExtractor[] - Field 
> A#aSet will be processed as GenericType. Please read the Flink documentation 
> on "Data Types & Serialization" for details of the effect on performance and 
> schema evolution.
> 
> Also in my code I have added 
> env.getConfig().disableGenericTypes();
> So I don't understand when I use Maps and Sets of primitive types why is 
> Flink not
> able to use PojoSerializer for these fields and even when I have disabled 
> generics types.
> why I am getting message that it will be processed as GenericType?
> 
> Any help in understanding what I need to do to ensure all the fields of my 
> object are handled using PojoSerializer.
> 
> Thanks
> Sachin
>  
> 

--
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink & Pinot





Re: Completeablefuture in a flat map operator

2024-02-19 Thread Ken Krugler
Is there some reason why you can’t use an AsyncFunction?

https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/async/AsyncFunction.html

Note that when dealing with event time and exactly once, an AsyncFunction 
provides required support for proper execution.

See 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/
 for more details.

— Ken


> On Feb 19, 2024, at 12:20 PM, Lasse Nedergaard 
>  wrote:
> 
> Hi. 
> 
> I have a case where I would like to collect object from a completeablefuture 
> future in a flat map function. 
> I run into some problem where I get an exception regarding a buffer pool that 
> don’t exists when I collect the objets after some times.  I can see if I for 
> testing don’t return from the function (creating a fori loop with a thread 
> sleep or wait for the future) it works. 
> Can anyone explain what going on behind the screen and if possible any hints 
> for a working solution. 
> 
> Thanks in advance 
> 
> Med venlig hilsen / Best regards
> Lasse Nedergaard
> 


--
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink & Pinot





Re: Conditional multi collect in flink

2023-12-04 Thread Ken Krugler
Hi Tauseef,

It sounds like you want to use side outputs 
<https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/side_output/>
 (via a ProcessFunction, versus a FlatMap).

— Ken



> On Dec 4, 2023, at 8:23 AM, Tauseef Janvekar  
> wrote:
> 
> Dear Team,
> 
> I was wondering if there is feature in flink that support conditional multi 
> colect.
> 
> Conditons
> 1. A stream is being processed and being converted to another stream/List 
> using flatMap.
> 2. The place where collector.collect() is being called can we have multiple 
> other collectors also ?
> 3. Can we add conditional collectors? Like if cond1 - then 
> collector1.collect(), elif cond2 then collector1.collect() and 
> collector2.collect().
> 
> I already know that I can do this serially by calling the same source stream 
> with a different flatMap and then convert and push using collector2
> 
> Thanks,
> Tauseef

--
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink & Pinot





Re: Question regarding asyncIO timeout

2023-09-05 Thread Ken Krugler
Hi Leon,

Normally I try to handle retrying in the client being used to call the server, 
as you have more control/context.

If that’s not an option for you, then normally (un)orderedWaitWithRetry() 
should work - when you say “it doesn’t seem to help much”, are you saying that 
even with retry you get transient failures that you want to handle better?

If so, then you could implement the timeout() method in your AsyncFunction, and 
complete with a special result that indicates you exceeded the retry count. 
This would then avoid having the job restart.

— Ken

PS - note that you can also do something similar inside of the asyncInvoke() 
method of your AsyncFunction, e.g:

@Override
public void asyncInvoke(String request, ResultFuture 
resultFuture) throws Exception {

final ServerResult timeoutResult = makeErrorResult(blah, "Timeout");

// Use your own executor, so that you're not relying on the size of the 
common fork pool.
CompletableFuture.supplyAsync(new 
Supplier() {

@Override
public ServerResult get() {
try {
return client.request(request);
} catch (Exception e) {
LOGGER.debug("Exception requesting " + request, e);
return makeErrorResult(blah, e.getMessage());
}
}
}, executor)
.completeOnTimeout(timeoutResult, REQUEST_TIMEOUT_SECONDS, 
TimeUnit.SECONDS)
.thenAccept((ServerResult result) -> {
ServerRequestResult requestResult = new ServerRequestResult();
requestResult.setBlah();
resultFuture.complete(Collections.singleton(fetchResult));
});
}


> On Sep 5, 2023, at 12:16 PM, Leon Xu  wrote:
> 
> Hi Flink users,
> 
> We are using Flink AsyncIO to call a grpc-based service in our Flink job.
> And from time to time we are experiencing Async function timeout issues, 
> here's the exception.
> ```
> java.lang.Exception: Could not complete the stream element: Record @ 
> 169393916 : [B@cadc5b3.
> Caused by: java.util.concurrent.TimeoutException: Async function call has 
> timed out.
> ```
> Every timeout will cause the job to restart, which seems to be very expensive.
> 
> On the server side it looks like these timeouts are transient and we were 
> expecting a retry will fix the issue.
> We tried using the asyncIO retry strategy but it doesn't seem to help much.
> `AsyncDataStream.orderedWaitWithRetry`
> 
> Do you have any suggestions on how to better reduce these timeout errors?
> 

--
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch





Re: In Flink, is there a way to merge two streams in stateful manner

2023-08-29 Thread Ken Krugler
If you need a specific output order, then merge the three streams, key by a 
constant (like 1), and run that into a KeyedProcessFunction.

That function can buffer out-of-order records, and set up a timer to fire when 
it gets a MAX_WATERMARK (which indicates that all streams are finished) so that 
it can flush any pending data from state, in the proper order.

You’d have a ListState for the reduced data, and a 
ValueState for the footer.

Once you get the header, you can flush all reduced data, and no longer buffer 
it.

When timer fires, you can flush the footer.

— Ken



> On Aug 10, 2023, at 10:14 PM, Muazim Wani  wrote:
> 
> Thank you so much for taking the time to provide me with such a detailed 
> response. Your assistance has been incredibly helpful in clarifying my 
> understanding! 
> Let me provide you with the exact scenario ,  I think there might be some 
> misunderstanding. All the streams are bounded and parallelism is set to 1. I 
> am  writing to 1 file only. So the main use case Is to provide support for 
> dynamic Headers and footers with Aggregated values. 
> e.g if my input is
> 
> Static Header
> JohnDoe, 12
> Alice, 21
> 
> My dynamic header is "Dynamic Header" and dynamic Footer is "Dynamic footer". 
> These headers are on top of static headers which are already present in the 
> DataStream(bounded). The output would be like
> 
> Dynamic Header 33
> Static Header
> JohnDoe, 12
> Alice, 21
> Dynamic footer
> 
> In this particular case I am writing to one file only. I have set my 
> parallelism to 1. I have 1 InputDataStream on top of that I have one dynamic 
> header and footer stream (which contains some dynamic params such as 
> aggregated value on some fields e.g salary etc) .Now I am left with three 
> transformed streams in Sink Operator. i.e dynamic HeaderStream with 
> aggregated Value 2) input DataStream 3) dynamic Footer stream with aggregated 
> Value. 
> 
> I could have used String for both Dynamic Headers and footers and emitted the 
> headers in open() method and footers in close() method of TextOutputFormat, 
> That would have solved my useCase. But as I get a DataStream(with only 1 
> value i.e final sum) back from Aggregated Values (Currently I am using reduce 
> function). I am adding headers to that DataStream only and similarly for 
> footers. Now I am not able to merge them while maintaining the order. 
> 
> Below I have provided my implementation of reduce function
> public DataStream sum(
> SumFunction reduceFunction, DataStream stream) {
> 
>   DataStream inputRecordTransformedDataStream =
>   stream.map(this::transform).returns((TypeInformation) 
> Types.GENERIC(Number.class));
> 
>   return inputRecordTransformedDataStream
>   .keyBy(value -> "key")
>   .reduce(reduceFunction);
> }
> 
> 
> Below I am adding my Headers to my Sum Stream
> 
> public static DataStream getAggregatedStream(
> String header, DataStream sinkStream) {
> 
>   return sinkStream
>   .keyBy(key -> "value")
>   .flatMap(
>   (FlatMapFunction)
>   (value, out) -> out.collect(PojoClass.builder().data(header + 
> value).build()))
>   .returns(PojoClass.class);
> }
> 
> Add HeaderStream is a Bounded DataStream with Dynamic Headers and Aggregated 
> value.
> DataStream headerStream = addHeaderRows(sinkStream);
> 
> DataStream footerStream = addFooterRows(finalStream);
> 
> DataStream sinkStream;
> 
> Thanks a lot for your time and the advice.
> Muazim Wani
> 
> 
> On Fri, 11 Aug 2023 at 07:45, Hang Ruan  <mailto:ruanhang1...@gmail.com>> wrote:
>> ps: Forget the link: Hybrid Source[1]
>> 
>> [1] 
>> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/hybridsource/
>> 
>> Hang Ruan mailto:ruanhang1...@gmail.com>> 
>> 于2023年8月11日周五 10:14写道:
>>> Hi, Muazim.
>>> 
>>> I think the Hybird Source[1] may be helpful for your case.
>>> 
>>> Best,
>>> Hang
>>> 
>>> Ken Krugler >> <mailto:kkrugler_li...@transpac.com>> 于2023年8月11日周五 04:18写道:
>>>> As (almost) always, the devil is in the details.
>>>> 
>>>> You haven’t said, but I’m assuming you’re writing out multiple files, each 
>>>> with a different schema, as otherwise you could just leverage the existing 
>>>> Flink support for CSV.
>>>> 
>>>> So then you could combine the header/footer streams (adding a flag for 
>>>> header vs. footer), and connect that to the row data stream, then use a 
>>>> KeyedCoProcessFunction (I’m assum

Re: In Flink, is there a way to merge two streams in stateful manner

2023-08-10 Thread Ken Krugler
As (almost) always, the devil is in the details.

You haven’t said, but I’m assuming you’re writing out multiple files, each with 
a different schema, as otherwise you could just leverage the existing Flink 
support for CSV.

So then you could combine the header/footer streams (adding a flag for header 
vs. footer), and connect that to the row data stream, then use a 
KeyedCoProcessFunction (I’m assuming you can key by something that identifies 
which schema). You’d buffer the row data & footer (separately in state). You 
would also need to set up a timer to fire at the max watermark, to flush out 
pending rows/footer when all of the input data has been processed.

After that function you could configure the sink to bucket by the target schema.

— Ken


> On Aug 10, 2023, at 10:41 AM, Muazim Wani  wrote:
> 
> Thanks for the response!
> I have a specific use case where I am writing to a TextFile sink. I have a 
> Bounded stream of header data and need  to merge it with another bounded 
> stream. While writing the data to a text file the header data should be 
> written before the original data(from another bounded stream). And also at 
> last I have another stream of footers where I would repeat the same process.
> I tried keeping an identifier for all three streams and based on these 
> identifiers I added the data in three different ListState using KeyedProcess 
> functions. So for headers I directly emitted the value but for main data and 
> footers if I store it in a state . The issue is Outside KeyedProcess I am not 
> able to emit the data in order.
> Is there any way I can achieve the use case of orderding the dataStreams . I 
> also tried with union but it seems it adds data arbitrarily in both streams .
> Thanks and regards
> 
> On Thu, 10 Aug, 2023, 10:59 pm Ken Krugler,  <mailto:kkrugler_li...@transpac.com>> wrote:
>> Hi Muazim,
>> 
>> In Flink, a stream of data (unless bounded) is assumed to never end.
>> 
>> So in your example below, this means stream 2 would NEVER be emitted, 
>> because stream 1 would never end (there is no time at which you know for 
>> sure that stream 1 is done).
>> 
>> And this in turn means stream 2 would be buffered forever in state, thus 
>> growing unbounded.
>> 
>> I would suggest elaborating on your use case.
>> 
>> Regards,
>> 
>> — Ken
>> 
>> 
>>> On Aug 10, 2023, at 10:11 AM, Muazim Wani >> <mailto:muazim1...@gmail.com>> wrote:
>>> 
>>> Hi Team,
>>> I have a use case where I have two streams and want to join them in 
>>> stateful manner . 
>>> E.g data of stream 1 should be emitted before stream2.
>>> I tried to store the data in ListState in KeyedProcessFunction but I am not 
>>> able to access state  outside proccessElement().
>>> Is there any way I could achieve this?
>>> Thanks and regards
>>> 
>> 
>> --
>> Ken Krugler
>> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
>> Custom big data solutions
>> Flink, Pinot, Solr, Elasticsearch
>> 
>> 
>> 

--
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch





Re: In Flink, is there a way to merge two streams in stateful manner

2023-08-10 Thread Ken Krugler
Hi Muazim,

In Flink, a stream of data (unless bounded) is assumed to never end.

So in your example below, this means stream 2 would NEVER be emitted, because 
stream 1 would never end (there is no time at which you know for sure that 
stream 1 is done).

And this in turn means stream 2 would be buffered forever in state, thus 
growing unbounded.

I would suggest elaborating on your use case.

Regards,

— Ken


> On Aug 10, 2023, at 10:11 AM, Muazim Wani  wrote:
> 
> Hi Team,
> I have a use case where I have two streams and want to join them in stateful 
> manner . 
> E.g data of stream 1 should be emitted before stream2.
> I tried to store the data in ListState in KeyedProcessFunction but I am not 
> able to access state  outside proccessElement().
> Is there any way I could achieve this?
> Thanks and regards
> 

--
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch





Re: Using HybridSource

2023-07-06 Thread Ken Krugler
Hi Oscar,

As Andrew noted, you could use the approach of overriding the CsvReaderFormat 
(or whatever you’re using to extract CSV data from S3) to return a Protobuf 
that’s the same format as what you’re getting from Kafka, though that obviously 
introduces some inefficiencies. Or you could do the opposite, and extend the 
Kafka deserialization schema to return a POJO versus a Protobuf.

But an important point with sources is that it’s harder to handle errors here, 
versus downstream in the workflow. E.g. if you have some error with converting 
to Protobuf, or mapping the Protobuf to your POJO, then you don’t have the 
ability to use a side output (as you normally would with a ProcessFunction). So 
that’s one reason why the HybridSource doesn’t provide out-of-the-box 
functionality to do extra mapping.

— Ken

> On Jul 5, 2023, at 7:19 AM, Oscar Perez  wrote:
> 
> and this is our case Alexander, it is the same data schema but different data 
> format. Kafka comes from protobuf while the CSV is a POJO though both have 
> the same fields. IMHO, the design of HybridSource is very limited and you 
> have to do nasty workarounds if you want to combine from cold storage (CSV, 
> S3) and kafka if the expectations differ a bit from the most common use case 
> (e.g. using protobuf)
> 
> Regards,
> Oscar
> 
> On Wed, 5 Jul 2023 at 12:53, Alexander Fedulov  <mailto:alexander.fedu...@gmail.com>> wrote:
> I do not think that trying to "squash" two different data types into one just 
> to use HybridSource is the right thing to do here. HybridSource is primarily 
> intended for use cases that need to read the same data from different 
> sources. A typical example: read events from "cold storage" in S3 up to a 
> specific point and switch over to "live" data in Kafka. 
> Since you are already using the low-level API, you can either manually pull 
> the data in inside of the open() function, or stream it into the local state 
> using KeyedCoProcessFunction or KeyedBroadcastProcessFunction (depending on 
> the size of the lookup state). 
> 
> This video should get you covered:
> https://www.youtube.com/watch?v=cJS18iKLUIY 
> <https://www.youtube.com/watch?v=cJS18iKLUIY>
> 
> Best,
> Alex
> 
> 
> On Wed, 5 Jul 2023 at 07:29, Péter Váry  <mailto:peter.vary.apa...@gmail.com>> wrote:
> Was it a conscious decision that HybridSource only accept Sources, and does 
> not allow mapping functions applied to them before combining them?
> 
> On Tue, Jul 4, 2023, 23:53 Ken Krugler  <mailto:kkrugler_li...@transpac.com>> wrote:
> Hi Oscar,
> 
> Couldn’t you have both the Kafka and File sources return an Either CSV File, Protobuf from Kafka>, and then (after the HybridSource) use a 
> MapFunction to convert to the unified/correct type?
> 
> — Ken
> 
> 
>> On Jul 4, 2023, at 12:13 PM, Oscar Perez via user > <mailto:user@flink.apache.org>> wrote:
>> 
>> Hei,
>> 1) We populate state based on this CSV data and do business logic based on 
>> this state and events coming from other unrelated streams.
>> 2) We are using low level process function in order to process this future 
>> hybrid source
>> 
>> Regardless of the aforementioned points, please note that the main challenge 
>> is to combine in a hybridsource CSV and kafka topic that return different 
>> datatypes so I dont know how my answers relate to the original problem tbh. 
>> Regards,
>> Oscar
>> 
>> On Tue, 4 Jul 2023 at 20:53, Alexander Fedulov > <mailto:alexander.fedu...@gmail.com>> wrote:
>> @Oscar <>
>> 1. How do you plan to use that CSV data? Is it needed for lookup from the 
>> "main" stream?
>> 2. Which API are you using? DataStream/SQL/Table or low level 
>> ProcessFunction?
>> 
>> Best,
>> Alex
>> 
>>  <>
>> On Tue, 4 Jul 2023 at 11:14, Oscar Perez via user > <mailto:user@flink.apache.org>> wrote:
>> ok, but is it? As I said, both sources have different data types. In the 
>> example here:
>> 
>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/hybridsource/
>>  
>> <https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/hybridsource/>
>> 
>> We are using both sources as returning string but in our case, one source 
>> would return a protobuf event while the other would return a pojo. How can 
>> we make the 2 sources share the same datatype so that we can successfully 
>> use hybrid source?
>> 
>> Regards,
>> Oscar
>> 
>> On Tue, 4 Jul 2023 at 12:04, Alexey Novakov > <mailto:ale...@ververica.com>> 

Re: Using HybridSource

2023-07-04 Thread Ken Krugler
Hi Oscar,

Couldn’t you have both the Kafka and File sources return an Either, and then (after the HybridSource) use a 
MapFunction to convert to the unified/correct type?

— Ken


> On Jul 4, 2023, at 12:13 PM, Oscar Perez via user  
> wrote:
> 
> Hei,
> 1) We populate state based on this CSV data and do business logic based on 
> this state and events coming from other unrelated streams.
> 2) We are using low level process function in order to process this future 
> hybrid source
> 
> Regardless of the aforementioned points, please note that the main challenge 
> is to combine in a hybridsource CSV and kafka topic that return different 
> datatypes so I dont know how my answers relate to the original problem tbh. 
> Regards,
> Oscar
> 
> On Tue, 4 Jul 2023 at 20:53, Alexander Fedulov  <mailto:alexander.fedu...@gmail.com>> wrote:
> @Oscar <>
> 1. How do you plan to use that CSV data? Is it needed for lookup from the 
> "main" stream?
> 2. Which API are you using? DataStream/SQL/Table or low level ProcessFunction?
> 
> Best,
> Alex
> 
>  <>
> On Tue, 4 Jul 2023 at 11:14, Oscar Perez via user  <mailto:user@flink.apache.org>> wrote:
> ok, but is it? As I said, both sources have different data types. In the 
> example here:
> 
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/hybridsource/
>  
> <https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/hybridsource/>
> 
> We are using both sources as returning string but in our case, one source 
> would return a protobuf event while the other would return a pojo. How can we 
> make the 2 sources share the same datatype so that we can successfully use 
> hybrid source?
> 
> Regards,
> Oscar
> 
> On Tue, 4 Jul 2023 at 12:04, Alexey Novakov  <mailto:ale...@ververica.com>> wrote:
> Hi Oscar,
> 
> You could use connected streams and put your file into a special Kafka topic 
> before starting such a job: 
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/operators/overview/#connect
>  
> <https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/operators/overview/#connect>
> But this may require more work and the event ordering (which is shuffled) in 
> the connected streams is probably not what you are looking for.
> 
> I think HybridSource is the right solution.
> 
> Best regards,
> Alexey
> 
> On Mon, Jul 3, 2023 at 3:44 PM Oscar Perez via user  <mailto:user@flink.apache.org>> wrote:
> Hei, We want to bootstrap some data from a CSV file before reading from a 
> kafka topic that has a retention period of 7 days.
> 
> We believe the best tool for that would be the HybridSource but the problem 
> we are facing is that both datasources are of different nature. The 
> KafkaSource returns a protobuf event while the CSV is a POJO with just 3 
> fields.
> 
> We could hack the kafkasource implementation and then in the 
> valuedeserializer do the mapping from protobuf to the CSV POJO but that seems 
> rather hackish. Is there a way more elegant to unify both datatypes from both 
> sources using Hybrid Source?
> 
> thanks
> Oscar

--
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch





Re: Flink bulk and record file source format metrices

2023-06-16 Thread Ken Krugler
Hi Kamal,

In a similar situation, when a decoding failure happened I would generate a 
special record that I could then detect/filter out (and increment a counter) in 
a FilterFunction immediately following the source.

— Ken


> On Jun 16, 2023, at 2:11 AM, Kamal Mittal via user  
> wrote:
> 
> Hello,
>  
> Any way-forward, please suggest.
>  
> Rgds,
> Kamal
>  
> From: Kamal Mittal via user  <mailto:user@flink.apache.org>> 
> Sent: 15 June 2023 10:39 AM
> To: Shammon FY mailto:zjur...@gmail.com>>
> Cc: user@flink.apache.org <mailto:user@flink.apache.org>
> Subject: RE: Flink bulk and record file source format metrices
>  
> Hello,
>  
> I need one counter matric for no. of corrupt records while decoding parquet 
> records at data source level. I know where the corrupt record handling 
> requires but due to non-existence of “SourceContext” or “RuntimeContext”, 
> unable to do anything w.r.t metric.
>  
> It is needed similarly the way “SourceReaderBase” class maintaining one 
> counter for no. of records emitted.
>  
> Rgds,
> Kamal
>  
> From: Shammon FY mailto:zjur...@gmail.com>> 
> Sent: 14 June 2023 05:33 PM
> To: Kamal Mittal  <mailto:kamal.mit...@ericsson.com>>
> Cc: user@flink.apache.org <mailto:user@flink.apache.org>
> Subject: Re: Flink bulk and record file source format metrices
>  
> Hi Kamal,
>  
> Can you give more information about the metris you want? In Flink each source 
> task has one source reader which already has some metrics, you can refer to 
> metrics doc[1] for more detailed information.
>  
> [1] https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/ 
> <https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/>
>  
> Best,
> Shammon FY
>  
> On Tue, Jun 13, 2023 at 11:13 AM Kamal Mittal via user  <mailto:user@flink.apache.org>> wrote:
> Hello,
>  
> Using Flink record stream format file source API as below for parquet records 
> reading.
>  
> FileSource.FileSourceBuilder source = 
> FileSource.forRecordStreamFormat(streamformat, path);
> source.monitorContinuously(Duration.ofMillis(1));
>  
> Want to log/generate metrices for corrupt records and for the same need to 
> log flink metrices at source level in parquet reader class, is there any way 
> to do that as right now no handle for SourceContext available?
>  
> Rgds,
> Kamal

--
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch





Re: Watermark idleness and alignment - are they exclusive?

2023-06-15 Thread Ken Krugler
I think you’re hitting this issue:

https://issues.apache.org/jira/browse/FLINK-31632 
<https://issues.apache.org/jira/browse/FLINK-31632>

Fixed in 1.16.2, 1.171.

— Ken


> On Jun 15, 2023, at 1:39 PM, Piotr Domagalski  wrote:
> 
> Hi all!
> 
> We've been experimenting with watermark alignment in Flink 1.15 and observed 
> an odd behaviour that I couldn't find any mention of in the documentation.
> 
> With the following strategy:
> 
> WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(60))
> .withTimestampAssigner((e, t) -> e.timestamp)
> .withIdleness(Duration.ofSeconds(3600))
> .withWatermarkAlignment("group-1", Duration.ofSeconds(15));
> 
> Kafka sources stop consuming completely after 3600s (even when the data is 
> flowing into all the partitions). Is this an expected behaviour? Where could 
> I find more information on this?
> 
> -- 
> Piotr Domagalski

--
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch





Re: A WordCount job using DataStream API but behave like the batch WordCount example

2023-04-30 Thread Ken Krugler
Hi Luke,

Without more details, I don’t have a good explanation for why you’re seeing 
duplicate results.

— Ken

> On Apr 29, 2023, at 7:38 PM, Luke Xiong  wrote:
> 
> Hi Ken,
> 
> My workflow looks like this:
> 
> dataStream
> .map(A -> B)
> .flatMap(B -> some Tuple2.of(C, 1))
> .keyBy(t, t.f0) // a.k.a. C
> .sum(1)
> .map(Tuple2.of(C, ) -> d)
> ;
> 
> So just illustrative, and I am not writing a WordCount job either.
> 
> - Luke
> 
> 
> On Sat, Apr 29, 2023 at 10:31 PM Ken Krugler  <mailto:kkrugler_li...@transpac.com>> wrote:
> Hi Luke,
> 
> What’s your workflow look like?
> 
> The one I included in my email generates what I’d expect.
> 
> — Ken
> 
>> On Apr 29, 2023, at 7:22 PM, Luke Xiong > <mailto:leix...@gmail.com>> wrote:
>> 
>> Hi Marco and Ken,
>> 
>> Thanks for the tips. I tried setting runtime mode to BATCH and it seems to 
>> work. However, I notice there are duplicate keys in the resulting stream. 
>> For example, it has
>> (hello, 2)
>> (hello, 2)
>> instead of
>> (hello, 4)
>> 
>> I suspect there might be a bug in the equals method of the keyed object, but 
>> it doesn't seem likely, because I can get expected result with
>> .collect(Collectors.groupingByConcurrent(Function.identity(), 
>> Collectors.counting()))
>> with the same stream using the Java Stream API.
>> Is there any other reason that causes it, and what should I do to get a 
>> stream with only one element per key?
>> -lxiong
>> 
>> On Sat, Apr 29, 2023 at 5:24 PM Ken Krugler > <mailto:kkrugler_li...@transpac.com>> wrote:
>> Hi Luke,
>> 
>> If you’re reading from a bounded source (like a file, and you’re not doing 
>> any monitoring of the directory or the file to load more data) then yes, you 
>> can get a final count using a stream.
>> 
>> The key point is that the Flink file source will send out a 
>> Watermark.MAX_WATERMARK value when it is done, and this will trigger any 
>> pending aggregations.
>> 
>> Though you do need to make sure your workflow mode is either explicitly 
>> BOUNDED, or AUTOMATIC.
>> 
>> Something like…
>> 
>> final StreamExecutionEnvironment env = ...;
>> env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
>> 
>> DataStream lines = ...;
>> 
>> lines
>> .flatMap(new SplitLineFunction())
>> .keyBy(t -> t.f0)
>> .sum(1)
>> .print();
>> 
>> env.execute();
>> 
>> …with a typical line splitter, cheesy version like...
>> 
>> private static class SplitLineFunction implements 
>> FlatMapFunction> {
>> 
>> @Override
>> public void flatMap(String in, Collector> 
>> out) throws Exception {
>> for (String word : in.split(" ")) {
>> out.collect(Tuple2.of(word, 1));
>> }
>> }
>> }
>> 
>> — Ken
>> 
>>> On Apr 28, 2023, at 10:56 PM, Luke Xiong >> <mailto:leix...@gmail.com>> wrote:
>>> 
>>> Dear experts,
>>> 
>>> Is it possible to write a WordCount job that uses the DataStream API, but 
>>> make it behave like the batch version WordCount example?
>>> 
>>> More specifically, I hope the job can get a DataStream of the final (word, 
>>> count) records when fed a text file.
>>> 
>>> For example, given a text file:
>>> ```input.txt
>>> hello hello world hello world
>>> hello world world world hello world
>>> ```
>>> 
>>> In the flink WordCount examples, the batch version outputs:
>>> ```batch.version.output
>>> hello 5
>>> world 6
>>> ```
>>> 
>>> while the stream version outputs:
>>> ```stream.version.output
>>> (hello,1)
>>> (hello,2)
>>> (world,1)
>>> (hello,3)
>>> (world,2)
>>> (hello,4)
>>> (world,3)
>>> (world,4)
>>> (world,5)
>>> (hello,5)
>>> (world,6)
>>> ```
>>> Is it possible to have a DataStream that only has two elements: (hello, 5) 
>>> and (world, 6)?
>>> 
>>> Regards,
>>> Luke
>> 
>> --
>> Ken Krugler
>> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
>> Custom big data solutions
>> Flink, Pinot, Solr, Elasticsearch
>> 
>> 
>> 
> 
> --
> Ken Krugler
> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
> Custom big data solutions
> Flink, Pinot, Solr, Elasticsearch
> 
> 
> 

--
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch





Re: A WordCount job using DataStream API but behave like the batch WordCount example

2023-04-29 Thread Ken Krugler
Hi Luke,

What’s your workflow look like?

The one I included in my email generates what I’d expect.

— Ken

> On Apr 29, 2023, at 7:22 PM, Luke Xiong  wrote:
> 
> Hi Marco and Ken,
> 
> Thanks for the tips. I tried setting runtime mode to BATCH and it seems to 
> work. However, I notice there are duplicate keys in the resulting stream. For 
> example, it has
> (hello, 2)
> (hello, 2)
> instead of
> (hello, 4)
> 
> I suspect there might be a bug in the equals method of the keyed object, but 
> it doesn't seem likely, because I can get expected result with
> .collect(Collectors.groupingByConcurrent(Function.identity(), 
> Collectors.counting()))
> with the same stream using the Java Stream API.
> Is there any other reason that causes it, and what should I do to get a 
> stream with only one element per key?
> -lxiong
> 
> On Sat, Apr 29, 2023 at 5:24 PM Ken Krugler  <mailto:kkrugler_li...@transpac.com>> wrote:
> Hi Luke,
> 
> If you’re reading from a bounded source (like a file, and you’re not doing 
> any monitoring of the directory or the file to load more data) then yes, you 
> can get a final count using a stream.
> 
> The key point is that the Flink file source will send out a 
> Watermark.MAX_WATERMARK value when it is done, and this will trigger any 
> pending aggregations.
> 
> Though you do need to make sure your workflow mode is either explicitly 
> BOUNDED, or AUTOMATIC.
> 
> Something like…
> 
> final StreamExecutionEnvironment env = ...;
> env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
> 
> DataStream lines = ...;
> 
> lines
> .flatMap(new SplitLineFunction())
> .keyBy(t -> t.f0)
> .sum(1)
> .print();
> 
> env.execute();
> 
> …with a typical line splitter, cheesy version like...
> 
> private static class SplitLineFunction implements FlatMapFunction Tuple2> {
> 
> @Override
> public void flatMap(String in, Collector> 
> out) throws Exception {
> for (String word : in.split(" ")) {
> out.collect(Tuple2.of(word, 1));
> }
> }
> }
> 
> — Ken
> 
>> On Apr 28, 2023, at 10:56 PM, Luke Xiong > <mailto:leix...@gmail.com>> wrote:
>> 
>> Dear experts,
>> 
>> Is it possible to write a WordCount job that uses the DataStream API, but 
>> make it behave like the batch version WordCount example?
>> 
>> More specifically, I hope the job can get a DataStream of the final (word, 
>> count) records when fed a text file.
>> 
>> For example, given a text file:
>> ```input.txt
>> hello hello world hello world
>> hello world world world hello world
>> ```
>> 
>> In the flink WordCount examples, the batch version outputs:
>> ```batch.version.output
>> hello 5
>> world 6
>> ```
>> 
>> while the stream version outputs:
>> ```stream.version.output
>> (hello,1)
>> (hello,2)
>> (world,1)
>> (hello,3)
>> (world,2)
>> (hello,4)
>> (world,3)
>> (world,4)
>> (world,5)
>> (hello,5)
>> (world,6)
>> ```
>> Is it possible to have a DataStream that only has two elements: (hello, 5) 
>> and (world, 6)?
>> 
>> Regards,
>> Luke
> 
> --
> Ken Krugler
> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
> Custom big data solutions
> Flink, Pinot, Solr, Elasticsearch
> 
> 
> 

--
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch





Re: Handling JSON Serialization without Kryo

2023-03-21 Thread Ken Krugler
Hi Rion,

I’m using Gson to deserialize to a Map.

1-2 records/second sounds way too slow, unless each record is enormous.

— Ken

> On Mar 21, 2023, at 6:18 AM, Rion Williams  wrote:
> 
> Hi Ken,
> 
> Thanks for the response. I hadn't tried exploring the use of the Record 
> class, which I'm assuming you're referring to a flink.types.Record, to read 
> the JSON into. Did you handle this via using a mapper to read the properties 
> in (e.g. Gson, Jackson) as fields or take a different approach? Additionally, 
> how has your experience been with performance? Kryo with the existing job 
> leveraging JsonObjects (via Gson) is horrific (~1-2 records/second) and can't 
> keep up with the speed of the producers, which is the impetus behind 
> reevaluating the serialization.
> 
> I'll explore this a bit more.
> 
> Thanks,
> 
> Rion
> 
> On Mon, Mar 20, 2023 at 10:28 PM Ken Krugler  <mailto:kkrugler_li...@transpac.com>> wrote:
> Hi Rion,
> 
> For my similar use case, I was able to make a simplifying assumption that my 
> top-level JSON object was a record.
> 
> I then registered a custom Kryo serde that knew how to handle the handful of 
> JsonPrimitive types for the record entries.
> 
> I recently looked at extending that to support arrays and nested records, but 
> haven’t had to do that.
> 
> — Ken
> 
> 
>> On Mar 20, 2023, at 6:56 PM, Rion Williams > <mailto:rionmons...@gmail.com>> wrote:
>> 
>> Hi Shammon,
>> 
>> Unfortunately it’s a data stream job. I’ve been exploring a few options but 
>> haven’t found anything I’ve decided on yet. I’m currently looking at seeing 
>> if I can leverage some type of partial serialization to bind to the 
>> properties that I know the job will use and retain the rest as a JSON blob. 
>> I’ve also consider trying to store the fields as a large map of 
>> string-object pairs and translating thay into a string prior to writing to 
>> the sinks.
>> 
>> Still accepting any/all ideas that I come across to see if I can handle this 
>> in an efficient, reasonable way.
>> 
>> Thanks,
>> 
>> Rion
>> 
>>> On Mar 20, 2023, at 8:40 PM, Shammon FY >> <mailto:zjur...@gmail.com>> wrote:
>>> 
>>> 
>>> Hi Rion
>>> 
>>> Is your job datastream or table/sql? If it is a table/sql job, and you can 
>>> define all the fields in json you need, then you can directly use json 
>>> format [1] to parse the data. 
>>> 
>>> You can also customize udf functions to parse json data into struct data, 
>>> such as map, row and other types supported by flink
>>> 
>>> 
>>> [1] 
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/json/
>>>  
>>> <https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/json/>
>>> 
>>> Best,
>>> Shammon FY
>>> 
>>> 
>>> On Sun, Mar 19, 2023 at 7:44 AM Rion Williams >> <mailto:rionmons...@gmail.com>> wrote:
>>> Hi all,
>>> 
>>> I’m reaching out today for some suggestions (and hopefully a solution) for 
>>> a Flink job that I’m working on. The job itself reads JSON strings from a 
>>> Kafka topic and reads those into JSONObjects (currently via Gson), which 
>>> are then operated against, before ultimately being written out to Kafka 
>>> again.
>>> 
>>> The problem here is that the shape of the data can vary wildly and 
>>> dynamically. Some records may have properties unique to only that record, 
>>> which makes defining a POJO difficult. In addition to this, the JSONObjects 
>>> fall by to Kryo serialization which is leading to atrocious throughput.
>>> 
>>> I basically need to read in JSON strings, enrich properties on these 
>>> objects, and ultimately write them to various sinks.  Is there some type of 
>>> JSON-based class or library or an approach I could use to accomplish this 
>>> in an efficient manner? Or if possibly a way to partially write a POJO that 
>>> would allow me to interact with sections/properties of the JSON while 
>>> retaining other properties that might be dynamically present or unique to 
>>> the message?
>>> 
>>> Any advice or suggestions would be welcome! I’ll also be happy to provide 
>>> any additional context if it would help!
>>> 
>>> Thanks,
>>> 
>>> Rion
>>> 
>>> (cross-posted to users+dev for reach)
> 
> --
> Ken Krugler
> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
> Custom big data solutions
> Flink, Pinot, Solr, Elasticsearch
> 
> 
> 

--
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch





Re: Avoiding data shuffling when reading pre-partitioned data from Kafka

2023-03-04 Thread Ken Krugler
Hi Tommy,

To use stateful timers, you need to have a keyed stream, which gets tricky when 
you’re trying to avoid network traffic caused by the keyBy()

If the number of unique keys isn’t huge, I could think of yet another 
helicopter stunt that you could try :)

It’s possible to calculate a composite key, based on the “real” key and a 
synthetic value, that will wind up on in the same slot where you’re doing this 
calculation.

So that would let you create a keyed stream which would have 
serialization/deserialization cost, but wouldn’t actually go through the 
network stack.

Since the composite key generation is deterministic, you can do the same thing 
on both streams, and join on the composite key.

You’d want to cache the mapping from the real key to the synthetic value, to 
avoid doing this calculation for every record.

If that sounds promising, lmk and I can post some code.

— Ken


> On Mar 4, 2023, at 12:37 PM, Tommy May  wrote:
> 
> Hello Ken,
> 
> Thanks for the quick response! That is an interesting workaround. In our case 
> though we are using a CoProcessFunction with stateful timers. Is there a 
> similar workaround path available in that case? The one possible way I could 
> find required partitioning data in kafka in a very specific way to match what 
> Flink's keyBy is doing, and that it'd have additional constraints to the 
> method you described that would be difficult to handle in a prod environment 
> where we don't have full control over the producers & input topics.
> 
> Regarding the addition of a more flexible way to take advantage of 
> pre-partitioned sources like in FLIP-186, would you suggest I forward this 
> chain over to the dev Flink mailing list? 
> 
> Thanks,
> Tommy
> 
>   
> 
> On Sat, Mar 4, 2023 at 11:32 AM Ken Krugler  <mailto:kkrugler_li...@transpac.com>> wrote:
> Hi Tommy,
> 
> I believe there is a way to make this work currently, but with lots of 
> caveats and constraints.
> 
> This assumes you want to avoid any network shuffle.
> 
> 1. Both topics have names that return the same value for 
> ((topicName.hashCode() * 31) & 0x7) % parallelism.
> 2. Both topics have the same number of partitions.
> 3. The parallelism of your join function exactly matches the number of 
> partitions.
> 4. You can’t change any of the above without losing state.
> 5. You don’t need stateful timers.
> 
> If the above is true, then you could use a CoFlatMapFunction and operator 
> state to implement a stateful join.
> 
> If it’s something like a left outer join without any state TTL or need to 
> keep both sides in state, then it’s pretty easy.
> 
> — Ken
> 
> PS - it’s pretty easy to figure out a “-xxx” value to append to a topic name 
> to get the hashCode() result you need.
> 
>> On Mar 3, 2023, at 4:56 PM, Tommy May > <mailto:tvma...@gmail.com>> wrote:
>> 
>> Hello, 
>> 
>> My team has a Flink streaming job that does a stateful join across two high 
>> throughput kafka topics. This results in a large amount of data ser/de and 
>> shuffling (about 1gb/s for context). We're running into a bottleneck on this 
>> shuffling step. We've attempted to optimize our flink configuration, join 
>> logic, scale out the kafka topics & flink job, and speed up state access. 
>> What we see is that the join step causes backpressure on the kafka sources 
>> and lag slowly starts to accumulate. 
>> 
>> One idea we had to optimize this is to pre-partition the data in kafka on 
>> the same key that the join is happening on. This'll effectively reduce data 
>> shuffling to 0 and remove the bottleneck that we're seeing. I've done some 
>> research into the topic and from what I understand this is not 
>> straightforward to take advantage of in Flink. It looks to be a fairly 
>> commonly requested feature based on the many StackOverflow posts and slack 
>> questions, and I noticed there is FLIP-186 which attempts to address this 
>> topic as well. 
>> 
>> Are there any upcoming plans to add this feature to a future Flink release? 
>> I believe it'd be super impactful for similar large scale jobs out there. 
>> I'd be interested in helping as well, but admittedly I'm relatively new to 
>> Flink.  I poked around the code a bit, and it certainly did not seem like a 
>> straightforward addition, so it may be best handled by someone with more 
>> internal knowledge.
>> 
>> Thanks,
>> Tommy
> 
> --
> Ken Krugler
> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
> Custom big data solutions
> Flink, Pinot, Solr, Elasticsearch
> 
> 
> 

--
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch





Re: Avoiding data shuffling when reading pre-partitioned data from Kafka

2023-03-04 Thread Ken Krugler
Hi Tommy,

I believe there is a way to make this work currently, but with lots of caveats 
and constraints.

This assumes you want to avoid any network shuffle.

1. Both topics have names that return the same value for ((topicName.hashCode() 
* 31) & 0x7) % parallelism.
2. Both topics have the same number of partitions.
3. The parallelism of your join function exactly matches the number of 
partitions.
4. You can’t change any of the above without losing state.
5. You don’t need stateful timers.

If the above is true, then you could use a CoFlatMapFunction and operator state 
to implement a stateful join.

If it’s something like a left outer join without any state TTL or need to keep 
both sides in state, then it’s pretty easy.

— Ken

PS - it’s pretty easy to figure out a “-xxx” value to append to a topic name to 
get the hashCode() result you need.

> On Mar 3, 2023, at 4:56 PM, Tommy May  wrote:
> 
> Hello, 
> 
> My team has a Flink streaming job that does a stateful join across two high 
> throughput kafka topics. This results in a large amount of data ser/de and 
> shuffling (about 1gb/s for context). We're running into a bottleneck on this 
> shuffling step. We've attempted to optimize our flink configuration, join 
> logic, scale out the kafka topics & flink job, and speed up state access. 
> What we see is that the join step causes backpressure on the kafka sources 
> and lag slowly starts to accumulate. 
> 
> One idea we had to optimize this is to pre-partition the data in kafka on the 
> same key that the join is happening on. This'll effectively reduce data 
> shuffling to 0 and remove the bottleneck that we're seeing. I've done some 
> research into the topic and from what I understand this is not 
> straightforward to take advantage of in Flink. It looks to be a fairly 
> commonly requested feature based on the many StackOverflow posts and slack 
> questions, and I noticed there is FLIP-186 which attempts to address this 
> topic as well. 
> 
> Are there any upcoming plans to add this feature to a future Flink release? I 
> believe it'd be super impactful for similar large scale jobs out there. I'd 
> be interested in helping as well, but admittedly I'm relatively new to Flink. 
>  I poked around the code a bit, and it certainly did not seem like a 
> straightforward addition, so it may be best handled by someone with more 
> internal knowledge.
> 
> Thanks,
> Tommy

--
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch





consume_receipts.py and user_cart_event.py

2023-01-09 Thread Ken Krugler
Hi Gordon,

I seem to remember you talking about these helper functions, to poll and write 
to Kinesis, as part of your StateFun shopping cart demo.

But I didn’t see them anywhere…was I imagining things?

Thanks,

— Ken

--
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch





Using filesystem plugin with MiniCluster

2023-01-03 Thread Ken Krugler
Hi all,

With Flink 1.15.x, is there a way to use the S3 Presto plugin when running code 
on the MiniCluster?

I can’t just add that jar as a dependency when testing, as I get:

java.lang.NoClassDefFoundError: Could not initialize class 
com.facebook.presto.hive.s3.PrestoS3FileSystem
at 
org.apache.flink.fs.s3presto.S3FileSystemFactory.createHadoopFileSystem(S3FileSystemFactory.java:88)
 ~[flink-s3-fs-presto-1.15.1.jar:1.15.1]
at 
org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(AbstractS3FileSystemFactory.java:126)
 ~[flink-s3-fs-presto-1.15.1.jar:1.15.1]

I assume that’s because of this warning in the Flink docs:

> The s3 file systems (flink-s3-fs-presto and flink-s3-fs-hadoop) can only be 
> used as plugins as we already removed the relocations. Placing them in libs/ 
> will result in system failures.


In the latest Flink JavaDocs, there’s a way to specify the PluginManager for 
the MiniClusterConfiguration, but I don’t see that in Flink 1.15.x

So is there a workaround to allow me to run a test from inside of my IDE, using 
the MiniCluster, that reads from S3?

Thanks,

— Ken

------
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch



Re: Slow restart from savepoint with large broadcast state when increasing parallelism

2022-12-16 Thread Ken Krugler
Hi Jun,

Thanks for following up.

The state storage is internal at a client, and isn’t throttled.

Also restoring from the savepoint when we didn’t change the parallelism was 
fine.

I didn’t see any errors in the TM logs, but I didn’t carefully inspect them - 
I’ll do that when we give this another test.

Broadcast state is weird in that it’s duplicated, apparently avoid “hot spots” 
when restoring from state. So I’m wondering how Flink handles the case of 
restoring broadcast state when the parallelism increases.

Regards,

— Ken
 

> On Dec 15, 2022, at 4:33 PM, Jun Qin  wrote:
> 
> Hi Ken,
> 
> Without knowning the details, the first thing I would suggest to check is 
> whether you have reached a threshold which is configured in your state 
> storage (e.g., s3) therefore your further download were throttled. Checking 
> your storage metrics or logs should help to confirm whether this is the case.
> 
> In addition, in those TMs where the restarting was slow, do you see anything 
> suspicious in the logs, e.g., reconnecting?
> 
> Thanks
> Jun
> 
> 
> 
> 
> 发自我的手机
> 
> 
>  原始邮件 
> 发件人: Ken Krugler 
> 日期: 2022年12月14日周三 19:32
> 收件人: User 
> 主 题: Slow restart from savepoint with large broadcast state when
> increasing parallelism
> Hi all,
> 
> I have a job with a large amount of broadcast state (62MB).
> 
> I took a savepoint when my workflow was running with parallelism 300.
> 
> I then restarted the workflow with parallelism 400.
> 
> The first 297 sub-tasks restored their broadcast state fairly quickly, but 
> after that it slowed to a crawl (maybe 2 sub-tasks finished per minute)
> 
> After 10 minutes we killed the job, so I don’t know if it would have 
> ultimately succeeded.
> 
> Is this expected? Seems like it could lead to a bad situation, where it would 
> take an hour to restart the workflow.
> 
> Thanks,
> 
> — Ken
> 
> --
> Ken Krugler
> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
> Custom big data solutions
> Flink, Pinot, Solr, Elasticsearch
> 

--
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch





Slow restart from savepoint with large broadcast state when increasing parallelism

2022-12-14 Thread Ken Krugler
Hi all,

I have a job with a large amount of broadcast state (62MB).

I took a savepoint when my workflow was running with parallelism 300.

I then restarted the workflow with parallelism 400.

The first 297 sub-tasks restored their broadcast state fairly quickly, but 
after that it slowed to a crawl (maybe 2 sub-tasks finished per minute)

After 10 minutes we killed the job, so I don’t know if it would have ultimately 
succeeded.

Is this expected? Seems like it could lead to a bad situation, where it would 
take an hour to restart the workflow.

Thanks,

— Ken

--
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch



Re: Registering serializer for RowData

2022-12-12 Thread Ken Krugler
Hi Yuxia,

Thanks for getting back to me.

SortOperator is a class in Hudi that was copied from Flink. The code says:

/**
 * Operator for batch sort.
 *
 * Copied from org.apache.flink.table.runtime.operators.sort.SortOperator to 
change the annotation.
 */
public class SortOperator extends TableStreamOperator
implements OneInputStreamOperator, BoundedOneInput {

Input is RowData. So I assume it’s using RowTypeInfo.

Regards,

— Ken

> On Dec 6, 2022, at 8:55 PM, yuxia  wrote:
> 
> Hi, what's the type of the input for the SortOperator? I mean what's the 
> TypeInformation? For example, PojoTypeInfo or RowTypeInfo?
> 
> Best regards,
> Yuxia
> 
> 发件人: "Ken Krugler" 
> 收件人: "User" 
> 发送时间: 星期三, 2022年 12 月 07日 上午 9:11:17
> 主题: Registering serializer for RowData
> 
> Hi there,
> 
> I’m using the Hudi sink to write data, in bulk insert mode, and running into 
> an issue where Hudi is unhappy because (I think) Flink is using the Kryo 
> serializer for RowData records, instead of something that extends 
> AbstractRowDataSerializer.
> 
> It’s this bit of (Hudi) code in SortOperator.java that fails:
> 
> AbstractRowDataSerializer inputSerializer =
> (AbstractRowDataSerializer)
> 
> getOperatorConfig().getTypeSerializerIn1(getUserCodeClassloader());
> this.binarySerializer = new 
> BinaryRowDataSerializer(inputSerializer.getArity());
> 
> And I get:
> 
> Caused by: java.lang.ClassCastException: class 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer cannot be 
> cast to class 
> org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer 
> (org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer and 
> org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer are in 
> unnamed module of loader 'app')
>   at 
> org.apache.hudi.sink.bulk.sort.SortOperator.open(SortOperator.java:73)
>   at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
>   …
> 
> So I’m wondering if the Flink table code configures this serializer, and I 
> need to do the same in my Java API-based workflow.
> 
> Thanks,
> 
> — Ken
> 
> PS - This is with Flink 1.15.1 and Hudi 0.12.0
> 
> --
> Ken Krugler
> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
> Custom big data solutions
> Flink, Pinot, Solr, Elasticsearch
> 
> 
> 
> 

--
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch





Registering serializer for RowData

2022-12-06 Thread Ken Krugler
Hi there,

I’m using the Hudi sink to write data, in bulk insert mode, and running into an 
issue where Hudi is unhappy because (I think) Flink is using the Kryo 
serializer for RowData records, instead of something that extends 
AbstractRowDataSerializer.

It’s this bit of (Hudi) code in SortOperator.java that fails:

AbstractRowDataSerializer inputSerializer =
(AbstractRowDataSerializer)
getOperatorConfig().getTypeSerializerIn1(getUserCodeClassloader());
this.binarySerializer = new 
BinaryRowDataSerializer(inputSerializer.getArity());

And I get:

Caused by: java.lang.ClassCastException: class 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer cannot be cast 
to class org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer 
(org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer and 
org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer are in 
unnamed module of loader 'app')
at 
org.apache.hudi.sink.bulk.sort.SortOperator.open(SortOperator.java:73)
at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
…

So I’m wondering if the Flink table code configures this serializer, and I need 
to do the same in my Java API-based workflow.

Thanks,

— Ken

PS - This is with Flink 1.15.1 and Hudi 0.12.0

--
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch





Re: Best Practice for Querying Flink State

2022-08-29 Thread Ken Krugler
Hi Lu,

It would be helpful to know about your query requirements, before making a 
recommendation.

E.g. does it just need to be a key-value store, and thus you’re querying by a 
single key (which has to match the state partitioning key)?

What about latency requirements? E.g. if you’re processing Flink state (option 
3) then this is going to be large.

As a final take-away, in my experience I’ve always wound up shoving data into a 
separate system (Pinot is my current favorite) for queries.

— Ken


> On Aug 29, 2022, at 3:19 PM, Lu Niu  wrote:
> 
> Hi, Flink Users
> 
> We have a user case that requests running ad hoc queries to query flink 
> state. There are several options:
> 
> 1. Dump flink state to external data systems, like kafka, s3 etc. from there 
> we can query the data. This is a very straightforward approach, but adds 
> system complexity and overall cost. 
> 2. Flink Queryable State. This requires additional development and also when 
> the job is down, we can not query the data, which violates the need for 
> debugging in the first place. Last, from some channel I happen to know this 
> feature is on the deprecation list. 
> 3. Flink State API. This requires additional development. 
> 
> I am wondering what are some best practices applied in production. For me, I 
> really hope there is one product that 1. let me query the flink state using 
> SQL 2. decouple with flink job 
> 
> Best
> Lu
> 
> 

--
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch





How to include path information in data extracted from text files with FileSource

2022-08-15 Thread Ken Krugler
Hi all,

We’ve got time-stamped directories containing text files, stored in HDFS.

We can regularly get new files added, so we’re using a FileSource with a 
monitoring duration, so that it continuously picks up any new files.

The challenge is that we need to include the parent directory’s timestamp in 
the output, for doing time-window joins of this enrichment data with another 
stream.

Previously I could extend with the input format 
<https://stackoverflow.com/a/68764550/231762> to extract path information, and 
emit a Tuple2.

But with the new FileSource architecture, I’m really not sure if it’s possible, 
or if so, the right way to go about doing it.

I’ve wandered through the source code (FileSource, AbstractFileSource, 
SourceReader, FileSourceReader, FileSourceSplit, ad nauseam) but haven’t seen 
any happy path to making that all work.

There might be a way using some really ugly hacks to TextLineFormat, where it 
would reverse engineer the FSDataInputStream to try to find information about 
the original file, but feels very fragile.

Any suggestions?

Thanks!

— Ken


------
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch





Re: Source vs SourceFunction and testing

2022-05-24 Thread Ken Krugler
Hi Piotr,

Yes, that should work (using DataStream as the common result of both 
source creation options)

— Ken

> On May 24, 2022, at 12:19 PM, Piotr Domagalski  wrote:
> 
> Hi Ken,
> 
> Thanks Ken. I guess the problem I had was, as a complete newbie to Flink, 
> navigating the type system and being still confused about differences between 
> Source, SourceFunction, DataStream, DataStreamOperator, etc. 
> 
> I think the DataStream<> type is what I'm looking for? That is, then I can 
> use:
> 
> DataStream source = env.fromSource(getKafkaSource(params), 
> watermarkStrategy, "Kafka");
> when using KafkaSource in the normal setup
> 
> and
> DataStream s = env.addSource(new ParallelTestSource<>(...));
> when using the testing source [1]
> 
> Does that sound right?
> 
> [1] 
> https://github.com/apache/flink-training/blob/master/common/src/test/java/org/apache/flink/training/exercises/testing/ParallelTestSource.java#L26
>  
> <https://github.com/apache/flink-training/blob/master/common/src/test/java/org/apache/flink/training/exercises/testing/ParallelTestSource.java#L26>
> On Tue, May 24, 2022 at 7:57 PM Ken Krugler  <mailto:kkrugler_li...@transpac.com>> wrote:
> Hi Piotr,
> 
> The way I handle this is via a workflow class that uses a builder approach to 
> specifying inputs, outputs, and any other configuration settings.
> 
> The inputs are typically DataStream.
> 
> This way I can separate out the Kafka inputs, and use testing sources that 
> give me very precise control over the inputs (e.g. I can hold up on right 
> side data to ensure my stateful left join junction is handling deferred joins 
> properly). I can also use Kafka unit test support (either kafka-junit or 
> Spring embedded Kafka) if needed.
> 
> Then in the actual tool class (with a main method) I’ll wire up the real 
> Kafka sources, with whatever logic is required to convert the consumer 
> records to what the workflow is expecting.
> 
> — Ken
> 
>> On May 24, 2022, at 8:34 AM, Piotr Domagalski > <mailto:pi...@domagalski.com>> wrote:
>> 
>> Hi,
>> 
>> I'm wondering: what ithe recommended way to structure the job which one 
>> would like to test later on with `MiniCluster`.
>> 
>> I've looked at the flink-training repository examples [1] and they tend to 
>> expose the main job as a class that accepts a `SourceFunction` and a 
>> `SinkFunction`, which make sense. But then, my job is normally constructed 
>> with `KafkaSource` which is then passed to `env.fromSource(...`.
>> 
>> Is there any recommended way of handling these discrepancies, ie. having to 
>> use `env.addSource(sourceFunction)` vs `env.fromSource(source)`?
>> 
>> [1] 
>> https://github.com/apache/flink-training/blob/05791e55ad7ff0358b5c57ea8f40eada4a1f626a/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingIntegrationTest.java#L61
>>  
>> <https://github.com/apache/flink-training/blob/05791e55ad7ff0358b5c57ea8f40eada4a1f626a/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingIntegrationTest.java#L61>
>> 
>> -- 
>> Piotr Domagalski
> 
> --
> Ken Krugler
> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
> Custom big data solutions
> Flink, Pinot, Solr, Elasticsearch
> 
> 
> 
> 
> 
> -- 
> Piotr Domagalski

--
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch





Re: Source vs SourceFunction and testing

2022-05-24 Thread Ken Krugler
Hi Piotr,

The way I handle this is via a workflow class that uses a builder approach to 
specifying inputs, outputs, and any other configuration settings.

The inputs are typically DataStream.

This way I can separate out the Kafka inputs, and use testing sources that give 
me very precise control over the inputs (e.g. I can hold up on right side data 
to ensure my stateful left join junction is handling deferred joins properly). 
I can also use Kafka unit test support (either kafka-junit or Spring embedded 
Kafka) if needed.

Then in the actual tool class (with a main method) I’ll wire up the real Kafka 
sources, with whatever logic is required to convert the consumer records to 
what the workflow is expecting.

— Ken

> On May 24, 2022, at 8:34 AM, Piotr Domagalski  wrote:
> 
> Hi,
> 
> I'm wondering: what ithe recommended way to structure the job which one would 
> like to test later on with `MiniCluster`.
> 
> I've looked at the flink-training repository examples [1] and they tend to 
> expose the main job as a class that accepts a `SourceFunction` and a 
> `SinkFunction`, which make sense. But then, my job is normally constructed 
> with `KafkaSource` which is then passed to `env.fromSource(...`.
> 
> Is there any recommended way of handling these discrepancies, ie. having to 
> use `env.addSource(sourceFunction)` vs `env.fromSource(source)`?
> 
> [1] 
> https://github.com/apache/flink-training/blob/05791e55ad7ff0358b5c57ea8f40eada4a1f626a/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingIntegrationTest.java#L61
>  
> <https://github.com/apache/flink-training/blob/05791e55ad7ff0358b5c57ea8f40eada4a1f626a/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingIntegrationTest.java#L61>
> 
> -- 
> Piotr Domagalski

--
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch





Setting boundedness for legacy Hadoop sequence file sources

2022-05-03 Thread Ken Krugler
Hi all,

I’m converting several batch Flink workflows to streaming, with bounded sources.

Some of our sources are reading Hadoop sequence files via 
StreamExecutionEnvironment.createInput(HadoopInputFormat).

The problem is that StreamGraphGenerator.existsUnboundedSource is returning 
true, because the LegacySourceTransformation for this source says it’s 
CONTINUOUS_UNBOUNDED. So the workflow fails to run, because I’ve set the 
execution mode to batch.

The root cause is that StreamExecutionEnvironment.createInput() checks if the 
input format extends FileInputFormat, and only sets up a bounded source if it 
does. HadoopInputFormat doesn’t extend FileInputFormat, so boundedness gets set 
to CONTINUOUS_UNBOUNDED, which is wrong.

This looks like a bug in StreamExecutionEnvironment.createInput(), though not 
sure how best to fix it. Relying on class checks feels brittle.

Regards,

— Ken

--
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch





Re: Controlling group partitioning with DataStream

2022-03-18 Thread Ken Krugler
Hi Guowei,

Thanks for following up on this, sorry I missed your email earlier.

Unfortunately I don’t think auto-rebalancing will help my situation, because I 
have a small number of unique key values (low cardinality).

And processing these groups (training one deep-learning model per group) 
requires a lot fo memory, so I need to ensure only one group per slot.

Regards,

— Ken


> On Mar 8, 2022, at 8:35 PM, Guowei Ma  wrote:
> 
> Hi, Ken
> 
> If you are talking about the Batch scene, there may be another idea that the 
> engine automatically and evenly distributes the amount of data to be 
> processed by each Stage to each worker node. This also means that, in some 
> cases, the user does not need to manually define a Partitioner.
> 
> At present, Flink has a FLIP-187 [1], which is working in this direction, but 
> to achieve the above goals may also require the follow up work of FLIP-186 
> [2]. After the release of 1.15, we will carry out the "Auto-rebalancing of 
> workloads" related work as soon as possible, you can pay attention to the 
> progress of this FLIP.
> 
> [1] 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-187%3A+Adaptive+Batch+Job+Scheduler
>  
> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-187%3A+Adaptive+Batch+Job+Scheduler>
> [2] 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-187%3A+Adaptive+Batch+Job+Scheduler#FLIP187:AdaptiveBatchJobScheduler-Futureimprovements
>  
> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-187%3A+Adaptive+Batch+Job+Scheduler#FLIP187:AdaptiveBatchJobScheduler-Futureimprovements>
> 
> Best,
> Guowei
> 
> 
> On Wed, Mar 9, 2022 at 8:44 AM Ken Krugler  <mailto:kkrugler_li...@transpac.com>> wrote:
> Hi Dario,
> 
> Just to close the loop on this, I answered my own question on SO.
> 
> Unfortunately it seems like the recommended solution is to do the same hack I 
> did a while ago, which is to generate (via trial-and-error) a key that gets 
> assigned to the target slot.
> 
> I was hoping for something a bit more elegant :)
> 
> I think it’s likely I could make it work by implementing my own version of 
> KeyGroupStreamPartitioner, but as I’d noted in my SO question, that would 
> involve use of some internal-only classes, so maybe not a win.
> 
> — Ken
> 
> 
>> On Mar 4, 2022, at 3:14 PM, Dario Heinisch > <mailto:dario.heini...@gmail.com>> wrote:
>> 
>> Hi, 
>> 
>> I think you are looking for this answer from David: 
>> https://stackoverflow.com/questions/69799181/flink-streaming-do-the-events-get-distributed-to-each-task-slots-separately-acc
>>  
>> <https://stackoverflow.com/questions/69799181/flink-streaming-do-the-events-get-distributed-to-each-task-slots-separately-acc>
>> I think then you could technically create your partitioner - though little 
>> bit cubersome - by mapping your existing keys to new keys who will have then 
>> an output to the desired
>> group & slot. 
>> 
>> Hope this may help, 
>> 
>> Dario
>> 
>> On 04.03.22 23:54, Ken Krugler wrote:
>>> Hi all,
>>> 
>>> I need to be able to control which slot a keyBy group goes to, in order to 
>>> compensate for a badly skewed dataset.
>>> 
>>> Any recommended approach to use here?
>>> 
>>> Previously (with a DataSet) I used groupBy followed by a withPartitioner, 
>>> and provided my own custom partitioner.
>>> 
>>> I posted this same question to 
>>> https://stackoverflow.com/questions/71357833/equivalent-of-dataset-groupby-withpartitioner-for-datastream
>>>  
>>> <https://stackoverflow.com/questions/71357833/equivalent-of-dataset-groupby-withpartitioner-for-datastream>
>>> 
>>> Thanks,
>>> 
>>> — Ken
> 
> --
> Ken Krugler
> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
> Custom big data solutions
> Flink, Pinot, Solr, Elasticsearch
> 
> 
> 

--
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch





Correct way to cleanly shut down StateFun Harness in test code

2022-03-17 Thread Ken Krugler
Hi all,

I’m using org.apache.flink.statefun.flink.harness.Harness in some unit test 
code, where I control the sources so that they are finite.

This is similar to what I found at 
https://stackoverflow.com/questions/61939681/is-it-possible-to-write-a-unit-test-which-terminates-using-flink-statefun-harnes

The problem is that if I shut down all of the sources, it looks like StateFun 
then starts shutting down some resources prematurely, which results in this 
error:

Caused by: java.lang.IllegalStateException: Mailbox is in state CLOSED, but is 
required to be in state OPEN for put operations.
at 
org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.checkPutStateConditions(TaskMailboxImpl.java:256)
 ~[flink-streaming-java_2.12-1.11.4.jar:1.11.4]
at 
org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.put(TaskMailboxImpl.java:184)
 ~[flink-streaming-java_2.12-1.11.4.jar:1.11.4]
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.execute(MailboxExecutorImpl.java:73)
 ~[flink-streaming-java_2.12-1.11.4.jar:1.11.4]
at 
org.apache.flink.streaming.api.operators.MailboxExecutor.execute(MailboxExecutor.java:98)
 ~[flink-streaming-java_2.12-1.11.4.jar:1.11.4]
at 
org.apache.flink.statefun.flink.core.common.MailboxExecutorFacade.execute(MailboxExecutorFacade.java:35)
 ~[statefun-flink-datastream-2.2.2.jar:2.2.2]
at 
org.apache.flink.statefun.flink.core.feedback.FeedbackChannel$ConsumerTask.scheduleDrainAll(FeedbackChannel.java:114)
 ~[statefun-flink-datastream-2.2.2.jar:2.2.2]
at 
org.apache.flink.statefun.flink.core.feedback.FeedbackChannel.put(FeedbackChannel.java:64)
 ~[statefun-flink-datastream-2.2.2.jar:2.2.2]
at 
org.apache.flink.statefun.flink.core.feedback.FeedbackSinkOperator.processElement(FeedbackSinkOperator.java:58)
 ~[statefun-flink-datastream-2.2.2.jar:2.2.2]
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:158)
 ~[flink-streaming-java_2.12-1.11.4.jar:1.11.4]
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:191)
 ~[flink-streaming-java_2.12-1.11.4.jar:1.11.4]
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:162)
 ~[flink-streaming-java_2.12-1.11.4.jar:1.11.4]
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
 ~[flink-streaming-java_2.12-1.11.4.jar:1.11.4]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:374)
 ~[flink-streaming-java_2.12-1.11.4.jar:1.11.4]
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:190)
 ~[flink-streaming-java_2.12-1.11.4.jar:1.11.4]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:608)
 ~[flink-streaming-java_2.12-1.11.4.jar:1.11.4]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:574) 
~[flink-streaming-java_2.12-1.11.4.jar:1.11.4]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:752) 
~[flink-runtime_2.12-1.11.4.jar:1.11.4]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:569) 
~[flink-runtime_2.12-1.11.4.jar:1.11.4]
at java.lang.Thread.run(Thread.java:834) ~[?:?]

I believe the issue is that messages are still being generated but the target 
for the feedback channel has been terminated.

I currently work around this by turning off just the sources that would trigger 
new egress results, then I wait for the egresses to all be idle for > 2 
seconds, and then I shut down the rest of the sources.

But it feels fragile...

Thanks,

— Ken

PS - using StateFun 2.2.2

--
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch





Re: Controlling group partitioning with DataStream

2022-03-08 Thread Ken Krugler
Hi Dario,

Just to close the loop on this, I answered my own question on SO.

Unfortunately it seems like the recommended solution is to do the same hack I 
did a while ago, which is to generate (via trial-and-error) a key that gets 
assigned to the target slot.

I was hoping for something a bit more elegant :)

I think it’s likely I could make it work by implementing my own version of 
KeyGroupStreamPartitioner, but as I’d noted in my SO question, that would 
involve use of some internal-only classes, so maybe not a win.

— Ken


> On Mar 4, 2022, at 3:14 PM, Dario Heinisch  wrote:
> 
> Hi, 
> 
> I think you are looking for this answer from David: 
> https://stackoverflow.com/questions/69799181/flink-streaming-do-the-events-get-distributed-to-each-task-slots-separately-acc
>  
> <https://stackoverflow.com/questions/69799181/flink-streaming-do-the-events-get-distributed-to-each-task-slots-separately-acc>
> I think then you could technically create your partitioner - though little 
> bit cubersome - by mapping your existing keys to new keys who will have then 
> an output to the desired
> group & slot. 
> 
> Hope this may help, 
> 
> Dario
> 
> On 04.03.22 23:54, Ken Krugler wrote:
>> Hi all,
>> 
>> I need to be able to control which slot a keyBy group goes to, in order to 
>> compensate for a badly skewed dataset.
>> 
>> Any recommended approach to use here?
>> 
>> Previously (with a DataSet) I used groupBy followed by a withPartitioner, 
>> and provided my own custom partitioner.
>> 
>> I posted this same question to 
>> https://stackoverflow.com/questions/71357833/equivalent-of-dataset-groupby-withpartitioner-for-datastream
>>  
>> <https://stackoverflow.com/questions/71357833/equivalent-of-dataset-groupby-withpartitioner-for-datastream>
>> 
>> Thanks,
>> 
>> — Ken

--
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch





Controlling group partitioning with DataStream

2022-03-04 Thread Ken Krugler
Hi all,

I need to be able to control which slot a keyBy group goes to, in order to 
compensate for a badly skewed dataset.

Any recommended approach to use here?

Previously (with a DataSet) I used groupBy followed by a withPartitioner, and 
provided my own custom partitioner.

I posted this same question to 
https://stackoverflow.com/questions/71357833/equivalent-of-dataset-groupby-withpartitioner-for-datastream
 
<https://stackoverflow.com/questions/71357833/equivalent-of-dataset-groupby-withpartitioner-for-datastream>

Thanks,

— Ken

------
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch



Exception thrown during batch job execution on YARN even though job succeeded

2021-09-30 Thread Ken Krugler
Hi all,

We’ve upgraded from Flink 1.11 to 1.13, and our workflows are now sometimes 
failing with an exception, even though the job has succeeded.

The stack trace for this bit of the exception is:

java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not 
complete the operation. Number of retries has been exhausted.
at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at 
org.apache.flink.client.program.ContextEnvironment.getJobExecutionResult(ContextEnvironment.java:117)
at 
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:74)
at my.program.execute.workflow...

The root cause is "java.net.ConnectException: Connection refused”, returned 
from the YARN node where the Job Manager is (was) running.

ContextEnvironment.java line 117 is:

jobExecutionResult = jobExecutionResultFuture.get();

This looks like a race condition, where YARN is terminating the Job Manager, 
and this sometimes completes before the main program has retrieved all of the 
job status information.

I’m wondering if this is a side effect of recent changes to make execution 
async/non-blocking.

Is this a known issue? Anything we can do to work around it?

Thanks,

— Ken

PS - The last two people working on this area code were Aljoscha and Robert 
(really wish git blame didn’t show most lines as being modified by “Rufus 
Refactor”…sigh)

------
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch



Re: Running Flink Dataset jobs Sequentially

2021-07-14 Thread Ken Krugler
Hi Jason,

Yes, I write the files inside of the mapPartition function. Note that you can 
get multiple key groups inside of one partition, so you have to manage your own 
map from the key group to the writer.

The Flink DAG ends with a DiscardingSink, after the mapPartition.

And no, we didn’t notice any specific performance hit with this approach. 
Though our workflow was much more complex, so performance was bounded by 
upstream joins.

— Ken


> On Jul 13, 2021, at 10:53 AM, Jason Liu  wrote:
> 
> Hey Ken, 
> 
> Thanks! This is really helpful. Just to clarify, when you said write a 
> custom mapPartition that writes to files, did you actually write the file 
> inside the mapPartition function itself? So the Flink DAG ends at 
> mapPartition? Did you notice any performance issues as a result of this? 
> 
> Thanks again,
> Jason
> 
> On Fri, Jul 9, 2021 at 1:39 PM Ken Krugler  <mailto:kkrugler_li...@transpac.com>> wrote:
> FWIW I had to do something similar in the past. My solution was to…
> 
> 1. Create a custom reader that added the source directory to the input data 
> (so I had a Tuple2
> 2. Create a job that reads from all source directories, using 
> HadoopInputFormat for text
> 3. Constrain the parallelism of this initial part of the job, to avoid 
> overwhelming downloads from S3.
> 4. Do a partition on the source directory
> 5. Write a custom mapPartition function that opens/writes to output files 
> that are created with names based on the source directory.
> 
> — Ken
> 
>> On Jul 8, 2021, at 3:19 PM, Jason Liu > <mailto:jasonli...@ucla.edu>> wrote:
>> 
>> Hi all,
>> 
>> We currently have a use case of running a given dataset API job for a 
>> given S3 directory to dedup data and output to a new directory. We need to 
>> run this job for roughly ~1000 S3 folders. I attempted to set up the Flink 
>> executions so it runs sequentially like this: 
>> 
>> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
>> 
>> Configuration parameters = new Configuration();
>> parameters.setBoolean("recursive.file.enumeration", true);
>> 
>> for (final String inputDirectory : directoryList) {
>>   String inputPath = inputDirectory;
>>   String outputPath = getOutputPath(inputPath);
>> 
>>   log.warn("using input path [{}] and output path [{}]", inputPath, 
>> outputPath);
>> 
>>   DataSet lines = 
>> env.readTextFile(inputPath).withParameters(parameters);
>>   DataSet deduped = lines.distinct(new GetKey());
>>   deduped.writeAsText(outputPath, FileSystem.WriteMode.OVERWRITE);
>> }
>> env.execute();
>> However, when I submit this job to the cluster, it generates a graph like 
>> this 
>> 
>> And it seems Flink is running them in parallel. Is there a way to tell Flink 
>> to run it sequentially? I tried moving the execution environment inside the 
>> loop but it seems like it only runs the job on the first directory. I'm 
>> running this on AWS Kinesis Data Analytics, so it's a bit hard for me to 
>> submit new jobs. 
>> 
>> Wondering if there's any way I can accomplish this?
>> 
>> Thanks,
>> Jason
>> 
> 
> --
> Ken Krugler
> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
> Custom big data solutions
> Flink, Pinot, Solr, Elasticsearch
> 
> 
> 

--
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch





Re: Running Flink Dataset jobs Sequentially

2021-07-09 Thread Ken Krugler
FWIW I had to do something similar in the past. My solution was to…

1. Create a custom reader that added the source directory to the input data (so 
I had a Tuple2
2. Create a job that reads from all source directories, using HadoopInputFormat 
for text
3. Constrain the parallelism of this initial part of the job, to avoid 
overwhelming downloads from S3.
4. Do a partition on the source directory
5. Write a custom mapPartition function that opens/writes to output files that 
are created with names based on the source directory.

— Ken

> On Jul 8, 2021, at 3:19 PM, Jason Liu  wrote:
> 
> Hi all,
> 
> We currently have a use case of running a given dataset API job for a 
> given S3 directory to dedup data and output to a new directory. We need to 
> run this job for roughly ~1000 S3 folders. I attempted to set up the Flink 
> executions so it runs sequentially like this: 
> 
> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> 
> Configuration parameters = new Configuration();
> parameters.setBoolean("recursive.file.enumeration", true);
> 
> for (final String inputDirectory : directoryList) {
>   String inputPath = inputDirectory;
>   String outputPath = getOutputPath(inputPath);
> 
>   log.warn("using input path [{}] and output path [{}]", inputPath, 
> outputPath);
> 
>   DataSet lines = 
> env.readTextFile(inputPath).withParameters(parameters);
>   DataSet deduped = lines.distinct(new GetKey());
>   deduped.writeAsText(outputPath, FileSystem.WriteMode.OVERWRITE);
> }
> env.execute();
> However, when I submit this job to the cluster, it generates a graph like 
> this 
> 
> And it seems Flink is running them in parallel. Is there a way to tell Flink 
> to run it sequentially? I tried moving the execution environment inside the 
> loop but it seems like it only runs the job on the first directory. I'm 
> running this on AWS Kinesis Data Analytics, so it's a bit hard for me to 
> submit new jobs. 
> 
> Wondering if there's any way I can accomplish this?
> 
> Thanks,
> Jason
> 

--
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch





Re: Memory Usage - Total Memory Usage on UI and Metric

2021-07-02 Thread Ken Krugler
When we run Flink jobs in EMR (typically batch, though) we disable the pmem 
(permanent memory) and vmem (virtual memory) checks.

This was initially done for much older versions of Flink (1.6???), where the 
memory model wasn’t so well documented or understood by us.

But I think the pmem check might still have an issue, due to Flink’s use of 
off-heap.

So something like:

[
{
"classification": "yarn-site",
"properties": {
"yarn.nodemanager.pmem-check-enabled": "false",
"yarn.nodemanager.vmem-check-enabled": "false"
}
}
]


…might help.

— Ken


> On Jul 2, 2021, at 8:36 AM, bat man  wrote:
> 
> Hi,
> 
> I am running a streaming job (Flink 1.9) on EMR on yarn. Flink web UI or 
> metrics reported from prometheus shows total memory usage within specified 
> task manager memory - 3GB. 
> 
> Metrics shows below numbers(in MB) -
> Heap - 577
> Non Heap - 241
> DirectMemoryUsed - 852
> 
> Non-heap does rise gradually, starting around 210MB and reaching 241 when 
> yarn kills the container. Heap fluctuates between 1.x - .6GB, 
> DirectMemoryUsed is constant at 852.
> 
> Based on configurations these are the tm params from yarn logs - 
> -Xms1957m -Xmx1957m -XX:MaxDirectMemorySize=1115m 
> 
> These are other params as configuration in flink-conf
> yarn-cutoff - 270MB
> Managed memory - 28MB
> Network memory - 819MB
> 
> Above memory values are from around the same time the container is killed by 
> yarn for -  is running beyond physical memory limits.
> 
> Is there anything else which is not reported by flink in metrics or I have 
> been misinterpreting as seen from above total memory consumed is below - 3GB.
> 
> Same behavior is reported when I have run the job with 2GB, 2.7GB and now 3GB 
> task mem. My job does have shuffles as data from one operator is sent to 4 
> other operators after filtering.
> 
> One more thing is I am running this with 3 yarn containers(2 tasks in each 
> container), total parallelism as 6. As soon as one container fails with this 
> error, the job re-starts. However, within minutes other 2 containers also 
> fail with the same error one by one.
> 
> Thanks,
> Hemant

--
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch





Re: Error with extracted type from custom partitioner key

2021-06-12 Thread Ken Krugler
Hi Timo,

Thanks, I’ll give the ResultTypeQueryable interface a try - my previous 
experience registering custom Kryo serializers wasn’t so positive.

Though I’m still curious as to whether java.lang.ClassCastException I got was 
representative of a bug in Flink, or my doing something wrong.

But with the ongoing deprecation of DataSet support, I imagine that’s a low 
priority issue in any case.

Regards,

— Ken


> On Jun 4, 2021, at 7:05 AM, Timo Walther  wrote:
> 
> Hi Ken,
> 
> non-POJOs are serialized with Kryo. This might not give you optimal 
> performance. You can register a custom Kryo serializer in ExecutionConfig to 
> speed up the serialization.
> 
> Alternatively, you can implement `ResultTypeQueryable` provide a custom type 
> information with a custom serializer.
> 
> I hope this helps. Otherwise can you share a little example how you would 
> like to cann partitionCustom()?
> 
> Regards,
> Timo
> 
> On 04.06.21 15:38, Ken Krugler wrote:
>> Hi all,
>> I'm using Flink 1.12 and a custom partitioner/partitioning key (batch mode, 
>> with a DataSet) to do a better job of distributing data to tasks. The 
>> classes look like:
>> public class MyPartitioner implements Partitioner
>> {
>> ...
>> }
>> public class MyGroupingKey implements Comparable
>> {
>> ...
>> }
>> This worked fine, but I noticed a warning logged by Flink about 
>> MyGroupingKey not having an empty constructor, and thus not being treated as 
>> a POJO.
>> I added that empty constructor, and then I got an error because 
>> partitionCustom() only works on a single field key.
>> So I changed MyGroupingKey to have a single field (a string), with transient 
>> cached values for the pieces of the key that I need while partitioning. Now 
>> I get an odd error:
>> java.lang.RuntimeException: Error while calling custom partitioner
>> Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to 
>> MyGroupingKey
>> at MyPartitioner.partition(AdsPinotFilePartitioner.java:11)
>> at 
>> org.apache.flink.runtime.operators.shipping.OutputEmitter.customPartition(OutputEmitter.java:235)
>> ... 19 more
>> So I've got two questions…
>> • Should I just get rid of the empty constructor, and have Flink treat it as 
>> a non-POJO? This seemed to be working fine.
>> • Is it a bug in Flink that the extracted field from the key is being used 
>> as the expected type for partitioning?
>> Thanks!
>> — Ken
>> --
>> Ken Krugler
>> http://www.scaleunlimited.com <http://www.scaleunlimited.com>
>> Custom big data solutions
>> Flink, Pinot, Solr, Elasticsearch
> 

--
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch





Error with extracted type from custom partitioner key

2021-06-04 Thread Ken Krugler
Hi all,

I'm using Flink 1.12 and a custom partitioner/partitioning key (batch mode, 
with a DataSet) to do a better job of distributing data to tasks. The classes 
look like:

public class MyPartitioner implements Partitioner 
{
...
}

public class MyGroupingKey implements Comparable 
{
...
}

This worked fine, but I noticed a warning logged by Flink about MyGroupingKey 
not having an empty constructor, and thus not being treated as a POJO.

I added that empty constructor, and then I got an error because 
partitionCustom() only works on a single field key.

So I changed MyGroupingKey to have a single field (a string), with transient 
cached values for the pieces of the key that I need while partitioning. Now I 
get an odd error:

java.lang.RuntimeException: Error while calling custom partitioner

Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to 
MyGroupingKey
at MyPartitioner.partition(AdsPinotFilePartitioner.java:11)
at 
org.apache.flink.runtime.operators.shipping.OutputEmitter.customPartition(OutputEmitter.java:235)
... 19 more

So I've got two questions…

• Should I just get rid of the empty constructor, and have Flink treat 
it as a non-POJO? This seemed to be working fine.
• Is it a bug in Flink that the extracted field from the key is being 
used as the expected type for partitioning?

Thanks!

— Ken

--
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch





Re: Flink auto-scaling feature and documentation suggestions

2021-05-05 Thread Ken Krugler
Hi Vishal,

WRT “bring down our internal services” - a common pattern with making requests 
to external services is to measure latency, and throttle (delay) requests in 
response to increased latency.

You’ll see this discussed frequently on web crawling forums as an auto-tuning 
approach.

Typically there’s a steady increase in latency as load on the service increases.

The trick is throttling soon enough before you hit the “elbow” where a service 
effectively falls over.

— Ken



> On May 5, 2021, at 9:08 AM, vishalovercome  wrote:
> 
> Yes. While back-pressure would eventually ensure high throughput, hand tuning
> parallelism became necessary because the job with high source parallelism
> would immediately bring down our internal services - not giving enough time
> to flink to adjust the in-rate. Plus running all operators at such a high
> scale would result in wastage of resources, even with operator chaining in
> place. 
> 
> That's why I think more toggles are needed to make current auto-scaling
> truly shine.
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

--
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch





Re: SocketException: Too many open files

2020-09-25 Thread Ken Krugler
h calling the Sink Function where i am writing the data to elastic
> i am calling the print() on SingleOutputStreamOperator (the Stream that is
> returned once i calculate the Aggregation based on Tumbling Window.
> 
> And also i am calling the DataStreamUtils.collect() on the above stream to
> log out the info in the stream.
> 
> These two are only enabled for in DEV Environment.
> 
> I have updated the limits.conf and also set the value of file-max
> (fs.file-max = 2097152) on the master node as well as on all worker nodes
> and still getting the same issue.
> 
> Thanks
> Sateesh
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

--
Ken Krugler
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr



Re: Speeding up CoGroup in batch job

2020-09-17 Thread Ken Krugler
Hi Robert,

Thanks for the input. I did increase the amount of managed memory, and 
confirmed that both SSDs (on each slave) are being used for temp data.

I haven’t been able to figure out why the server CPU usage is low, but I did 
notice that it fluctuated from very low (10%) on up to 95+%, with the average 
around 50%. But iowait never gets very high. Wondering if CPU is low when a lot 
of segments are being flushed to disk, and high when a lot of segments are 
being sorted before being flushed.

The main bottleneck is the CoGroup operation, which is in the phase where it's 
writing a all of the (grouped) data to disk, in preparation for the sorted 
merge to do the grouping.

Looking at threads from a single dump of a TM process, most are WAITING, with 
counts like:

47 - requestMemorySegmentBlocking
70 - ReaderIterator.next
70 - AbstractRecordReader.getNextRecord

The only RUNNABLE threads that were doing anything interesting were all 
Kryo-related, which speaks to your point about ensuring I’m using POJOs.

I’m curious, after looking into the code, whether enabling object reuse would 
also help - I see different versions of mergers being used, depending on 
whether that’s on or not.

Thanks again,

— Ken


> On Sep 11, 2020, at 5:27 AM, Robert Metzger  wrote:
> 
> Hi Ken,
> 
> Some random ideas that pop up in my head:
> - make sure you use data types that are efficient to serialize, and cheap to 
> compare (ideally use primitive types in TupleN or POJOs)
> - Maybe try the TableAPI batch support (if you have time to experiment).
> - optimize memory usage on the TaskManager for a lot of managed memory on the 
> TaskManager, so that we have more memory for efficient sorting (leading to 
> less spilling): 
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_tuning.html#configure-memory-for-batch-jobs
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_tuning.html#configure-memory-for-batch-jobs>
> - make sure to configure a separate tmp directory for each SSD, so that we 
> can spread the load across all SSDs.
> - If you are saying the CPU load is 40% on a TM, we have to assume we are IO 
> bound: Is it the network or the disk(s)?
> 
> I hope this is some helpful inspiration for improving the performance.
> 
> 
> On Fri, Sep 4, 2020 at 9:43 PM Ken Krugler  <mailto:kkrugler_li...@transpac.com>> wrote:
> Hi all,
> 
> I added a CoGroup to my batch job, and it’s now running much slower, 
> primarily due to back pressure from the CoGroup operator.
> 
> I assume it’s because this operator is having to sort/buffer-to-disk all 
> incoming data. Looks like about 1TB from one side of the join, currently very 
> little from the other but will be up to 2TB in the future.
> 
> I don’t see lots of GC, I’m using about 60% of available network buffers, per 
> TM server load (for all 8 servers) is about 40% average, and both SSDs on 
> each TM are being used for …/flink-io-xxx/yyy.channel files.
> 
> What are techniques for improving the performance of a CoGroup? 
> 
> Thanks!
> 
> — Ken
> 
> --
> Ken Krugler
> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
> custom big data solutions & training
> Hadoop, Cascading, Cassandra & Solr
> 

--
Ken Krugler
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr



Re: Use of slot sharing groups causing workflow to hang

2020-09-09 Thread Ken Krugler
Hi Til,

> On Sep 3, 2020, at 12:31 AM, Till Rohrmann  wrote:
> 
> Hi Ken,
> 
> I believe that we don't have a lot if not any explicit logging about the slot 
> sharing group in the code. You can, however, learn indirectly about it by 
> looking at the required number of AllocatedSlots in the SlotPool. Also the 
> number of "multi task slot" which are created should vary because every group 
> of slot sharing tasks will create one of them. For learning about the 
> SlotPoolImpl's status, you can also take a look at SlotPoolImpl.printStatus.
> 
> For the underlying problem, I believe that Yangze could be right. How many 
> resources do you have in your cluster?

I've got a Flink MiniCluster with 12 slots. Even with only 6 pipelined
operators, each with a parallelism of 1, it still hangs while starting. So
I don't think that it's a resource issue.

One odd thing I've noticed. I've got three streams that I union together.
Two of the streams are in separate slot sharing groups, the third is not
assigned to a group. But when I check the logs, I see three "Create multi
task slot" entries. I'm wondering if unioning streams that are in different
slot sharing groups creates a problem.

Thanks,

-- Ken

> On Thu, Sep 3, 2020 at 4:25 AM Yangze Guo  <mailto:karma...@gmail.com>> wrote:
> Hi,
> 
> The failure of requesting slots usually because of the lack of
> resources. If you put part of the workflow to a specific slot sharing
> group, it may require more slots to run the workflow than before.
> Could you share logs of the ResourceManager and SlotManager, I think
> there are more clues in it.
> 
> Best,
> Yangze Guo
> 
> On Thu, Sep 3, 2020 at 4:39 AM Ken Krugler  <mailto:kkrugler_li...@transpac.com>> wrote:
> >
> > Hi all,
> >
> > I’ve got a streaming workflow (using Flink 1.11.1) that runs fine locally 
> > (via Eclipse), with a parallelism of either 3 or 6.
> >
> > If I set up part of the workflow to use a specific (not “default”) slot 
> > sharing group with a parallelism of 3, and the remaining portions of the 
> > workflow have a parallelism of either 1 or 2, then the workflow never 
> > starts running, and eventually fails due to a slot request not being 
> > fulfilled in time.
> >
> > So I’m wondering how best to debug this.
> >
> > I don’t see any information (even at DEBUG level) being logged about which 
> > operators are in what slot sharing group, or which slots are assigned to 
> > what groups.
> >
> > Thanks,
> >
> > — Ken
> >
> > PS - I’ve looked at https://issues.apache.org/jira/browse/FLINK-8712 
> > <https://issues.apache.org/jira/browse/FLINK-8712>, and tried the approach 
> > of setting # of slots in the config, but that didn’t change anything. I see 
> > that issue is still open, so wondering what Til and Konstantin have to say 
> > about it.
> >
> > --
> > Ken Krugler
> > http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
> > custom big data solutions & training
> > Hadoop, Cascading, Cassandra & Solr
> >

--
Ken Krugler
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr



Re: Should the StreamingFileSink mark the files "finished" when all bounded input sources are depleted?

2020-09-07 Thread Ken Krugler
Hi Fred,

I think this is the current behavior (though it would be helpful to know which 
version of Flink you’re using).

From an email conversation with Kostas in January of this year:

> Hi Ken, Jingsong and Li,
> 
> Sorry for the late reply.
> 
> As Jingsong pointed out, upon calling close() the StreamingFileSink
> does not commit the in-progress/pending files.
> The reason for this is that the close() method of any UDF including
> sink functions is called on both normal termination and termination
> due to failure.
> Given this, we cannot commit the files, because in case of failure
> they should be reverted.
> 
> Actually we are currently updating the StreamingFileSink docs to
> includes this among other things.
> Also the differentiation between normal termination and termination
> due to failure will hopefully be part of Flink 1.11 and
> this is the FLIP to check
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-46%3A+Graceful+Shutdown+Handling+by+UDFs
>  
> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-46%3A+Graceful+Shutdown+Handling+by+UDFs>.
> 
> Cheers,
> Kostas

Though it looks like FLIP-46 is still under discussion, and thus 1.11 doesn’t 
have a fix for this?

— Ken

> On Sep 7, 2020, at 8:39 AM, Teunissen, F.G.J. (Fred)  <mailto:fred.teunis...@ing.com>> wrote:
> 
> Hi All,
>  
> My flink-job is using bounded input sources and writes the results to a 
> StreamingFileSink.
> When it has processed all the input the job is finished and closes. But the 
> output files are still
> named “-0-0..inprogress.”. I expected them to be named 
> ““-0-0.”.
> 
> Did I forget some setting or something else?
>  
> Regards,
> Fred
>  
> -
> ATTENTION:
> The information in this e-mail is confidential and only meant for the 
> intended recipient. If you are not the intended recipient, don't use or 
> disclose it in any way. Please let the sender know and delete the message 
> immediately.
> -

--
Ken Krugler
http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr



--
Ken Krugler
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr



Speeding up CoGroup in batch job

2020-09-04 Thread Ken Krugler
Hi all,

I added a CoGroup to my batch job, and it’s now running much slower, primarily 
due to back pressure from the CoGroup operator.

I assume it’s because this operator is having to sort/buffer-to-disk all 
incoming data. Looks like about 1TB from one side of the join, currently very 
little from the other but will be up to 2TB in the future.

I don’t see lots of GC, I’m using about 60% of available network buffers, per 
TM server load (for all 8 servers) is about 40% average, and both SSDs on each 
TM are being used for …/flink-io-xxx/yyy.channel files.

What are techniques for improving the performance of a CoGroup? 

Thanks!

— Ken

--
Ken Krugler
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr



Re: Bug: Kafka producer always writes to partition 0, because KafkaSerializationSchemaWrapper does not call open() method of FlinkKafkaPartitioner

2020-09-03 Thread Ken Krugler
Assuming you’re not doing custom partitioning, then another workaround is to 
pass Optional.empty() for the partitioner, so that it will use the Kafka 
partitioning vs. a Flink partitioner.

Or at least that worked for us, when we encountered this same issue.

— Ken

> On Sep 3, 2020, at 5:36 AM, Dawid Wysakowicz  wrote:
> 
> Thank you for the thorough investigation.  I totally agree with you. I 
> created an issue for it[1]. Will try to fix it as soon as possible and 
> include it in 1.11.2 and 1.12. 
> The way you could work this around is by using the KafkaSerializationSchema 
> directly with an KafkaContextAware interface.
> Best,
> 
> Dawid
> 
> [1] https://issues.apache.org/jira/browse/FLINK-19133 
> <https://issues.apache.org/jira/browse/FLINK-19133>
> On 03/09/2020 14:24, DONG, Weike wrote:
>> Hi community,
>> 
>> And by the way, during FlinkKafkaProducer#initProducer, the 
>> flinkKafkaPartitioner is only opened when is is NOT null, which is 
>> unfortunately not the case here, because it would be set to null if 
>> KafkaSerializationSchemaWrapper is provided in the arguments of the 
>> constructor.
>> 
>> 
>> 
>> 
>> So these logic flaws eventually lead to this serious bug, and we recommend 
>> that initialization of FlinkKafkaPartitioners could be done in 
>> KafkaSerializationSchemaWrapper#open.
>> 
>> Sincerely,
>> Weike
>> 
>> 
>> On Thu, Sep 3, 2020 at 8:15 PM DONG, Weike > <mailto:kyled...@connect.hku.hk>> wrote:
>> Hi community,
>> 
>> We have found a serious issue with the newly-introduced 
>> KafkaSerializationSchemaWrapper class, which eventually let 
>> FlinkKafkaProducer only write to partition 0 in the given Kafka topic under 
>> certain conditions.
>> 
>> First let's look at this constructor in the universal version of 
>> FlinkKafkaProducer, and it uses FlinkFixedPartitioner as the custom 
>> partitioner.
>> 
>> 
>> 
>> And when we trace down the call path, KafkaSerializationSchemaWrapper is 
>> used to wrap the aforementioned custom partitioner, i.e. 
>> FlinkFiexedPartitioner. 
>> 
>> 
>> 
>> However, we found that in the implementation of  
>> KafkaSerializationSchemaWrapper, it does not call the open method of the 
>> given partitioner, which is essential for the partitioner to initialize its 
>> environment variables like parallelInstanceId in FlinkFixedPartitioner. 
>> 
>> Therefore, when KafkaSerializationSchemaWrapper#serialize is later called by 
>> the FlinkKafkaProducer,   FlinkFiexedPartitioner#partition would always 
>> return 0, because  parallelInstanceId is not properly initialized.
>> 
>> 
>> Eventually, all of the data would go only to partition 0 of the given Kafka 
>> topic, creating severe data skew in the sink.
>> 
>> 
>> 

--
Ken Krugler
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr



Use of slot sharing groups causing workflow to hang

2020-09-02 Thread Ken Krugler
Hi all,

I’ve got a streaming workflow (using Flink 1.11.1) that runs fine locally (via 
Eclipse), with a parallelism of either 3 or 6.

If I set up part of the workflow to use a specific (not “default”) slot sharing 
group with a parallelism of 3, and the remaining portions of the workflow have 
a parallelism of either 1 or 2, then the workflow never starts running, and 
eventually fails due to a slot request not being fulfilled in time.

So I’m wondering how best to debug this.

I don’t see any information (even at DEBUG level) being logged about which 
operators are in what slot sharing group, or which slots are assigned to what 
groups.

Thanks,

— Ken

PS - I’ve looked at https://issues.apache.org/jira/browse/FLINK-8712 
<https://issues.apache.org/jira/browse/FLINK-8712>, and tried the approach of 
setting # of slots in the config, but that didn’t change anything. I see that 
issue is still open, so wondering what Til and Konstantin have to say about it.

------
Ken Krugler
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr



Change in sub-task id assignment from 1.9 to 1.10?

2020-08-06 Thread Ken Krugler
Hi all,

Was there any change in how sub-tasks get allocated to TMs, from Flink 1.9 to 
1.10?

Specifically for consecutively numbered sub-tasks (e.g. 0, 1, 2) did it become 
more or less likely that they’d be allocated to the same Task Manager?

Asking because a workflow that ran fine in 1.9 now has a “hot” TM that’s having 
trouble keeping up with a Kafka topic.

The most promising explanation is that now there are three sub-tasks on the 
same TM that are reading from that topic, versus previously they’d be scattered 
across multiple TMs.

But I don’t see significant changes in this area post 1.8

Thanks,

— Ken

--
Ken Krugler
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr



Re: Issue with single job yarn flink cluster HA

2020-08-05 Thread Ken Krugler
Hi Dinesh,

Did updating to Flink 1.10 resolve the issue?

Thanks,

— Ken

> Hi Andrey,
> Sure We will try to use Flink 1.10 to see if HA issues we are facing is fixed 
> and update in this thread.
> 
> Thanks,
> Dinesh
> 
> On Thu, Apr 2, 2020 at 3:22 PM Andrey Zagrebin  <mailto:azagre...@apache.org>> wrote:
> Hi Dinesh,
> 
> Thanks for sharing the logs. There were couple of HA fixes since 1.7, e.g. 
> [1] and [2].
> I would suggest to try Flink 1.10.
> If the problem persists, could you also find the logs of the failed Job 
> Manager before the failover?
> 
> Best,
> Andrey
> 
> [1] https://jira.apache.org/jira/browse/FLINK-14316 
> <https://jira.apache.org/jira/browse/FLINK-14316>
> [2] https://jira.apache.org/jira/browse/FLINK-11843 
> <https://jira.apache.org/jira/browse/FLINK-11843>
> On Tue, Mar 31, 2020 at 6:49 AM Dinesh J  <mailto:dineshj...@gmail.com>> wrote:
> Hi Yang,
> I am attaching one full jobmanager log for a job which I reran today. This a 
> job that tries to read from savepoint.
> Same error message "leader election onging" is displayed. And this stays the 
> same even after 30 minutes. If I leave the job without yarn kill, it stays 
> the same forever.
> Based on your suggestions till now, I guess it might be some zookeeper 
> problem. If that is the case, what can I lookout for in zookeeper to figure 
> out the issue?
> 
> Thanks,
> Dinesh


[snip]

--
Ken Krugler
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr



Re: Flink Logging on EMR

2020-07-02 Thread Ken Krugler
Hi Sateesh,

Note that there are three classes of log files, when running Flink on EMR:

1. The output from the main class.

Since I typically run the job by sshing onto the master and using the CLI from 
there, I have control over where that output goes. E.g.

nohup bin/flink run -m yarn-cluster -yn 48 /path/to/my-job.jar >> my.log 2>&1 &

And then:

tail -f my.log

2. Logging by the JobManager

The JobManager log is available via the Yarn Application Overview screen (see 
the Logs link in the attempt list near the bottom). When your tool fails (e.g., 
due to a missing command-line argument), the error output is available via the 
stderr link in that Step of the EMR Cluster > Steps tab

3. Logging by each TaskManager

I typically log into the slave to have a look at the Task Manager error/status 
output (e.g., in 
/var/log/hadoop-yarn/containers/application_1546471484145_0002/container_1546471484145_0002_01_02/taskmanager.err).
 One common approach here is to grep the taskmanager.log files (on each slave), 
e.g.

sudo find /var/log/hadoop-yarn/containers/application_1568579660214_0004/ -name 
"taskmanager.log" | sudo xargs grep “text of interest”

HTH,

— Ken
 
> On Jul 2, 2020, at 9:29 AM, mars  wrote:
> 
> Hi,
> 
>  I am running my Flink jobs on EMR and i didn't include any
> log4j.properties as part of my JAR and i am using slf4j (and included the
> dependent jars in the uber jar i created) and logging in my app.
> 
>  When i run my everything is running fine except i cannot find my
> application logs any where 
> 
> I am running the Flink job/app with (-p 2) i see two task managers and when
> i looked into the logs (none of the app specific logs can be found in those
> logs). We are using the INFO Level logging. 
> 
> I was hoping the logs will go to default Console Appender.
> 
> In the Master node Flink Conf i have found logback-console.xml (which sets
> the root level logging to INFO) and is using Console Appender and there is
> also log4j.properties file which also sets the Root Level logging to INFO
> and is using FileAppender
> 
> I also tried to access the logs using "yarn logs --applicationId <>" i am
> getting
> 
> $ yarn logs -applicationId application_1593579475717_0001
> 20/07/01 21:16:32 INFO client.RMProxy: Connecting to ResourceManager at
> <>:8032
> /var/log/hadoop-yarn/apps/root/logs/application_1593579475717_0001 does not
> exist.
> Log aggregation has not completed or is not enabled.
> 
> And Yarn Log Aggregation is already enabled. When i checked
> /etc/hadoop/conf/yarn-site.xml
> 
> 
>yarn.log-aggregation-enable
>true
> 
> 
> It might be the case that i can only see the logs through yarn once the
> application completes/finishes/fails
> 
> Thanks
> Sateesh
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

--
Ken Krugler
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr



Re: Simple stateful polling source

2020-06-07 Thread Ken Krugler
Hi Chesnay,

> On Jun 19, 2019, at 6:05 AM, Chesnay Schepler  wrote:
> 
> A (Rich)SourceFunction that does not implement RichParallelSourceFunction is 
> always run with a parallelism of 1.

RichSourceFunction 
<https://ci.apache.org/projects/flink/flink-docs-stable/api/java/index.html?org/apache/flink/streaming/api/functions/source/RichSourceFunction.html>
 says "Base class for implementing a parallel data source…” and also talks 
about (in a similar, but not identical way as RichParallelSourceFunction 
<https://ci.apache.org/projects/flink/flink-docs-stable/api/java/index.html?org/apache/flink/streaming/api/functions/source/RichSourceFunction.html>)
 use of getRuntimeContext() to determine the sub-task index.

But you’d always want to extend RichParallelSourceFunction to create a parallel 
data source, yes?

Seems confusing.

Thanks,

— Ken

> 
> On 19/06/2019 14:36, Flavio Pompermaier wrote:
>> My sourcefunction is intrinsically single-thread. Is there a way to force 
>> this aspect?
>> I can't find a real difference between a RichParallelSourceFunction and a 
>> RichSourceFunction.
>> Is this last (RichSourceFunction) implicitly using parallelism = 1?
>> 
>> On Wed, Jun 19, 2019 at 2:25 PM Chesnay Schepler > <mailto:ches...@apache.org>> wrote:
>> It returns a list of states so that state can be re-distributed if the 
>> parallelism changes.
>> 
>> If you hard-code the interface to return a single value then you're 
>> implicitly locking the parallelism.
>> When you reduce the parallelism you'd no longer be able to restore all 
>> state, since you have less instances than stored state.
>> 
>> On 19/06/2019 14:19, Flavio Pompermaier wrote:
>>> It's not clear to me why the source checkpoint returns a list of 
>>> object...when it could be useful to use a list instead of a single value?
>>> The documentation says The returned list should contain one entry for 
>>> redistributable unit of state" but this is not very clear to me..
>>> 
>>> Best,
>>> Flavio
>>> 
>>> On Wed, Jun 19, 2019 at 12:40 PM Chesnay Schepler >> <mailto:ches...@apache.org>> wrote:
>>> This looks fine to me.
>>> 
>>> What exactly were you worried about?
>>> 
>>> On 19/06/2019 12:33, Flavio Pompermaier wrote:
>>> > Hi to all,
>>> > in my use case I have to ingest data from a rest service, where I 
>>> > periodically poll the data (of course a queue would be a better choice 
>>> > but this doesn't depend on me).
>>> >
>>> > So I wrote a RichSourceFunction that starts a thread that poll for new 
>>> > data.
>>> > However, I'd like to restart from the last "from" value (in the case 
>>> > the job is stopped).
>>> >
>>> > My initial thought was to write somewhere the last used date and, on 
>>> > job restart, read that date (from a file for example). However, Flink 
>>> > stateful source should be a better choice here...am I wrong? So I 
>>> > made  my source function implementing ListCheckpointed:
>>> >
>>> > @Override
>>> > public List snapshotState(long checkpointId, long timestamp) 
>>> > throws Exception {
>>> >return Collections.singletonList(pollingThread.getDateFromAsString());
>>> > }
>>> > @Override
>>> > public void restoreState(List state) throws Exception {
>>> > for (String dateFrom : state) {
>>> >  startDateStr = dateFrom;
>>> >  }
>>> > }
>>> >
>>> > @Override
>>> > public void run(SourceContext ctx) throws Exception {
>>> >final Object lock = ctx.getCheckpointLock();
>>> >Client httpClient = getHttpClient();
>>> >try {
>>> >   pollingThread = new MyPollingThread.Builder(baseUrl, 
>>> > httpClient)//
>>> >   .setStartDate(startDateStr, datePatternStr)//
>>> >   .build();
>>> >   // start the polling thread
>>> >   new Thread(pr).start();
>>> >  (etc)
>>> > }
>>> >
>>> > Is this the correct approach or did I misunderstood how stateful 
>>> > source functions work?
>>> >
>>> > Best,
>>> > Flavio
>>> 
>>> 
>>> 
>> 
>> 
> 

--
Ken Krugler
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr



Re: Flink restart strategy on specific exception

2020-05-12 Thread Ken Krugler
Hi Til,

Sorry, missed the key question…in the RestartStrategy.restart() method, I don’t 
see any good way to get at the underlying exception.

I can cast the RestartCallback to an ExecutionGraphRestartCallback, but I still 
need access to the private execGraph to be able to get at the failure info. Is 
there some other way in the restart handler to get at this?

And yes, I meant to note you’d mentioned the required static method in your 
email, I was asking about documentation for it.

Thanks,

— Ken

===
Sorry to resurface an ancient question, but is there a working example anywhere 
of setting a custom restart strategy?

Asking because I’ve been wandering through the Flink 1.9 code base for a while, 
and the restart strategy implementation is…pretty tangled.

From what I’ve been able to figure out, you have to provide a factory class, 
something like this:

Configuration config = new Configuration();
config.setString(ConfigConstants.RESTART_STRATEGY, 
MyRestartStrategyFactory.class.getCanonicalName());
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment(4, config);

That factory class should extend RestartStrategyFactory, but it also needs to 
implement a static method that looks like:

public static MyRestartStrategyFactory createFactory(Configuration config) {
return new MyRestartStrategyFactory();
}

I wasn’t able to find any documentation that mentioned this particular method 
being a requirement.

And also the documentation at 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#fault-tolerance
 
<https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#fault-tolerance>
 doesn’t mention you can set a custom class name for the restart-strategy.

Thanks,

— Ken


> On Nov 22, 2018, at 8:18 AM, Till Rohrmann  <mailto:trohrm...@apache.org>> wrote:
> 
> Hi Kasif,
> 
> I think in this situation it is best if you defined your own custom 
> RestartStrategy by specifying a class which has a `RestartStrategyFactory 
> createFactory(Configuration configuration)` method as `restart-strategy: 
> MyRestartStrategyFactoryFactory` in `flink-conf.yaml`.
> 
> Cheers,
> Till
> 
> On Thu, Nov 22, 2018 at 7:18 AM Ali, Kasif  <mailto:kasif@gs.com>> wrote:
> Hello,
> 
>  
> 
> Looking at existing restart strategies they are kind of generic. We have a 
> requirement to restart the job only in case of specific exception/issues.
> 
> What would be the best way to have a re start strategy which is based on few 
> rules like looking at particular type of exception or some extra condition 
> checks which are application specific.?
> 
>  
> 
> Just a background on one specific issue which invoked this requirement is 
> slots not getting released when the job finishes. In our applications, we 
> keep track of jobs submitted with the amount of parallelism allotted to it.  
> Once the job finishes we assume that the slots are free and try to submit 
> next set of jobs which at times fail with error  “not enough slots available”.
> 
>  
> 
> So we think a job re start can solve this issue but we only want to re start 
> only if this particular situation is encountered.
> 
>  
> 
> Please let us know If there are better ways to solve this problem other than 
> re start strategy.
> 
>  
> 
> Thanks,
> 
> Kasif
> 
>  
> 
> 
> 
> Your Personal Data: We may collect and process information about you that may 
> be subject to data protection laws. For more information about how we use and 
> disclose your personal data, how we protect your information, our legal basis 
> to use your information, your rights and who you can contact, please refer 
> to: www.gs.com/privacy-notices <http://www.gs.com/privacy-notices>

--
Ken Krugler
http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr



Re: Flink restart strategy on specific exception

2020-05-12 Thread Ken Krugler
Hi Til,

Sorry to resurface an ancient question, but is there a working example anywhere 
of setting a custom restart strategy?

Asking because I’ve been wandering through the Flink 1.9 code base for a while, 
and the restart strategy implementation is…pretty tangled.

From what I’ve been able to figure out, you have to provide a factory class, 
something like this:

Configuration config = new Configuration();
config.setString(ConfigConstants.RESTART_STRATEGY, 
MyRestartStrategyFactory.class.getCanonicalName());
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment(4, config);

That factory class should extend RestartStrategyFactory, but it also needs to 
implement a static method that looks like:

public static MyRestartStrategyFactory createFactory(Configuration config) {
return new MyRestartStrategyFactory();
}

I wasn’t able to find any documentation that mentioned this particular method 
being a requirement.

And also the documentation at 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#fault-tolerance
 
<https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#fault-tolerance>
 doesn’t mention you can set a custom class name for the restart-strategy.

Thanks,

— Ken


> On Nov 22, 2018, at 8:18 AM, Till Rohrmann  wrote:
> 
> Hi Kasif,
> 
> I think in this situation it is best if you defined your own custom 
> RestartStrategy by specifying a class which has a `RestartStrategyFactory 
> createFactory(Configuration configuration)` method as `restart-strategy: 
> MyRestartStrategyFactoryFactory` in `flink-conf.yaml`.
> 
> Cheers,
> Till
> 
> On Thu, Nov 22, 2018 at 7:18 AM Ali, Kasif  <mailto:kasif@gs.com>> wrote:
> Hello,
> 
>  
> 
> Looking at existing restart strategies they are kind of generic. We have a 
> requirement to restart the job only in case of specific exception/issues.
> 
> What would be the best way to have a re start strategy which is based on few 
> rules like looking at particular type of exception or some extra condition 
> checks which are application specific.?
> 
>  
> 
> Just a background on one specific issue which invoked this requirement is 
> slots not getting released when the job finishes. In our applications, we 
> keep track of jobs submitted with the amount of parallelism allotted to it.  
> Once the job finishes we assume that the slots are free and try to submit 
> next set of jobs which at times fail with error  “not enough slots available”.
> 
>  
> 
> So we think a job re start can solve this issue but we only want to re start 
> only if this particular situation is encountered.
> 
>  
> 
> Please let us know If there are better ways to solve this problem other than 
> re start strategy.
> 
>  
> 
> Thanks,
> 
> Kasif
> 
>  
> 
> 
> 
> Your Personal Data: We may collect and process information about you that may 
> be subject to data protection laws. For more information about how we use and 
> disclose your personal data, how we protect your information, our legal basis 
> to use your information, your rights and who you can contact, please refer 
> to: www.gs.com/privacy-notices <http://www.gs.com/privacy-notices>

--
Ken Krugler
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr



Re: Restore from savepoint with Iterations

2020-05-04 Thread Ken Krugler
Hi Ashish,

The workaround we did was to throttle data flowing in the iteration (in code), 
though not sure if that’s possible for your situation.

You could remove the iteration by writing to a Kafka topic at the end of the 
part of your workflow that is currently an iteration, and then consuming from 
that same topic as your “iteration" source.

— Ken


> On May 4, 2020, at 7:32 PM, Ashish Pokharel  wrote:
> 
> Hi Ken,
> 
> Thanks for the quick response!
> 
> I came across FLIP-15 on my next google search after I sent email :) It 
> DEFINITELY looks that way. As I was watching logs and nature of how job gets 
> stuck it does look like buffer is blocked. But FLIP-15 has not moved further 
> though. So there are no workarounds at all at this point? Perhaps a technique 
> to block Kafka Consumer for some time? Even that may get me going but looks 
> like there is probability of this happening during the normal processing as 
> your use case demonstrates. I am using iteration with no timeouts for prod 
> job, using timeouts only in unit testing.Theory was in prod input stream will 
> be indefinite and sometime long lull of no event might happen during 
> maintenance, backlog etc. I really would like to avoid a bloat in the DAG by 
> repeating same functions with filters and side outputs. Other than obvious 
> repetition, it will increase the site of states by a factor. Even those 
> slowly moving dimensions are not light (around half billion every day) :) 
> 
>> On May 4, 2020, at 10:13 PM, Ken Krugler > <mailto:kkrugler_li...@transpac.com>> wrote:
>> 
>> Hi Ashish,
>> 
>> Wondering if you’re running into the gridlock problem I mention on slide #25 
>> here: 
>> https://www.slideshare.net/FlinkForward/flink-forward-san-francisco-2018-ken-krugler-building-a-scalable-focused-web-crawler-with-flink
>>  
>> <https://www.slideshare.net/FlinkForward/flink-forward-san-francisco-2018-ken-krugler-building-a-scalable-focused-web-crawler-with-flink>
>> 
>> If the iteration path has too much data in it, then the network buffer at 
>> the head of the iteration can fill up, and it never clears out because the 
>> operator consuming those buffers is blocked writing to the next operator in 
>> the iteration, and so on back to the head.
>> 
>> We ran into this when outlinks from web pages caused fan-out/amplification 
>> of the data being iterated, but maybe you hit it with restoring from state.
>> 
>> — Ken
>> 
>> 
>>> On May 4, 2020, at 6:41 PM, Ashish Pokharel >> <mailto:ashish...@yahoo.com>> wrote:
>>> 
>>> Hi all,
>>> 
>>> Hope everyone is doing well!
>>> 
>>> I am running into what seems like a deadlock (application stalled) 
>>> situation with a Flink streaming job upon restore from savepoint. Job has a 
>>> slowly moving stream (S1) that needs to be “stateful” and a continuous 
>>> stream (S2) which is “joined” with slow moving stream (S1). Some level of 
>>> loss/repetition is acceptable in continuous stream (S2) and hence can rely 
>>> on something like Kafka consumer states upon restarts etc. Continuous 
>>> stream (S2) however needs to be iterated through states from slowly moving 
>>> streams (S1) a few times (mostly 2). States are fair sized (ends up being 
>>> 15GB on HDFS). When job is restarted with no continuous data (S2) on topic 
>>> job starts up, restores states and does it’s initial checkpoint within 3 
>>> minutes. However, when app is started from savepoint and continuous stream 
>>> (S2) is actually present in Kafka it seems like application comes to a 
>>> halt. Looking at progress of checkpoints, it seems like every attempt is 
>>> stuck after until some timeouts happen at around 10 mins. If iteration on 
>>> stream is removed app can successfully start and checkpoint even when 
>>> continuous stream (S2) is flowing in as well. Unfortunately we are working 
>>> on a hosted environment for both data and platform, hence debugging with 
>>> thread dumps etc will be challenging. 
>>> 
>>> I couldn’t find a known issue on this but was wondering if anyone has seen 
>>> such behavior or know of any issues in past. It does look like 
>>> checkpointing has to be set to forced to get an iterative job to checkpoint 
>>> in the first place (an option that is marked deprecated already - working 
>>> on 1.8.2 version as of now). I do understand challenges around consistent 
>>> checkpointing of iterative stream. As I mentioned earlier, what I really 
>>> want to maintain for the

Re: Restore from savepoint with Iterations

2020-05-04 Thread Ken Krugler
Hi Ashish,

Wondering if you’re running into the gridlock problem I mention on slide #25 
here: 
https://www.slideshare.net/FlinkForward/flink-forward-san-francisco-2018-ken-krugler-building-a-scalable-focused-web-crawler-with-flink
 
<https://www.slideshare.net/FlinkForward/flink-forward-san-francisco-2018-ken-krugler-building-a-scalable-focused-web-crawler-with-flink>

If the iteration path has too much data in it, then the network buffer at the 
head of the iteration can fill up, and it never clears out because the operator 
consuming those buffers is blocked writing to the next operator in the 
iteration, and so on back to the head.

We ran into this when outlinks from web pages caused fan-out/amplification of 
the data being iterated, but maybe you hit it with restoring from state.

— Ken


> On May 4, 2020, at 6:41 PM, Ashish Pokharel  wrote:
> 
> Hi all,
> 
> Hope everyone is doing well!
> 
> I am running into what seems like a deadlock (application stalled) situation 
> with a Flink streaming job upon restore from savepoint. Job has a slowly 
> moving stream (S1) that needs to be “stateful” and a continuous stream (S2) 
> which is “joined” with slow moving stream (S1). Some level of loss/repetition 
> is acceptable in continuous stream (S2) and hence can rely on something like 
> Kafka consumer states upon restarts etc. Continuous stream (S2) however needs 
> to be iterated through states from slowly moving streams (S1) a few times 
> (mostly 2). States are fair sized (ends up being 15GB on HDFS). When job is 
> restarted with no continuous data (S2) on topic job starts up, restores 
> states and does it’s initial checkpoint within 3 minutes. However, when app 
> is started from savepoint and continuous stream (S2) is actually present in 
> Kafka it seems like application comes to a halt. Looking at progress of 
> checkpoints, it seems like every attempt is stuck after until some timeouts 
> happen at around 10 mins. If iteration on stream is removed app can 
> successfully start and checkpoint even when continuous stream (S2) is flowing 
> in as well. Unfortunately we are working on a hosted environment for both 
> data and platform, hence debugging with thread dumps etc will be challenging. 
> 
> I couldn’t find a known issue on this but was wondering if anyone has seen 
> such behavior or know of any issues in past. It does look like checkpointing 
> has to be set to forced to get an iterative job to checkpoint in the first 
> place (an option that is marked deprecated already - working on 1.8.2 version 
> as of now). I do understand challenges around consistent checkpointing of 
> iterative stream. As I mentioned earlier, what I really want to maintain for 
> the most part are states of slowly moving dimensions. Iterations does solve 
> the problem at hand (multiple loops of logic) pretty gracefully but not being 
> able to restore from savepoint will be a show stopper. 
> 
> Will appreciate any pointer / suggestions.
> 
> Thanks in advance, 
> 
> Ashish

--
Ken Krugler
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr



Status of FLINK-12692 (Support disk spilling in HeapKeyedStateBackend)

2020-01-29 Thread Ken Krugler
Hi Yu Li,

It looks like this stalled out a bit, from May of last year, and won’t make it 
into 1.10.

I’m wondering if there’s a version in Blink (as a completely separate state 
backend?) that could be tried out?

Thanks,

— Ken

--
Ken Krugler
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr



Re: StreamingFileSink doesn't close multipart uploads to s3?

2020-01-10 Thread Ken Krugler
Hi Kostas,

I didn’t see a follow-up to this, and have also run into this same issue of 
winding up with a bunch of .inprogress files when a bounded input stream ends 
and the job terminates.

When StreamingFileSystem.close() is called, shouldn’t all buckets get 
auto-rolled, so that the .inprogress files become part-xxx files?

Thanks,

— Ken


> On Dec 9, 2019, at 6:56 PM, Jingsong Li  wrote:
> 
> Hi Kostas,
> 
> I  took a look to StreamingFileSink.close, it just delete all temporary 
> files. I know it is for failover. When Job fail, it should just delete temp 
> files for next restart.
> But for testing purposes, we just want to run a bounded streaming job. If 
> there is no checkpoint trigger, no one will move the final temp files to 
> output path, so the result of this job is wrong.
> Do you have any idea about this? Can we distinguish "fail close" from 
> "success finish close" in StreamingFileSink?
> 
> Best,
> Jingsong Lee
> 
> On Mon, Dec 9, 2019 at 10:32 PM Kostas Kloudas  <mailto:kklou...@gmail.com>> wrote:
> Hi Li,
> 
> This is the expected behavior. All the "exactly-once" sinks in Flink
> require checkpointing to be enabled.
> We will update the documentation to be clearer in the upcoming release.
> 
> Thanks a lot,
> Kostas
> 
> On Sat, Dec 7, 2019 at 3:47 AM Li Peng  <mailto:li.p...@doordash.com>> wrote:
> >
> > Ok I seem to have solved the issue by enabling checkpointing. Based on the 
> > docs (I'm using 1.9.0), it seemed like only 
> > StreamingFileSink.forBulkFormat() should've required checkpointing, but 
> > based on this experience, StreamingFileSink.forRowFormat() requires it too! 
> > Is this the intended behavior? If so, the docs should probably be updated.
> >
> > Thanks,
> > Li
> >
> > On Fri, Dec 6, 2019 at 2:01 PM Li Peng  > <mailto:li.p...@doordash.com>> wrote:
> >>
> >> Hey folks, I'm trying to get StreamingFileSink to write to s3 every 
> >> minute, with flink-s3-fs-hadoop, and based on the default rolling policy, 
> >> which is configured to "roll" every 60 seconds, I thought that would be 
> >> automatic (I interpreted rolling to mean actually close a multipart upload 
> >> to s3).
> >>
> >> But I'm not actually seeing files written to s3 at all, instead I see a 
> >> bunch of open multipart uploads when I check the AWS s3 console, for 
> >> example:
> >>
> >>  "Uploads": [
> >> {
> >> "Initiated": "2019-12-06T20:57:47.000Z",
> >> "Key": "2019-12-06--20/part-0-0"
> >> },
> >> {
> >> "Initiated": "2019-12-06T20:57:47.000Z",
> >> "Key": "2019-12-06--20/part-1-0"
> >> },
> >> {
> >> "Initiated": "2019-12-06T21:03:12.000Z",
> >> "Key": "2019-12-06--21/part-0-1"
> >> },
> >> {
> >> "Initiated": "2019-12-06T21:04:15.000Z",
> >> "Key": "2019-12-06--21/part-0-2"
> >> },
> >> {
> >> "Initiated": "2019-12-06T21:22:23.000Z"
> >> "Key": "2019-12-06--21/part-0-3"
> >> }
> >> ]
> >>
> >> And these uploads are being open for a long time. So far after an hour, 
> >> none of the uploads have been closed. Is this the expected behavior? If I 
> >> wanted to get these uploads to actually write to s3 quickly, do I need to 
> >> configure the hadoop stuff to get that done, like setting a smaller 
> >> buffer/partition size to force it to upload?
> >>
> >> Thanks,
> >> Li
> 
> 
> -- 
> Best, Jingsong Lee

--
Ken Krugler
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr



How to assign a UID to a KeyedStream?

2020-01-09 Thread Ken Krugler
Hi all,

[Of course, right after hitting send I realized I could just do 
rides.getTransformation().setUid(“blah”), ditto for the fares stream. Might be 
something to add to the docs, or provide a .uid() method on KeyedStreams for 
syntactic sugar]

Just for grins, I disabled auto-generated UIDs for the taxi rides/fares state 
example in the online tutorial. 

env.getConfig().disableAutoGeneratedUIDs();

I then added UIDs for all operators, sources & sinks. But I still get the 
following when calling env.getExecutionPlan() or env.execute():

java.lang.IllegalStateException: Auto generated UIDs have been disabled but no 
UID or hash has been assigned to operator Partition
at 
org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:297)
at 
org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformTwoInputTransform(StreamGraphGenerator.java:682)
at 
org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:252)
at 
org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:209)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1529)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionPlan(StreamExecutionEnvironment.java:1564)
at com.citi.flink.RidesAndFaresTool.main(RidesAndFaresTool.java:63)

The simple workflow is:

DataStream rides = env
.addSource(new CheckpointedTaxiRideSource(ridesFile, 
servingSpeedFactor))
.uid("source: taxi rides")
.name("taxi rides")
.filter((TaxiRide ride) -> ride.isStart)
.uid("filter: only start rides")
.name("only start rides")
.keyBy((TaxiRide ride) -> ride.rideId);

DataStream fares = env
.addSource(new CheckpointedTaxiFareSource(faresFile, 
servingSpeedFactor))
.uid("source: taxi fares")
.name("taxi fares")
.keyBy((TaxiFare fare) -> fare.rideId);

DataStreamSink> enriched = rides
.connect(fares)
.flatMap(new EnrichmentFunction())
.uid("function: enrich rides with fares")
.name("enrich rides with fares")
.addSink(sink)
.uid("sink: enriched taxi rides")
.name("enriched taxi rides");

Internally the exception is thrown when the EnrichFunction (a 
RichCoFlatMapFunction) is being transformed by 
StreamGraphGenerator.transformTwoInputTransform().

This calls StreamGraphGenerator.transform() with the two inputs, but the 
Transformation for each input is a PartitionTransformation.

I don’t see a way to set the UID following the keyBy(), as a KeyedStream 
creates the PartitionTransformation without a UID.

Any insight into setting the UID properly here? Or should 
StreamGraphGenerator.transform() skip the no-uid check for 
PartitionTransformation, since that’s not an operator with state?

Thanks,

— Ken

--
Ken Krugler
http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr



How to assign a UID to a KeyedStream?

2020-01-09 Thread Ken Krugler
Hi all,

Just for grins, I disabled auto-generated UIDs for the taxi rides/fares state 
example in the online tutorial. 

env.getConfig().disableAutoGeneratedUIDs();

I then added UIDs for all operators, sources & sinks. But I still get the 
following when calling env.getExecutionPlan() or env.execute():

java.lang.IllegalStateException: Auto generated UIDs have been disabled but no 
UID or hash has been assigned to operator Partition
at 
org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:297)
at 
org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformTwoInputTransform(StreamGraphGenerator.java:682)
at 
org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:252)
at 
org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:209)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1529)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionPlan(StreamExecutionEnvironment.java:1564)
at com.citi.flink.RidesAndFaresTool.main(RidesAndFaresTool.java:63)

The simple workflow is:

DataStream rides = env
.addSource(new CheckpointedTaxiRideSource(ridesFile, 
servingSpeedFactor))
.uid("source: taxi rides")
.name("taxi rides")
.filter((TaxiRide ride) -> ride.isStart)
.uid("filter: only start rides")
.name("only start rides")
.keyBy((TaxiRide ride) -> ride.rideId);

DataStream fares = env
.addSource(new CheckpointedTaxiFareSource(faresFile, 
servingSpeedFactor))
.uid("source: taxi fares")
.name("taxi fares")
.keyBy((TaxiFare fare) -> fare.rideId);

DataStreamSink> enriched = rides
.connect(fares)
.flatMap(new EnrichmentFunction())
.uid("function: enrich rides with fares")
.name("enrich rides with fares")
.addSink(sink)
.uid("sink: enriched taxi rides")
.name("enriched taxi rides");

Internally the exception is thrown when the EnrichFunction (a 
RichCoFlatMapFunction) is being transformed by 
StreamGraphGenerator.transformTwoInputTransform().

This calls StreamGraphGenerator.transform() with the two inputs, but the 
Transformation for each input is a PartitionTransformation.

I don’t see a way to set the UID following the keyBy(), as a KeyedStream 
creates the PartitionTransformation without a UID.

Any insight into setting the UID properly here? Or should 
StreamGraphGenerator.transform() skip the no-uid check for 
PartitionTransformation, since that’s not an operator with state?

Thanks,

— Ken

--
Ken Krugler
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr



Re: Batch mode with Flink 1.8 unstable?

2019-09-18 Thread Ken Krugler
ou found:
> 
> (1) Input splits and oversized RPC
> 
> Your explanation seems correct, timeout due to dropping oversized RPC message.
> 
> I don't quite understand how that exactly happens, because the size limit is 
> 10 MB and input splits should be rather small in most cases.
> Are you running custom sources which put large data into splits? Maybe 
> accidentally, by having a large serialized closure in the splits?
> 
> The fix would be this issue: https://issues.apache.org/jira/browse/FLINK-4399 
> <https://issues.apache.org/jira/browse/FLINK-4399>  
> 
> (2) TM early release
> 
> The 1.8 version had a fix that should work for regular cases without 
> fine-grained failure recovery.
> 1.9 should have a more general fix that also works for fine-grained recovery
> 
> Are you trying to use the finer grained failover with the batch job?
> The finer-grained failover is not working in batch for 1.8, that is why it is 
> not an advertised feature (it only works for streaming so far).
> 
> The goal is that this works in the 1.9 release (aka the batch fixup release)
> 
> (3) Hang in Processing
> 
> I think a thread dump (jstack) from the TMs would be helpful to diagnose that.
> There are known issues with the current batch shuffle implementation, which 
> is why 1.9 is getting a new bounded-blocking stream shuffle implementation.
> 
> Best,
> Stephan
> 
> 
> 
> 
> 
> 
> On Mon, Jun 24, 2019 at 2:32 AM Ken Krugler  <mailto:kkrugler_li...@transpac.com>> wrote:
> Hi all,
> 
> I’ve been running a somewhat complex batch job (in EMR/YARN) with Flink 
> 1.8.0, and it regularly fails, but for varying reasons.
> 
> Has anyone else had stability with 1.8.0 in batch mode and non-trivial 
> workflows?
> 
> Thanks,
> 
> — Ken
> 
> 1. TimeoutException getting input splits
> 
> The batch job starts by processing a lot of files that live in S3. During 
> this phase, I sometimes see:
> 
> 2019-06-20 01:20:22,659 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- CHAIN 
> DataSource (at createInput(ExecutionEnvironment.java:549) 
> (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> Map (ad 
> dailies) -> Filter (Filter at createWorkflow(MyWorkflow.java:34)) -> Filter 
> (Filter at createWorkflow(MyWorkflow.java:36)) -> Filter (Filter at 
> createWorkflow(MyWorkflow.java:38)) -> Map (Key Extractor) -> Combine (Reduce 
> at createWorkflow(MyWorkflow.java:41)) (31/32) 
> (8a8cbea47394c3d638910c36ac62d877) switched from RUNNING to FAILED.
> java.lang.RuntimeException: Could not retrieve next input split.
>   at 
> org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:367)
>   at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:160)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: 
> org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException: 
> Requesting the next input split failed.
>   at 
> org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69)
>   at 
> org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:365)
>   ... 3 more
> Caused by: java.util.concurrent.TimeoutException
>   at 
> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
>   at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>   at 
> org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61)
>   ... 4 more
> 2019-06-20 01:20:22,664 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Job Flink 
> Java Job at Thu Jun 20 01:11:28 UTC 2019 (5564b8980f40d788d7ef312318709e4d) 
> switched from state RUNNING to FAILING.
> java.lang.RuntimeException: Could not retrieve next input split.
>   at 
> org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:367)
>   at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:160)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: 
> org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException: 
> Requesting the next input split failed.
>   at 
> org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69)
>   at 
> org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:365)
>   ... 3 more
> Caused by: java.util.concurren

Potential block size issue with S3 binary files

2019-08-28 Thread Ken Krugler
Hi all,

Wondering if anyone else has run into this.

We write files to S3 using the SerializedOutputFormat. When we 
read them back, sometimes we get deserialization errors where the data seems to 
be corrupt.

After a lot of logging, the weathervane of blame pointed towards the block size 
somehow not being the same between the write (where it’s 64MB) and the read 
(unknown).

When I added a call to SerializedInputFormat.setBlockSize(64MB), the problems 
went away.

It looks like both input and output formats use fs.getDefaultBlockSize() to set 
this value by default, so maybe the root issue is S3 somehow reporting 
different values.

But it does feel a bit odd that we’re relying on this default setting, versus 
it being recorded in the file during the write phase.

And it’s awkward to try to set the block size on the write, as you need to set 
it in the environment conf, which means it applies to all output files in the 
job.

— Ken

--
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



Re: Best way to compute the difference between 2 datasets

2019-07-22 Thread Ken Krugler
Hi Juan,

If you want to deduplicate, then you could group by the record, and use a (very 
simple) reduce function to only emit a record if the group contains one element.

There will be performance issues, though - Flink will have to generate all 
groups first, which typically means spilling to disk if the data set has any 
significant size.

— Ken

PS - I assume that you’ve implemented a valid hashCode()/equals() for the 
record.


> On Jul 22, 2019, at 8:29 AM, Juan Rodríguez Hortalá 
>  wrote:
> 
> Hi, 
> 
> I've been trying to write a function to compute the difference between 2 
> datasets. With that I mean computing a dataset that has all the elements of a 
> dataset that are not present in another dataset. I first tried using 
> coCogroup, but it was very slow in a local execution environment, and often 
> was crashing with OOM. Then I tried with leftOuterJoin and got similar 
> results. I then tried the following:
> 
> private[this] def minussWithSortPartition(other: DataSet[T]): DataSet[T] = {
>   val selfMarked: DataSet[(T, Boolean)] = self.map((_, true))
>   val otherMarked: DataSet[(T, Boolean)] = other.map((_, false))
> 
>   val all = selfMarked.union(otherMarked)
> .partitionByHash(0) // so occurrences of the same value in both datasets 
> go to the same partition
> .sortPartition[(T, Boolean)](identity, Order.ASCENDING)
>   all.mapPartition[T] { (partitionIter: Iterator[(T, Boolean)], collector: 
> Collector[T]) =>
> var latestOtherOpt: Option[T] = None
> partitionIter.foreach {
>   case (otherElem, false) => latestOtherOpt = Some(otherElem)
>   case (selfElem, true) =>
> if (latestOtherOpt != Some(selfElem)) collector.collect(selfElem)
> }
>   }
> }
> 
> This is basically the idea of removing duplicates in a collection by first 
> sorting it, and then traversing it from beginning to end, removing the 
> elements that are consecutive to an element we just saw. That is extended 
> here to mark whether an element is coming from `self` or from `other`, 
> keeping only elements from `self` that are not following another occurrence 
> of the same element in `other`. That code is also really slow on a local 
> execution environment, and crashes a lot. But when I replace `sortPartition` 
> by sorting each partition in memory inside a mapPartition, it works ok with 
> the local execution environment.
> 
> private[this] def minusWithInMemoryPartition(other: DataSet[T]): DataSet[T] = 
> {
>   val selfMarked: DataSet[(T, Boolean)] = self.map((_, true))
>   val otherMarked: DataSet[(T, Boolean)] = other.map((_, false))
>   val all = selfMarked.union(otherMarked)
> .partitionByHash(0) // so occurrences of the same value in both datasets 
> go to the same partition
>   all.mapPartition[T] { (partitionIter: Iterator[(T, Boolean)], collector: 
> Collector[T]) =>
> val sortedPartition = {
>   val partition = partitionIter.toArray
>   util.Sorting.quickSort(partition)
>   partition
> }
> var latestOtherOpt: Option[T] = None
> sortedPartition.foreach {
>   case (otherElem, false) => latestOtherOpt = Some(otherElem)
>   case (selfElem, true) =>
> if (latestOtherOpt != Some(selfElem)) collector.collect(selfElem)
> }
>   }
> }
> 
> I'm surprised by such a big difference. This is my code 
> <https://github.com/demiourgoi/flink-check/blob/master/flink-check/src/main/scala/es/ucm/fdi/sscheck/matcher/specs2/flink/package.scala#L16>,
>  and a test 
> <https://github.com/demiourgoi/flink-check/blob/master/flink-check/src/test/scala/es/ucm/fdi/sscheck/matcher/specs2/flink/FlinkMatchersSpec.scala#L69>
>  I use for running this. I'm very surprised with these performance issues 
> with such small DataSet sizes, with less than 20 elements. Is this because 
> I'm running the program with a local execution environment?, are operations 
> like coGroup, leftOuterJoin or sorPartition implemented inefficiently in the 
> local environment? If that is the case, is there any other alternative 
> environment recommended for development in a single machine, where I won't be 
> experiencing these issues with those operations? Should I expect the function 
> `minussWithSortPartition` above to run efficiently on a cluster? Or maybe 
> there is something wrong with my code? Are there any plans to provide a 
> built-in minus operator in future versions of Flink?
> 
> Thanks, 
> 
> Juan 
> 
> 

--
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



Re: Disk full problem faced due to the Flink tmp directory contents

2019-07-10 Thread Ken Krugler
Hi Konstantinos,

Typically the data that you are seeing is from records being spilled to disk 
during groupBy/join operations, where the size of one (or multiple, for the 
join case) data sets exceeds what will fit in memory.

And yes, these files can get big, e.g. as big as the sum of your input data 
sizes.

If you split your data stream (one data set being processed by multiple 
operators) then the summed temp size can be multiplied.

You can specify multiple disks to use as temp directories (comma-separated list 
in Flink config), so that’s one way to avoid a single disk becoming too full.

You can take a single workflow and break it into multiple pieces that you run 
sequentially, as that can reduce the high water mark for total spilled files.

You can write intermediate results to a file, versus relying on spills. Though 
if you use HDFS, and HDFS is using the same disks in your cluster, that 
obviously won’t help, and in fact can be worse due to replication of data.

As far as auto-deletion goes, I don’t think Flink supports this. In our case, 
after a job has run we can a shell script (via ssh) on slaves to remove temp 
files.

— Ken

PS - note that logging can also chew up a lot of space, if you set the log 
level to DEBUG, due to HTTP wire traffic. 

> On Jul 10, 2019, at 3:51 AM, Papadopoulos, Konstantinos 
>  wrote:
> 
> Hi all,
>  
> We are developing several batch processing applications using the DataSet API 
> of the Apache Flink.
> For the time being, we are facing an issue with one of our production 
> environments since its disk usage increase enormously. After a quick 
> investigation, we concluded that the /tmp/flink-io-{} directory (under the 
> parent directory of the Apache Flink deployment) contains files of more than 
> 1TB and we need to regularly delete them in order to return our system to its 
> proper functionality. On the first sight, there is no significant impact when 
> deleting these temp files. So, I need your help to answer the following 
> questions:
> What kind of data does it stored to the aforementioned directory?
> Why does the respective files have such an enormous size?
> How can we limit the size of the data written to the respective directory?
> Is there any way  to delete such files automatically when not needed yet?
>  
> Thanks in advance for your help,
> Konstantinos

--
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



Re: Job tasks are not balance among taskmanagers

2019-07-02 Thread Ken Krugler
Hi Ever,

As Haibo noted, that’s a known regression.

If you fall back to the older approach of having multiple TMs per slave, each 
with one slot, then Flink (as of 1.7/1.8) does a better job of distributing 
work.

— Ken

> On Jul 1, 2019, at 9:23 PM, Haibo Sun  wrote:
> 
> Hi, Ever
> 
> This is a regression wrt the pre Flip-6 code, and the following JIRA 
> dedicated to this issue.
> 
> https://issues.apache.org/jira/browse/FLINK-12122 
> <https://issues.apache.org/jira/browse/FLINK-12122> 
>  
> Best,
> Haibo
> 
> 
> 
> At 2019-07-02 11:42:27, "Ever" <439674...@qq.com> wrote:
> Hi, there're 3 taskManager nodes within our testing flink cluster, whose 
> version is 1.8. And each one have 10 taskslots.
> 
> Now I have a job with parallelism 3. 
> I expected the 3 tasks will be located at 3 different taskManagers, just as 
> Example 2 below:
> 
> 
> But it came out that all 3 tasks are all located at the same taskmanager.
> <3503f...@0bbe000a.a3d21a5d.jpg>
> 
> <2608f...@99c54575.a3d21a5d.jpg>
> 
> Why?

--
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



Re: Batch mode with Flink 1.8 unstable?

2019-07-01 Thread Ken Krugler
Hi Stephan,

Thanks for responding, comments inline below…

Regards,

— Ken

> On Jun 26, 2019, at 7:50 AM, Stephan Ewen  wrote:
> 
> Hi Ken!
> 
> Sorry to hear you are going through this experience. The major focus on 
> streaming so far means that the DataSet API has stability issues at scale.
> So, yes, batch mode in current Flink version can be somewhat tricky.
> 
> It is a big focus of Flink 1.9 to fix the batch mode, finally, and by 
> addressing batch specific scheduling / recovery / and shuffle issues.
> 
> Let me go through the issues you found:
> 
> (1) Input splits and oversized RPC
> 
> Your explanation seems correct, timeout due to dropping oversized RPC message.
> 
> I don't quite understand how that exactly happens, because the size limit is 
> 10 MB and input splits should be rather small in most cases.
> Are you running custom sources which put large data into splits? Maybe 
> accidentally, by having a large serialized closure in the splits?

As per my email to Till, I don’t feel like I’m doing anything tricky, though I 
am reading Hadoop sequence files that contain Cascading Tuple/Tuple key/value 
data.

> The fix would be this issue: https://issues.apache.org/jira/browse/FLINK-4399 
> <https://issues.apache.org/jira/browse/FLINK-4399>  
> 
> (2) TM early release
> 
> The 1.8 version had a fix that should work for regular cases without 
> fine-grained failure recovery.
> 1.9 should have a more general fix that also works for fine-grained recovery
> 
> Are you trying to use the finer grained failover with the batch job?

No, or at least I’m not doing anything special to enable it.

Is there something I need to do to explicitly _disable_ it?

> The finer-grained failover is not working in batch for 1.8, that is why it is 
> not an advertised feature (it only works for streaming so far).
> 
> The goal is that this works in the 1.9 release (aka the batch fixup release)
> 
> (3) Hang in Processing
> 
> I think a thread dump (jstack) from the TMs would be helpful to diagnose that.
> There are known issues with the current batch shuffle implementation, which 
> is why 1.9 is getting a new bounded-blocking stream shuffle implementation.

Next time it happens, I’ll dump the threads.

I should have done it this time, but was in a hurry to kill the EMR cluster as 
it had been costing money all night long :(



> On Mon, Jun 24, 2019 at 2:32 AM Ken Krugler  <mailto:kkrugler_li...@transpac.com>> wrote:
> Hi all,
> 
> I’ve been running a somewhat complex batch job (in EMR/YARN) with Flink 
> 1.8.0, and it regularly fails, but for varying reasons.
> 
> Has anyone else had stability with 1.8.0 in batch mode and non-trivial 
> workflows?
> 
> Thanks,
> 
> — Ken
> 
> 1. TimeoutException getting input splits
> 
> The batch job starts by processing a lot of files that live in S3. During 
> this phase, I sometimes see:
> 
> 2019-06-20 01:20:22,659 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- CHAIN 
> DataSource (at createInput(ExecutionEnvironment.java:549) 
> (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> Map (ad 
> dailies) -> Filter (Filter at 
> createWorkflow(AdvertiserSimilarityWorkflow.java:34)) -> Filter (Filter at 
> createWorkflow(AdvertiserSimilarityWorkflow.java:36)) -> Filter (Filter at 
> createWorkflow(AdvertiserSimilarityWorkflow.java:38)) -> Map (Key Extractor) 
> -> Combine (Reduce at createWorkflow(AdvertiserSimilarityWorkflow.java:41)) 
> (31/32) (8a8cbea47394c3d638910c36ac62d877) switched from RUNNING to FAILED.
> java.lang.RuntimeException: Could not retrieve next input split.
>   at 
> org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:367)
>   at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:160)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: 
> org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException: 
> Requesting the next input split failed.
>   at 
> org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69)
>   at 
> org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:365)
>   ... 3 more
> Caused by: java.util.concurrent.TimeoutException
>   at 
> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
>   at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>   at 
> org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61)
> 

Random errors reading binary files in batch workflow

2019-07-01 Thread Ken Krugler
Hi all,

My new latest issue is that regularly (but not always) I get a 
java.io.UTFDataFormatException when trying to read in serialized records.

I can re-run the exact same workflow, on the same cluster, with the same input 
data, and sometimes it works.

It seems like the higher the parallelism, the more likely that an error happens.

The fact that sometimes it’s OK feels like it’s not a problem with corrupted 
records (previously written out by an upstream workflow), as that should cause 
a consistent failure.

The error occurs when reading from both S3 and HDFS.

When the error occurs, it looks like this (fails on deserializing the first 
field in the POJO):

2019-07-01 22:12:02,542 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- DataSource 
(feature vector source) (36/64) (577f1375e15df4a5352a405fb8b21204) switched 
from RUNNING to FAILED.
java.io.UTFDataFormatException: malformed input around byte 2
at java.io.DataInputStream.readUTF(DataInputStream.java:634)
at java.io.DataInputStream.readUTF(DataInputStream.java:564)
at 
com.adbeat.similarity.FeatureVectorWithCountry.read(FeatureVectorWithCountry.java:47)
at 
org.apache.flink.api.common.io.SerializedInputFormat.deserialize(SerializedInputFormat.java:39)
at 
org.apache.flink.api.common.io.SerializedInputFormat.deserialize(SerializedInputFormat.java:32)
at 
org.apache.flink.api.common.io.BinaryInputFormat.nextRecord(BinaryInputFormat.java:305)
at 
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:192)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)

FeatureVectorWithCountry is a POJO that implements the IOReadableWriteable 
interface.

It also sometimes fails while reading a different POJO, which is in a different 
input DataSet in the same workflow:

2019-07-01 00:39:05,829 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- DataSource (at 
createWorkflow(AdvertiserSimilarityWorkflow.java:88) 
(org.apache.flink.api.common.io.SerializedInputFormat)) (17/48) 
(021bc0011dd523a4314d4e52f97a2486) switched from RUNNING to FAILED.
java.io.UTFDataFormatException: malformed input around byte 50
at java.io.DataInputStream.readUTF(DataInputStream.java:656)
at java.io.DataInputStream.readUTF(DataInputStream.java:564)
at com.adbeat.similarity.advertiser.AdText.read(AdText.java:170)
at 
org.apache.flink.api.common.io.SerializedInputFormat.deserialize(SerializedInputFormat.java:39)
at 
org.apache.flink.api.common.io.SerializedInputFormat.deserialize(SerializedInputFormat.java:32)
at 
org.apache.flink.api.common.io.BinaryInputFormat.nextRecord(BinaryInputFormat.java:305)
at 
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:192)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)

I don’t see any preceding errors in the logs.

It seems like the calculation of valid starting offsets in a split are 
sometimes wrong, and thus it starts trying to read a record from an incorrect 
location.

Has anyone else run into this?

Thanks,

— Ken

--
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



Re: Batch mode with Flink 1.8 unstable?

2019-07-01 Thread Ken Krugler
INK-4399>  
> 
> (2) TM early release
> 
> The 1.8 version had a fix that should work for regular cases without 
> fine-grained failure recovery.
> 1.9 should have a more general fix that also works for fine-grained recovery
> 
> Are you trying to use the finer grained failover with the batch job?
> The finer-grained failover is not working in batch for 1.8, that is why it is 
> not an advertised feature (it only works for streaming so far).
> 
> The goal is that this works in the 1.9 release (aka the batch fixup release)
> 
> (3) Hang in Processing
> 
> I think a thread dump (jstack) from the TMs would be helpful to diagnose that.
> There are known issues with the current batch shuffle implementation, which 
> is why 1.9 is getting a new bounded-blocking stream shuffle implementation.
> 
> Best,
> Stephan
> 
> 
> 
> 
> 
> 
> On Mon, Jun 24, 2019 at 2:32 AM Ken Krugler  <mailto:kkrugler_li...@transpac.com>> wrote:
> Hi all,
> 
> I’ve been running a somewhat complex batch job (in EMR/YARN) with Flink 
> 1.8.0, and it regularly fails, but for varying reasons.
> 
> Has anyone else had stability with 1.8.0 in batch mode and non-trivial 
> workflows?
> 
> Thanks,
> 
> — Ken
> 
> 1. TimeoutException getting input splits
> 
> The batch job starts by processing a lot of files that live in S3. During 
> this phase, I sometimes see:
> 
> 2019-06-20 01:20:22,659 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- CHAIN 
> DataSource (at createInput(ExecutionEnvironment.java:549) 
> (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> Map (ad 
> dailies) -> Filter (Filter at 
> createWorkflow(AdvertiserSimilarityWorkflow.java:34)) -> Filter (Filter at 
> createWorkflow(AdvertiserSimilarityWorkflow.java:36)) -> Filter (Filter at 
> createWorkflow(AdvertiserSimilarityWorkflow.java:38)) -> Map (Key Extractor) 
> -> Combine (Reduce at createWorkflow(AdvertiserSimilarityWorkflow.java:41)) 
> (31/32) (8a8cbea47394c3d638910c36ac62d877) switched from RUNNING to FAILED.
> java.lang.RuntimeException: Could not retrieve next input split.
>   at 
> org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:367)
>   at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:160)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: 
> org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException: 
> Requesting the next input split failed.
>   at 
> org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69)
>   at 
> org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:365)
>   ... 3 more
> Caused by: java.util.concurrent.TimeoutException
>   at 
> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
>   at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>   at 
> org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61)
>   ... 4 more
> 2019-06-20 01:20:22,664 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Job Flink 
> Java Job at Thu Jun 20 01:11:28 UTC 2019 (5564b8980f40d788d7ef312318709e4d) 
> switched from state RUNNING to FAILING.
> java.lang.RuntimeException: Could not retrieve next input split.
>   at 
> org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:367)
>   at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:160)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: 
> org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException: 
> Requesting the next input split failed.
>   at 
> org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69)
>   at 
> org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:365)
>   ... 3 more
> Caused by: java.util.concurrent.TimeoutException
>   at 
> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
>   at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>   at 
> org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61)
>   ... 4 more
> 
> I saw bjb...@gmail.com <mailto:bjb...@gmail.com>’s email recently about a 
> similar issue:
> 
>> I figured this out my

Batch mode with Flink 1.8 unstable?

2019-06-23 Thread Ken Krugler
Hi all,

I’ve been running a somewhat complex batch job (in EMR/YARN) with Flink 1.8.0, 
and it regularly fails, but for varying reasons.

Has anyone else had stability with 1.8.0 in batch mode and non-trivial 
workflows?

Thanks,

— Ken

1. TimeoutException getting input splits

The batch job starts by processing a lot of files that live in S3. During this 
phase, I sometimes see:

2019-06-20 01:20:22,659 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- CHAIN 
DataSource (at createInput(ExecutionEnvironment.java:549) 
(org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> Map (ad 
dailies) -> Filter (Filter at 
createWorkflow(AdvertiserSimilarityWorkflow.java:34)) -> Filter (Filter at 
createWorkflow(AdvertiserSimilarityWorkflow.java:36)) -> Filter (Filter at 
createWorkflow(AdvertiserSimilarityWorkflow.java:38)) -> Map (Key Extractor) -> 
Combine (Reduce at createWorkflow(AdvertiserSimilarityWorkflow.java:41)) 
(31/32) (8a8cbea47394c3d638910c36ac62d877) switched from RUNNING to FAILED.
java.lang.RuntimeException: Could not retrieve next input split.
at 
org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:367)
at 
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:160)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException: 
Requesting the next input split failed.
at 
org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69)
at 
org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:365)
... 3 more
Caused by: java.util.concurrent.TimeoutException
at 
java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
at 
org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61)
... 4 more
2019-06-20 01:20:22,664 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Job Flink Java 
Job at Thu Jun 20 01:11:28 UTC 2019 (5564b8980f40d788d7ef312318709e4d) switched 
from state RUNNING to FAILING.
java.lang.RuntimeException: Could not retrieve next input split.
at 
org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:367)
at 
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:160)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException: 
Requesting the next input split failed.
at 
org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69)
at 
org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:365)
... 3 more
Caused by: java.util.concurrent.TimeoutException
at 
java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
at 
org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61)
... 4 more

I saw bjb...@gmail.com’s email recently about a similar issue:

> I figured this out myself. In my yarn container logs I saw this warning/error,
> 
> akka.remote.OversizedPayloadException: Discarding oversized payload sent to 
> Actor[akka.tcp://flink@HOST:43911/temp/$n]: 
>  max allowed size 10485760 bytes, 
> actual size of encoded class 
> org.apache.flink.runtime.jobmaster.SerializedInputSplit was 15728643 bytes.
> 
> Looking into this there is a max frame size for Akka which in flink can be 
> set with akka.framesize and is set to 10MB by default. Increasing this past 
> the size of my side input fixed the issue. I'm guessing this is due to 
> creating the side input PCollection from memory using the Create.of APIs.

But no such akka.remote.OversizedPayloadException appears in any of my log 
files.

2. TM released too soon?

Sometimes it fails with "Connecting the channel failed: Connecting to remote 
task manager xxx has failed. This might indicate that the remote task manager 
has been lost”

I’d run into this previously with 1.7.2, but thought that 1.8.0 had the fix for 
https://issues.apache.org/jira/browse/FLINK-10941 
, and thus I’d avoid the 
problem, but it seems like there’s still an issue.

I’m running 3 TMs on three servers, each with 32 slots. When the job fails, the 
servers are under heavy CPU load.

From the logs, I see the JobManager releasing two of the TMs, then requesting 
two new containers. One of these requests gets filled, 

Re: Use Partitioner to forward messages to subtask by index

2019-06-21 Thread Ken Krugler
Hi Joshua,

It is possible, but fragile, as it depends on the internal code that Flink uses 
to calculate a hash for a key, and the max operator parallelism, etc.

See makeKeyForOperatorIndex 
<https://github.com/ScaleUnlimited/flink-crawler/blob/master/src/main/java/com/scaleunlimited/flinkcrawler/utils/FlinkUtils.java#L185>,
 which will generate a String that can be used for partitioning to a specific 
subtask (operator index)

— Ken

> On Jun 21, 2019, at 10:15 AM, Joshua Griffith  wrote:
> 
> Is it possible to use a custom Partitioner to forward messages to a 
> downstream substask by the subtask's index? I realize that it would not be 
> safe to rely upon this across job restarts but this does not effect my 
> particular application. I attempted to return a partition number identical to 
> the subtask number but this did not work. For example, assigning a message to 
> partition 1 would send it to subtask 0.
> 
> Thanks,
> Josh

--
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



Re: Unable to set S3 like object storage for state backend.

2019-06-20 Thread Ken Krugler
Caused by: org.apache.flink.runtime.client.JobExecutionException:
> Could not set up JobManager
> at 
> org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:176)
> at 
> org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:1058)
> at 
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:308)
> at 
> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
> ... 7 more
> Caused by: java.lang.RuntimeException: Failed to start checkpoint ID
> counter: null uri host. This can be caused by unencoded / in the
> password string
> at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:255)
> at 
> org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:498)
> at 
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:345)
> at 
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
> at 
> org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1166)
> at 
> org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1146)
> at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:296)
> at 
> org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:157)
> ... 10 more
> Caused by: java.io.IOException: null uri host. This can be caused by
> unencoded / in the password string
> at 
> org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(AbstractS3FileSystemFactory.java:159)
> at 
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:395)
> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
> at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
> at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.(FsCheckpointStorage.java:58)
> at 
> org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:444)
> at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:249)
> ... 17 more
> Caused by: java.lang.NullPointerException: null uri host. This can be
> caused by unencoded / in the password string
> at java.util.Objects.requireNonNull(Objects.java:228)
> at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3native.S3xLoginHelper.buildFSURI(S3xLoginHelper.java:69)
> at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.setUri(S3AFileSystem.java:467)
> at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:234)
> at 
> org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(AbstractS3FileSystemFactory.java:124)
> ... 23 more
> 
> 
> It seems like the way I setup the state backed causes this exception ie.
> 
> env.setStateBackend(new 
> FsStateBackend("s3://aip_featuretoolkit/checkpoints/"))
> 
> How can I resolve this issue, are S3 like object stores supported by 1.7.2 ?
> 
> Thanks,
> Vishwas

--
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



Re: privacy preserving on streaming Kmeans in Flink

2019-06-14 Thread Ken Krugler
Hi Alaa,

You could look at https://github.com/ScaleUnlimited/flink-streaming-kmeans 
<https://github.com/ScaleUnlimited/flink-streaming-kmeans> for an example of 
this.

Though note that there are non-iterative versions to k-means clustering that 
are much more efficient.

This code was a way of exploring iterations in streaming data….

— Ken

> On Jun 14, 2019, at 6:06 AM, alaa  wrote:
> 
> *I try to make a research in privacy preserving data on streaming k-means
> clustering and to develop an implementation on Flink.
> 
> My questions are :
> 
>If there a library for K means in Flink? Because I didn't see it on
> FlinkML?
> 
>How we can I implement streaming kmeans clustering on Flink?
> 
> If anybody know , can you please explain some of the key steps that I have
> to follow?*

--
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



Re: StreamingFileSink in version 1.8

2019-06-11 Thread Ken Krugler
The code in HadoopRecoverableWriter is:

if (!"hdfs".equalsIgnoreCase(fs.getScheme()) || 
!HadoopUtils.isMinHadoopVersion(2, 7)) {
throw new UnsupportedOperationException(
"Recoverable writers on Hadoop are only 
supported for HDFS and for Hadoop version 2.7 or newer");
}

So one possibility is that your sink path doesn’t have the explicit hdfs://xxx 
protocol. 

Another is that you’re in classpath hell, and your job jar contains an older 
version of Hadoop jars.

— Ken


> On Jun 11, 2019, at 12:16 AM, Yitzchak Lieberman  
> wrote:
> 
> Hi.
> 
> I'm a bit confused:
> When launching my flink streaming application on EMR release 5.24 (which have 
> flink 1.8 version) that write Kafka messages to s3 parquet files i'm getting 
> the exception below, but when i'm installing flink 1.8 on EMR custom wise it 
> works.
> What could be the difference behavior?
> 
> Thanks,
> Yitzchak.
> 
> Caused by: java.lang.UnsupportedOperationException: Recoverable writers on 
> Hadoop are only supported for HDFS and for Hadoop version 2.7 or newer
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.(HadoopRecoverableWriter.java:57)
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.createRecoverableWriter(HadoopFileSystem.java:202)
>   at 
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.(Buckets.java:112)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$BulkFormatBuilder.createBuckets(StreamingFileSink.java:317)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>   at java.lang.Thread.run(Thread.java:748)

--
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



Re: Troubleshooting java.io.IOException: Connecting the channel failed: Connecting to remote task manager has failed. This might indicate that the remote task manager has been lost.

2019-06-10 Thread Ken Krugler
eadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
> at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884)
> ... 1 more
> Caused by: 
> org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException: 
> connection timed out: 
> flink2-0.high.ue1.pre.aws.cloud.arity.com/10.16.70.80:26266 
> <http://pre.aws.cloud.arity.com/10.16.70.80:26266>
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:267)
> ... 7 more
>  
> It happens for all task manager. What seems to be the problem here and how 
> can I troubleshoot it?
>  
> The task managers all seem active and show up on the web ui.
>  
> Thanks,
> Harshith

--
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



Re: Getting java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError when stopping/canceling job.

2019-05-16 Thread Ken Krugler
uld you share the full stack trace or better logs?
> It looks like something is trying to be executed in vertx.io 
> <http://vertx.io/> code after the local task has been stopped and the class 
> loader for the user code has been unloaded. Maybe from some daemon thread 
> pool. 
> 
> Best,
> Andrey
> 
> 
> On Wed, May 15, 2019 at 4:58 PM John Smith  <mailto:java.dev@gmail.com>> wrote:
> Hi, 
> 
> I'm using vertx.io <http://vertx.io/> as an async JDBC client for a 
> RichAsyncFunction it works fine but when I stop the job I get...
> 
> java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError: 
> io/vertx/core/impl/VertxImpl$SharedWorkerPool
> 
> Is there a way to avoid/fix this?

--
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



Re: Error While Initializing S3A FileSystem

2019-05-15 Thread Ken Krugler
Hi Manish,

It’s best to start a new thread if you have a new question - see 
https://home.apache.org/~hossman/#threadhijack 
<https://home.apache.org/~hossman/#threadhijack> for reasons why…

Regards,

— Ken


> On May 15, 2019, at 4:46 PM, Manish Bellani  wrote:
> 
> Hi Ken,
> 
> Thanks for the quick response, you are actually right, the job seems to be 
> running even after that error appears. It was crashing earlier (due to 
> fs.s3a.multipart.size being too high) and I confused it with this error since 
> that was the first one popping out and OOM wasn't apparent immediately.
> 
> I do have a subsequent question though if you don't mind me asking this 
> question in the same thread. So... if I'm reading the  BucketingSink code 
> correctly then if I supply the core-site.xml with following contents, would 
> it not pick the S3RecoverableWriter code path?:
> 
> 
> 
> 
> fs.s3.impl
> org.apache.hadoop.fs.s3a.S3AFileSystem
> 
> 
> 
> fs.s3a.fast.upload
> true
> 
> Use the incremental block upload mechanism with
> the buffering mechanism set in fs.s3a.fast.upload.buffer.
> The number of threads performing uploads in the filesystem is 
> defined
> by fs.s3a.threads.max; the queue of waiting uploads limited by
> fs.s3a.max.total.tasks.
> The size of each buffer is set by fs.s3a.multipart.size.
> 
> 
> 
> 
> fs.s3a.fast.upload.buffer
> array
> 
> The buffering mechanism to use when using S3A fast upload
> (fs.s3a.fast.upload=true). Values: disk, array, bytebuffer.
> This configuration option has no effect if fs.s3a.fast.upload is 
> false.
> 
> "disk" will use the directories listed in fs.s3a.buffer.dir as
> the location(s) to save data prior to being uploaded.
> 
> "array" uses arrays in the JVM heap
> 
> "bytebuffer" uses off-heap memory within the JVM.
> 
> Both "array" and "bytebuffer" will consume memory in a single 
> stream up to the number
> of blocks set by:
> 
> fs.s3a.multipart.size * fs.s3a.fast.upload.active.blocks.
> 
> If using either of these mechanisms, keep this value low
> 
> The total number of threads performing work across all threads is 
> set by
> fs.s3a.threads.max, with fs.s3a.max.total.tasks values setting 
> the number of queued
> work items.
> 
> 
> 
> 
> fs.s3a.multipart.size
> 10M
> How big (in bytes) to split upload or copy operations up 
> into.
> A suffix from the set {K,M,G,T,P} may be used to scale the 
> numeric value.
> 
> 
> 
> 
> fs.s3a.fast.upload.active.blocks
> 8
> 
> Maximum Number of blocks a single output stream can have
> active (uploading, or queued to the central FileSystem
> instance's pool of queued operations.
> 
> This stops a single stream overloading the shared thread pool.
> 
> 
> 
> 
>   fs.s3a.aws.credentials.provider
>   com.amazonaws.auth.EnvironmentVariableCredentialsProvider
> 
> 
> 
> 
> I say that because I don't see any files being written under `/tmp` directory 
> with the pattern like ".tmp_UUID", which what RefCountedTmpFileCreator is 
> supposed to create for staging writes to s3 (which is wired in by 
> org.apache.flink.fs.s3.common.FlinkS3FileSystem): 
> 
> public RefCountedFile apply(File file) throws IOException {
>File directory = this.tempDirectories[this.nextIndex()];
> 
> while(true) {
> try {
> if (file == null) {
> File newFile = new File(directory, ".tmp_" + 
> UUID.randomUUID());
> OutputStream out = 
> Files.newOutputStream(newFile.toPath(), StandardOpenOption.CREATE_NEW);
> return RefCountedFile.newFile(newFile, out);
>     }
> 
> OutputStream out = Files.newOutputStream(file.toPath(), 
> StandardOpenOption.APPEND);
> return RefCountedFile.restoredFile(file, out, file.length());
> } catch (FileAlreadyExistsException var5) {
> }
> }
> }
> 
> 
> Is S3RecoverableWriter path even supported for BucketingSink?
> 
> Manish
> 
> 
> On Wed, May 15, 2019 at 6:05 PM Ken Krugler  <mailto:kkrugler

Re: Error While Initializing S3A FileSystem

2019-05-15 Thread Ken Krugler
til.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
> at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
> at java.lang.Thread.run(Thread.java:748)
>  
> 
> 
>  
> Some googling about "bad write method arg count" reveals that it could 
> potentially be related to a beanutils issue, but I'm not entirely sure. I've 
> hunted through all the jars that are on the classpath:
> 
> /usr/lib/jvm/java-8-openjdk-amd64/bin/java 
> -Djavax.net.ssl.trustStore=/etc/ssl/java-certs/cacerts -XX:+UseG1GC -Xms5530M 
> -Xmx5530M -XX:MaxDirectMemorySize=8388607T 
> -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties 
> -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml 
> -classpath 
> /opt/flink/lib/aws-java-sdk-core-1.10.6.jar:/opt/flink/lib/aws-java-sdk-kms-1.10.6.jar:/opt/flink/lib/aws-java-sdk-s3-1.10.6.jar:/opt/flink/lib/flink-metrics-datadog-1.6.2.jar:/opt/flink/lib/flink-python_2.11-1.7.2.jar:/opt/flink/lib/flink-s3-fs-hadoop-1.7.2.jar:/opt/flink/lib/flink-shaded-hadoop2-uber-1.7.2.jar:/opt/flink/lib/hadoop-aws-2.8.5.jar:/opt/flink/lib/httpclient-4.3.6.jar:/opt/flink/lib/httpcore-4.3.3.jar:/opt/flink/lib/jackson-annotations-2.5.3.jar:/opt/flink/lib/jackson-core-2.5.3.jar:/opt/flink/lib/jackson-databind-2.5.3.jar:/opt/flink/lib/joda-time-2.8.1.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.7.15.jar:/opt/flink/lib/flink-dist_2.11-1.7.2.jar::/opt/flink/conf:
>  org.apache.flink.runtime.taskexecutor.TaskManagerRunner --configDir 
> /opt/flink/conf
>  
> and i see that `FluentPropertyBeanIntrospector` is contained within the 
> following two jars:
> 
> flink-s3-fs-hadoop-1.7.2.jar:org/apache/flink/fs/shaded/hadoop3/org/apache/commons/beanutils/FluentPropertyBeanIntrospector.class
> flink-shaded-hadoop2-uber-1.7.2.jar:2019 
> org/apache/commons/beanutils/FluentPropertyBeanIntrospector.class
> 
>  both of those jars are packaged as part of the flink distribution I'm using. 
> I can't think of any other options atm other than thinking that this could 
> potentially be some incompatible transitive dependency issue. I would love to 
> get some advice from y'all to see if this is a packaging bug or something 
> else on my side.
> 
> 
> 
> Thanks
> 
> Manish
> 

--
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



Re: Write simple text into hdfs

2019-04-29 Thread Ken Krugler
DataSet.writeAsText(hdfs://) should work.

— Ken

> On Apr 29, 2019, at 8:00 AM, Hai  wrote:
> 
> Hi, 
> 
> Could anyone give a simple way to write a DataSet into hdfs using a 
> simple way?
> 
> I look up the official document, and didn’t find that, am I missing some 
> thing ?
> 
> Many thanks.

--
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



Re: Partitioning key range

2019-04-08 Thread Ken Krugler
Hi Davood,

We have done some explicit partitioning in the past, but it’s pretty fragile.

See FlinkUtils#makeKeyForOperatorIndex 
<https://github.com/ScaleUnlimited/flink-crawler/blob/master/src/main/java/com/scaleunlimited/flinkcrawler/utils/FlinkUtils.java#L153>

Though I haven’t tried this with Flink 1.7/1.8, and I’m guessing Fabian would 
notice some issues if he reviewed it :)

— Ken


> On Apr 8, 2019, at 1:01 AM, Fabian Hueske  wrote:
> 
> Hi Davood,
> 
> Flink uses hash partitioning to assign keys to key groups. Each key group is 
> then assigned to a task for processing (a task might process multiple key 
> groups).
> There is no way to directly assign a key to a particular key group or task.
> All you can do is to experiment with different custom KeySelectors which 
> return keys that are hashed into different key groups.
> 
> Best, Fabian
> 
> Am Sa., 6. Apr. 2019 um 11:43 Uhr schrieb Congxian Qiu 
> mailto:qcx978132...@gmail.com>>:
> Hi Davood
> Maybe a custom KeySelector can be helpful, you can define the key used to 
> partition the stream. You can ref the code[1] for detail.
> 
> [1] 
> https://github.com/apache/flink/blob/8d05e91945c6c8d83f9924c00890ccf350f1f36f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java#L58
>  
> <https://github.com/apache/flink/blob/8d05e91945c6c8d83f9924c00890ccf350f1f36f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java#L58>
> 
> Best, Congxian
> On Apr 5, 2019, 06:35 +0800, Davood Rafiei  <mailto:rafieidavo...@gmail.com>>, wrote:
>> Hi all,
>> 
>> I partition DataStream (say dsA) with parallelism 2 and get KeyedStream (say 
>> ksA) with parallelism 2.
>> Depending on my keys in dsA, one partition remains empty in ksA.
>> For example when my keys are 10 and 20 in dsA, then both partitions in ksA 
>> are full.
>> However, with keys 1000 and 1001, only one partition receives all of the 
>> upstream data in ksA.
>> Is there any way to get information about key ranges for each downstream 
>> partitions?
>> Or is there any way to overcome this issue?
>> We can assume that I know all possible keys (in this case 2 different keys) 
>> in dsA and therefore I want all partitions in ksA to be fully utilized.
>> 
>> Thanks,
>> Davood
>> 

--
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



Re: Use different functions for different signal values

2019-04-02 Thread Ken Krugler
Hi Marke,

You can use DataStream.split() to create a SplitStream, and then call 
SplitStream.select() to create the three different paths to the three functions.

See 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/#datastream-transformations
 
<https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/#datastream-transformations>

— Ken

> On Apr 2, 2019, at 8:41 AM, Marke Builder  wrote:
> 
> Hi,
> 
> I want to implement the following behavior:
> 
> 
> There are a lot of ingest signals with unique Id's, I would use for each 
> signal set a special function. E.g. Signal1, Signal2 ==> function1, Signal3, 
> Signal4 ==> function2.
> What is the recommended way to implement this pattern?
> 
> Thanks!
> 

--
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



Re: Async Function Not Generating Backpressure

2019-03-21 Thread Ken Krugler

> On Mar 20, 2019, at 6:49 PM, Seed Zeng  wrote:
> 
> Hey Andrey and Ken,
> Sorry about the late reply. I might not have been clear in my question
> The performance of writing to Cassandra is the same in both cases, only that 
> the source rate was higher in the case of the async function is present. 

OK, I was confused by what you’d originally written...

>>> Job 1 is backpressured because Cassandra cannot handle all the writes and 
>>> eventually slows down the source rate to 6.5k/s. 
>>> Job 2 is slightly backpressured but was able to run at 14k/s.

If the source rate is _temporarily_ higher, then that maybe makes sense, as the 
async function will be able to buffer up to the configured capacity.

E.g. in the documentation example 
<https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html#async-io-api>

AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, 
TimeUnit.MILLISECONDS, 100);

The capacity is 100 (which is also the default, if you don’t specify it)

> Something is "buffering" and not propagating backpressure to slow down the 
> source speed from Kafka.
> 
> In our use case, we prefer the backpressure to slow down the source so that 
> the write to Cassandra is not delayed while the source is consuming fast.

You can use a smaller capacity to reduce the impact, but that could obviously 
impact the performance whatever your using the async function to parallelize.

Regards,

— Ken

> On Wed, Mar 20, 2019 at 9:38 AM Andrey Zagrebin  <mailto:and...@ververica.com>> wrote:
> Hi Seed,
> 
> Sorry for confusion, I see now it is separate. Back pressure should still be 
> created because internal async queue has capacity 
> but not sure about reporting problem, Ken and Till probably have better idea.
> 
> As for consumption speed up, async operator creates another thread to collect 
> the result and Cassandra sink probably uses that thread to write data.
> This might parallelize and pipeline previous steps like Kafka fetching and 
> Cassandra IO but I am also not sure about this explanation.
> 
> Best,
> Andrey
> 
> 
> On Tue, Mar 19, 2019 at 8:05 PM Ken Krugler  <mailto:kkrugler_li...@transpac.com>> wrote:
> Hi Seed,
> 
> I was assuming the Cassandra sink was separate from and after your async 
> function.
> 
> I was trying to come up for an explanation as to why adding the async 
> function would improve your performance.
> 
> The only very unlikely reason I thought of was that the async function 
> somehow caused data arriving at the sink to be more “batchy”, which (if the 
> Cassandra sink had an “every x seconds do a write” batch mode) could improve 
> performance.
> 
> — Ken
> 
>> On Mar 19, 2019, at 11:35 AM, Seed Zeng > <mailto:seed.z...@klaviyo.com>> wrote:
>> 
>> Hi Ken and Andrey,
>> 
>> Thanks for the response. I think there is a confusion that the writes to 
>> Cassandra are happening within the Async function. 
>> In my test, the async function is just a pass-through without doing any work.
>> 
>> So any Cassandra related batching or buffering should not be the cause for 
>> this.
>> 
>> Thanks,
>> 
>> Seed
>> 
>> On Tue, Mar 19, 2019 at 12:35 PM Ken Krugler > <mailto:kkrugler_li...@transpac.com>> wrote:
>> Hi Seed,
>> 
>> It’s a known issue that Flink doesn’t report back pressure properly for 
>> AsyncFunctions, due to how it monitors the output collector to gather back 
>> pressure statistics.
>> 
>> But that wouldn’t explain how you get a faster processing with the 
>> AsyncFunction inserted into your workflow.
>> 
>> I haven’t looked at how the Cassandra sink handles batching, but if the 
>> AsyncFunction somehow caused fewer, bigger Cassandra writes to happen then 
>> that’s one (serious hand waving) explanation.
>> 
>> — Ken
>> 
>>> On Mar 18, 2019, at 7:48 PM, Seed Zeng >> <mailto:seed.z...@klaviyo.com>> wrote:
>>> 
>>> Flink Version - 1.6.1
>>> 
>>> In our application, we consume from Kafka and sink to Cassandra in the end. 
>>> We are trying to introduce a custom async function in front of the Sink to 
>>> carry out some customized operations. In our testing, it appears that the 
>>> Async function is not generating backpressure to slow down our Kafka Source 
>>> when Cassandra becomes unhappy. Essentially compared to an almost identical 
>>> job where the only difference is the lack of the Async function, Kafka 
>>> source consumption speed is much higher under the same settings and 
>>> identical Cassandra cluster.

Re: Async Function Not Generating Backpressure

2019-03-19 Thread Ken Krugler
Hi Seed,

I was assuming the Cassandra sink was separate from and after your async 
function.

I was trying to come up for an explanation as to why adding the async function 
would improve your performance.

The only very unlikely reason I thought of was that the async function somehow 
caused data arriving at the sink to be more “batchy”, which (if the Cassandra 
sink had an “every x seconds do a write” batch mode) could improve performance.

— Ken

> On Mar 19, 2019, at 11:35 AM, Seed Zeng  wrote:
> 
> Hi Ken and Andrey,
> 
> Thanks for the response. I think there is a confusion that the writes to 
> Cassandra are happening within the Async function. 
> In my test, the async function is just a pass-through without doing any work.
> 
> So any Cassandra related batching or buffering should not be the cause for 
> this.
> 
> Thanks,
> 
> Seed
> 
> On Tue, Mar 19, 2019 at 12:35 PM Ken Krugler  <mailto:kkrugler_li...@transpac.com>> wrote:
> Hi Seed,
> 
> It’s a known issue that Flink doesn’t report back pressure properly for 
> AsyncFunctions, due to how it monitors the output collector to gather back 
> pressure statistics.
> 
> But that wouldn’t explain how you get a faster processing with the 
> AsyncFunction inserted into your workflow.
> 
> I haven’t looked at how the Cassandra sink handles batching, but if the 
> AsyncFunction somehow caused fewer, bigger Cassandra writes to happen then 
> that’s one (serious hand waving) explanation.
> 
> — Ken
> 
>> On Mar 18, 2019, at 7:48 PM, Seed Zeng > <mailto:seed.z...@klaviyo.com>> wrote:
>> 
>> Flink Version - 1.6.1
>> 
>> In our application, we consume from Kafka and sink to Cassandra in the end. 
>> We are trying to introduce a custom async function in front of the Sink to 
>> carry out some customized operations. In our testing, it appears that the 
>> Async function is not generating backpressure to slow down our Kafka Source 
>> when Cassandra becomes unhappy. Essentially compared to an almost identical 
>> job where the only difference is the lack of the Async function, Kafka 
>> source consumption speed is much higher under the same settings and 
>> identical Cassandra cluster. The experiment is like this.
>> 
>> Job 1 - without async function in front of Cassandra
>> Job 2 - with async function in front of Cassandra
>> 
>> Job 1 is backpressured because Cassandra cannot handle all the writes and 
>> eventually slows down the source rate to 6.5k/s. 
>> Job 2 is slightly backpressured but was able to run at 14k/s.
>> 
>> Is the AsyncFunction somehow not reporting the backpressure correctly?
>> 
>> Thanks,
>> Seed
> 
> --
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
> Custom big data solutions & training
> Flink, Solr, Hadoop, Cascading & Cassandra
> 

--
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



Re: Async Function Not Generating Backpressure

2019-03-19 Thread Ken Krugler
Hi Seed,

It’s a known issue that Flink doesn’t report back pressure properly for 
AsyncFunctions, due to how it monitors the output collector to gather back 
pressure statistics.

But that wouldn’t explain how you get a faster processing with the 
AsyncFunction inserted into your workflow.

I haven’t looked at how the Cassandra sink handles batching, but if the 
AsyncFunction somehow caused fewer, bigger Cassandra writes to happen then 
that’s one (serious hand waving) explanation.

— Ken

> On Mar 18, 2019, at 7:48 PM, Seed Zeng  wrote:
> 
> Flink Version - 1.6.1
> 
> In our application, we consume from Kafka and sink to Cassandra in the end. 
> We are trying to introduce a custom async function in front of the Sink to 
> carry out some customized operations. In our testing, it appears that the 
> Async function is not generating backpressure to slow down our Kafka Source 
> when Cassandra becomes unhappy. Essentially compared to an almost identical 
> job where the only difference is the lack of the Async function, Kafka source 
> consumption speed is much higher under the same settings and identical 
> Cassandra cluster. The experiment is like this.
> 
> Job 1 - without async function in front of Cassandra
> Job 2 - with async function in front of Cassandra
> 
> Job 1 is backpressured because Cassandra cannot handle all the writes and 
> eventually slows down the source rate to 6.5k/s. 
> Job 2 is slightly backpressured but was able to run at 14k/s.
> 
> Is the AsyncFunction somehow not reporting the backpressure correctly?
> 
> Thanks,
> Seed

--
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



Re: Set partition number of Flink DataSet

2019-03-14 Thread Ken Krugler
Hi Qi,

See https://github.com/ScaleUnlimited/flink-utils/ 
<https://github.com/ScaleUnlimited/flink-utils/>, for a rough but working 
version of a bucketing sink.

— Ken


> On Mar 13, 2019, at 7:46 PM, qi luo  wrote:
> 
> Hi Ken,
> 
> Agree. I will try partitonBy() to reducer the number of parallel sinks, and 
> may also try sortPartition() so each sink could write files one by one. 
> Looking forward to your solution. :)
> 
> Thanks,
> Qi
> 
>> On Mar 14, 2019, at 2:54 AM, Ken Krugler > <mailto:kkrugler_li...@transpac.com>> wrote:
>> 
>> Hi Qi,
>> 
>>> On Mar 13, 2019, at 1:26 AM, qi luo >> <mailto:luoqi...@gmail.com>> wrote:
>>> 
>>> Hi Ken,
>>> 
>>> Do you mean that I can create a batch sink which writes to N files? 
>> 
>> Correct.
>> 
>>> That sounds viable, but since our data size is huge (billions of records & 
>>> thousands of files), the performance may be unacceptable. 
>> 
>> The main issue with performance (actually memory usage) is how many 
>> OutputFormats do you need to have open at the same time.
>> 
>> If you partition by the same key that’s used to define buckets, then the max 
>> number is less, as each parallel instance of the sink only gets a unique 
>> subset of all possible bucket values.
>> 
>> I’m actually dealing with something similar now, so I might have a solution 
>> to share soon.
>> 
>> — Ken
>> 
>> 
>>> I will check Blink and give it a try anyway.
>>> 
>>> Thank you,
>>> Qi
>>> 
>>>> On Mar 12, 2019, at 11:58 PM, Ken Krugler >>> <mailto:kkrugler_li...@transpac.com>> wrote:
>>>> 
>>>> Hi Qi,
>>>> 
>>>> If I understand what you’re trying to do, then this sounds like a 
>>>> variation of a bucketing sink.
>>>> 
>>>> That typically uses a field value to create a directory path or a file 
>>>> name (though the filename case is only viable when the field is also 
>>>> what’s used to partition the data)
>>>> 
>>>> But I don’t believe Flink has built-in support for that, in batch mode 
>>>> (see BucketingSink 
>>>> <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html>
>>>>  for streaming).
>>>> 
>>>> Maybe Blink has added that? Hoping someone who knows that codebase can 
>>>> chime in here.
>>>> 
>>>> Otherwise you’ll need to create a custom sink to implement the desired 
>>>> behavior - though abusing a MapPartitionFunction 
>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.7/api/java/org/apache/flink/api/common/functions/MapPartitionFunction.html>
>>>>  would be easiest, I think.
>>>> 
>>>> — Ken
>>>> 
>>>> 
>>>> 
>>>>> On Mar 12, 2019, at 2:28 AM, qi luo >>>> <mailto:luoqi...@gmail.com>> wrote:
>>>>> 
>>>>> Hi Ken,
>>>>> 
>>>>> Thanks for your reply. I may not make myself clear: our problem is not 
>>>>> about reading but rather writing. 
>>>>> 
>>>>> We need to write to N files based on key partitioning. We have to use 
>>>>> setParallelism() to set the output partition/file number, but when the 
>>>>> partition number is too large (~100K), the parallelism would be too high. 
>>>>> Is there any other way to achieve this?
>>>>> 
>>>>> Thanks,
>>>>> Qi
>>>>> 
>>>>>> On Mar 11, 2019, at 11:22 PM, Ken Krugler >>>>> <mailto:kkrugler_li...@transpac.com>> wrote:
>>>>>> 
>>>>>> Hi Qi,
>>>>>> 
>>>>>> I’m guessing you’re calling createInput() for each input file.
>>>>>> 
>>>>>> If so, then instead you want to do something like:
>>>>>> 
>>>>>>  Job job = Job.getInstance();
>>>>>> 
>>>>>>  for each file…
>>>>>>  FileInputFormat.addInputPath(job, new 
>>>>>> org.apache.hadoop.fs.Path(file path));
>>>>>> 
>>>>>>  env.createInput(HadoopInputs.createHadoopInput(…, job)
>>>>>> 
>>>>>> Flink/Hadoop will take care of parallelizing the 

Re: Batch jobs stalling after initial progress

2019-03-13 Thread Ken Krugler
Hi Marko,

Some things that have caused my jobs to run very slowly (though not completely 
stall)

1. Cross-joins generating huge result sets.

2. Joins causing very large spills to disk. 

3. Slow external API access

With streaming, iterations can cause stalls, but I don’t think that’s true for 
batch (haven’t tried, though)

— Ken


> On Mar 13, 2019, at 6:54 AM, Marko Mušnjak  wrote:
> 
> Hi,
> 
> I'm running flink batch jobs on EMR 5.21, and I'm seeing many (>50%) jobs 
> stall and make no progress after some initial period. I've seen the behaviour 
> earlier (5.17), but not nearly as much as now.
> 
> The job is a fairly simple enrichment job, loading an avro metadata file, 
> creating several datasets from the file and broadcasting them. Later they are 
> used in joins with the dataset of input events, also avro files. There are no 
> shuffles or keyBy operations. 
> 
> I see nothing in the logs at INFO level, and the UI for the stalled jobs 
> shows the following:
> * metadata loading tasks are finished.
> * all other tasks are running, except the parquet output which is in state 
> "created"
> * the task earlier in the DAG from the parquet output task shows the back 
> pressure status as "OK", the one earlier is shown with back pressure status 
> "High"
> 
> Are there any specific logs I should enable to get more information on this? 
> Has anyone else seen this behaviour?
> 
> Kind regards,
> Marko 

--
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



Re: Set partition number of Flink DataSet

2019-03-13 Thread Ken Krugler
Hi Qi,

> On Mar 13, 2019, at 1:26 AM, qi luo  wrote:
> 
> Hi Ken,
> 
> Do you mean that I can create a batch sink which writes to N files?

Correct.

> That sounds viable, but since our data size is huge (billions of records & 
> thousands of files), the performance may be unacceptable.

The main issue with performance (actually memory usage) is how many 
OutputFormats do you need to have open at the same time.

If you partition by the same key that’s used to define buckets, then the max 
number is less, as each parallel instance of the sink only gets a unique subset 
of all possible bucket values.

I’m actually dealing with something similar now, so I might have a solution to 
share soon.

— Ken


> I will check Blink and give it a try anyway.
> 
> Thank you,
> Qi
> 
>> On Mar 12, 2019, at 11:58 PM, Ken Krugler > <mailto:kkrugler_li...@transpac.com>> wrote:
>> 
>> Hi Qi,
>> 
>> If I understand what you’re trying to do, then this sounds like a variation 
>> of a bucketing sink.
>> 
>> That typically uses a field value to create a directory path or a file name 
>> (though the filename case is only viable when the field is also what’s used 
>> to partition the data)
>> 
>> But I don’t believe Flink has built-in support for that, in batch mode (see 
>> BucketingSink 
>> <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html>
>>  for streaming).
>> 
>> Maybe Blink has added that? Hoping someone who knows that codebase can chime 
>> in here.
>> 
>> Otherwise you’ll need to create a custom sink to implement the desired 
>> behavior - though abusing a MapPartitionFunction 
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.7/api/java/org/apache/flink/api/common/functions/MapPartitionFunction.html>
>>  would be easiest, I think.
>> 
>> — Ken
>> 
>> 
>> 
>>> On Mar 12, 2019, at 2:28 AM, qi luo >> <mailto:luoqi...@gmail.com>> wrote:
>>> 
>>> Hi Ken,
>>> 
>>> Thanks for your reply. I may not make myself clear: our problem is not 
>>> about reading but rather writing. 
>>> 
>>> We need to write to N files based on key partitioning. We have to use 
>>> setParallelism() to set the output partition/file number, but when the 
>>> partition number is too large (~100K), the parallelism would be too high. 
>>> Is there any other way to achieve this?
>>> 
>>> Thanks,
>>> Qi
>>> 
>>>> On Mar 11, 2019, at 11:22 PM, Ken Krugler >>> <mailto:kkrugler_li...@transpac.com>> wrote:
>>>> 
>>>> Hi Qi,
>>>> 
>>>> I’m guessing you’re calling createInput() for each input file.
>>>> 
>>>> If so, then instead you want to do something like:
>>>> 
>>>>Job job = Job.getInstance();
>>>> 
>>>>for each file…
>>>>FileInputFormat.addInputPath(job, new 
>>>> org.apache.hadoop.fs.Path(file path));
>>>> 
>>>>env.createInput(HadoopInputs.createHadoopInput(…, job)
>>>> 
>>>> Flink/Hadoop will take care of parallelizing the reads from the files, 
>>>> given the parallelism that you’re specifying.
>>>> 
>>>> — Ken
>>>> 
>>>> 
>>>>> On Mar 11, 2019, at 5:42 AM, qi luo >>>> <mailto:luoqi...@gmail.com>> wrote:
>>>>> 
>>>>> Hi,
>>>>> 
>>>>> We’re trying to distribute batch input data to (N) HDFS files 
>>>>> partitioning by hash using DataSet API. What I’m doing is like:
>>>>> 
>>>>> env.createInput(…)
>>>>>   .partitionByHash(0)
>>>>>   .setParallelism(N)
>>>>>   .output(…)
>>>>> 
>>>>> This works well for small number of files. But when we need to distribute 
>>>>> to large number of files (say 100K), the parallelism becomes too large 
>>>>> and we could not afford that many TMs.
>>>>> 
>>>>> In spark we can write something like ‘rdd.partitionBy(N)’ and control the 
>>>>> parallelism separately (using dynamic allocation). Is there anything 
>>>>> similar in Flink or other way we can achieve similar result? Thank you!
>>>>> 
>>>>> Qi
>>>> 
>>>> --
>>>> Ken Krugler
>>>> +1 530-210-6378
>>>> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
>>>> Custom big data solutions & training
>>>> Flink, Solr, Hadoop, Cascading & Cassandra
>>>> 
>>> 
>> 
>> --
>> Ken Krugler
>> +1 530-210-6378
>> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
>> Custom big data solutions & training
>> Flink, Solr, Hadoop, Cascading & Cassandra
>> 
> 

--
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



Re: Set partition number of Flink DataSet

2019-03-12 Thread Ken Krugler
Hi Qi,

If I understand what you’re trying to do, then this sounds like a variation of 
a bucketing sink.

That typically uses a field value to create a directory path or a file name 
(though the filename case is only viable when the field is also what’s used to 
partition the data)

But I don’t believe Flink has built-in support for that, in batch mode (see 
BucketingSink 
<https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html>
 for streaming).

Maybe Blink has added that? Hoping someone who knows that codebase can chime in 
here.

Otherwise you’ll need to create a custom sink to implement the desired behavior 
- though abusing a MapPartitionFunction 
<https://ci.apache.org/projects/flink/flink-docs-release-1.7/api/java/org/apache/flink/api/common/functions/MapPartitionFunction.html>
 would be easiest, I think.

— Ken



> On Mar 12, 2019, at 2:28 AM, qi luo  wrote:
> 
> Hi Ken,
> 
> Thanks for your reply. I may not make myself clear: our problem is not about 
> reading but rather writing. 
> 
> We need to write to N files based on key partitioning. We have to use 
> setParallelism() to set the output partition/file number, but when the 
> partition number is too large (~100K), the parallelism would be too high. Is 
> there any other way to achieve this?
> 
> Thanks,
> Qi
> 
>> On Mar 11, 2019, at 11:22 PM, Ken Krugler > <mailto:kkrugler_li...@transpac.com>> wrote:
>> 
>> Hi Qi,
>> 
>> I’m guessing you’re calling createInput() for each input file.
>> 
>> If so, then instead you want to do something like:
>> 
>>  Job job = Job.getInstance();
>> 
>>  for each file…
>>  FileInputFormat.addInputPath(job, new 
>> org.apache.hadoop.fs.Path(file path));
>> 
>>  env.createInput(HadoopInputs.createHadoopInput(…, job)
>> 
>> Flink/Hadoop will take care of parallelizing the reads from the files, given 
>> the parallelism that you’re specifying.
>> 
>> — Ken
>> 
>> 
>>> On Mar 11, 2019, at 5:42 AM, qi luo >> <mailto:luoqi...@gmail.com>> wrote:
>>> 
>>> Hi,
>>> 
>>> We’re trying to distribute batch input data to (N) HDFS files partitioning 
>>> by hash using DataSet API. What I’m doing is like:
>>> 
>>> env.createInput(…)
>>>   .partitionByHash(0)
>>>   .setParallelism(N)
>>>   .output(…)
>>> 
>>> This works well for small number of files. But when we need to distribute 
>>> to large number of files (say 100K), the parallelism becomes too large and 
>>> we could not afford that many TMs.
>>> 
>>> In spark we can write something like ‘rdd.partitionBy(N)’ and control the 
>>> parallelism separately (using dynamic allocation). Is there anything 
>>> similar in Flink or other way we can achieve similar result? Thank you!
>>> 
>>> Qi
>> 
>> --
>> Ken Krugler
>> +1 530-210-6378
>> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
>> Custom big data solutions & training
>> Flink, Solr, Hadoop, Cascading & Cassandra
>> 
> 

--
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



Re: Side Output from AsyncFunction

2019-03-11 Thread Ken Krugler
Hi Mike,

1. Depending on what you need the side output for, you can use metrics to track 
some things.

But yes, that’s a very limited subset of all use cases.

2. As you mentioned, you could output a combo record.

Using an Either 
<https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/types/Either.html> is a common approach.

But you don’t need a process function, you can use the .split() operator to 
create a SplitStream, and then .select() either the regular or the “side 
output” stream.

— Ken



> On Mar 11, 2019, at 9:40 AM, Mikhail Pryakhin  wrote:
> 
> Hello Flink experts!
> My streaming pipeline makes async IO calls via the recommended AsyncFunction. 
> The pipeline evolves and I've encountered a requirement to side output 
> additional events from the function.
> As it turned out the side output feature is only available in the following 
> functions:
> ProcessFunction 
> CoProcessFunction
> ProcessWindowFunction
> ProcessAllWindowFunction
> 
> Just curious whether there is any known approach of utilizing "side output" 
> feature from the AsyncFunction?
> 
> From my perspective, the only feasible way of solving this task is to output 
> a processing event and an optional "side output" as a tuple from the 
> AsyncFunction and then process the resulting stream with a ProcessFunction to 
> handle the required logic. But this approach seems to be overcomplicated. 
> 
> Am I missing something? Any help/ideas are much appreciated!
> 
> Cheers,
> Mike Pryakhin
> 

--
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



Re: Set partition number of Flink DataSet

2019-03-11 Thread Ken Krugler
Hi Qi,

I’m guessing you’re calling createInput() for each input file.

If so, then instead you want to do something like:

Job job = Job.getInstance();

for each file…
FileInputFormat.addInputPath(job, new 
org.apache.hadoop.fs.Path(file path));

env.createInput(HadoopInputs.createHadoopInput(…, job)

Flink/Hadoop will take care of parallelizing the reads from the files, given 
the parallelism that you’re specifying.

— Ken


> On Mar 11, 2019, at 5:42 AM, qi luo  wrote:
> 
> Hi,
> 
> We’re trying to distribute batch input data to (N) HDFS files partitioning by 
> hash using DataSet API. What I’m doing is like:
> 
> env.createInput(…)
>   .partitionByHash(0)
>   .setParallelism(N)
>   .output(…)
> 
> This works well for small number of files. But when we need to distribute to 
> large number of files (say 100K), the parallelism becomes too large and we 
> could not afford that many TMs.
> 
> In spark we can write something like ‘rdd.partitionBy(N)’ and control the 
> parallelism separately (using dynamic allocation). Is there anything similar 
> in Flink or other way we can achieve similar result? Thank you!
> 
> Qi

--
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



Re: Checkpoints and catch-up burst (heavy back pressure)

2019-03-04 Thread Ken Krugler
The amount of data you’re checkpointing (if it’s really limited to file lists) 
still seems too small to cause timeouts, unless there’s some other issue with 
either your configuration or where data is being written (thus my previous 
question #1).

— Ken


> On Mar 3, 2019, at 10:56 PM, LINZ, Arnaud  wrote:
> 
> Hi,
> 
> My source checkpoint is actually the file list. But it's not trivially small 
> as I may have hundreds of thousand of files, with long filenames.
> My sink checkpoint is a smaller hdfs file list with current size.
> 
>  Message d'origine 
> De : Ken Krugler 
> Date : ven., mars 01, 2019 7:05 PM +0100
> A : "LINZ, Arnaud" 
> CC : zhijiang , user 
> Objet : Re: Checkpoints and catch-up burst (heavy back pressure)
> 
> Hi Arnaud,
> 
> 1. What’s your checkpoint configuration? Wondering if you’re writing to HDFS, 
> and thus the load you’re putting on it while catching up & checkpointing is 
> too high.
> 
> If so, then you could monitor the TotalLoad metric (FSNamesystem) in your 
> source, and throttle back the emitting of file paths when this (empirically) 
> gets too high.
> 
> 2. I’m wondering what all you are checkpointing, and why.
> 
> E.g. if this is just an ETL-ish workflow to pull files, parse them, and write 
> out (transformed) results, then you could in theory just checkpoint which 
> files have been processed.
> 
> This means catching up after a failure could take more time, but your 
> checkpoint size will be trivially small.
> 
> — Ken
> 
> 
>> On Mar 1, 2019, at 5:04 AM, LINZ, Arnaud > <mailto:al...@bouyguestelecom.fr>> wrote:
>> 
>> Hi,
>>  
>> I think I should go into more details to explain my use case.
>> I have one non parallel source (parallelism = 1) that list binary files in a 
>> HDFS directory. DataSet emitted by the source is a data set of file names, 
>> not file content. These filenames are rebalanced, and sent to workers 
>> (parallelism = 15) that will use a flatmapper that open the file, read it, 
>> decode it, and send records (forward mode) to the sinks (with a few 1-to-1 
>> mapping in-between). So the flatmap operation is a time-consuming one as the 
>> files are more than 200Mb large each; the flatmapper will emit millions of 
>> record to the sink given one source record (filename).
>>  
>> The rebalancing, occurring at the file name level, does not use much I/O and 
>> I cannot use one-to-one mode at that point if I want some parallelims since 
>> I have only one source.
>>  
>> I did not put file decoding directly in the sources because I have no good 
>> way to distribute files to sources without a controller (input directory is 
>> unique, filenames are random and cannot be “attributed” to one particular 
>> source instance easily). 
>> Alternatively, I could have used a dispatcher daemon separated from the 
>> streaming app that distribute files to various directories, each directory 
>> being associated with a flink source instance, and put the file reading & 
>> decoding directly in the source, but that seemed more complex to code and 
>> exploit than the filename source. Would it have been better from the 
>> checkpointing perspective?
>>  
>> About the ungraceful source sleep(), is there a way, programmatically, to 
>> know the “load” of the app, or to determine if checkpointing takes too much 
>> time, so that I can do it only on purpose?
>>  
>> Thanks,
>> Arnaud
>>  
>> De : zhijiang > <mailto:wangzhijiang...@aliyun.com>> 
>> Envoyé : vendredi 1 mars 2019 04:59
>> À : user mailto:user@flink.apache.org>>; LINZ, 
>> Arnaud mailto:al...@bouyguestelecom.fr>>
>> Objet : Re: Checkpoints and catch-up burst (heavy back pressure)
>>  
>> Hi Arnaud,
>>  
>> Thanks for the further feedbacks!
>>  
>> For option1: 40min still does not makes sense, which indicates it might take 
>> more time to finish checkpoint in your case. I also experienced some 
>> scenarios of catching up data to take several hours to finish one 
>> checkpoint. If the current checkpoint expires because of timeout, the next 
>> new triggered checkpoint might still be failed for timeout. So it seems 
>> better to wait the current checkpoint until finishes, not expires it, unless 
>> we can not bear this long time for some reasons such as wondering failover 
>> to restore more data during this time.
>>  
>> For option2: The default network setting should be make sense. The lower 
>> values might cause performance regression and the higher values would 
>> increase

Re: Checkpoints and catch-up burst (heavy back pressure)

2019-03-01 Thread Ken Krugler
; To:zhijiang mailto:wangzhijiang...@aliyun.com>>; 
> user mailto:user@flink.apache.org>>
> Subject:RE: Checkpoints and catch-up burst (heavy back pressure)
>  
> Update :
> Option  1 does not work. It still fails at the end of the timeout, no matter 
> its value.
> Should I implement a “bandwidth” management system by using artificial 
> Thread.sleep in the source depending on the back pressure ?
>  
> De : LINZ, Arnaud 
> Envoyé : jeudi 28 février 2019 15:47
> À : 'zhijiang'  <mailto:wangzhijiang...@aliyun.com>>; user  <mailto:user@flink.apache.org>>
> Objet : RE: Checkpoints and catch-up burst (heavy back pressure)
>  
> Hi Zhihiang,
>  
> Thanks for your feedback.
> I’ll try option 1 ; time out is 4min for now, I’ll switch it to 40min and 
> will let you know. Setting it higher than 40 min does not make much sense 
> since after 40 min the pending output is already quite large.
> Option 3 won’t work ; I already take too many ressources, and as my source is 
> more or less a hdfs directory listing, it will always be far faster than any 
> mapper that reads the file and emits records based on its content or sink 
> that store the transformed data, unless I put “sleeps” in it (but is this 
> really a good idea?)
> Option 2: taskmanager.network.memory.buffers-per-channel and 
> taskmanager.network.memory.buffers-per-gate are currently unset in my 
> configuration (so to their default of 2 and 8), but for this streaming app I 
> have very few exchanges between nodes (just a rebalance after the source that 
> emit file names, everything else is local to the node). Should I adjust their 
> values nonetheless ? To higher or lower values ?
> Best,
> Arnaud
> De : zhijiang  <mailto:wangzhijiang...@aliyun.com>> 
> Envoyé : jeudi 28 février 2019 10:58
> À : user mailto:user@flink.apache.org>>; LINZ, Arnaud 
> mailto:al...@bouyguestelecom.fr>>
> Objet : Re: Checkpoints and catch-up burst (heavy back pressure)
>  
> Hi Arnaud,
>  
> I think there are two key points. First the checkpoint barrier might be 
> emitted delay from source under high backpressure for synchronizing lock.
> Second the barrier has to be queued in flighting data buffers, so the 
> downstream task has to process all the buffers before barriers to trigger 
> checkpoint and this would take some time under back pressure.
>  
> There has three ways to work around:
> 1. Increase the checkpoint timeout avoid expire in short time.
> 2. Decrease the setting of network buffers to decrease the amount of 
> flighting buffers before barrier, you can check the config of  
> "taskmanager.network.memory.buffers-per-channel" and 
> "taskmanager.network.memory.buffers-per-gate".
> 3. Adjust the parallelism such as increasing it for sink vertex in order to 
> process source data faster, to avoid backpressure in some extent.
>  
> You could check which way is suitable for your scenario and may have a try.
>  
> Best,
> Zhijiang
> --
> From:LINZ, Arnaud mailto:al...@bouyguestelecom.fr>>
> Send Time:2019年2月28日(星期四) 17:28
> To:user mailto:user@flink.apache.org>>
> Subject:Checkpoints and catch-up burst (heavy back pressure)
>  
> Hello,
>  
> I have a simple streaming app that get data from a source and store it to 
> HDFS using a sink similar to the bucketing file sink. Checkpointing mode is 
> “exactly once”.
> Everything is fine on a “normal” course as the sink is faster than the 
> source; but when we stop the application for a while and then restart it, we 
> have a catch-up burst to get all the messages emitted in the meanwhile.
> During this burst, the source is faster than the sink, and all checkpoints 
> fail (time out) until the source has been totally caught up. This is annoying 
> because the sink does not “commit” the data before a successful checkpoint is 
> made, and so the app release all the “catch up” data as a atomic block that 
> can be huge if the streaming app was stopped for a while, adding an unwanted 
> stress to all the following hive treatments that use the data provided in 
> micro batches and to the Hadoop cluster.
>  
> How should I handle the situation? Is there something special to do to get 
> checkpoints even during heavy load?
>  
> The problem does not seem to be new, but I was unable to find any practical 
> solution in the documentation.
>  
> Best regards,
> Arnaud
>  
>  
>  
>  
>  
> 
> L'intégrité de ce message n'étant pas assurée sur internet, la société 
> expéditrice ne peut être tenue responsable de son contenu ni de ses pièces 
> jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous 
> n'êtes pas destinataire de ce message, merci de le détruire et d'avertir 
> l'expéditeur.
> 
> The integrity of this message cannot be guaranteed on the Internet. The 
> company that sent this message cannot therefore be held liable for its 
> content nor attachments. Any unauthorized use or dissemination is prohibited. 
> If you are not the intended recipient of this message, then please delete it 
> and notify the sender.

--
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



Re: Starting Flink cluster and running a job

2019-02-19 Thread Ken Krugler
lp)"
>> exit 0
>> elif [ "${CMD}" = "${JOB_MANAGER}" -o "${CMD}" = "${TASK_MANAGER}" ]; then
>> if [ "${CMD}" = "${TASK_MANAGER}" ]; then
>> 
>> TASK_MANAGER_NUMBER_OF_TASK_SLOTS=${TASK_MANAGER_NUMBER_OF_TASK_SLOTS:-$(grep
>>  -c ^processor /proc/cpuinfo)}
>> 
>> sed -i -e "s/jobmanager.rpc.address: 
>> localhost/jobmanager.rpc.address: ${JOB_MANAGER_RPC_ADDRESS}/g" 
>> "$FLINK_HOME/conf/flink-conf.yaml"
>> sed -i -e "s/taskmanager.numberOfTaskSlots: 
>> 1/taskmanager.numberOfTaskSlots: $TASK_MANAGER_NUMBER_OF_TASK_SLOTS/g" 
>> "$FLINK_HOME/conf/flink-conf.yaml"
>> echo "blob.server.port: 6124" >> "$FLINK_HOME/conf/flink-conf.yaml"
>> echo "query.server.port: 6125" >> "$FLINK_HOME/conf/flink-conf.yaml"
>> 
>> echo "Starting Task Manager"
>> echo "config file: " && grep '^[^\n#]' 
>> "$FLINK_HOME/conf/flink-conf.yaml"
>> exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" 
>> start-foreground
>> else
>>     sed -i -e "s/jobmanager.rpc.address: 
>> localhost/jobmanager.rpc.address: ${JOB_MANAGER_RPC_ADDRESS}/g" 
>> "$FLINK_HOME/conf/flink-conf.yaml"
>> echo "blob.server.port: 6124" >> "$FLINK_HOME/conf/flink-conf.yaml"
>> echo "query.server.port: 6125" >> "$FLINK_HOME/conf/flink-conf.yaml"
>> echo "config file: " && grep '^[^\n#]' 
>> "$FLINK_HOME/conf/flink-conf.yaml"
>> 
>> if [ -z "$1" ]; then
>>exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" 
>> start-foreground "$@"
>> else
>> exec $FLINK_HOME/bin/standalone-job.sh start-foreground "$@"
>> fi
>> fi
>> fi
>> 
>> exec "$@"
>> It does work for all the cases, except running standalone job.
>> The problem, the way I understand it, is a racing condition.
>> In kubernetes it takes several attempts for establish connection between Job 
>> and Task manager, while standalone-job.sh
>>  tries to start a job immediately once the cluster is created (before 
>> connection is established).
>> Is there a better option to implement it starting a job on container startup?
>>  
>> 
>> 
>> -- 
>> Konstantin Knauf | Solutions Architect
>> +49 160 91394525

--
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



Re: ElasticSearchSink - retrying doesn't work in ActionRequestFailureHandler

2019-02-13 Thread Ken Krugler
Hi Averell,

https://github.com/apache/flink/commit/35af99391dac431c85e30bcc98b89cba79bccfea#diff-51a12ea54593424e195dd5874309a08d
 
<https://github.com/apache/flink/commit/35af99391dac431c85e30bcc98b89cba79bccfea#diff-51a12ea54593424e195dd5874309a08d>

…is the commit where Gordon made his changes for FLINK-11046 
<https://issues.apache.org/jira/browse/FLINK-11046>.

The ElasticsearchFailureHandlerIndexer class was removed as part of the commit.

— Ken


> On Feb 13, 2019, at 4:46 PM, Averell  wrote:
> 
> Hi Ken,
> 
> Thanks for that. But I could not find the changes included in Gordon's
> mentioned pull request in the repository you gave me (e.g: the new class
> /ElasticsearchFailureHandlerIndexer/). 
> I have found this folder
> https://dist.apache.org/repos/dist/dev/flink/flink-1.7.2-rc1/, but it also
> doesn't have that new class.
> Maybe Gordon meant 1.7.2 rc2?
> 
> Thanks and regards,
> Averell
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

--
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



Re: ElasticSearchSink - retrying doesn't work in ActionRequestFailureHandler

2019-02-13 Thread Ken Krugler
Hi Averell,

You can get release candidates from the Apache release candidate maven repo. 
For 1.7.2, I think it’s in:

https://repository.apache.org/content/repositories/orgapacheflink-1206/ 
<https://repository.apache.org/content/repositories/orgapacheflink-1206/>

So just edit your pom.xml to add this repo to the  section.

— Ken

> On Feb 13, 2019, at 4:20 PM, Averell  wrote:
> 
> Hi Gordon,
> 
> Sorry for a noob question: How can I get the RC 1.7.2 build / code to build?
> I could not find any branch like that in Github.
> 
> Thanks and regards,
> Averell
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

--
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



Per-workflow configurations for an S3-related property

2019-02-08 Thread Ken Krugler
Hi all,

When running in EMR, we’re encountering the oh-so-common HTTP timeout that’s 
caused by the connection pool being too small (see below)

I’d found one SO answer  that said 
to bump fs.s3.maxConnections for the EMR S3 filesystem implementation.

I tried to change this for my failing job, via adding to the CLI command line:

-yD fs.s3.maxConnections=2000

But it didn’t seem to have any impact.

I haven’t dug into the Flink CLI code to figure out if -yD can actually be used 
to alter values like this (which come from emrfs-site.xml)

What’s the right way (on a per-job basis) to tweak something like this?

Thanks!

— Ken

==
2019-02-08 21:18:29,669 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Job Flink Java 
Job at Fri Feb 08 21:16:58 UTC 2019 (c361522b13121e263460364b1e38db9d) switched 
from state RUNNING to FAILING.
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.SdkClientException: Unable to 
execute HTTP request: Timeout waiting for connection from pool
at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1134)
at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1080)
at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:745)
at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:719)
at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:701)
at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:669)
at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:651)
at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:515)
at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4443)
at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4390)
at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1280)
at 
com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:22)
at 
com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:8)
at 
com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor.execute(GlobalS3Executor.java:91)
at 
com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:184)
at 
com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.getObjectMetadata(AmazonS3LiteClient.java:96)
at 
com.amazon.ws.emr.hadoop.fs.s3.lite.AbstractAmazonS3Lite.getObjectMetadata(AbstractAmazonS3Lite.java:38)
at 
com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:214)
at 
com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:758)
at 
com.amazon.ws.emr.hadoop.fs.EmrFileSystem.getFileStatus(EmrFileSystem.java:548)
at 
org.apache.hadoop.io.SequenceFile$Reader.(SequenceFile.java:1842)
at 
org.apache.hadoop.io.SequenceFile$Reader.(SequenceFile.java:1865)
at 
org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader.initialize(SequenceFileRecordReader.java:54)
at 
org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.open(HadoopInputFormatBase.java:187)
at 
org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.open(HadoopInputFormatBase.java:59)
at 
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:170)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by: 
com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.conn.ConnectionPoolTimeoutException:
 Timeout waiting for connection from pool
at 
com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager.leaseConnection(PoolingHttpClientConnectionManager.java:286)
at 
com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager$1.get(PoolingHttpClientConnectionManager.java:263)
at sun.reflect.GeneratedMethodAccessor68.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at 

Re: Regarding json/xml/csv file splitting

2019-02-04 Thread Ken Krugler
Normally parallel processing of text input files is handled via Hadoop 
TextInputFormat, which support splitting of files on line boundaries at 
(roughly) HDFS block boundaries.

There are various XML Hadoop InputFormats available, which try to sync up with 
splittable locations. The one I’ve used in the past 
<https://github.com/apache/mahout/blob/ad84344e4055b1e6adff5779339a33fa29e1265d/examples/src/main/java/org/apache/mahout/classifier/bayes/XmlInputFormat.java>
 is part of the Mahout project.

If each JSON record is on its own line, then you can just use a regular source, 
and parse it in a subsequent map function 

Otherwise you can still create a custom input format, as long as there’s some 
unique JSON that identifies the beginning/end of each record.

See 
https://stackoverflow.com/questions/18593595/custom-inputformat-for-reading-json-in-hadoop

And failing that, you can always build a list of file paths as your input, and 
then in your map function explicitly open/read each file and process it as you 
would any JSON file. In a past project where we had a similar requirement, the 
only interesting challenge was building N lists of files (for N mappers) where 
the sum of file sizes was roughly equal for each parallel map parser, as there 
was significant skew in the file sizes.

— Ken


> On Feb 4, 2019, at 9:12 AM, madan  wrote:
> 
> Hi,
> 
> Can someone please tell me how to split json/xml data file. Since they are 
> structured form (i.e., parent/child hierarchy), is it possible to split the 
> file and process in parallel with 2 or more instances of source operator ?
> Also please confirm if my understanding of csv splitting is correct as 
> mentioned below,
> 
> When used parallelism greater than 1, file will be split into equal parts 
> more or less and each operator instance will have respective start position 
> of file partition. There can be possibility that start position of file 
> partition can come in the middle of the delimited line as shown below. And 
> when file reading is started initial partial record will be ignored by 
> respective operator instance and reads full records which are coming 
> afterwards. ie.,
> # Operator1 reads emp1, emp2 records (reads emp2 since record's starting char 
> position fell in its reading range)
> # Operator2 ignores partial emp2 rec and reads emp3 and emp4
> # Operator3 ignores partial emp4 and reads emp5
> Record delimiter is used to skip partial record and identifying new record.
> 
> 
> 
> 
> 
> -- 
> Thank you,
> Madan.

--
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



Re: How to load Avro file in a Dataset

2019-01-27 Thread Ken Krugler
Hi Soheil,

I’ve used Avro in the past, but I’m no expert - so I could be missing something 
obvious here…

But if you don’t know any of the fields in the schema, then what processing 
would you do with the data in your Flink workflow?

— Ken

> On Jan 27, 2019, at 5:50 AM, Soheil Pourbafrani  wrote:
> 
> According to the Flink document, it's possible to load Avro file like the 
> following:
> AvroInputFormat users = new AvroInputFormat(in, User.class);
> DataSet usersDS = env.createInput(users);
> It's a bit confusing for me. I guess the User is a predefined class. My 
> question is can Flink detect the Avro file schema automatically? How can I 
> load Avro file without any predefined class?

----------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



Re: Best pattern for achieving stream enrichment (side-input) from a large static source

2019-01-26 Thread Ken Krugler

> On Jan 26, 2019, at 1:08 PM, Nimrod Hauser  
> wrote:
> 
> Hey Ken, 
> 
> Thank you for your quick response! That definitely sounds like something 
> worth exploring.
> Just a few more small questions, if that's ok.
> 
> 1. You referred to the parquet source as a "stream", but what we have is a 
> static data-source which we will always want to "query" against .
> What we thought about doing is to stream the entire parquet dataset and 
> load it into our state. 
> Does that sound right, or is that "hacky”?

Not sure what you mean by “stream the entire parquet dataset”. Do you mean 
you’d load it into memory yourself, and then distribute it?

If so, then yes you can do that, but you’d have to obviously re-load it 
yourself, and partition it (since it’s also keyed, right?) yourself, etc.

> 2. Can the continuousFileMonitoringFunction be used to track an entire 
> directory of parquet files?

Yes.

> Also, we'd like it to refresh its' state (= its' internal data structures) 
> every time the parquet folder is updated, but only after all new files have 
> been written (meaning, we'll need it to run once an update has been detected, 
> but not right away)
> Is that a reasonable use-case?

It’s a reasonable use case, but it precludes using the 
ContinuousFileMonitoringFunction.

You can write a custom SourceFunction, but where it gets tricky is handling 
failure recovery (checkpointing).

But I should also have mentioned the fundamental issue with this kind of 
enrichment in Flink - you can’t control the ordering of the two streams 
(easily), so you have to be prepared to buffer data from Kafka until you’ve got 
a complete set of data from Parquet.

We’d worked around a similar issue with a UnionedSources 
<https://github.com/ScaleUnlimited/flink-streaming-kmeans/blob/master/src/main/java/com/scaleunlimited/flinksources/UnionedSources.java>
 source function, but I haven’t validated that it handles checkpointing 
correctly.

— Ken

 
> 
> And thank you once again.
> 
> Nimrod.
> 
> On Sat, Jan 26, 2019 at 7:10 PM Ken Krugler  <mailto:kkrugler_li...@transpac.com>> wrote:
> Hi Nimrod,
> 
> One approach is as follows…
> 
> 1. For the Parquet data, you could use a ContinuousFileMonitoringFunction 
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.7_api_java_org_apache_flink_streaming_api_functions_source_ContinuousFileMonitoringFunction.html=DwMFaQ=euGZstcaTDllvimEN8b7jXrwqOf-v5A_CdpgnVfiiMM=ZiJRxaWlmZ09uE1VLnEG3ryBI3b9mAkVojy2QaG8EaA=5NC-xiUI1cQNX_73Zvaja-CGYF-QhEQTh-Z7XZrbE6U=9kusX9KP5vzip4W2BZxGO0-yceK2XsdSh4n8p3xpGW8=>
>  to generate a stream of enrichment records.
> 
> 2. Then use a CoMapFunction to “merge” the two connected streams (Kafka and 
> Parquet), and output something like Tuple2>
> 
> 3. In your enrichment function, based on what’s in the Either<> you’re either 
> updating your enrichment state, or processing a record from Kafka.
> 
> But I think you might want to add a stateful function in the Parquet stream, 
> so that you can separately track Kafka record state in the enrichment 
> function.
> 
> There’s also the potential issue of wanting to buffer Kafka data in the 
> enrichment function, if you need to coordinate with the enrichment data (e.g. 
> you need to get a complete set of updated enrichment data before applying any 
> of it to the incoming Kafka data).
> 
> — Ken
> 
> 
> 
>> On Jan 26, 2019, at 8:04 AM, Nimrod Hauser > <mailto:nimrod.hau...@bluevoyant.com>> wrote:
>> 
>> Hello,
>> 
>> We're using Flink on a high velocity data-stream, and we're looking for the 
>> best way to enrich our stream using a large static source (originating from 
>> Parquet files, which are rarely updated).
>> 
>> The source for the enrichment weights a few GBs, which is why we want to 
>> avoid using techniques such as broadcast streams, which cannot be keyed and 
>> need to be duplicated for every Flink operator that is used.
>> 
>> We started looking into the possibility of merging streams with datasets, or 
>> using the Table API, but any best-practice that's been done before will be 
>> greatly appreciated.
>> 
>> I'm attaching a simple chart for convenience,
>> 
>> Thanks you very much,
>> 
>> Nimrod.
>> 

--
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



Re: Best pattern for achieving stream enrichment (side-input) from a large static source

2019-01-26 Thread Ken Krugler
Hi Nimrod,

One approach is as follows…

1. For the Parquet data, you could use a ContinuousFileMonitoringFunction 
<https://ci.apache.org/projects/flink/flink-docs-release-1.7/api/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.html>
 to generate a stream of enrichment records.

2. Then use a CoMapFunction to “merge” the two connected streams (Kafka and 
Parquet), and output something like Tuple2>

3. In your enrichment function, based on what’s in the Either<> you’re either 
updating your enrichment state, or processing a record from Kafka.

But I think you might want to add a stateful function in the Parquet stream, so 
that you can separately track Kafka record state in the enrichment function.

There’s also the potential issue of wanting to buffer Kafka data in the 
enrichment function, if you need to coordinate with the enrichment data (e.g. 
you need to get a complete set of updated enrichment data before applying any 
of it to the incoming Kafka data).

— Ken



> On Jan 26, 2019, at 8:04 AM, Nimrod Hauser  
> wrote:
> 
> Hello,
> 
> We're using Flink on a high velocity data-stream, and we're looking for the 
> best way to enrich our stream using a large static source (originating from 
> Parquet files, which are rarely updated).
> 
> The source for the enrichment weights a few GBs, which is why we want to 
> avoid using techniques such as broadcast streams, which cannot be keyed and 
> need to be duplicated for every Flink operator that is used.
> 
> We started looking into the possibility of merging streams with datasets, or 
> using the Table API, but any best-practice that's been done before will be 
> greatly appreciated.
> 
> I'm attaching a simple chart for convenience,
> 
> Thanks you very much,
> 
> Nimrod.
> 

--
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



Re: Issue with counter metrics for large number of keys

2019-01-16 Thread Ken Krugler
Hi Gaurav,

I’ve use a few hundred counters before without problems. My concern about > 
100K unique counters is that you wind up generating load (and maybe memory 
issues) for the JobManager.

E.g. with Hadoop’s metric system trying to go much beyond 1000 counters could 
cause significant problems. IIRC it was due to the JobTracker getting bogged 
down processing too many counter updates, and/or running out of memory. It’s 
possible more recent versions of Hadoop no longer have that problem.

But someone on the Flink dev team should weigh in here…

— Ken


> On Jan 16, 2019, at 7:45 PM, Gaurav Luthra  wrote:
> 
> Thanks a lot Ken for your inputs.
> 
> I will look for your suggested solution and will update about this.
> Moreover I want to know, what is the approx number of counter metrics for 
> which I should keep the reference of?
> Or what is the max number of references of counter metrics you have heard 
> from anyone using metrics?
> 
> Thanks & Regards
> Gaurav Luthra
> Mob:- +91-9901945206
> 
> 
> On Thu, Jan 17, 2019 at 9:04 AM Ken Krugler  <mailto:kkrugler_li...@transpac.com>> wrote:
> I think trying to use counters to track counts for 100K+ keys is not going to 
> be a good idea.
> 
> An alternative is to have a small function with managed MapState 
> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#using-managed-keyed-state>,
>  and make that state queryable 
> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html#queryable-state-beta>
> 
> Though maybe under the hood that’s what metrics is doing anyway :)
> 
> — Ken
> 
> 
>> On Jan 16, 2019, at 7:25 PM, Gaurav Luthra > <mailto:gauravluthra6...@gmail.com>> wrote:
>> 
>> I want new counter for every key of my windowed stream, And I want the same 
>> counter to get increment when the same key comes multiple times in incoming 
>> event.
>> 
>> So, I will write below code for every incoming event.
>> getRuntimeContext().getMetricGroup().counter(myKey).inc();
>> 
>> But above code fails when same value of myKey comes. As this is limitation 
>> of flink metrics. It throws exception, "Name collision: Adding a metric with 
>> the same name as a metric subgroup: "
>> 
>> Note: myKey is of String type and can have different value for every 
>> incoming event.
>> In my case I expect 100 thousands different values of myKey.
>> 
>> Now, To solve the issue, I have to keep the reference of 100 thousands 
>> values of myKey in some data structure e.g.
>>  Map myMetricMap; 
>>  and for every myKey I have to do below.
>> 
>> Counter counter = myMetricMap.get(myKey);
>> if (null == windowMetricGauge)
>> {
>> Counter counter = new counter();
>>  counter.inc();
>> myMetricMap.put(myKey, counter);
>> getRuntimeContext().getMetricGroup().counter(myKey,counter);
>> }
>>  else
>>  {
>>  counter.inc();
>>  }
>>  
>> Above code suffice my purpose. But I do not want to maintain map of 100 
>> thousands keys values of myKey.
>> 
>> Is there any alternate solution? I am looking for a solution where I achieve 
>> above functionality to maintain approx. 100 thousands counter metrics 
>> without keeping their reference in map (or any other data structure).
>> 
>> 
>> Thanks & Regards
>> Gaurav Luthra
>> Mob:- +91-9901945206
> 
> --
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
> Custom big data solutions & training
> Flink, Solr, Hadoop, Cascading & Cassandra
> 

--
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



Re: windowAll and AggregateFunction

2019-01-09 Thread Ken Krugler

> On Jan 9, 2019, at 3:10 PM, CPC  wrote:
> 
> Hi Ken,
> 
> From regular time-based windows do you mean keyed windows?

Correct. Without doing a keyBy() you would have a parallelism of 1.

I think you want to key on whatever you’re counting for unique values, so that 
each window operator gets a slice of the unique values.

— Ken

> On Wed, Jan 9, 2019, 10:22 PM Ken Krugler  <mailto:kkrugler_li...@transpac.com> wrote:
> Hi there,
> 
> You should be able to use a regular time-based window(), and emit the 
> HyperLogLog binary data as your result, which then would get merged in your 
> custom function (which you set a parallelism of 1 on).
> 
> Note that if you are generating unique counts per non-overlapping time 
> window, you’ll need to keep N HLL structures in each operator.
> 
> — Ken
> 
> 
>> On Jan 9, 2019, at 10:26 AM, CPC > <mailto:acha...@gmail.com>> wrote:
>> 
>> Hi Stefan,
>> 
>> Could i use "Reinterpreting a pre-partitioned data stream as keyed stream" 
>> feature for this? 
>> 
>> On Wed, 9 Jan 2019 at 17:50, Stefan Richter > <mailto:s.rich...@da-platform.com>> wrote:
>> Hi,
>> 
>> I think your expectation about windowAll is wrong, from the method 
>> documentation: “Note: This operation is inherently non-parallel since all 
>> elements have to pass through the same operator instance” and I also cannot 
>> think of a way in which the windowing API would support your use case 
>> without a shuffle. You could probably build the functionality by hand 
>> through, but I guess this is not quite what you want.
>> 
>> Best,
>> Stefan
>> 
>> > On 9. Jan 2019, at 13:43, CPC > > <mailto:acha...@gmail.com>> wrote:
>> > 
>> > Hi all,
>> > 
>> > In our implementation,we are consuming from kafka and calculating distinct 
>> > with hyperloglog. We are using windowAll function with a custom 
>> > AggregateFunction but flink runtime shows a little bit unexpected behavior 
>> > at runtime. Our sources running with parallelism 4 and i expect add 
>> > function to run after source calculate partial results and at the end of 
>> > the window i expect it to send 4 hll object to single operator to merge 
>> > there(merge function). Instead, it sends all data to single instance and 
>> > call add function there. 
>> > 
>> > Is here any way to make flink behave like this? I mean calculate partial 
>> > results after consuming from kafka with paralelism of sources without 
>> > shuffling(so some part of the calculation can be calculated in parallel) 
>> > and merge those partial results with a merge function?
>> > 
>> > Thank you in advance...
>> 
> 
> --
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
> Custom big data solutions & training
> Flink, Solr, Hadoop, Cascading & Cassandra
> 

--
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



Re: windowAll and AggregateFunction

2019-01-09 Thread Ken Krugler
Hi there,

You should be able to use a regular time-based window(), and emit the 
HyperLogLog binary data as your result, which then would get merged in your 
custom function (which you set a parallelism of 1 on).

Note that if you are generating unique counts per non-overlapping time window, 
you’ll need to keep N HLL structures in each operator.

— Ken


> On Jan 9, 2019, at 10:26 AM, CPC  wrote:
> 
> Hi Stefan,
> 
> Could i use "Reinterpreting a pre-partitioned data stream as keyed stream" 
> feature for this? 
> 
> On Wed, 9 Jan 2019 at 17:50, Stefan Richter  <mailto:s.rich...@da-platform.com>> wrote:
> Hi,
> 
> I think your expectation about windowAll is wrong, from the method 
> documentation: “Note: This operation is inherently non-parallel since all 
> elements have to pass through the same operator instance” and I also cannot 
> think of a way in which the windowing API would support your use case without 
> a shuffle. You could probably build the functionality by hand through, but I 
> guess this is not quite what you want.
> 
> Best,
> Stefan
> 
> > On 9. Jan 2019, at 13:43, CPC  > <mailto:acha...@gmail.com>> wrote:
> > 
> > Hi all,
> > 
> > In our implementation,we are consuming from kafka and calculating distinct 
> > with hyperloglog. We are using windowAll function with a custom 
> > AggregateFunction but flink runtime shows a little bit unexpected behavior 
> > at runtime. Our sources running with parallelism 4 and i expect add 
> > function to run after source calculate partial results and at the end of 
> > the window i expect it to send 4 hll object to single operator to merge 
> > there(merge function). Instead, it sends all data to single instance and 
> > call add function there. 
> > 
> > Is here any way to make flink behave like this? I mean calculate partial 
> > results after consuming from kafka with paralelism of sources without 
> > shuffling(so some part of the calculation can be calculated in parallel) 
> > and merge those partial results with a merge function?
> > 
> > Thank you in advance...
> 

--
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



Re: Iterations and back pressure problem

2018-12-24 Thread Ken Krugler
Hi Sergey,

As Andrey noted, it’s a known issue with (currently) no good solution.

I talk a bit about how we worked around it on slide 26 of my Flink Forward talk 
<https://www.slideshare.net/FlinkForward/flink-forward-san-francisco-2018-ken-krugler-building-a-scalable-focused-web-crawler-with-flink>
 on a Flink-based web crawler.

Basically we do some cheesy approximate monitoring of in-flight data, and 
throttle the key producer so that (hopefully) network buffers don’t fill up to 
the point of deadlock.

— Ken


> On Dec 24, 2018, at 8:46 AM, Andrey Zagrebin  wrote:
> 
> Hi Sergey,
> 
> It seems to be a known issue. Community will hopefully work on this but I do 
> not see more updates since the last answer to the similar question [1], see 
> also [2] and [3].
> 
> Best,
> Andrey
> 
> [1] 
> http://mail-archives.apache.org/mod_mbox/flink-user/201801.mbox/%3CBFD8C506-5B41-47D8-B735-488D03842051%40data-artisans.com%3E
>  
> <http://mail-archives.apache.org/mod_mbox/flink-user/201801.mbox/%3CBFD8C506-5B41-47D8-B735-488D03842051%40data-artisans.com%3E>
> [2] 
> http://mail-archives.apache.org/mod_mbox/flink-user/201801.mbox/%3CBFD8C506-5B41-47D8-B735-488D03842051%40data-artisans.com%3E
>  
> <http://mail-archives.apache.org/mod_mbox/flink-user/201801.mbox/%3CBFD8C506-5B41-47D8-B735-488D03842051%40data-artisans.com%3E>
> [3] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=66853132 
> <https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=66853132>
> On Mon, Dec 24, 2018 at 7:16 PM Sergei Poganshev  <mailto:s.pogans...@slice.com>> wrote:
> We've tried using iterations feature and in case of significant load the job 
> sometimes stalls and stops processing events due to high back pressure both 
> in tasks that produces records for iteration and all the other inputs to this 
> task. It looks like a back pressure loop the task can't handle all the 
> incoming records, iteration sink loops back into this task and also gets back 
> pressured. This is basically a "back pressure loop" which causes a complete 
> job stoppage.
> 
> Is there a way to mitigate this (to guarantee such issue does not occur)?

--
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



  1   2   >