Re: Jar Uploads in High Availability (Flink 1.7.2)

2019-10-15 Thread Ravi Bhushan Ratnakar
Hi,

i was also experiencing with the similar behavior. I adopted following
approach

   -  used a distributed file system(in my case aws efs) and set the
   attribute "web.upload.dir", this way both the job manager have same
   location.
   - on the load balancer side(aws elb), i used "readiness probe" based on
   zookeeper entry for active jobmanager address, this way elb always point to
   the active job manager and if the active jobmanager changes then it
   automatically point to the new active jobmanager and as both are using the
   same location by configuring distributed file system so new active job is
   able to find the same jar.


Regards,
Ravi

On Wed, Oct 16, 2019 at 1:15 AM Martin, Nick J [US] (IS) <
nick.mar...@ngc.com> wrote:

> I’m seeing that when I upload a jar through the rest API, it looks like
> only the Jobmanager that received the upload request is aware of the newly
> uploaded jar. That worked fine for me in older versions where all clients
> were redirected to connect to the leader, but now that each Jobmanager
> accepts requests, if I send a jar upload request, it could end up on any
> one (and only one) of the Jobmanagers, not necessarily the leader. Further,
> each Jobmanager responds to a GET request on the /jars endpoint with its
> own local list of jars. If I try and use one of the Jar IDs from that
> request, my next request may not go to the same Jobmanager (requests are
> going through Docker and being load-balanced), and so the Jar ID isn’t
> found on the new Jobmanager handling that request.
>
>
>
>
>
>
>
>
>


Re: Kinesis Connector and Savepoint/Checkpoint restore.

2019-10-15 Thread Ravi Bhushan Ratnakar
Hi,

I am also facing the same problem. I am using Flink 1.9.0 and consuming
from Kinesis source with retention of 1 day. I am observing that when the
job is submitted with "latest" initial stream position, the job starts well
and keep on processing data from all the shards for very long period of
time without any lag. When the job fails then it also recovery well with
last successful checkpointed state. But i am also experiencing that very
rarely when the job fails and it recovers from the last successful
checkpointed state, i noticed a hug lag( 1 day as per retention) on one of
the stream. For me, to reproduce this issue is still unknown to defined a
step by step process.

So far now, as per the analysis i gathered some  more information by
customizing the FlinkKinesisConsumer to put additional log message, I
noticed that the number of shards details which is loaded from checkpoint
data during recovering is less than than the actual number of shards in the
stream. I have fixed number of shards in kinesis stream.

i added one line of debug log at line 408 to print the size of variable "
sequenceNumsToRestore" which was populated with shard details from
checkpoint data.
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L408

In this consumer class, when the "run" method is called, it does following

   -  it discover shards from kinesis stream and selects all those shards
   which a subtask can scheduled
   - then one by one it iterates over the discovers shards and checks that
   whether that shards state is available in recovered state
   "sequenceNumsToRestore"
   
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L295
   - if it is available then it scheduled that shard with the recovered
   state
   - if it is not available in the state then it shcedule that shard with
   "EARLIEST_SEQUENCE_NUMBER"
   
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L308

As in my case, the recovered number of shard details from the checkpoint
data is less than the actual number of shards which results into scheduling
those shards with earliest stream position.
I am suspecting that somehow the checkpoint is missing state for some of
the shards. But if this is the case then that checkpoint should had failed.

Any further information to resolve this issue would be highly appreciated...

Regards,
Ravi

On Wed, Oct 16, 2019 at 5:57 AM Yun Tang  wrote:

> Hi Steven
>
> If you restore savepoint/checkpoint successfully, I think this might due
> to the shard wasn't discovered in the previous run, therefore it would be
> consumed from the beginning. Please refer to the implementation here: [1]
>
> [1]
> https://github.com/apache/flink/blob/2c411686d23f456cdc502abf1c6b97a61070a17d/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L307
>
> Best
> Yun Tang
> --
> *From:* Steven Nelson 
> *Sent:* Wednesday, October 16, 2019 4:31
> *To:* user 
> *Subject:* Kinesis Connector and Savepoint/Checkpoint restore.
>
> Hello, we currently use Flink 1.9.0 with Kinesis to process data.
>
> We have extended data retention on the Kinesis stream, which gives us 7
> days of data.
>
> We have found that when a savepoint/checkpoint is restored that it appears
> to be restarting the Kinesis Consumer from the start of the stream.
> The 
> flink_taskmanager_job_task_operator_KinesisConsumer_stream_shardId_millisBehindLatest
> property reports to Prometheus that it is behind by 7 days when the process
> starts back up from a savepoint.
>
> We have some logs that say:
>
> Subtask 3 will start consuming seeded shard 
> StreamShardHandle{streamName='TheStream',
> shard='{ShardId: shardId-0083,HashKeyRange: {StartingHashKey:
> 220651847300296034902031972006537199616,EndingHashKey:
> 223310303291865866647839586127097888767},SequenceNumberRange:
> {StartingSequenceNumber:
> 49597946220601502339755334362523522663986150244033234226,}}'} from sequence
> number EARLIEST_SEQUENCE_NUM with ShardConsumer 20
>
> This seems to indicate that this shard is starting from the beginning of
> the stream
>
> and some logs that say:
> Subtask 3 will start consuming seeded shard StreamShardHandle{streamName='
> TheStream ', shard='{ShardId: shardId-0087,HashKeyRange:
> {StartingHashKey: 231285671266575361885262428488779956224,EndingHashKey:
> 233944127258145193631070042609340645375},SequenceNumberRange:
> {StartingSequenceNumber:
> 49597946220690705320549456855089665537076743690057155954,}}'} from sequence
> number 49599841594208637293623823226010128300928335129272649074 with
> ShardConsumer 21
>

Re: Kinesis Connector and Savepoint/Checkpoint restore.

2019-10-15 Thread Yun Tang
Hi Steven

If you restore savepoint/checkpoint successfully, I think this might due to the 
shard wasn't discovered in the previous run, therefore it would be consumed 
from the beginning. Please refer to the implementation here: [1]

[1] 
https://github.com/apache/flink/blob/2c411686d23f456cdc502abf1c6b97a61070a17d/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L307

Best
Yun Tang

From: Steven Nelson 
Sent: Wednesday, October 16, 2019 4:31
To: user 
Subject: Kinesis Connector and Savepoint/Checkpoint restore.

Hello, we currently use Flink 1.9.0 with Kinesis to process data.

We have extended data retention on the Kinesis stream, which gives us 7 days of 
data.

We have found that when a savepoint/checkpoint is restored that it appears to 
be restarting the Kinesis Consumer from the start of the stream. The 
flink_taskmanager_job_task_operator_KinesisConsumer_stream_shardId_millisBehindLatest
 property reports to Prometheus that it is behind by 7 days when the process 
starts back up from a savepoint.

We have some logs that say:

Subtask 3 will start consuming seeded shard 
StreamShardHandle{streamName='TheStream', shard='{ShardId: 
shardId-0083,HashKeyRange: {StartingHashKey: 
220651847300296034902031972006537199616,EndingHashKey: 
223310303291865866647839586127097888767},SequenceNumberRange: 
{StartingSequenceNumber: 
49597946220601502339755334362523522663986150244033234226,}}'} from sequence 
number EARLIEST_SEQUENCE_NUM with ShardConsumer 20

This seems to indicate that this shard is starting from the beginning of the 
stream

and some logs that say:
Subtask 3 will start consuming seeded shard StreamShardHandle{streamName=' 
TheStream ', shard='{ShardId: shardId-0087,HashKeyRange: 
{StartingHashKey: 231285671266575361885262428488779956224,EndingHashKey: 
233944127258145193631070042609340645375},SequenceNumberRange: 
{StartingSequenceNumber: 
49597946220690705320549456855089665537076743690057155954,}}'} from sequence 
number 49599841594208637293623823226010128300928335129272649074 with 
ShardConsumer 21

This shard seems to be resuming from a specific point.

I am assuming that this might be caused by no data being available on the shard 
for the entire stream (possible with this application stage). Is this the 
expected behavior? I had thought it would checkpoint with the most recent 
sequence number, regardless of if it got data or not.

-Steve




Verifying correctness of StreamingFileSink (Kafka -> S3)

2019-10-15 Thread amran dean
I am evaluating StreamingFileSink (Kafka 0.10.11) as a production-ready
alternative to a current Kafka -> S3 solution.

Is there any way to verify the integrity of data written in S3? I'm
confused how the file names (e.g part-1-17) map to Kafka partitions, and
further unsure how to ensure that no Kafka records are lost (I know Flink
guarantees exactly-once, but this is more of a sanity check).


Jar Uploads in High Availability (Flink 1.7.2)

2019-10-15 Thread Martin, Nick J [US] (IS)
I'm seeing that when I upload a jar through the rest API, it looks like only 
the Jobmanager that received the upload request is aware of the newly uploaded 
jar. That worked fine for me in older versions where all clients were 
redirected to connect to the leader, but now that each Jobmanager accepts 
requests, if I send a jar upload request, it could end up on any one (and only 
one) of the Jobmanagers, not necessarily the leader. Further, each Jobmanager 
responds to a GET request on the /jars endpoint with its own local list of 
jars. If I try and use one of the Jar IDs from that request, my next request 
may not go to the same Jobmanager (requests are going through Docker and being 
load-balanced), and so the Jar ID isn't found on the new Jobmanager handling 
that request.






Kinesis Connector and Savepoint/Checkpoint restore.

2019-10-15 Thread Steven Nelson
Hello, we currently use Flink 1.9.0 with Kinesis to process data.

We have extended data retention on the Kinesis stream, which gives us 7
days of data.

We have found that when a savepoint/checkpoint is restored that it appears
to be restarting the Kinesis Consumer from the start of the stream.
The 
flink_taskmanager_job_task_operator_KinesisConsumer_stream_shardId_millisBehindLatest
property reports to Prometheus that it is behind by 7 days when the process
starts back up from a savepoint.

We have some logs that say:

Subtask 3 will start consuming seeded shard
StreamShardHandle{streamName='TheStream',
shard='{ShardId: shardId-0083,HashKeyRange: {StartingHashKey:
220651847300296034902031972006537199616,EndingHashKey:
223310303291865866647839586127097888767},SequenceNumberRange:
{StartingSequenceNumber:
49597946220601502339755334362523522663986150244033234226,}}'} from sequence
number EARLIEST_SEQUENCE_NUM with ShardConsumer 20

This seems to indicate that this shard is starting from the beginning of
the stream

and some logs that say:
Subtask 3 will start consuming seeded shard StreamShardHandle{streamName='
TheStream ', shard='{ShardId: shardId-0087,HashKeyRange:
{StartingHashKey: 231285671266575361885262428488779956224,EndingHashKey:
233944127258145193631070042609340645375},SequenceNumberRange:
{StartingSequenceNumber:
49597946220690705320549456855089665537076743690057155954,}}'} from sequence
number 49599841594208637293623823226010128300928335129272649074 with
ShardConsumer 21

This shard seems to be resuming from a specific point.

I am assuming that this might be caused by no data being available on the
shard for the entire stream (possible with this application stage). Is this
the expected behavior? I had thought it would checkpoint with the most
recent sequence number, regardless of if it got data or not.

-Steve


Mirror Maker 2.0 cluster and starting from latest offset and other queries

2019-10-15 Thread Vishal Santoshi
2 queries

1.   I am trying to configure MM2 to start replicating from the head (
latest of the topic ) .  Should  auto.offset.reset = latest in
mm2.properties be enough ? Unfortunately MM2 will start from the EARLIEST.

2. I  do not have "Authorizer is configured on the broker "  and see this
exception java.util.concurrent.ExecutionException:
org.apache.kafka.common.errors.SecurityDisabledException: No Authorizer is
configured on the broker From
https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0#KIP-382:MirrorMaker2.0-Config,ACLSync

*sync.topic.acls = false* should do it but does not..



Any ideas?


JDBC Table Sink doesn't seem to sink to database.

2019-10-15 Thread John Smith
Hi, using 1.8.0

I have the following job: https://pastebin.com/ibZUE8Qx

So the job does the following steps...
1- Consume from Kafka and return JsonObject
2- Map JsonObject to MyPojo
3- Convert The stream to a table
4- Insert the table to JDBC sink table
5- Print the table.

- The job seems to work with no errors and I can see the row print to the
console and I see nothing in my database.
- If I put invalid host for the database and restart the job, I get a
connection SQLException error. So at least we know that works.
- If I make a typo on the INSERT INTO statement like INSERTS INTO
non_existing_table, there are no exceptions thrown, the print happens, the
stream continues to work.
- If I drop the table from the database, same thing, no exceptions thrown,
the print happens, the stream continues to work.

So am I missing something?


Re: Is it possible to get Flink job name in an operator?

2019-10-15 Thread Aleksandar Mastilovic
I got mine in AbstractStreamOperator.open() method through 
this.getContainingTask().getEnvironment().getJobID();


> On Oct 14, 2019, at 11:53 PM, 马阳阳  wrote:
> 
> As the title. Is it possible now? Or if we can do something to achieve this. 
> I tried to put the job name into the ExecutionConfig.GlobalJobParameters. But 
> it is not possible to get the job name before Environment.execute() is called.
> 
> Best regards,
> mayangyang



Re: add() method of AggregateFunction not called even though new watermark is emitted

2019-10-15 Thread Vijay Balakrishnan
Hi Theo,
You were right. For some reason(I still haven't figured it out) but the
FilterFunction was causing issues. I commented it out and it started
getting into the add() method of the aggregate method.

/*kinesisStream = kinesisStream.filter((FilterFunction>) inputMap -> {
Object groupByValueObj = inputMap.get(groupBy);
return groupByValueObj != null;
});*/
//String metric = Objects.requireNonNull(inputMetricSelector).getMetric();

TIA,

Vijay




On Tue, Oct 15, 2019 at 9:34 AM Vijay Balakrishnan 
wrote:

> Hi Theo,
> It gets to the FilterFunction during the creation of the ExecutionGraph
> initially but not during the runtime when recs are streaming in.So, it is
> not getting that far- seems to be stuck in the
>
> final SingleOutputStreamOperator> filteredKinesisStream = 
> kinesisStream.filter   code.
>
> Doesn't seem to get past it as it keeps incrementing watermarks but the 
> Watermark never seems to hit the end of the window.Maybe I am doing
>
> something super simple stupid.
>
> TIA,
> Vijay
>
> On Tue, Oct 15, 2019 at 12:48 AM Theo Diefenthal <
> theo.diefent...@scoop-software.de> wrote:
>
>> Hi Vijay,
>>
>> Maybe a stupid question, but according to your comments, the code works
>> fine up till a "flatMap" operation. It seems that this flatMap is directly
>> followed by a filter-Function in the method
>> createAggregatedMonitoringGroupingWindowStream1. Is ist maybe filtering out
>> all events? Or is not even the filter function itself called? (Due to your
>> comments suggesting it).
>>
>> Best regards
>> Theo
>>
>> --
>> *Von: *"Vijay Balakrishnan" 
>> *An: *"Dawid Wysakowicz" 
>> *CC: *"user" 
>> *Gesendet: *Dienstag, 15. Oktober 2019 02:01:05
>> *Betreff: *Re: add() method of AggregateFunction not called even though
>> new watermark is emitted
>>
>> Hi,
>> Thx for the replies - Congxian & Dawdi.
>> Watermarks are advancing.Not sure how to check every new generated
>> watermark is reaching end of the window 
>>
>> I did check the Flink UI for the currentInputWatermark and it is
>> increasing monotonically.
>>
>> Narrowed down the problem to not calling the windowStream.aggregate.
>> I also *added a checkpoint *to see if it was causing the issue.Didn't
>> seem to help.
>> Most of the code is reached during the creation of the ExecutionGraph on
>> the start of the program.
>>
>> I generate an incrementing sequence of timestamps(delay of 5000ms between
>> each rec) from a Producer to Kinesis and it emits a new watermark as it
>> starts receiving the input records.
>> My window size is 15s.
>> I see a WindowedStream is created with windowAssigner:
>> TumblingEventTimeWindows(15000) and trigger: EventTimeTrigger
>> but the *code never gets into the EventTimeTrigger.onElement() or
>> onEventTime() to fire the trigger*.
>> It gets into TimestampsAndPunctuatedWatermarkOperator and emitWatermark().
>> I even tried to use ProcessingTime but that also didn't help.
>>
>>
>> //code to create kinesis consumer successfully..
>> for (Rule rule : rules.getRules()) {
>> //gets in here fine
>> final SingleOutputStreamOperator>
>> filteredKinesisStream = kinesisStream.filter(mon -> {
>> boolean result;
>> String eventName = mon.get(MEASUREMENT) != null ?
>> (String) mon.get(MEASUREMENT) : "";
>> InputMetricSelector inputMetricSelector =
>> rule.getInputMetricSelector();
>> String measurement = inputMetricSelector != null ?
>> inputMetricSelector.getMeasurement() : "";
>> result = eventName.equals(measurement);
>> if (result) {
>> Map inputTags = mon.get(TAGS) != null
>> ? (Map) mon.get(TAGS) : new HashMap<>();
>> Map ruleTags = inputMetricSelector !=
>> null ? inputMetricSelector.getTags() : new HashMap<>();
>> result = matchTags(inputTags, ruleTags);
>> }
>> return result;//*<== this is true*
>> }
>> ).flatMap((FlatMapFunction, Map>)
>> (input, out) -> {
>> out.collect(input);//*< runs up till here fine*
>> }).returns(new TypeHint>() {
>> });
>> //*doesn't do anything beyond this point at runtime*
>> DataStream enrichedMGStream =
>> pms.createAggregatedMonitoringGroupingWindowStream1
>> (filteredKinesisStream, ruleFactory, rule, parallelProcess);
>> enrichedMGStream.addSink(influxSink)
>> .setParallelism(nbrSinks);
>> }
>>
>> private DataStream
>> createAggregatedMonitoringGroupingWindowStream1(DataStream> Object>> kinesisStream, RuleFactory ruleFactory, Rule rule, int
>> parallelProcess) {
>> DataStream enrichedComponentInstanceStream1;
>> RuleConfig ruleConfig = rule.getRuleConfig();
>> String ruleType = ruleConfig != null ? ruleConfig.getRuleType() : "";
>> RuleIF ruleImpl = ruleFactory.getRule(ruleType);
>> Map ruleProps = ruleConfig != null ?
>> ruleConfig.getRuleProps() : new 

Re: add() method of AggregateFunction not called even though new watermark is emitted

2019-10-15 Thread Vijay Balakrishnan
Hi Theo,
It gets to the FilterFunction during the creation of the ExecutionGraph
initially but not during the runtime when recs are streaming in.So, it is
not getting that far- seems to be stuck in the

final SingleOutputStreamOperator>
filteredKinesisStream = kinesisStream.filter   code.

Doesn't seem to get past it as it keeps incrementing watermarks but
the Watermark never seems to hit the end of the window.Maybe I am
doing

something super simple stupid.

TIA,
Vijay

On Tue, Oct 15, 2019 at 12:48 AM Theo Diefenthal <
theo.diefent...@scoop-software.de> wrote:

> Hi Vijay,
>
> Maybe a stupid question, but according to your comments, the code works
> fine up till a "flatMap" operation. It seems that this flatMap is directly
> followed by a filter-Function in the method
> createAggregatedMonitoringGroupingWindowStream1. Is ist maybe filtering out
> all events? Or is not even the filter function itself called? (Due to your
> comments suggesting it).
>
> Best regards
> Theo
>
> --
> *Von: *"Vijay Balakrishnan" 
> *An: *"Dawid Wysakowicz" 
> *CC: *"user" 
> *Gesendet: *Dienstag, 15. Oktober 2019 02:01:05
> *Betreff: *Re: add() method of AggregateFunction not called even though
> new watermark is emitted
>
> Hi,
> Thx for the replies - Congxian & Dawdi.
> Watermarks are advancing.Not sure how to check every new generated
> watermark is reaching end of the window 
>
> I did check the Flink UI for the currentInputWatermark and it is
> increasing monotonically.
>
> Narrowed down the problem to not calling the windowStream.aggregate.
> I also *added a checkpoint *to see if it was causing the issue.Didn't
> seem to help.
> Most of the code is reached during the creation of the ExecutionGraph on
> the start of the program.
>
> I generate an incrementing sequence of timestamps(delay of 5000ms between
> each rec) from a Producer to Kinesis and it emits a new watermark as it
> starts receiving the input records.
> My window size is 15s.
> I see a WindowedStream is created with windowAssigner:
> TumblingEventTimeWindows(15000) and trigger: EventTimeTrigger
> but the *code never gets into the EventTimeTrigger.onElement() or
> onEventTime() to fire the trigger*.
> It gets into TimestampsAndPunctuatedWatermarkOperator and emitWatermark().
> I even tried to use ProcessingTime but that also didn't help.
>
>
> //code to create kinesis consumer successfully..
> for (Rule rule : rules.getRules()) {
> //gets in here fine
> final SingleOutputStreamOperator>
> filteredKinesisStream = kinesisStream.filter(mon -> {
> boolean result;
> String eventName = mon.get(MEASUREMENT) != null ? (String)
> mon.get(MEASUREMENT) : "";
> InputMetricSelector inputMetricSelector =
> rule.getInputMetricSelector();
> String measurement = inputMetricSelector != null ?
> inputMetricSelector.getMeasurement() : "";
> result = eventName.equals(measurement);
> if (result) {
> Map inputTags = mon.get(TAGS) != null
> ? (Map) mon.get(TAGS) : new HashMap<>();
> Map ruleTags = inputMetricSelector !=
> null ? inputMetricSelector.getTags() : new HashMap<>();
> result = matchTags(inputTags, ruleTags);
> }
> return result;//*<== this is true*
> }
> ).flatMap((FlatMapFunction, Map>)
> (input, out) -> {
> out.collect(input);//*< runs up till here fine*
> }).returns(new TypeHint>() {
> });
> //*doesn't do anything beyond this point at runtime*
> DataStream enrichedMGStream =
> pms.createAggregatedMonitoringGroupingWindowStream1
> (filteredKinesisStream, ruleFactory, rule, parallelProcess);
> enrichedMGStream.addSink(influxSink)
> .setParallelism(nbrSinks);
> }
>
> private DataStream
> createAggregatedMonitoringGroupingWindowStream1(DataStream Object>> kinesisStream, RuleFactory ruleFactory, Rule rule, int
> parallelProcess) {
> DataStream enrichedComponentInstanceStream1;
> RuleConfig ruleConfig = rule.getRuleConfig();
> String ruleType = ruleConfig != null ? ruleConfig.getRuleType() : "";
> RuleIF ruleImpl = ruleFactory.getRule(ruleType);
> Map ruleProps = ruleConfig != null ?
> ruleConfig.getRuleProps() : new HashMap<>();
> Object intervalObj = ruleProps.get("rule_eval_window");
> String timeInterval = intervalObj != null ? (String) intervalObj : "";
> org.apache.flink.streaming.api.windowing.time.Time timeWindow =
> getTimeWindowFromInterval(timeInterval);
>
> Object windowTypeObj = ruleProps.get("window_type");
> String windowType = windowTypeObj != null ? (String) windowTypeObj :
> "";
>
> InputMetricSelector inputMetricSelector =
> rule.getInputMetricSelector();
> Map tags = inputMetricSelector != null ?
> inputMetricSelector.getTags() : new HashMap<>();
> String groupByObj = tags.get(GROUP_BY);
> String groupBy 

Re: Discard message on deserialization errors.

2019-10-15 Thread John Smith
Ah ok thanks!

On Sat, 12 Oct 2019 at 11:13, Zhu Zhu  wrote:

> I mean the Kafka source provided in Flink can correctly ignores null
> deserialized values.
>
> isEndOfStream allows you to control when to end the input stream.
> If it is used for running infinite stream jobs, you can simply return
> false.
>
> Thanks,
> Zhu Zhu
>
> John Smith  于2019年10月12日周六 下午8:40写道:
>
>> The Kafka Fetcher you mean the flink JSON schemas? They throw
>> IOExceptions?
>>
>> Also what's the purpose of isEndOfStream most schemas I looked at don't
>> do anything but just return false?
>>
>> On Fri., Oct. 11, 2019, 11:44 p.m. Zhu Zhu,  wrote:
>>
>>> Hi John,
>>>
>>> It should work with a *null* return value.
>>> In the java doc of DeserializationSchema#deserialize it says that
>>>
 *@return The deserialized message as an object (null if the message
 cannot be deserialized).*
>>>
>>>
>>> I also checked the Kafka fetcher in Flink and it can correctly handle a
>>> null deserialized record.
>>>
>>> Just pay attention to also not make
>>> *DeserializationSchema#isEndOfStream* throw errors on a null record
>>> provided.
>>>
>>> Thanks,
>>> Zhu Zhu
>>>
>>> John Smith  于2019年10月12日周六 上午5:36写道:
>>>
 Hi using Flink 1.8.0.

 I am ingesting data from Kafka, unfortunately for the time being I have
 not looked into using the schema registry.

 So for now I would like to write a simple deserialization schema that
 discards the data if deserialization fails.

 The other option is to do in flat map with markers and split to dead
 letter queue, but I'm not too concerned about that for now.

 Is it ok to just return null if deserialization fails?

 @Override
 public MyObject deserialize(byte[] message) {
try {
   return MyDecoder.decode(message));
} catch(IOException ex) {
   logger.warn("Failed to decode message.", ex);
   return null;
}
 }




RE: FLINK-13497 / "Could not create file for checking if truncate works" / HDFS

2019-10-15 Thread Adrian Vasiliu
Hi,
FYI we've switched to a different Hadoop server, and the issue vanished... It does look as the cause was on hadoop side. Thanks again Congxian.Adrian 
 
- Original message -From: "Adrian Vasiliu" To: qcx978132...@gmail.comCc: user@flink.apache.orgSubject: [EXTERNAL] RE: FLINK-13497 / "Could not create file for checking if truncate works" / HDFSDate: Tue, Oct 15, 2019 8:37 AM 
Thanks Congxian. The possible causes listed in the mostly voted answer of https://stackoverflow.com/questions/36015864/hadoop-be-replicated-to-0-nodes-instead-of-minreplication-1-there-are-1/36310025 do not seem to hold for us, because we have other pretty much similar flink jobs using the same Hadoop server and root directory (under different hdfs paths), and they do work. Thus in principle the config on the Hadoop server-side wouldn't be the cause. Also, according to the Ambari monitoring tools, the Hadoop server is healthy, and we did restart it. However, we'll check all points mentioned in various answers, in particular the one about temp files.
Thanks
Adrian
 
- Original message -From: Congxian Qiu To: Adrian Vasiliu Cc: user Subject: [EXTERNAL] Re: FLINK-13497 / "Could not create file for checking if truncate works" / HDFSDate: Tue, Oct 15, 2019 4:02 AM 
Hi
 
From the given stack trace, maybe you could solve the "replication problem" first,   File /okd-dev/3fe6b069-43bf-4d86-9762-4f501c9db16e could only be replicated to 0 nodes instead of minReplication (=1). There are 2 datanode(s) running and no node(s) are excluded in this operation, and maybe the answer from SO[1] can help.
 
[1] https://stackoverflow.com/questions/36015864/hadoop-be-replicated-to-0-nodes-instead-of-minreplication-1-there-are-1/36310025
Best,
Congxian 

Adrian Vasiliu  于2019年10月14日周一 下午9:10写道:
Hello, 
 
We recently upgraded our product from Flink 1.7.2 to Flink 1.9, and we experience repeated failing jobs with  
java.lang.RuntimeException: Could not create file for checking if truncate works. You can disable support for truncate() completely via BucketingSink.setUseTruncate(false).
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.reflectTruncate(BucketingSink.java:645)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:388)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:281)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /okd-dev/3fe6b069-43bf-4d86-9762-4f501c9db16e could only be replicated to 0 nodes instead of minReplication (=1). There are 2 datanode(s) running and no node(s) are excluded in this operation.
at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1719)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getNewBlockTargets(FSNamesystem.java:3368)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3292)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:850)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:504)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2351)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2347)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2347) 

at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1489)
at org.apache.hadoop.ipc.Client.call(Client.java:1435)
at org.apache.hadoop.ipc.Client.call(Client.java:1345)
at 

Re: Is it possible to get Flink job name in an operator?

2019-10-15 Thread Zhu Zhu
I think ExecutionConfig.GlobalJobParameters is the way to do this if you
want to retrieve it in runtime.
Or you can just pass the name to each operator you implement to have it
serialized together with the udf.

Thanks,
Zhu Zhu

马阳阳  于2019年10月15日周二 下午3:11写道:

> As the title. Is it possible now? Or if we can do something to achieve
> this. I tried to put the job name into the 
> ExecutionConfig.GlobalJobParameters.
> But it is not possible to get the job name before Environment.execute() is
> called.
>
> Best regards,
> mayangyang
>


Re: add() method of AggregateFunction not called even though new watermark is emitted

2019-10-15 Thread Theo Diefenthal
Hi Vijay, 

Maybe a stupid question, but according to your comments, the code works fine up 
till a "flatMap" operation. It seems that this flatMap is directly followed by 
a filter-Function in the method 
createAggregatedMonitoringGroupingWindowStream1. Is ist maybe filtering out all 
events? Or is not even the filter function itself called? (Due to your comments 
suggesting it). 

Best regards 
Theo 


Von: "Vijay Balakrishnan"  
An: "Dawid Wysakowicz"  
CC: "user"  
Gesendet: Dienstag, 15. Oktober 2019 02:01:05 
Betreff: Re: add() method of AggregateFunction not called even though new 
watermark is emitted 

Hi, 
Thx for the replies - Congxian & Dawdi. 
Watermarks are advancing. Not sure how to check every new generated watermark 
is reaching end of the window  

I did check the Flink UI for the currentInputWatermark and it is increasing 
monotonically. 

Narrowed down the problem to not calling the windowStream.aggregate. 
I also added a checkpoint to see if it was causing the issue.Didn't seem to 
help. 
Most of the code is reached during the creation of the ExecutionGraph on the 
start of the program. 

I generate an incrementing sequence of timestamps(delay of 5000ms between each 
rec) from a Producer to Kinesis and it emits a new watermark as it starts 
receiving the input records. 
My window size is 15s. 
I see a WindowedStream is created with windowAssigner: 
TumblingEventTimeWindows(15000) and trigger: EventTimeTrigger 
but the code never gets into the EventTimeTrigger.onElement() or onEventTime() 
to fire the trigger . 
It gets into TimestampsAndPunctuatedWatermarkOperator and emitWatermark(). 
I even tried to use ProcessingTime but that also didn't help. 


//code to create kinesis consumer successfully.. 
for (Rule rule : rules.getRules()) { 
//gets in here fine 
final SingleOutputStreamOperator> filteredKinesisStream = 
kinesisStream.filter(mon -> { 
boolean result; 
String eventName = mon.get(MEASUREMENT) != null ? (String) mon.get(MEASUREMENT) 
: ""; 
InputMetricSelector inputMetricSelector = rule.getInputMetricSelector(); 
String measurement = inputMetricSelector != null ? 
inputMetricSelector.getMeasurement() : ""; 
result = eventName.equals(measurement); 
if (result) { 
Map inputTags = mon.get(TAGS) != null ? (Map) 
mon.get(TAGS) : new HashMap<>(); 
Map ruleTags = inputMetricSelector != null ? 
inputMetricSelector.getTags() : new HashMap<>(); 
result = matchTags(inputTags, ruleTags); 
} 
return result;// <== this is true 
} 
).flatMap((FlatMapFunction, Map>) (input, 
out) -> { 
out.collect(input);// < runs up till here fine 
}).returns(new TypeHint>() { 
}); 
// doesn't do anything beyond this point at runtime 
DataStream enrichedMGStream = 
pms.createAggregatedMonitoringGroupingWindowStream1 
(filteredKinesisStream, ruleFactory, rule, parallelProcess); 
enrichedMGStream.addSink(influxSink) 
.setParallelism(nbrSinks); 
} 

private DataStream 
createAggregatedMonitoringGroupingWindowStream1(DataStream> 
kinesisStream, RuleFactory ruleFactory, Rule rule, int parallelProcess) { 
DataStream enrichedComponentInstanceStream1; 
RuleConfig ruleConfig = rule.getRuleConfig(); 
String ruleType = ruleConfig != null ? ruleConfig.getRuleType() : ""; 
RuleIF ruleImpl = ruleFactory.getRule(ruleType); 
Map ruleProps = ruleConfig != null ? ruleConfig.getRuleProps() 
: new HashMap<>(); 
Object intervalObj = ruleProps.get("rule_eval_window"); 
String timeInterval = intervalObj != null ? (String) intervalObj : ""; 
org.apache.flink.streaming.api.windowing.time.Time timeWindow = 
getTimeWindowFromInterval(timeInterval); 

Object windowTypeObj = ruleProps.get("window_type"); 
String windowType = windowTypeObj != null ? (String) windowTypeObj : ""; 

InputMetricSelector inputMetricSelector = rule.getInputMetricSelector(); 
Map tags = inputMetricSelector != null ? 
inputMetricSelector.getTags() : new HashMap<>(); 
String groupByObj = tags.get(GROUP_BY); 
String groupBy = groupByObj != null ? groupByObj : ""; 
kinesisStream = kinesisStream.filter((FilterFunction>) 
inputMap -> { 
Object groupByValueObj = inputMap.get(groupBy); 
return groupByValueObj != null; 
}); 
Set groupBySet = new 
HashSet<>(Arrays.asList(groupBy.split(KEY_DELIMITER))); 
String metric = Objects.requireNonNull(inputMetricSelector).getMetric(); 
//till here, it went through fine during creation of ExceutionGraph 
KeyedStream, MonitoringTuple> monitoringTupleKeyedStream = 
kinesisStream.keyBy(new MapTupleKeySelector(groupBySet, metric)); <=== never 
gets into the MapTupleKeySelector.getKey() - a similar class works in another 
project 
enrichedComponentInstanceStream1 = 
getMonitoringGroupDataStream1(monitoringTupleKeyedStream, timeWindow, 
windowType, timeInterval, ruleImpl, rule, parallelProcess); 
return enrichedComponentInstanceStream1; 
} 

private DataStream 
getMonitoringGroupDataStream1(KeyedStream, MonitoringTuple> 
monitoringTupleKeyedStream, 
org.apache.flink.streaming.api.windowing.time.Time timeWindow, String 

Fwd: Is it possible to get Flink job name in an operator?

2019-10-15 Thread 马阳阳
As the title. Is it possible now? Or if we can do something to achieve 
this. I tried to put the job name into the 
ExecutionConfig.GlobalJobParameters. But it is not possible to get the 
job name before Environment.execute() is called.


Best regards,
mayangyang


RE: FLINK-13497 / "Could not create file for checking if truncate works" / HDFS

2019-10-15 Thread Adrian Vasiliu
Thanks Congxian. The possible causes listed in the mostly voted answer of https://stackoverflow.com/questions/36015864/hadoop-be-replicated-to-0-nodes-instead-of-minreplication-1-there-are-1/36310025 do not seem to hold for us, because we have other pretty much similar flink jobs using the same Hadoop server and root directory (under different hdfs paths), and they do work. Thus in principle the config on the Hadoop server-side wouldn't be the cause. Also, according to the Ambari monitoring tools, the Hadoop server is healthy, and we did restart it. However, we'll check all points mentioned in various answers, in particular the one about temp files.
Thanks
Adrian
 
- Original message -From: Congxian Qiu To: Adrian Vasiliu Cc: user Subject: [EXTERNAL] Re: FLINK-13497 / "Could not create file for checking if truncate works" / HDFSDate: Tue, Oct 15, 2019 4:02 AM 
Hi
 
From the given stack trace, maybe you could solve the "replication problem" first,   File /okd-dev/3fe6b069-43bf-4d86-9762-4f501c9db16e could only be replicated to 0 nodes instead of minReplication (=1). There are 2 datanode(s) running and no node(s) are excluded in this operation, and maybe the answer from SO[1] can help.
 
[1] https://stackoverflow.com/questions/36015864/hadoop-be-replicated-to-0-nodes-instead-of-minreplication-1-there-are-1/36310025
Best,
Congxian 

Adrian Vasiliu  于2019年10月14日周一 下午9:10写道:
Hello, 
 
We recently upgraded our product from Flink 1.7.2 to Flink 1.9, and we experience repeated failing jobs with  
java.lang.RuntimeException: Could not create file for checking if truncate works. You can disable support for truncate() completely via BucketingSink.setUseTruncate(false).
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.reflectTruncate(BucketingSink.java:645)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:388)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:281)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /okd-dev/3fe6b069-43bf-4d86-9762-4f501c9db16e could only be replicated to 0 nodes instead of minReplication (=1). There are 2 datanode(s) running and no node(s) are excluded in this operation.
at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1719)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getNewBlockTargets(FSNamesystem.java:3368)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3292)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:850)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:504)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2351)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2347)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2347) 

at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1489)
at org.apache.hadoop.ipc.Client.call(Client.java:1435)
at org.apache.hadoop.ipc.Client.call(Client.java:1345)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
at com.sun.proxy.$Proxy49.addBlock(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:444)
at sun.reflect.GeneratedMethodAccessor87.invoke(Unknown Source)