Re: Kafka Per-Partition Watermarks

2018-10-04 Thread Taher Koitawala
Hey Andrew,
  We face the same problem in our application where some of the
kafka partitions are empty. In this case what we do is use the rebalance()
method on the source streams.

Ex: DataStream srcStream= Env.addSource(new
FlinkKafkaConsumer09<>(topic,Ser,props));
SrcStream.rebalance();

Since rebalance sends records all tasks in a round robin fashion we do not
have to worry about empty partitions and also about mapping our parallelism
to kafka partitions. After rebalance you can have your parallelism higher
than the number of kafka partition and windowing will work just fine.

Thanks,
Taher Koitawala

On Fri 5 Oct, 2018, 1:28 AM Andrew Kowpak, 
wrote:

> Hi all,
>
> I apologize if this has been discussed to death in the past, but, I'm
> finding myself very confused, and google is not proving helpful.
>
> Based on the documentation, I understand that if there are idle partitions
> in a kafka stream, watermarks will not advance for the entire application.
> I was hoping that by setting parallelism = the number of partitions that I
> would be able to work around the issue, but, this didn't work.  I'm totally
> willing to accept the fact that if I have idle partitions, my windowed
> partitions won't work, however, I would really like to understand why
> setting the parallelism didn't work.  If someone can explain, or perhaps
> point me to documentation or code, it would be very much appreciated.
>
> Thanks.
>
> --
> *Andrew Kowpak P.Eng* *Sr. Software Engineer*
> (519)  489 2688 | SSIMWAVE Inc.
> 402-140 Columbia Street West, Waterloo ON
>


Re: Streaming to Parquet Files in HDFS

2018-10-04 Thread Averell
Hi Fabian, Kostas,

>From the description of this ticket
https://issues.apache.org/jira/browse/FLINK-9753, I understand that now my
output parquet file with StreamingFileSink will span multiple checkpoints.
However, when I tried (as in the here below code snippet) I still see that
one "part-X-X" file is created after each checkpoint. Is there any other
configuration that I'm missing?

BTW, I have another question regarding
StreamingFileSink.BulkFormatBuilder.withBucketCheckInterval(). As per the
notes at the end of this page  StreamingFileSink

 
, buck-enconding can only combined with OnCheckpointRollingPolicy, which
rolls on every checkpoint. So setting that CheckInterval makes no
difference. So why should we expose that withBucketCheckInterval method?

Thanks and best regards,
Averell

def buildSink[T <: MyBaseRecord](outputPath: String)(implicit ct:
ClassTag[T]): StreamingFileSink[T] = {
StreamingFileSink.forBulkFormat(new Path(outputPath),
ParquetAvroWriters.forReflectRecord(ct.runtimeClass)).asInstanceOf[StreamingFileSink.BulkFormatBuilder[T,
String]]
.withBucketCheckInterval(5L * 60L * 1000L)
.withBucketAssigner(new 
DateTimeBucketAssigner[T]("-MM-dd--HH"))
.build()
}




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Kafka Per-Partition Watermarks

2018-10-04 Thread Elias Levy
Does your job perform a keyBy or broadcast that would result in data from
different partitions being distributed among tasks?  If so, then that would
be the cause.

On Thu, Oct 4, 2018 at 12:58 PM Andrew Kowpak 
wrote:

> Hi all,
>
> I apologize if this has been discussed to death in the past, but, I'm
> finding myself very confused, and google is not proving helpful.
>
> Based on the documentation, I understand that if there are idle partitions
> in a kafka stream, watermarks will not advance for the entire application.
> I was hoping that by setting parallelism = the number of partitions that I
> would be able to work around the issue, but, this didn't work.  I'm totally
> willing to accept the fact that if I have idle partitions, my windowed
> partitions won't work, however, I would really like to understand why
> setting the parallelism didn't work.  If someone can explain, or perhaps
> point me to documentation or code, it would be very much appreciated.
>
> Thanks.
>
> --
> *Andrew Kowpak P.Eng* *Sr. Software Engineer*
> (519)  489 2688 | SSIMWAVE Inc.
> 402-140 Columbia Street West, Waterloo ON
>


Kafka Per-Partition Watermarks

2018-10-04 Thread Andrew Kowpak
Hi all,

I apologize if this has been discussed to death in the past, but, I'm
finding myself very confused, and google is not proving helpful.

Based on the documentation, I understand that if there are idle partitions
in a kafka stream, watermarks will not advance for the entire application.
I was hoping that by setting parallelism = the number of partitions that I
would be able to work around the issue, but, this didn't work.  I'm totally
willing to accept the fact that if I have idle partitions, my windowed
partitions won't work, however, I would really like to understand why
setting the parallelism didn't work.  If someone can explain, or perhaps
point me to documentation or code, it would be very much appreciated.

Thanks.

-- 
*Andrew Kowpak P.Eng* *Sr. Software Engineer*
(519)  489 2688 | SSIMWAVE Inc.
402-140 Columbia Street West, Waterloo ON


Unable to start session cluster using Docker

2018-10-04 Thread Vinay Patil
Hi,

I have used the docker-compose file for creating the cluster as shown in
the documentation. The web ui is started successfully, however, the task
managers are unable to join.

Job Manager container logs:

018-10-04 18:13:13,907 INFO
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint- Rest
endpoint listening at cluster:8081

2018-10-04 18:13:13,907 INFO
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint-
http://cluster:8081 was granted leadership with
leaderSessionID=----

2018-10-04 18:13:13,907 INFO
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint- Web
frontend listening at http://cluster:8081

2018-10-04 18:13:14,012 INFO
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
ResourceManager akka.tcp://flink@cluster:6123/user/resourcemanager was
granted leadership with fencing token 

2018-10-04 18:13:14,013 INFO
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  -
Starting the SlotManager.

2018-10-04 18:13:14,026 INFO
org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Dispatcher
akka.tcp://flink@cluster:6123/user/dispatcher was granted leadership with
fencing token ----

Not sure why it says Web Frontend listening at cluster:8081 when the job
manager rpc address is specified to jobmanager

Task Manager Container Logs:

018-10-04 18:19:18,818 INFO
org.apache.flink.runtime.taskexecutor.TaskExecutor- Connecting
to ResourceManager akka.tcp://flink@jobmanager
:6123/user/resourcemanager().

2018-10-04 18:19:18,818 INFO  org.apache.flink.runtime.filecache.FileCache
- User file cache uses directory
/tmp/flink-dist-cache-1bd95c51-3031-42ab-b782-14a0023921e5

2018-10-04 18:19:28,850 INFO
org.apache.flink.runtime.taskexecutor.TaskExecutor- Could not
resolve ResourceManager address
akka.tcp://flink@jobmanager:6123/user/resourcemanager,
retrying in 1 ms: Ask timed out on
[ActorSelection[Anchor(akka.tcp://flink@jobmanager:6123/),
Path(/user/resourcemanager)]] after [1 ms]. Sender[null] sent message
of type "akka.actor.Identify".


I have even tried to set JOB_MANAGER_RPC_ADDRESS=cluster in   in
docker-compose file, it does not work.
Even "cluster" and "jobmanager" points to localhost in /etc/hosts file.

Can you please let me know what is the issue here.

Regards,
Vinay Patil


Re: HDFS HA issue in Flink 1.4 caused by failing to use fs.hdfs.hadoopconf in Flink conf

2018-10-04 Thread Yan Yan
Hi Aljoscha,

Thanks for looking into this! Yes, we toggled Flink 1.4 back to Flink 1.3.2
and it works. So seems truly a feature disparity between 1.3.2 and 1.4.

Best,
Yan


On Thu, Oct 4, 2018 at 6:36 AM Aljoscha Krettek  wrote:

> Another thing: when you retry this again with Flink 1.3.2 it works? I'm
> trying to rule out another problem in the setup.
>
>
> On 4. Oct 2018, at 15:17, Aljoscha Krettek  wrote:
>
> Hi Yan,
>
> This seems to be a bug in the FileSystems and how they're initialized. I'm
> looking into this myself but I'm also looping in Stephan and Stefan how
> have worked on this the most in the past. Maybe they have some valuable
> input.
>
> Best,
> Aljoscha
>
>
> On 4. Oct 2018, at 01:18, Yan Yan  wrote:
>
> Hi,
>
> We recently bumped to Flink 1.4 from 1.3.2, and found out an issue on HDFS
> configuration.
>
> We are using *FlinkYarnSessionCli* to start the cluster and submit job.
>
> In 1.3.2, we set below Flink properties when using checkpoints:
> state.backend.fs.checkpointdir = hdfs://nameservice0/.../..
> state.checkpoints.dir = hdfs://nameservice0/../..
>
> The mapping between logical nameservice (nameservice0) and actual
> namenodes hostports are passed to Flink via *yarnship/core-site.xml *(by
> providing the -yt option), and set fs.hdfs.hadoopconf=yarnship/
>
> However, we encountered below error after bumping to 1.4, which caused the
> checkpointing to fail.
>
> 2018-09-20 01:01:00.041 [yarn-jobmanager-io-thread-18] WARN  
> org.apache.flink.runtime.checkpoint.OperatorSubtaskState  - Error while 
> discarding operator states.
> java.io.IOException: Cannot instantiate file system for URI: 
> hdfs://nameservice0/app/athena_checkpoint/rt_data/gairos-athena-flink-demand-processor-opticclient-flink-job/flink/checkpoints/ckpt_1537402213581/data/ebb30a2d3b26e4d63682538a9bcdc752/chk-3204/da1d44d3-d3eb-4e57-9145-bdf30c96993a
> at 
> org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187)
> at 
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)
> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
> at 
> org.apache.flink.runtime.state.filesystem.FileStateHandle.getFileSystem(FileStateHandle.java:109)
> at 
> org.apache.flink.runtime.state.filesystem.FileStateHandle.discardState(FileStateHandle.java:81)
> at 
> org.apache.flink.runtime.state.OperatorStateHandle.discardState(OperatorStateHandle.java:65)
> at 
> org.apache.flink.util.LambdaUtil.applyToAllWhileSuppressingExceptions(LambdaUtil.java:51)
> at 
> org.apache.flink.runtime.state.StateUtil.bestEffortDiscardAllStateObjects(StateUtil.java:53)
> at 
> org.apache.flink.runtime.checkpoint.OperatorSubtaskState.discardState(OperatorSubtaskState.java:207)
> at 
> org.apache.flink.runtime.checkpoint.OperatorState.discardState(OperatorState.java:108)
> at 
> org.apache.flink.util.LambdaUtil.applyToAllWhileSuppressingExceptions(LambdaUtil.java:51)
> at 
> org.apache.flink.runtime.state.StateUtil.bestEffortDiscardAllStateObjects(StateUtil.java:53)
> at 
> org.apache.flink.runtime.checkpoint.PendingCheckpoint$1.run(PendingCheckpoint.java:530)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.IllegalArgumentException: java.net.UnknownHostException: 
> nameservice0
> at 
> org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:378)
> at 
> org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:310)
> at 
> org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
> at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:678)
> at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:619)
> at 
> org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:149)
> at 
> org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:159)
> ... 15 common frames omitted
> Caused by: java.net.UnknownHostException: nameservice0
> ... 22 common frames omitted
>
>
> It does not recognize nameservice0 because the *core-site.xml *on the
> actual machine (read in by Flink via $HADOOP_CONF_DIR) does not use
> nameservice0 but something else for the *fs.defaultFs*
>
> Digging a little bit, I found that the *hadoopConfig* (code
> )
>  does
> not have the properties we set via *yarnship/core-site.xml*. Especially,
> I suspect it is due to the cached *HadoopFsFactory* is initialized with
> an dummy Configuration (code
> ),
> which prevents future 

[deserialization schema] skip data, that couldn't be properly deserialized

2018-10-04 Thread Rinat
Hi mates, in accordance with the contract of 
org.apache.flink.formats.avro.DeserializationSchema, it should return null 
value, when content couldn’t be deserialized.
But in most cases (for example 
org.apache.flink.formats.avro.AvroDeserializationSchema) method fails if data 
is corrupted. 

We’ve implemented our own SerDe class, that returns null, if data doesn’t 
satisfy avro schema, but it’s rather hard to maintain this functionality during 
migration to the latest Flink version.
What do you think, maybe it’ll be useful if we will support optional skip of 
failed records in avro and other Deserializers in the source code ?

Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

email: r.shari...@cleverdata.ru 
mobile: +7 (925) 416-37-26

CleverDATA
make your data clever



Re: Rowtime for Table from DataStream without explixit fieldnames

2018-10-04 Thread Johannes Schulte
Thanks for the answer Dawid and the helper function, Timo

it's not too bad for my use case (small number of fields), I just wanted to
make sure I am not missing something here.

Cheers,

Johannes

On Thu, Oct 4, 2018 at 5:07 PM Timo Walther  wrote:

> Hi Johannes,
>
> this is not supported so far. You could write a little helper method like
> the following:
>
> val s: Seq[Expression] = 
> Types.of[WC].asInstanceOf[CaseClassTypeInfo[WC]].fieldNames.map(Symbol(_).toExpr)
> val s2: Seq[Expression] = s :+ 'rowtime.rowtime
>
> tEnv.fromDataSet(input, s2: _*)
>
>
> Not a very nice solution, but it should work.
>
> Regards,
> Timo
>
> Am 04.10.18 um 15:40 schrieb Dawid Wysakowicz:
>
> Hi Johannes,
>
> I am afraid that this is currently not possible and indeed you have to
> pass all fields again, but Timo cced might want to correct me if I am wrong.
>
> Best,
>
> Dawid
>
>
> On 04/10/18 15:08, Johannes Schulte wrote:
>
> Hi,
>
> when converting a DataStream (with Watermarks) to a table like
> described here
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/streaming.html#event-time
>
> I wonder on how to use the rowtime in a following window operation
> _without_ explicitly specifying all field names and hence rely on case
> class type inference.
>
> Currently when operating on a stream of events
>
> case class Event(field1: String, ts: long)
>
> val ds: DataStream[Event] = ...
>
> I have to do
>
> tableEnv.fromDataStream(ds, 'field1, 'ts, 'myRowtime.rowtime)
>
> to do
>
> .window(Tumble over 1.hours on 'myRowtime  as 'w)
>
> afterwards. Is there a way to create the TimeAttribute column without
> specifiying all fields again?
>
> Thanks for yout help,
>
> Johannes
>
>
>
>


Re: Data loss when restoring from savepoint

2018-10-04 Thread Stefan Richter
Hi,

> Am 04.10.2018 um 16:08 schrieb Juho Autio :
> 
> > you could take a look at Bravo [1] to query your savepoints and to check if 
> > the state in the savepoint complete w.r.t your expectations
> 
> Thanks. I'm not 100% if this is the case, but to me it seemed like the missed 
> ids were being logged by the reducer soon after the job had started (after 
> restoring a savepoint). But on the other hand, after that I also made another 
> savepoint & restored that, so what I could check is: does that next savepoint 
> have the missed ids that were logged (a couple of minutes before the 
> savepoint was created, so there should've been more than enough time to add 
> them to the state before the savepoint was triggered) or not. Any way, if I 
> would be able to verify with Bravo that the ids are missing from the 
> savepoint (even though reduced logged that it saw them), would that help in 
> figuring out where they are lost? Is there some major difference compared to 
> just looking at the final output after window has been triggered?


I think that makes a difference. For example, you can investigate if there is a 
state loss or a problem with the windowing. In the savepoint you could see 
which keys exists and to which windows they are assigned. Also just to make 
sure there is no misunderstanding: only elements that are in the state at the 
start of a savepoint are expected to be part of the savepoint; all elements 
between start and completion of the savepoint are not expected to be part of 
the savepoint.

> 
> > I also doubt that the problem is about backpressure after restore, because 
> > the job will only continue running after the state restore is already 
> > completed.
> 
> Yes, I'm not suspecting that the state restoring would be the problem either. 
> My concern was about backpressure possibly messing with the updates of 
> reducing state? I would tend to suspect that updating the state consistently 
> is what fails, where heavy load / backpressure might be a factor.


How would you assume that backpressure would influence your updates? Updates to 
each local state still happen event-by-event, in a single reader/writing thread.

> 
> On Thu, Oct 4, 2018 at 4:18 PM Stefan Richter  > wrote:
> Hi,
> 
> you could take a look at Bravo [1] to query your savepoints and to check if 
> the state in the savepoint complete w.r.t your expectations. I somewhat doubt 
> that there is a general problem with the state/savepoints because many users 
> are successfully running it on a large state and I am not aware of any data 
> loss problems, but nothing is impossible. What the savepoint does is also 
> straight forward: iterate a db snapshot and write all key/value pairs to 
> disk, so all data that was in the db at the time of the savepoint, should 
> show up. I also doubt that the problem is about backpressure after restore, 
> because the job will only continue running after the state restore is already 
> completed. Did you check if you are using exactly-one-semantics or 
> at-least-once semantics? Also did you check that the kafka consumer start 
> position is configured properly [2]? Are watermarks generated as expected 
> after restore?
> 
> One more unrelated high-level comment that I have: for a granularity of 24h 
> windows, I wonder if it would not make sense to use a batch job instead?
> 
> Best,
> Stefan
> 
> [1] https://github.com/king/bravo 
> [2] 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-start-position-configuration
>  
> 
> 
>> Am 04.10.2018 um 14:53 schrieb Juho Autio > >:
>> 
>> Thanks for the suggestions!
>> 
>> > In general, it would be tremendously helpful to have a minimal working 
>> > example which allows to reproduce the problem.
>> 
>> Definitely. The problem with reproducing has been that this only seems to 
>> happen in the bigger production data volumes.
>> 
>> That's why I'm hoping to find a way to debug this with the production data. 
>> With that it seems to consistently cause some misses every time the job is 
>> killed/restored.
>> 
>> > check if it happens for shorter windows, like 1h etc
>> 
>> What would be the benefit of that compared to 24h window?
>> 
>> > simplify the job to not use a reduce window but simply a time window which 
>> > outputs the window events. Then counting the input and output events 
>> > should allow you to verify the results. If you are not seeing missing 
>> > events, then it could have something to do with the reducing state used in 
>> > the reduce function.
>> 
>> Hm, maybe, but not sure how useful that would be, because it wouldn't yet 
>> prove that it's related to reducing, because not having a reduce function 
>> could also mean smaller load on the job, which 

[ANNOUNCE] Weekly community update #40

2018-10-04 Thread Till Rohrmann
Dear community,

this is the weekly community update thread #40. Please post any news and
updates you want to share with the community to this thread.

# Discussing feature freeze for Flink 1.7

The community is currently discussing the feature freeze for Flink 1.7 [1].
The 22nd of October is currently proposed.

# Discussing dropping flink-storm

The community is currently discussing to drop Flink's Storm compatibility
because it heavily relies on Flink's legacy mode [2].

# Flip-23: Model serving

There is again some activity for Flip-23 (model serving) [3]. If you want
to help with this effort, then please join the mailing list discussion.

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Feature-freeze-for-Flink-1-7-tp24378.html
[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Dropping-flink-storm-tp24376.html
[3]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-23-Model-Serving-tp20260.html

Cheers,
Till


Identifying missing events in keyed streams

2018-10-04 Thread Averell
Hi everyone, 

I have a keyed stream which is expecting events every fixed interval (let's
say 1 minute). I want to raise alarms for any key which has received no
events in n-periods. What should be the cheapest way (in term of performance
) to do this?
I thought of some solutions, but don't know which one is the best:
1. Sliding window then count the number of events in each window <<< this
seems quite expensive when n is big.
2. Register a timer for every single event, record the last event timestamp
and check that timestamp when the timer expires. (This would be the best if
there's an option to cancel/modify a timer, but it seems that feature is not
available yet)
3. Session window: i haven't implemented this to verify its feasibility.
Thinking of firing the alarm on every window clear event.
4. CEP. I don't know whether it's possible or not. Haven't found a guide for
defining patterns of missing events.

Could you please give some advices?

Thanks and best regards, 
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Data loss when restoring from savepoint

2018-10-04 Thread Juho Autio
> you could take a look at Bravo [1] to query your savepoints and to check
if the state in the savepoint complete w.r.t your expectations

Thanks. I'm not 100% if this is the case, but to me it seemed like the
missed ids were being logged by the reducer soon after the job had started
(after restoring a savepoint). But on the other hand, after that I also
made another savepoint & restored that, so what I could check is: does that
next savepoint have the missed ids that were logged (a couple of minutes
before the savepoint was created, so there should've been more than enough
time to add them to the state before the savepoint was triggered) or not.
Any way, if I would be able to verify with Bravo that the ids are missing
from the savepoint (even though reduced logged that it saw them), would
that help in figuring out where they are lost? Is there some major
difference compared to just looking at the final output after window has
been triggered?

> I also doubt that the problem is about backpressure after restore,
because the job will only continue running after the state restore is
already completed.

Yes, I'm not suspecting that the state restoring would be the problem
either. My concern was about backpressure possibly messing with the updates
of reducing state? I would tend to suspect that updating the state
consistently is what fails, where heavy load / backpressure might be a
factor.

> One more unrelated high-level comment that I have: for a granularity of
24h windows, I wonder if it would not make sense to use a batch job instead?

We already have a batch version that's in use currently. The reason for
trying to have this as a stream job is to minimize the delay of the
results. But we can't afford missing anything in this case, and Flink
should be able to handle that – please correct me if this is a wrong
assumption.

> Did you check if you are using exactly-one-semantics or at-least-once
semantics?

I couldn't even find how at-least-once would be set in Flink. We must be
using exactly-once, as it's the default. At least once would be ok for this
particular job, but for some other jobs maybe not, so I'd rather get to the
bottom of this problem without switching to at-least-once configuration.

> Also did you check that the kafka consumer start position is configured
properly [2]?

I wonder what you mean by configuring properly? Because when restoring we
don't set it at all. Well, the default is "no reset", obviously. We just
create FlinkKafkaConsumer and let it restore offsets from the savepoint
state. Originally the job has been started with
consumer.setStartFromLatest(), but that shouldn't really matter now.

> Are watermarks generated as expected after restore?

Watermarks are generated normally as far as I can see. If they would be
stuck, for example, the window wouldn't be triggered eventually. Maybe I'm
missing what the question is about(?).

Thank you once more for helping with this!

On Thu, Oct 4, 2018 at 4:18 PM Stefan Richter 
wrote:

> Hi,
>
> you could take a look at Bravo [1] to query your savepoints and to check
> if the state in the savepoint complete w.r.t your expectations. I somewhat
> doubt that there is a general problem with the state/savepoints because
> many users are successfully running it on a large state and I am not aware
> of any data loss problems, but nothing is impossible. What the savepoint
> does is also straight forward: iterate a db snapshot and write all
> key/value pairs to disk, so all data that was in the db at the time of the
> savepoint, should show up. I also doubt that the problem is about
> backpressure after restore, because the job will only continue running
> after the state restore is already completed. Did you check if you are
> using exactly-one-semantics or at-least-once semantics? Also did you check
> that the kafka consumer start position is configured properly [2]? Are
> watermarks generated as expected after restore?
>
> One more unrelated high-level comment that I have: for a granularity of
> 24h windows, I wonder if it would not make sense to use a batch job instead?
>
> Best,
> Stefan
>
> [1] https://github.com/king/bravo
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-start-position-configuration
>
> Am 04.10.2018 um 14:53 schrieb Juho Autio :
>
> Thanks for the suggestions!
>
> > In general, it would be tremendously helpful to have a minimal working
> example which allows to reproduce the problem.
>
> Definitely. The problem with reproducing has been that this only seems to
> happen in the bigger production data volumes.
>
> That's why I'm hoping to find a way to debug this with the production
> data. With that it seems to consistently cause some misses every time the
> job is killed/restored.
>
> > check if it happens for shorter windows, like 1h etc
>
> What would be the benefit of that compared to 24h window?
>
> > simplify the job to not use a reduce window but simply a time 

Re: Rowtime for Table from DataStream without explixit fieldnames

2018-10-04 Thread Dawid Wysakowicz
Hi Johannes,

I am afraid that this is currently not possible and indeed you have to
pass all fields again, but Timo cced might want to correct me if I am wrong.

Best,

Dawid


On 04/10/18 15:08, Johannes Schulte wrote:
> Hi,
>
> when converting a DataStream (with Watermarks) to a table like
> described here
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/streaming.html#event-time
>
> I wonder on how to use the rowtime in a following window operation
> _without_ explicitly specifying all field names and hence rely on case
> class type inference.
>
> Currently when operating on a stream of events 
>
> case class Event(field1: String, ts: long)
>
> val ds: DataStream[Event] = ...
>
> I have to do 
>
> tableEnv.fromDataStream(ds, 'field1, 'ts, 'myRowtime.rowtime) 
>
> to do
>
> .window(Tumble over 1.hours on 'myRowtime  as 'w)
>
> afterwards. Is there a way to create the TimeAttribute column without
> specifiying all fields again?
>
> Thanks for yout help,
>
> Johannes




signature.asc
Description: OpenPGP digital signature


Re: Using FlinkKinesisConsumer through a proxy

2018-10-04 Thread Tzu-Li (Gordon) Tai
Hi,

Since Flink 1.5, you should be able to set all available configurations on
the ClientConfiguration through the consumer Properties (see FLINK-9188
[1]).

The way to do that would be to prefix the configuration you want to set
with "aws.clientconfig" and add that to the properties, as such:

```
Properties kinesisConsumerProps = new Properties();
kinesisConsumerProps.setProperty("aws.clientconfig.proxyHost", ...);
kinesisConsumerProps.setProperty("aws.clientconfig.proxyPort", ...);
kinesisConsumerProps.setProperty("aws.clientconfig.proxyUsert", ...);
...
```

Could you try that out and see if it works for you?

I've also realized that this feature isn't documented very well, and have
opened a ticket for that [2].

Cheers,
Gordon

[1] https://issues.apache.org/jira/browse/FLINK-9188
[2] https://issues.apache.org/jira/browse/FLINK-10492

On Thu, Oct 4, 2018, 7:57 PM Aljoscha Krettek  wrote:

> Hi,
>
> I'm looping in Gordon and Thomas, they might have some idea about how to
> resolve this.
>
> Best,
> Aljoscha
>
> On 3. Oct 2018, at 17:29, Vijay Balakrishnan  wrote:
>
> I have been trying with all variations  to no avail of java
> -Dhttp.nonProxyHosts=..  -Dhttps.proxyHost=http://...
> -Dhttps.proxyPort=911 -Dhttps.proxyUser= -Dhttps.proxyPassword=..
> -Dhttp.proxyHost=http://.. -Dhttp.proxyPort=911 -Dhttp.proxyUser=...
> -Dhttp.proxyPassword=... -jar .. after looking at the code in
> com.amazonaws.ClientConfiguration
>
> On Tue, Oct 2, 2018 at 3:49 PM Vijay Balakrishnan 
> wrote:
>
>> HI,
>> How do I use FlinkKinesisConsumer using the Properties through a proxy ?
>> Getting a Connection issue through the proxy.
>> Works outside the proxy.
>>
>> Properties kinesisConsumerConfig = new Properties();
>> kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_REGION,
>> region);
>>
>> if (local) {
>>
>> kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID,
>> accessKey);
>>
>> kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY,
>> secretKey);
>> } else {
>>
>> kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER,
>> "AUTO");
>> }
>>
>> //only for Consumer
>>
>> kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX,
>> "1");
>>
>> kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
>> "2000");
>> FlinkKinesisConsumer>
>> kinesisConsumer = new FlinkKinesisConsumer<>(
>> "kinesisTopicRead", new Tuple2KinesisSchema(),
>> kinesisConsumerConfig);
>> TIA
>>
>
>


Re: HDFS HA issue in Flink 1.4 caused by failing to use fs.hdfs.hadoopconf in Flink conf

2018-10-04 Thread Aljoscha Krettek
Another thing: when you retry this again with Flink 1.3.2 it works? I'm trying 
to rule out another problem in the setup.

> On 4. Oct 2018, at 15:17, Aljoscha Krettek  wrote:
> 
> Hi Yan,
> 
> This seems to be a bug in the FileSystems and how they're initialized. I'm 
> looking into this myself but I'm also looping in Stephan and Stefan how have 
> worked on this the most in the past. Maybe they have some valuable input.
> 
> Best,
> Aljoscha
> 
> 
>> On 4. Oct 2018, at 01:18, Yan Yan > > wrote:
>> 
>> Hi,
>> 
>> We recently bumped to Flink 1.4 from 1.3.2, and found out an issue on HDFS 
>> configuration.
>> 
>> We are using FlinkYarnSessionCli to start the cluster and submit job.
>> 
>> In 1.3.2, we set below Flink properties when using checkpoints:
>> state.backend.fs.checkpointdir = hdfs://nameservice0/ 
>> .../..
>> state.checkpoints.dir = hdfs://nameservice0/ ../..
>> 
>> The mapping between logical nameservice (nameservice0) and actual namenodes 
>> hostports are passed to Flink via yarnship/core-site.xml (by providing the 
>> -yt option), and set fs.hdfs.hadoopconf=yarnship/
>> 
>> However, we encountered below error after bumping to 1.4, which caused the 
>> checkpointing to fail.
>> 
>> 2018-09-20 01:01:00.041 [yarn-jobmanager-io-thread-18] WARN  
>> org.apache.flink.runtime.checkpoint.OperatorSubtaskState  - Error while 
>> discarding operator states.
>> java.io.IOException: Cannot instantiate file system for URI: 
>> hdfs://nameservice0/app/athena_checkpoint/rt_data/gairos-athena-flink-demand-processor-opticclient-flink-job/flink/checkpoints/ckpt_1537402213581/data/ebb30a2d3b26e4d63682538a9bcdc752/chk-3204/da1d44d3-d3eb-4e57-9145-bdf30c96993a
>>  
>> 
>> at 
>> org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187)
>> at 
>> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)
>> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
>> at 
>> org.apache.flink.runtime.state.filesystem.FileStateHandle.getFileSystem(FileStateHandle.java:109)
>> at 
>> org.apache.flink.runtime.state.filesystem.FileStateHandle.discardState(FileStateHandle.java:81)
>> at 
>> org.apache.flink.runtime.state.OperatorStateHandle.discardState(OperatorStateHandle.java:65)
>> at 
>> org.apache.flink.util.LambdaUtil.applyToAllWhileSuppressingExceptions(LambdaUtil.java:51)
>> at 
>> org.apache.flink.runtime.state.StateUtil.bestEffortDiscardAllStateObjects(StateUtil.java:53)
>> at 
>> org.apache.flink.runtime.checkpoint.OperatorSubtaskState.discardState(OperatorSubtaskState.java:207)
>> at 
>> org.apache.flink.runtime.checkpoint.OperatorState.discardState(OperatorState.java:108)
>> at 
>> org.apache.flink.util.LambdaUtil.applyToAllWhileSuppressingExceptions(LambdaUtil.java:51)
>> at 
>> org.apache.flink.runtime.state.StateUtil.bestEffortDiscardAllStateObjects(StateUtil.java:53)
>> at 
>> org.apache.flink.runtime.checkpoint.PendingCheckpoint$1.run(PendingCheckpoint.java:530)
>> at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.IllegalArgumentException: 
>> java.net.UnknownHostException: nameservice0
>> at 
>> org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:378)
>> at 
>> org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:310)
>> at 
>> org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
>> at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:678)
>> at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:619)
>> at 
>> org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:149)
>> at 
>> org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:159)
>> ... 15 common frames omitted
>> Caused by: java.net.UnknownHostException: nameservice0
>> ... 22 common frames omitted
>> 
>> It does not recognize nameservice0 because the core-site.xml on the actual 
>> machine (read in by Flink via $HADOOP_CONF_DIR) does not use nameservice0 
>> but something else for the fs.defaultFs
>> 
>> Digging a little bit, I found that the hadoopConfig (code 
>> )
>>  does not have the properties we set via yarnship/core-site.xml. Especially, 
>> I suspect it is due to the cached HadoopFsFactory is initialized with an 
>> dummy Configuration (code 
>> ),
>>  which prevents future flinkConfig getting passed in (code 
>> 

Rowtime for Table from DataStream without explixit fieldnames

2018-10-04 Thread Johannes Schulte
Hi,

when converting a DataStream (with Watermarks) to a table like described
here

https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/streaming.html#event-time

I wonder on how to use the rowtime in a following window operation
_without_ explicitly specifying all field names and hence rely on case
class type inference.

Currently when operating on a stream of events

case class Event(field1: String, ts: long)

val ds: DataStream[Event] = ...

I have to do

tableEnv.fromDataStream(ds, 'field1, 'ts, 'myRowtime.rowtime)

to do

.window(Tumble over 1.hours on 'myRowtime  as 'w)

afterwards. Is there a way to create the TimeAttribute column without
specifiying all fields again?

Thanks for yout help,

Johannes


Re: Data loss when restoring from savepoint

2018-10-04 Thread Stefan Richter
Hi,

you could take a look at Bravo [1] to query your savepoints and to check if the 
state in the savepoint complete w.r.t your expectations. I somewhat doubt that 
there is a general problem with the state/savepoints because many users are 
successfully running it on a large state and I am not aware of any data loss 
problems, but nothing is impossible. What the savepoint does is also straight 
forward: iterate a db snapshot and write all key/value pairs to disk, so all 
data that was in the db at the time of the savepoint, should show up. I also 
doubt that the problem is about backpressure after restore, because the job 
will only continue running after the state restore is already completed. Did 
you check if you are using exactly-one-semantics or at-least-once semantics? 
Also did you check that the kafka consumer start position is configured 
properly [2]? Are watermarks generated as expected after restore?

One more unrelated high-level comment that I have: for a granularity of 24h 
windows, I wonder if it would not make sense to use a batch job instead?

Best,
Stefan

[1] https://github.com/king/bravo
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-start-position-configuration
 


> Am 04.10.2018 um 14:53 schrieb Juho Autio :
> 
> Thanks for the suggestions!
> 
> > In general, it would be tremendously helpful to have a minimal working 
> > example which allows to reproduce the problem.
> 
> Definitely. The problem with reproducing has been that this only seems to 
> happen in the bigger production data volumes.
> 
> That's why I'm hoping to find a way to debug this with the production data. 
> With that it seems to consistently cause some misses every time the job is 
> killed/restored.
> 
> > check if it happens for shorter windows, like 1h etc
> 
> What would be the benefit of that compared to 24h window?
> 
> > simplify the job to not use a reduce window but simply a time window which 
> > outputs the window events. Then counting the input and output events should 
> > allow you to verify the results. If you are not seeing missing events, then 
> > it could have something to do with the reducing state used in the reduce 
> > function.
> 
> Hm, maybe, but not sure how useful that would be, because it wouldn't yet 
> prove that it's related to reducing, because not having a reduce function 
> could also mean smaller load on the job, which might alone be enough to make 
> the problem not manifest.
> 
> Is there a way to debug what goes into the reducing state (including what 
> gets removed or overwritten and what restored), if that makes sense..? Maybe 
> some suitable logging could be used to prove that the lost data is written to 
> the reducing state (or at least asked to be written), but not found any more 
> when the window closes and state is flushed?
> 
> On configuration once more, we're using RocksDB state backend with 
> asynchronous incremental checkpointing. The state is restored from savepoints 
> though, we haven't been using those checkpoints in these tests (although they 
> could be used in case of crashes – but we haven't had those now).
> 
> On Thu, Oct 4, 2018 at 3:25 PM Till Rohrmann  > wrote:
> Hi Juho,
> 
> another idea to further narrow down the problem could be to simplify the job 
> to not use a reduce window but simply a time window which outputs the window 
> events. Then counting the input and output events should allow you to verify 
> the results. If you are not seeing missing events, then it could have 
> something to do with the reducing state used in the reduce function.
> 
> In general, it would be tremendously helpful to have a minimal working 
> example which allows to reproduce the problem.
> 
> Cheers,
> Till
> 
> On Thu, Oct 4, 2018 at 2:02 PM Andrey Zagrebin  > wrote:
> Hi Juho,
> 
> can you try to reduce the job to minimal reproducible example and share the 
> job and input?
> 
> For example:
> - some simple records as input, e.g. tuples of primitive types saved as cvs
> - minimal deduplication job which processes them and misses records
> - check if it happens for shorter windows, like 1h etc
> - setup which you use for the job, ideally locally reproducible or cloud
> 
> Best,
> Andrey
> 
>> On 4 Oct 2018, at 11:13, Juho Autio > > wrote:
>> 
>> Sorry to insist, but we seem to be blocked for any serious usage of state in 
>> Flink if we can't rely on it to not miss data in case of restore.
>> 
>> Would anyone have suggestions for how to troubleshoot this? So far I have 
>> verified with DEBUG logs that our reduce function gets to process also the 
>> data that is missing from window output.
>> 
>> On Mon, Oct 1, 2018 at 11:56 AM Juho Autio > > wrote:
>> Hi 

Re: Avro serialization problem after updating to flink 1.6.0

2018-10-04 Thread Aljoscha Krettek
Hi,

can you check whether AlertEvent actually has a field called "SCHEMA$"? You can 
do that via
javap path/to/AlertEvent.class

Best,
Aljoscha

> On 27. Sep 2018, at 10:03, Mark Harris  wrote:
> 
> Hi, 
> 
> I recently tried to update a flink job from 1.3.2 to 1.6.0. It deploys 
> successfully as usual, but logs the following exception shortly after 
> starting: 
> 
> Caused by: org.apache.avro.AvroRuntimeException: 
> avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException: 
> org.apache.avro.AvroRuntimeException: Not a Specific class: class 
> uk.co.test.serde.AlertEvent 
> at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:227) 
> at 
> org.apache.flink.formats.avro.typeutils.AvroSerializer.initializeAvro(AvroSerializer.java:367)
>  
> at 
> org.apache.flink.formats.avro.typeutils.AvroSerializer.checkAvroInitialized(AvroSerializer.java:357)
>  
> at 
> org.apache.flink.formats.avro.typeutils.AvroSerializer.snapshotConfiguration(AvroSerializer.java:269)
>  
> at 
> org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo.computeSnapshot(RegisteredKeyValueStateBackendMetaInfo.java:241)
>  
> at 
> org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo.snapshot(RegisteredKeyValueStateBackendMetaInfo.java:226)
>  
> at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.getMetaInfoSnapshot(CopyOnWriteStateTableSnapshot.java:173)
>  
> at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy.processSnapshotMetaInfoForAllStates(HeapKeyedStateBackend.java:880)
>  
> at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy.performSnapshot(HeapKeyedStateBackend.java:719)
>  
> at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.snapshot(HeapKeyedStateBackend.java:355)
>  
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:383)
>  
> ... 13 more 
> Caused by: 
> avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException: 
> org.apache.avro.AvroRuntimeException: Not a Specific class: class 
> uk.co.test.serde.AlertEvent 
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2234)
>  
> at 
> avro.shaded.com.google.common.cache.LocalCache.get(LocalCache.java:3965) 
> at 
> avro.shaded.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3969)
>  
> at 
> avro.shaded.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4829)
>  
> at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:225) 
> ... 23 more 
> Caused by: org.apache.avro.AvroRuntimeException: Not a Specific class: class 
> uk.co.AlertEvent 
> at 
> org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:285) 
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218) 
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215) 
> at 
> avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
>  
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
>  
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313)
>  
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228)
>  
> ... 27 more 
> 
> 
> AlertEvent is a scala case class generated using sbt avrohugger 
> (https://github.com/julianpeeters/sbt-avrohugger 
> ) that definitely implements 
> SpecificRecordBase. 
> 
> There has been an Avro verion jump betwen 1.3.2 and 1.6.0, from 1.7.7 to 
> 1.8.2 but I've rebuilt the avro model against Avro 1.8.2 and had a brief look 
> at the code in SpecificData.create - it seems like it would still have tried 
> the getDeclaredField("$SCHEMA") check that's throwing. 
> 
> Any advice on how to figure out what's causing the problem, or work around it 
> would be gratefully received. 
> 
> Best regards, 
> 
> Mark Harris
> 
> hivehome.com 
> 
> 
> 
> 
> Hive | London | Cambridge | Houston | Toronto
> The information contained in or attached to this email is confidential and 
> intended only for the use of the individual(s) to which it is addressed. It 
> may contain information which is confidential and/or covered by legal 
> professional or other privilege. The views expressed in this email are not 
> necessarily the views of Centrica plc, and the company, its directors, 
> officers or employees make no representation or accept any liability for 
> their accuracy or completeness unless expressly stated to the contrary. 
> Centrica Hive Limited (company no: 5782908), registered in England and Wales 
> with its registered office at Millstream, Maidenhead Road, Windsor, Berkshire 
> SL4 5GD.



Re: Data loss when restoring from savepoint

2018-10-04 Thread Juho Autio
Thanks for the suggestions!

> In general, it would be tremendously helpful to have a minimal working
example which allows to reproduce the problem.

Definitely. The problem with reproducing has been that this only seems to
happen in the bigger production data volumes.

That's why I'm hoping to find a way to debug this with the production data.
With that it seems to consistently cause some misses every time the job is
killed/restored.

> check if it happens for shorter windows, like 1h etc

What would be the benefit of that compared to 24h window?

> simplify the job to not use a reduce window but simply a time window
which outputs the window events. Then counting the input and output events
should allow you to verify the results. If you are not seeing missing
events, then it could have something to do with the reducing state used in
the reduce function.

Hm, maybe, but not sure how useful that would be, because it wouldn't yet
prove that it's related to reducing, because not having a reduce function
could also mean smaller load on the job, which might alone be enough to
make the problem not manifest.

Is there a way to debug what goes into the reducing state (including what
gets removed or overwritten and what restored), if that makes sense..?
Maybe some suitable logging could be used to prove that the lost data is
written to the reducing state (or at least asked to be written), but not
found any more when the window closes and state is flushed?

On configuration once more, we're using RocksDB state backend with
asynchronous incremental checkpointing. The state is restored from
savepoints though, we haven't been using those checkpoints in these tests
(although they could be used in case of crashes – but we haven't had those
now).

On Thu, Oct 4, 2018 at 3:25 PM Till Rohrmann  wrote:

> Hi Juho,
>
> another idea to further narrow down the problem could be to simplify the
> job to not use a reduce window but simply a time window which outputs the
> window events. Then counting the input and output events should allow you
> to verify the results. If you are not seeing missing events, then it could
> have something to do with the reducing state used in the reduce function.
>
> In general, it would be tremendously helpful to have a minimal working
> example which allows to reproduce the problem.
>
> Cheers,
> Till
>
> On Thu, Oct 4, 2018 at 2:02 PM Andrey Zagrebin 
> wrote:
>
>> Hi Juho,
>>
>> can you try to reduce the job to minimal reproducible example and share
>> the job and input?
>>
>> For example:
>> - some simple records as input, e.g. tuples of primitive types saved as
>> cvs
>> - minimal deduplication job which processes them and misses records
>> - check if it happens for shorter windows, like 1h etc
>> - setup which you use for the job, ideally locally reproducible or cloud
>>
>> Best,
>> Andrey
>>
>> On 4 Oct 2018, at 11:13, Juho Autio  wrote:
>>
>> Sorry to insist, but we seem to be blocked for any serious usage of state
>> in Flink if we can't rely on it to not miss data in case of restore.
>>
>> Would anyone have suggestions for how to troubleshoot this? So far I have
>> verified with DEBUG logs that our reduce function gets to process also the
>> data that is missing from window output.
>>
>> On Mon, Oct 1, 2018 at 11:56 AM Juho Autio  wrote:
>>
>>> Hi Andrey,
>>>
>>> To rule out for good any questions about sink behaviour, the job was
>>> killed and started with an additional Kafka sink.
>>>
>>> The same number of ids were missed in both outputs: KafkaSink &
>>> BucketingSink.
>>>
>>> I wonder what would be the next steps in debugging?
>>>
>>> On Fri, Sep 21, 2018 at 3:49 PM Juho Autio  wrote:
>>>
 Thanks, Andrey.

 > so it means that the savepoint does not loose at least some dropped
 records.

 I'm not sure what you mean by that? I mean, it was known from the
 beginning, that not everything is lost before/after restoring a savepoint,
 just some records around the time of restoration. It's not 100% clear
 whether records are lost before making a savepoint or after restoring it.
 Although, based on the new DEBUG logs it seems more like losing some
 records that are seen ~soon after restoring. It seems like Flink would be
 somehow confused either about the restored state vs. new inserts to state.
 This could also be somehow linked to the high back pressure on the kafka
 source while the stream is catching up.

 > If it is feasible for your setup, I suggest to insert one more map
 function after reduce and before sink.
 > etc.

 Isn't that the same thing that we discussed before? Nothing is sent to
 BucketingSink before the window closes, so I don't see how it would make
 any difference if we replace the BucketingSink with a map function or
 another sink type. We don't create or restore savepoints during the time
 when BucketingSink gets input or has open buckets – that happens at a much
 later 

Re: Flink support for multiple data centers

2018-10-04 Thread Andrey Zagrebin
Hi Olga,

At the moment Flink does not have any embedded support for multi region 
deployment or failover.
You can try to automate it, for example, using savepoints [1] and backing them 
up in multiple data centres.
This way you could restore failed job in other regions from the latest 
savepoint.
Of course, job input/output should be also available in multiple data centres.

Flink does not have any embedded encryption mechanism for state.
The state value is just de-serialisable byte array for Flink.
For example, you can implement custom TypeSerializer [2] or wrap existing one 
with encryption.

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/ops/state/savepoints.html
[2] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/state/custom_serialization.html

Best,
Andrey

> On 2 Oct 2018, at 02:57, Olga Luganska  wrote:
> 
> Hello,
> 
> Does Flink support multiple data center implementation and failover 
> procedures in case one of the data centers goes down?
> 
> Another question I have is about data encryption. If  application state which 
> needs to be checkpointed contains data elements which are considered to be a 
> personally identifiable information, or maybe some credit card information, 
> do you provide any encryption mechanisms to make sure that this data will be 
> secured?
> 
> Thank you very much,
> Olga 



Re: hadoopInputFormat and elasticsearch

2018-10-04 Thread Andrey Zagrebin
Hi,

At the moment if the processing of any data input split fails,
Flink will restart the batch job completely from scratch.

There is an ongoing effort to improve fine-grained recovery in FLINK-4256.

Best,
Andrey

> On 2 Oct 2018, at 13:52, aviad  wrote:
> 
> Hi,
> 
> I want to write batch job which reads data from *elasticsearch* using
> *elasticsearch-hadoop* (https://github.com/elastic/elasticsearch-hadoop/)
> and *hadoopInputFormat*
> 
> example code (from
> https://github.com/genged/flink-playground/blob/master/src/main/java/com/mic/flink/FlinkMain.java):
> 
> 
> 
> elasticsearch-hadoop creates one Hadoop InputSplit (tasks) per Elasticsearch
> shard.
> so if my index have 20 shards, it will be split to 20 InputSplit
> 
> 
> /My question is:/
> What will happen if my job restart (failover) after finishing half of the
> InputSplit's ?
> Does hadoopInputFormat remember which InputSplit are finished and knows how
> to continue from where it stopped? (maybe read from beginning of unfinished
> InputSplit? ) or it starts from the beginning?
> 
> thanks
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Data loss when restoring from savepoint

2018-10-04 Thread Till Rohrmann
Hi Juho,

another idea to further narrow down the problem could be to simplify the
job to not use a reduce window but simply a time window which outputs the
window events. Then counting the input and output events should allow you
to verify the results. If you are not seeing missing events, then it could
have something to do with the reducing state used in the reduce function.

In general, it would be tremendously helpful to have a minimal working
example which allows to reproduce the problem.

Cheers,
Till

On Thu, Oct 4, 2018 at 2:02 PM Andrey Zagrebin 
wrote:

> Hi Juho,
>
> can you try to reduce the job to minimal reproducible example and share
> the job and input?
>
> For example:
> - some simple records as input, e.g. tuples of primitive types saved as cvs
> - minimal deduplication job which processes them and misses records
> - check if it happens for shorter windows, like 1h etc
> - setup which you use for the job, ideally locally reproducible or cloud
>
> Best,
> Andrey
>
> On 4 Oct 2018, at 11:13, Juho Autio  wrote:
>
> Sorry to insist, but we seem to be blocked for any serious usage of state
> in Flink if we can't rely on it to not miss data in case of restore.
>
> Would anyone have suggestions for how to troubleshoot this? So far I have
> verified with DEBUG logs that our reduce function gets to process also the
> data that is missing from window output.
>
> On Mon, Oct 1, 2018 at 11:56 AM Juho Autio  wrote:
>
>> Hi Andrey,
>>
>> To rule out for good any questions about sink behaviour, the job was
>> killed and started with an additional Kafka sink.
>>
>> The same number of ids were missed in both outputs: KafkaSink &
>> BucketingSink.
>>
>> I wonder what would be the next steps in debugging?
>>
>> On Fri, Sep 21, 2018 at 3:49 PM Juho Autio  wrote:
>>
>>> Thanks, Andrey.
>>>
>>> > so it means that the savepoint does not loose at least some dropped
>>> records.
>>>
>>> I'm not sure what you mean by that? I mean, it was known from the
>>> beginning, that not everything is lost before/after restoring a savepoint,
>>> just some records around the time of restoration. It's not 100% clear
>>> whether records are lost before making a savepoint or after restoring it.
>>> Although, based on the new DEBUG logs it seems more like losing some
>>> records that are seen ~soon after restoring. It seems like Flink would be
>>> somehow confused either about the restored state vs. new inserts to state.
>>> This could also be somehow linked to the high back pressure on the kafka
>>> source while the stream is catching up.
>>>
>>> > If it is feasible for your setup, I suggest to insert one more map
>>> function after reduce and before sink.
>>> > etc.
>>>
>>> Isn't that the same thing that we discussed before? Nothing is sent to
>>> BucketingSink before the window closes, so I don't see how it would make
>>> any difference if we replace the BucketingSink with a map function or
>>> another sink type. We don't create or restore savepoints during the time
>>> when BucketingSink gets input or has open buckets – that happens at a much
>>> later time of day. I would focus on figuring out why the records are lost
>>> while the window is open. But I don't know how to do that. Would you have
>>> any additional suggestions?
>>>
>>> On Fri, Sep 21, 2018 at 3:30 PM Andrey Zagrebin <
>>> and...@data-artisans.com> wrote:
>>>
 Hi Juho,

 so it means that the savepoint does not loose at least some dropped
 records.

 If it is feasible for your setup, I suggest to insert one more map
 function after reduce and before sink.
 The map function should be called right after window is triggered but
 before flushing to s3.
 The result of reduce (deduped record) could be logged there.
 This should allow to check whether the processed distinct records were
 buffered in the state after the restoration from the savepoint or not. If
 they were buffered we should see that there was an attempt to write them to
 the sink from the state.

 Another suggestion is to try to write records to some other sink or to
 both.
 E.g. if you can access file system of workers, maybe just into local
 files and check whether the records are also dropped there.

 Best,
 Andrey

 On 20 Sep 2018, at 15:37, Juho Autio  wrote:

 Hi Andrey!

 I was finally able to gather the DEBUG logs that you suggested. In
 short, the reducer logged that it processed at least some of the ids that
 were missing from the output.

 "At least some", because I didn't have the job running with DEBUG logs
 for the full 24-hour window period. So I was only able to look up if I can
 find *some* of the missing ids in the DEBUG logs. Which I did indeed.

 I changed the DistinctFunction.java to do this:

 @Override
 public Map reduce(Map value1,
 Map value2) {
 LOG.debug("DistinctFunction.reduce returns: {}={}",
 

Re: Difference between BucketingSink and StreamingFileSink

2018-10-04 Thread Aljoscha Krettek
No worries! :-) it's nice that you also posted the solution

> On 4. Oct 2018, at 13:23, Averell  wrote:
> 
> Hi,
> 
> Sorry for wasting your time. I found the solution for that question
> regarding event-time: a class that extends BucketAssigner would do the
> needful:
> 
> class SdcTimeBucketAssigner[T <: MyClass](prefix: String, formatString:
> String) extends BucketAssigner[T, String]{
>   @transient
>   var dateFormatter = new SimpleDateFormat(formatString)
> 
>   override def getBucketId(in: T, context: BucketAssigner.Context): 
> String =
> {
>   if (dateFormatter == null) dateFormatter = new
> SimpleDateFormat(formatString)
>   s"$prefix${dateFormatter.format(new 
> java.util.Date(in.getTimestamp))}"
>   }
> 
>   override def getSerializer = SimpleVersionedStringSerializer.INSTANCE
> }
> 
> Thanks and best regards,
> Averell
> 
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Data loss when restoring from savepoint

2018-10-04 Thread Andrey Zagrebin
Hi Juho,

can you try to reduce the job to minimal reproducible example and share the job 
and input?

For example:
- some simple records as input, e.g. tuples of primitive types saved as cvs
- minimal deduplication job which processes them and misses records
- check if it happens for shorter windows, like 1h etc
- setup which you use for the job, ideally locally reproducible or cloud

Best,
Andrey

> On 4 Oct 2018, at 11:13, Juho Autio  wrote:
> 
> Sorry to insist, but we seem to be blocked for any serious usage of state in 
> Flink if we can't rely on it to not miss data in case of restore.
> 
> Would anyone have suggestions for how to troubleshoot this? So far I have 
> verified with DEBUG logs that our reduce function gets to process also the 
> data that is missing from window output.
> 
> On Mon, Oct 1, 2018 at 11:56 AM Juho Autio  > wrote:
> Hi Andrey,
> 
> To rule out for good any questions about sink behaviour, the job was killed 
> and started with an additional Kafka sink.
> 
> The same number of ids were missed in both outputs: KafkaSink & BucketingSink.
> 
> I wonder what would be the next steps in debugging?
> 
> On Fri, Sep 21, 2018 at 3:49 PM Juho Autio  > wrote:
> Thanks, Andrey.
> 
> > so it means that the savepoint does not loose at least some dropped records.
> 
> I'm not sure what you mean by that? I mean, it was known from the beginning, 
> that not everything is lost before/after restoring a savepoint, just some 
> records around the time of restoration. It's not 100% clear whether records 
> are lost before making a savepoint or after restoring it. Although, based on 
> the new DEBUG logs it seems more like losing some records that are seen ~soon 
> after restoring. It seems like Flink would be somehow confused either about 
> the restored state vs. new inserts to state. This could also be somehow 
> linked to the high back pressure on the kafka source while the stream is 
> catching up.
> 
> > If it is feasible for your setup, I suggest to insert one more map function 
> > after reduce and before sink.
> > etc.
> 
> Isn't that the same thing that we discussed before? Nothing is sent to 
> BucketingSink before the window closes, so I don't see how it would make any 
> difference if we replace the BucketingSink with a map function or another 
> sink type. We don't create or restore savepoints during the time when 
> BucketingSink gets input or has open buckets – that happens at a much later 
> time of day. I would focus on figuring out why the records are lost while the 
> window is open. But I don't know how to do that. Would you have any 
> additional suggestions?
> 
> On Fri, Sep 21, 2018 at 3:30 PM Andrey Zagrebin  > wrote:
> Hi Juho,
> 
> so it means that the savepoint does not loose at least some dropped records.
> 
> If it is feasible for your setup, I suggest to insert one more map function 
> after reduce and before sink. 
> The map function should be called right after window is triggered but before 
> flushing to s3.
> The result of reduce (deduped record) could be logged there.
> This should allow to check whether the processed distinct records were 
> buffered in the state after the restoration from the savepoint or not. If 
> they were buffered we should see that there was an attempt to write them to 
> the sink from the state.
> 
> Another suggestion is to try to write records to some other sink or to both. 
> E.g. if you can access file system of workers, maybe just into local files 
> and check whether the records are also dropped there.
> 
> Best,
> Andrey
> 
>> On 20 Sep 2018, at 15:37, Juho Autio > > wrote:
>> 
>> Hi Andrey!
>> 
>> I was finally able to gather the DEBUG logs that you suggested. In short, 
>> the reducer logged that it processed at least some of the ids that were 
>> missing from the output.
>> 
>> "At least some", because I didn't have the job running with DEBUG logs for 
>> the full 24-hour window period. So I was only able to look up if I can find 
>> some of the missing ids in the DEBUG logs. Which I did indeed.
>> 
>> I changed the DistinctFunction.java to do this:
>> 
>> @Override
>> public Map reduce(Map value1, 
>> Map value2) {
>> LOG.debug("DistinctFunction.reduce returns: {}={}", 
>> value1.get("field"), value1.get("id"));
>> return value1;
>> }
>> 
>> Then:
>> 
>> vi flink-1.6.0/conf/log4j.properties
>> log4j.logger.org.apache.flink.streaming.runtime.tasks.StreamTask=DEBUG
>> log4j.logger.com.rovio.ds.flink.uniqueid.DistinctFunction=DEBUG
>> 
>> Then I ran the following kind of test:
>> 
>> - Cancelled the on-going job with savepoint created at ~Sep 18 08:35 UTC 2018
>> - Started a new cluster & job with DEBUG enabled at ~09:13, restored from 
>> that previous cluster's savepoint
>> - Ran until caught up offsets
>> - Cancelled the job with a new savepoint
>> - Started a new job _without_ 

Re: BucketingSink to S3: Missing class com/amazonaws/AmazonClientException

2018-10-04 Thread Aljoscha Krettek
Hi,

they are actually using different interfaces and dependencies. Checkpointing 
uses Flink FileSystem and the shaded Hadoop Filesystem is a special 
implementation of this based on the Hadoop S3 FileSystem that has all 
dependencies bundled in. The BucketingSink uses HDFS/Hadoop FileSystem, 
therefore this needs to have the correct dependency setup.

Flink 1.6. released the new StreamingFileSink which is a replacement for 
BucketingSink. With Flink 1.7 this will also support the bundled S3 file 
systems.

Best,
Aljoscha

> On 3. Oct 2018, at 17:55, Julio Biason  wrote:
> 
> Hi Andrey,
> 
> Yes, we followed the guide. Our checkpoints/savepoints are already being 
> saved on S3/Ceph, using the ShadedHadoop/S3AFileSystem (because it's the one 
> we managed to completely override the AWS address to point to our Ceph 
> cluster).
> 
> I suppose I can add the package with the AmazonClientException to my project, 
> but I still wonder why it works fine for Flink but fails for my project; in 
> theory, both are using the same dependencies, right?
> 
> On Wed, Oct 3, 2018 at 7:51 AM, Andrey Zagrebin  > wrote:
> Hi Julio,
> 
> Looks like some problem with dependencies.
> Have you followed the recommended s3 configuration guide [1]?
> Is it correct that your job already created checkpoints/savepoints on s3 
> before?
> 
> I think if you manually create file system using FileSystem.get(path), it 
> should be configured the same way as for bucketing sink and checkpoints.
> 
> Best,
> Andrey
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/aws.html#s3-simple-storage-service
>  
> 
> 
>> On 2 Oct 2018, at 15:21, Julio Biason > > wrote:
>> 
>> Hey guys,
>> 
>> I've setup a BucketingSink as a dead letter queue into our Ceph cluster 
>> using S3, but when I start the job, I get this error:
>> 
>> java.lang.NoClassDefFoundError: com/amazonaws/AmazonClientException
>>  at java.lang.Class.forName0(Native Method)
>>  at java.lang.Class.forName(Class.java:348)
>>  at 
>> org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:2306)
>>  at 
>> org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2271)
>>  at 
>> org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2367)
>>  at 
>> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2793)
>>  at 
>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1295)
>>  at 
>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:432)
>>  at 
>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:376)
>>  at 
>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>>  at 
>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>>  at 
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>>  at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:254)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>  at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.ClassNotFoundException: 
>> com.amazonaws.AmazonClientException
>>  at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>  at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>  at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
>>  at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>  ... 17 more
>> I find it weird 'cause I've already set up checkpoints (and savepoitns) to 
>> use S3 as protocol, and I just assume that, if it works for checkpoints, it 
>> should work here.
>> 
>> (I suppose I could add the aws client as a dependency of my build but, 
>> again, I assumed that once S3 works for checkpoints, it should work 
>> everywhere.)
>> 
>> And kinda related, can I assume that using the FileSystem class to create 
>> FSOutputStreams will follow Flink configuration? I have another type of dead 
>> letter queue that won't work with BucketingSink and I was thinking about 
>> using it directly to create files inside that Ceph/S3.
>> 
>> -- 
>> Julio Biason, Sofware Engineer
>> AZION  |  Deliver. Accelerate. Protect.
>> Office: +55 51 3083 8101   |  Mobile: +55 51  
>> 99907 0554
> 
> 
> 
> 
> -- 
> Julio Biason, Sofware Engineer
> 

Re: Using FlinkKinesisConsumer through a proxy

2018-10-04 Thread Aljoscha Krettek
Hi,

I'm looping in Gordon and Thomas, they might have some idea about how to 
resolve this.

Best,
Aljoscha

> On 3. Oct 2018, at 17:29, Vijay Balakrishnan  wrote:
> 
> I have been trying with all variations  to no avail of java 
> -Dhttp.nonProxyHosts=..  -Dhttps.proxyHost=http://... -Dhttps.proxyPort=911 
> -Dhttps.proxyUser= -Dhttps.proxyPassword=.. -Dhttp.proxyHost=http://.. 
> -Dhttp.proxyPort=911 -Dhttp.proxyUser=... -Dhttp.proxyPassword=... -jar .. 
> after looking at the code in com.amazonaws.ClientConfiguration
> 
> On Tue, Oct 2, 2018 at 3:49 PM Vijay Balakrishnan  > wrote:
> HI,
> How do I use FlinkKinesisConsumer using the Properties through a proxy ? 
> Getting a Connection issue through the proxy. 
> Works outside the proxy.
> 
> Properties kinesisConsumerConfig = new Properties();
> kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_REGION, 
> region);
> 
> if (local) {
> 
> kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, 
> accessKey);
> 
> kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
> secretKey);
> } else {
> 
> kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER,
>  "AUTO");
> }
> 
> //only for Consumer
> 
> kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX,
>  "1");
> 
> kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
>  "2000");
> FlinkKinesisConsumer> 
> kinesisConsumer = new FlinkKinesisConsumer<>(
> "kinesisTopicRead", new Tuple2KinesisSchema(), 
> kinesisConsumerConfig);
>   
>   
> TIA



Re: Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

2018-10-04 Thread Aljoscha Krettek
Hi,

this looks like a potential Flink bug. Looping in Nico and Piotr who have 
looked into that in the past. Could you please comment on that?

Best,
Aljoscha

> On 3. Oct 2018, at 12:12, Narayanaswamy, Krishna 
>  wrote:
> 
> Hi,
>  
> I am trying to run one large single job graph which has > 10k tasks. The form 
> of the graph is something like
> DataSource -> Filter -> Map [...multiple]
> Sink1
> Sink2
> I am using a parallelism of 10 with 1 slot per task manager and a memory 
> allocation of 32G per TM. The JM is running with 8G.
>  
> Everything starts up and runs fine with close to 6-7k tasks (this is variable 
> and is mostly the source /filter/map portions) completing and then the graph 
> just hangs.  I managed to connect to the task managers and get a thread dump 
> just in time and found the following deadlock on one of the TMs which 
> apparently seems to be holding up everything else.
> Please could someone take a look and advise if there is something I could do 
> or try out to fix this.
>  
> Marked below are the 2 isolated thread stacks marking the deadlock -
>  
> Thread-1
> "DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002" prio=5 tid=0x3e2 nid=NA 
> waiting for monitor entry
> waiting for Map (Key Extractor) (1/10)@9967 to release lock on 
> <0x2dfb> (a java.util.ArrayDeque)
>   at 
> org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:223)
>   at 
> org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:373)
>   at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:355)
>   - locked <0x2dfd> (a java.util.ArrayDeque)
>   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:402)
>   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:203)
>   - locked <0x2da5> (a java.lang.Object)
>   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:193)
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.returnExclusiveSegments(SingleInputGate.java:318)
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.releaseAllResources(RemoteInputChannel.java:259)
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:578)
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.pollNextBufferOrEvent(SingleInputGate.java:507)
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.waitAndGetNextInputGate(UnionInputGate.java:213)
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:163)
>   at 
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
>   at 
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
>   at 
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>   at 
> org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:216)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>   at java.lang.Thread.run(Thread.java:745)
>  
>  
> Thread-2
> "Map (Key Extractor) (1/10)@9967" prio=5 tid=0xaab nid=NA waiting for monitor 
> entry
>   java.lang.Thread.State: BLOCKED
> blocks DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002
> waiting for DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002 to 
> release lock on <0x2dfd> (a java.util.ArrayDeque)
>   at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:261)
>   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:171)
>   at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:106)
>   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:146)
>   at 
> org.apache.flink.runtime.io.network.buffer.BufferConsumer.close(BufferConsumer.java:110)
>   at 
> org.apache.flink.runtime.io.network.partition.SpillableSubpartition.spillFinishedBufferConsumers(SpillableSubpartition.java:271)
>   at 
> org.apache.flink.runtime.io.network.partition.SpillableSubpartition.add(SpillableSubpartition.java:117)
>   - locked <0x2dfb> (a java.util.ArrayDeque)
>   at 
> org.apache.flink.runtime.io.network.partition.SpillableSubpartition.add(SpillableSubpartition.java:96)
>   - locked 

Re: How can I set the Flink Job Id to enable flink checkpoint

2018-10-04 Thread Aljoscha Krettek
Hi,

for starting a job from the state of a previous job you should look into 
savepoints: 
https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/savepoints.html
 


Checkpoints are only meant for faul tolerance within one execution of a job.

Best,
Aljoscha

> On 3. Oct 2018, at 08:10, wpb <307558...@qq.com> wrote:
> 
> hello,
> When I enable the checkpoint in flink job, I found that flink have saved 
> the checkpoint to the dir such as ${statebackend}/${jobId}. but after I 
> restart the flink job, the  job id is changed, so that the state does not 
> restore correctly.
> How can I set the Flink Job Id in code?



Re: Difference between BucketingSink and StreamingFileSink

2018-10-04 Thread Averell
Hi,

Sorry for wasting your time. I found the solution for that question
regarding event-time: a class that extends BucketAssigner would do the
needful:

class SdcTimeBucketAssigner[T <: MyClass](prefix: String, formatString:
String) extends BucketAssigner[T, String]{
@transient
var dateFormatter = new SimpleDateFormat(formatString)

override def getBucketId(in: T, context: BucketAssigner.Context): 
String =
{
if (dateFormatter == null) dateFormatter = new
SimpleDateFormat(formatString)
s"$prefix${dateFormatter.format(new 
java.util.Date(in.getTimestamp))}"
}

override def getSerializer = SimpleVersionedStringSerializer.INSTANCE
}

Thanks and best regards,
Averell




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


FlinkUserClassLoader in AggregateFunction

2018-10-04 Thread Chirag Dewan
Hi All,
Is there any other way to get hold of the FlinkUserClassLoaderother than the 
RuntimeContext?
The problem is, AggregateFunction cant be a RichFunction. I understand that's 
because of the state merging issue(from a thread here earlier). Now, I need 
DynamicClassLoading in AggregateFunction. And apparently, the only way to get a 
UserClassLoader is through the RuntimeContext.

Any workaround around this? Or someway I can create my custom UserClassLoader 
in the AggregateFunction?

Thanks in advance!
Chirag

Re: In-Memory Lookup in Flink Operators

2018-10-04 Thread Chirag Dewan
 Thanks a lot, David and Fabian.
I will give this a try.
Cheers,Chirag
   On Monday, 1 October, 2018, 3:48:42 PM IST, David Anderson 
 wrote: 
 
 Hi Chirag,
The community is also looking at an approach that involves using Bravo[1][2] to 
bootstrap state by loading the initial version of the state into a savepoint.
[1] https://github.com/king/bravo
[2] 
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Proposal-Utilities-for-reading-transforming-and-creating-Streaming-savepoints-td23843.html#a23854
On Mon, Oct 1, 2018 at 11:27 AM Fabian Hueske  wrote:

Hi Chirag,
Flink 1.5.0 added support for BroadcastState which should address your 
requirement of replicating the data.  [1]
The replicated data is stored in the configured state backend which can also be 
RocksDB.
Regarding the reload, I would recommend Lasse's approach of having a custom 
source that pushes data in regular intervals instead.One problem is that it is 
not possible to pause a stream until all data is loaded. Instread, you would 
need to buffer that data in state as well and work with start and end markers 
on the broadcast stream.

Best, Fabian

[1] 
https://data-artisans.com/blog/a-practical-guide-to-broadcast-state-in-apache-flink


Am So., 30. Sep. 2018 um 10:48 Uhr schrieb Chirag Dewan 
:

 Thanks Lasse, that is rightly put. That's the only solution I can think of too.
Only thing which I can't get my head around is using the coMap and coFlatMap 
functions with such a stream. Since they dont support side outputs, is there a 
way my lookup map/flatmap function simply consume a stream? 
Ken, thats an interesting solution actually. Is there any chance you need to 
update the memory-loaded data too? 
Thanks,
Chirag
   On Sunday, 30 September, 2018, 5:17:51 AM IST, Ken Krugler 
 wrote: 
 
 Hi Lasse,
One approach I’ve used in a similar situation is to have a “UnionedSource” 
wrapper that first emits the (bounded) data that will be loaded in-memory, and 
then starts running the source that emits the continuous stream of data.
This outputs an Either, which I then split, and broadcast the A, and 
key/partition the B.
You could do something similar, but occasionally keep checking if there’s more 
 data vs. assuming it’s bounded.
The main issue I ran into is that it doesn’t seem possible to do checkpointing, 
or at least I couldn’t think of a way to make this work properly.
— Ken


On Sep 27, 2018, at 9:50 PM, Lasse Nedergaard  wrote:

Hi. 
We have created our own database source that pools the data with a configured 
interval. We then use a co processed function. It takes to input one from our 
database and one from our data input. I require that you keyby with the 
attributes you use lookup in your map function. To delay your data input until 
your database lookup is done first time is not simple but a simple solution 
could be to implement a delay operation or keep the data in your process 
function until data arrive from your database stream. 

Med venlig hilsen / Best regardsLasse Nedergaard

Den 28. sep. 2018 kl. 06.28 skrev Chirag Dewan :


Hi,
I saw Apache Flink User Mailing List archive. - static/dynamic lookups in flink 
streaming being discussed, and then I saw this FLIP 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API.
 
I know we havent made much progress on this topic. I still wanted to put 
forward my problem statement around this. 
I am also looking for a dynamic lookup in Flink operators. I actually want to 
pre-fetch various Data Sources, like DB, Filesystem, Cassandra etc. into 
memory. Along with that, I have to ensure a refresh of in-memory lookup table 
periodically. The period being a configurable parameter. 
This is what a map operator would look like with lookup: 
-> Load in-memory lookup - Refresh timer start-> Stream processing start-> Call 
lookup-> Use lookup result in Stream processing
-> Timer elapsed -> Reload lookup data source into in-memory table-> Continue 
processing

 My concern around these are : 
1) Possibly storing the same copy of data in every Task slots memory or state 
backend(RocksDB in my case).2) Having a dedicated refresh thread for each 
subtask instance(possibly, every Task Manager having multiple refresh thread)
Am i thinking in the right direction? Or missing something very obvious? It 
confusing.
Any leads are much appreciated. Thanks in advance.
Cheers, Chirag

--Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra
  



-- 
David Anderson | Training Coordinator | data Artisans
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time  

Re: Data loss when restoring from savepoint

2018-10-04 Thread Juho Autio
Sorry to insist, but we seem to be blocked for any serious usage of state
in Flink if we can't rely on it to not miss data in case of restore.

Would anyone have suggestions for how to troubleshoot this? So far I have
verified with DEBUG logs that our reduce function gets to process also the
data that is missing from window output.

On Mon, Oct 1, 2018 at 11:56 AM Juho Autio  wrote:

> Hi Andrey,
>
> To rule out for good any questions about sink behaviour, the job was
> killed and started with an additional Kafka sink.
>
> The same number of ids were missed in both outputs: KafkaSink &
> BucketingSink.
>
> I wonder what would be the next steps in debugging?
>
> On Fri, Sep 21, 2018 at 3:49 PM Juho Autio  wrote:
>
>> Thanks, Andrey.
>>
>> > so it means that the savepoint does not loose at least some dropped
>> records.
>>
>> I'm not sure what you mean by that? I mean, it was known from the
>> beginning, that not everything is lost before/after restoring a savepoint,
>> just some records around the time of restoration. It's not 100% clear
>> whether records are lost before making a savepoint or after restoring it.
>> Although, based on the new DEBUG logs it seems more like losing some
>> records that are seen ~soon after restoring. It seems like Flink would be
>> somehow confused either about the restored state vs. new inserts to state.
>> This could also be somehow linked to the high back pressure on the kafka
>> source while the stream is catching up.
>>
>> > If it is feasible for your setup, I suggest to insert one more map
>> function after reduce and before sink.
>> > etc.
>>
>> Isn't that the same thing that we discussed before? Nothing is sent to
>> BucketingSink before the window closes, so I don't see how it would make
>> any difference if we replace the BucketingSink with a map function or
>> another sink type. We don't create or restore savepoints during the time
>> when BucketingSink gets input or has open buckets – that happens at a much
>> later time of day. I would focus on figuring out why the records are lost
>> while the window is open. But I don't know how to do that. Would you have
>> any additional suggestions?
>>
>> On Fri, Sep 21, 2018 at 3:30 PM Andrey Zagrebin 
>> wrote:
>>
>>> Hi Juho,
>>>
>>> so it means that the savepoint does not loose at least some dropped
>>> records.
>>>
>>> If it is feasible for your setup, I suggest to insert one more map
>>> function after reduce and before sink.
>>> The map function should be called right after window is triggered but
>>> before flushing to s3.
>>> The result of reduce (deduped record) could be logged there.
>>> This should allow to check whether the processed distinct records were
>>> buffered in the state after the restoration from the savepoint or not. If
>>> they were buffered we should see that there was an attempt to write them to
>>> the sink from the state.
>>>
>>> Another suggestion is to try to write records to some other sink or to
>>> both.
>>> E.g. if you can access file system of workers, maybe just into local
>>> files and check whether the records are also dropped there.
>>>
>>> Best,
>>> Andrey
>>>
>>> On 20 Sep 2018, at 15:37, Juho Autio  wrote:
>>>
>>> Hi Andrey!
>>>
>>> I was finally able to gather the DEBUG logs that you suggested. In
>>> short, the reducer logged that it processed at least some of the ids that
>>> were missing from the output.
>>>
>>> "At least some", because I didn't have the job running with DEBUG logs
>>> for the full 24-hour window period. So I was only able to look up if I can
>>> find *some* of the missing ids in the DEBUG logs. Which I did indeed.
>>>
>>> I changed the DistinctFunction.java to do this:
>>>
>>> @Override
>>> public Map reduce(Map value1,
>>> Map value2) {
>>> LOG.debug("DistinctFunction.reduce returns: {}={}",
>>> value1.get("field"), value1.get("id"));
>>> return value1;
>>> }
>>>
>>> Then:
>>>
>>> vi flink-1.6.0/conf/log4j.properties
>>> log4j.logger.org.apache.flink.streaming.runtime.tasks.StreamTask=DEBUG
>>> log4j.logger.com.rovio.ds.flink.uniqueid.DistinctFunction=DEBUG
>>>
>>> Then I ran the following kind of test:
>>>
>>> - Cancelled the on-going job with savepoint created at ~Sep 18 08:35 UTC
>>> 2018
>>> - Started a new cluster & job with DEBUG enabled at ~09:13, restored
>>> from that previous cluster's savepoint
>>> - Ran until caught up offsets
>>> - Cancelled the job with a new savepoint
>>> - Started a new job _without_ DEBUG, which restored the new savepoint,
>>> let it keep running so that it will eventually write the output
>>>
>>> Then on the next day, after results had been flushed when the 24-hour
>>> window closed, I compared the results again with a batch version's output.
>>> And found some missing ids as usual.
>>>
>>> I drilled down to one specific missing id (I'm replacing the actual
>>> value with AN12345 below), which was not found in the stream output, but
>>> was found in batch output & flink DEBUG logs.
>>>
>>> Related 

Re: Difference between BucketingSink and StreamingFileSink

2018-10-04 Thread Averell
Hi,

https://issues.apache.org/jira/browse/FLINK-9749 <<< as per this ticket,
StreamingFileSink is a newer option, which is better than BucketingSink for
Parquet.
Would love to see some example one using that.

Thanks and best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Difference between BucketingSink and StreamingFileSink

2018-10-04 Thread Averell
Hi everyone,

I am trying to persist my stream into parquet files. In the documents, I can
see two different file sinks: BucketingSink (Rolling File Sink) and
StreamingFileSink. I could not see any information regarding the differences
between these two types.
Which one should I choose for writing to parquet? Is that possible to
partition my output basing on event-time?

Thanks and best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/