Re: Jar Uploads in High Availability (Flink 1.7.2)
Hi, i was also experiencing with the similar behavior. I adopted following approach - used a distributed file system(in my case aws efs) and set the attribute "web.upload.dir", this way both the job manager have same location. - on the load balancer side(aws elb), i used "readiness probe" based on zookeeper entry for active jobmanager address, this way elb always point to the active job manager and if the active jobmanager changes then it automatically point to the new active jobmanager and as both are using the same location by configuring distributed file system so new active job is able to find the same jar. Regards, Ravi On Wed, Oct 16, 2019 at 1:15 AM Martin, Nick J [US] (IS) < nick.mar...@ngc.com> wrote: > I’m seeing that when I upload a jar through the rest API, it looks like > only the Jobmanager that received the upload request is aware of the newly > uploaded jar. That worked fine for me in older versions where all clients > were redirected to connect to the leader, but now that each Jobmanager > accepts requests, if I send a jar upload request, it could end up on any > one (and only one) of the Jobmanagers, not necessarily the leader. Further, > each Jobmanager responds to a GET request on the /jars endpoint with its > own local list of jars. If I try and use one of the Jar IDs from that > request, my next request may not go to the same Jobmanager (requests are > going through Docker and being load-balanced), and so the Jar ID isn’t > found on the new Jobmanager handling that request. > > > > > > > > >
Re: Kinesis Connector and Savepoint/Checkpoint restore.
Hi, I am also facing the same problem. I am using Flink 1.9.0 and consuming from Kinesis source with retention of 1 day. I am observing that when the job is submitted with "latest" initial stream position, the job starts well and keep on processing data from all the shards for very long period of time without any lag. When the job fails then it also recovery well with last successful checkpointed state. But i am also experiencing that very rarely when the job fails and it recovers from the last successful checkpointed state, i noticed a hug lag( 1 day as per retention) on one of the stream. For me, to reproduce this issue is still unknown to defined a step by step process. So far now, as per the analysis i gathered some more information by customizing the FlinkKinesisConsumer to put additional log message, I noticed that the number of shards details which is loaded from checkpoint data during recovering is less than than the actual number of shards in the stream. I have fixed number of shards in kinesis stream. i added one line of debug log at line 408 to print the size of variable " sequenceNumsToRestore" which was populated with shard details from checkpoint data. https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L408 In this consumer class, when the "run" method is called, it does following - it discover shards from kinesis stream and selects all those shards which a subtask can scheduled - then one by one it iterates over the discovers shards and checks that whether that shards state is available in recovered state "sequenceNumsToRestore" https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L295 - if it is available then it scheduled that shard with the recovered state - if it is not available in the state then it shcedule that shard with "EARLIEST_SEQUENCE_NUMBER" https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L308 As in my case, the recovered number of shard details from the checkpoint data is less than the actual number of shards which results into scheduling those shards with earliest stream position. I am suspecting that somehow the checkpoint is missing state for some of the shards. But if this is the case then that checkpoint should had failed. Any further information to resolve this issue would be highly appreciated... Regards, Ravi On Wed, Oct 16, 2019 at 5:57 AM Yun Tang wrote: > Hi Steven > > If you restore savepoint/checkpoint successfully, I think this might due > to the shard wasn't discovered in the previous run, therefore it would be > consumed from the beginning. Please refer to the implementation here: [1] > > [1] > https://github.com/apache/flink/blob/2c411686d23f456cdc502abf1c6b97a61070a17d/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L307 > > Best > Yun Tang > -- > *From:* Steven Nelson > *Sent:* Wednesday, October 16, 2019 4:31 > *To:* user > *Subject:* Kinesis Connector and Savepoint/Checkpoint restore. > > Hello, we currently use Flink 1.9.0 with Kinesis to process data. > > We have extended data retention on the Kinesis stream, which gives us 7 > days of data. > > We have found that when a savepoint/checkpoint is restored that it appears > to be restarting the Kinesis Consumer from the start of the stream. > The > flink_taskmanager_job_task_operator_KinesisConsumer_stream_shardId_millisBehindLatest > property reports to Prometheus that it is behind by 7 days when the process > starts back up from a savepoint. > > We have some logs that say: > > Subtask 3 will start consuming seeded shard > StreamShardHandle{streamName='TheStream', > shard='{ShardId: shardId-0083,HashKeyRange: {StartingHashKey: > 220651847300296034902031972006537199616,EndingHashKey: > 223310303291865866647839586127097888767},SequenceNumberRange: > {StartingSequenceNumber: > 49597946220601502339755334362523522663986150244033234226,}}'} from sequence > number EARLIEST_SEQUENCE_NUM with ShardConsumer 20 > > This seems to indicate that this shard is starting from the beginning of > the stream > > and some logs that say: > Subtask 3 will start consuming seeded shard StreamShardHandle{streamName=' > TheStream ', shard='{ShardId: shardId-0087,HashKeyRange: > {StartingHashKey: 231285671266575361885262428488779956224,EndingHashKey: > 233944127258145193631070042609340645375},SequenceNumberRange: > {StartingSequenceNumber: > 49597946220690705320549456855089665537076743690057155954,}}'} from sequence > number 49599841594208637293623823226010128300928335129272649074 with > ShardConsumer 21 >
Re: Kinesis Connector and Savepoint/Checkpoint restore.
Hi Steven If you restore savepoint/checkpoint successfully, I think this might due to the shard wasn't discovered in the previous run, therefore it would be consumed from the beginning. Please refer to the implementation here: [1] [1] https://github.com/apache/flink/blob/2c411686d23f456cdc502abf1c6b97a61070a17d/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L307 Best Yun Tang From: Steven Nelson Sent: Wednesday, October 16, 2019 4:31 To: user Subject: Kinesis Connector and Savepoint/Checkpoint restore. Hello, we currently use Flink 1.9.0 with Kinesis to process data. We have extended data retention on the Kinesis stream, which gives us 7 days of data. We have found that when a savepoint/checkpoint is restored that it appears to be restarting the Kinesis Consumer from the start of the stream. The flink_taskmanager_job_task_operator_KinesisConsumer_stream_shardId_millisBehindLatest property reports to Prometheus that it is behind by 7 days when the process starts back up from a savepoint. We have some logs that say: Subtask 3 will start consuming seeded shard StreamShardHandle{streamName='TheStream', shard='{ShardId: shardId-0083,HashKeyRange: {StartingHashKey: 220651847300296034902031972006537199616,EndingHashKey: 223310303291865866647839586127097888767},SequenceNumberRange: {StartingSequenceNumber: 49597946220601502339755334362523522663986150244033234226,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 20 This seems to indicate that this shard is starting from the beginning of the stream and some logs that say: Subtask 3 will start consuming seeded shard StreamShardHandle{streamName=' TheStream ', shard='{ShardId: shardId-0087,HashKeyRange: {StartingHashKey: 231285671266575361885262428488779956224,EndingHashKey: 233944127258145193631070042609340645375},SequenceNumberRange: {StartingSequenceNumber: 49597946220690705320549456855089665537076743690057155954,}}'} from sequence number 49599841594208637293623823226010128300928335129272649074 with ShardConsumer 21 This shard seems to be resuming from a specific point. I am assuming that this might be caused by no data being available on the shard for the entire stream (possible with this application stage). Is this the expected behavior? I had thought it would checkpoint with the most recent sequence number, regardless of if it got data or not. -Steve
Verifying correctness of StreamingFileSink (Kafka -> S3)
I am evaluating StreamingFileSink (Kafka 0.10.11) as a production-ready alternative to a current Kafka -> S3 solution. Is there any way to verify the integrity of data written in S3? I'm confused how the file names (e.g part-1-17) map to Kafka partitions, and further unsure how to ensure that no Kafka records are lost (I know Flink guarantees exactly-once, but this is more of a sanity check).
Jar Uploads in High Availability (Flink 1.7.2)
I'm seeing that when I upload a jar through the rest API, it looks like only the Jobmanager that received the upload request is aware of the newly uploaded jar. That worked fine for me in older versions where all clients were redirected to connect to the leader, but now that each Jobmanager accepts requests, if I send a jar upload request, it could end up on any one (and only one) of the Jobmanagers, not necessarily the leader. Further, each Jobmanager responds to a GET request on the /jars endpoint with its own local list of jars. If I try and use one of the Jar IDs from that request, my next request may not go to the same Jobmanager (requests are going through Docker and being load-balanced), and so the Jar ID isn't found on the new Jobmanager handling that request.
Kinesis Connector and Savepoint/Checkpoint restore.
Hello, we currently use Flink 1.9.0 with Kinesis to process data. We have extended data retention on the Kinesis stream, which gives us 7 days of data. We have found that when a savepoint/checkpoint is restored that it appears to be restarting the Kinesis Consumer from the start of the stream. The flink_taskmanager_job_task_operator_KinesisConsumer_stream_shardId_millisBehindLatest property reports to Prometheus that it is behind by 7 days when the process starts back up from a savepoint. We have some logs that say: Subtask 3 will start consuming seeded shard StreamShardHandle{streamName='TheStream', shard='{ShardId: shardId-0083,HashKeyRange: {StartingHashKey: 220651847300296034902031972006537199616,EndingHashKey: 223310303291865866647839586127097888767},SequenceNumberRange: {StartingSequenceNumber: 49597946220601502339755334362523522663986150244033234226,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 20 This seems to indicate that this shard is starting from the beginning of the stream and some logs that say: Subtask 3 will start consuming seeded shard StreamShardHandle{streamName=' TheStream ', shard='{ShardId: shardId-0087,HashKeyRange: {StartingHashKey: 231285671266575361885262428488779956224,EndingHashKey: 233944127258145193631070042609340645375},SequenceNumberRange: {StartingSequenceNumber: 49597946220690705320549456855089665537076743690057155954,}}'} from sequence number 49599841594208637293623823226010128300928335129272649074 with ShardConsumer 21 This shard seems to be resuming from a specific point. I am assuming that this might be caused by no data being available on the shard for the entire stream (possible with this application stage). Is this the expected behavior? I had thought it would checkpoint with the most recent sequence number, regardless of if it got data or not. -Steve
Mirror Maker 2.0 cluster and starting from latest offset and other queries
2 queries 1. I am trying to configure MM2 to start replicating from the head ( latest of the topic ) . Should auto.offset.reset = latest in mm2.properties be enough ? Unfortunately MM2 will start from the EARLIEST. 2. I do not have "Authorizer is configured on the broker " and see this exception java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.SecurityDisabledException: No Authorizer is configured on the broker From https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0#KIP-382:MirrorMaker2.0-Config,ACLSync *sync.topic.acls = false* should do it but does not.. Any ideas?
JDBC Table Sink doesn't seem to sink to database.
Hi, using 1.8.0 I have the following job: https://pastebin.com/ibZUE8Qx So the job does the following steps... 1- Consume from Kafka and return JsonObject 2- Map JsonObject to MyPojo 3- Convert The stream to a table 4- Insert the table to JDBC sink table 5- Print the table. - The job seems to work with no errors and I can see the row print to the console and I see nothing in my database. - If I put invalid host for the database and restart the job, I get a connection SQLException error. So at least we know that works. - If I make a typo on the INSERT INTO statement like INSERTS INTO non_existing_table, there are no exceptions thrown, the print happens, the stream continues to work. - If I drop the table from the database, same thing, no exceptions thrown, the print happens, the stream continues to work. So am I missing something?
Re: Is it possible to get Flink job name in an operator?
I got mine in AbstractStreamOperator.open() method through this.getContainingTask().getEnvironment().getJobID(); > On Oct 14, 2019, at 11:53 PM, 马阳阳 wrote: > > As the title. Is it possible now? Or if we can do something to achieve this. > I tried to put the job name into the ExecutionConfig.GlobalJobParameters. But > it is not possible to get the job name before Environment.execute() is called. > > Best regards, > mayangyang
Re: add() method of AggregateFunction not called even though new watermark is emitted
Hi Theo, You were right. For some reason(I still haven't figured it out) but the FilterFunction was causing issues. I commented it out and it started getting into the add() method of the aggregate method. /*kinesisStream = kinesisStream.filter((FilterFunction>) inputMap -> { Object groupByValueObj = inputMap.get(groupBy); return groupByValueObj != null; });*/ //String metric = Objects.requireNonNull(inputMetricSelector).getMetric(); TIA, Vijay On Tue, Oct 15, 2019 at 9:34 AM Vijay Balakrishnan wrote: > Hi Theo, > It gets to the FilterFunction during the creation of the ExecutionGraph > initially but not during the runtime when recs are streaming in.So, it is > not getting that far- seems to be stuck in the > > final SingleOutputStreamOperator> filteredKinesisStream = > kinesisStream.filter code. > > Doesn't seem to get past it as it keeps incrementing watermarks but the > Watermark never seems to hit the end of the window.Maybe I am doing > > something super simple stupid. > > TIA, > Vijay > > On Tue, Oct 15, 2019 at 12:48 AM Theo Diefenthal < > theo.diefent...@scoop-software.de> wrote: > >> Hi Vijay, >> >> Maybe a stupid question, but according to your comments, the code works >> fine up till a "flatMap" operation. It seems that this flatMap is directly >> followed by a filter-Function in the method >> createAggregatedMonitoringGroupingWindowStream1. Is ist maybe filtering out >> all events? Or is not even the filter function itself called? (Due to your >> comments suggesting it). >> >> Best regards >> Theo >> >> -- >> *Von: *"Vijay Balakrishnan" >> *An: *"Dawid Wysakowicz" >> *CC: *"user" >> *Gesendet: *Dienstag, 15. Oktober 2019 02:01:05 >> *Betreff: *Re: add() method of AggregateFunction not called even though >> new watermark is emitted >> >> Hi, >> Thx for the replies - Congxian & Dawdi. >> Watermarks are advancing.Not sure how to check every new generated >> watermark is reaching end of the window >> >> I did check the Flink UI for the currentInputWatermark and it is >> increasing monotonically. >> >> Narrowed down the problem to not calling the windowStream.aggregate. >> I also *added a checkpoint *to see if it was causing the issue.Didn't >> seem to help. >> Most of the code is reached during the creation of the ExecutionGraph on >> the start of the program. >> >> I generate an incrementing sequence of timestamps(delay of 5000ms between >> each rec) from a Producer to Kinesis and it emits a new watermark as it >> starts receiving the input records. >> My window size is 15s. >> I see a WindowedStream is created with windowAssigner: >> TumblingEventTimeWindows(15000) and trigger: EventTimeTrigger >> but the *code never gets into the EventTimeTrigger.onElement() or >> onEventTime() to fire the trigger*. >> It gets into TimestampsAndPunctuatedWatermarkOperator and emitWatermark(). >> I even tried to use ProcessingTime but that also didn't help. >> >> >> //code to create kinesis consumer successfully.. >> for (Rule rule : rules.getRules()) { >> //gets in here fine >> final SingleOutputStreamOperator> >> filteredKinesisStream = kinesisStream.filter(mon -> { >> boolean result; >> String eventName = mon.get(MEASUREMENT) != null ? >> (String) mon.get(MEASUREMENT) : ""; >> InputMetricSelector inputMetricSelector = >> rule.getInputMetricSelector(); >> String measurement = inputMetricSelector != null ? >> inputMetricSelector.getMeasurement() : ""; >> result = eventName.equals(measurement); >> if (result) { >> Map inputTags = mon.get(TAGS) != null >> ? (Map) mon.get(TAGS) : new HashMap<>(); >> Map ruleTags = inputMetricSelector != >> null ? inputMetricSelector.getTags() : new HashMap<>(); >> result = matchTags(inputTags, ruleTags); >> } >> return result;//*<== this is true* >> } >> ).flatMap((FlatMapFunction, Map>) >> (input, out) -> { >> out.collect(input);//*< runs up till here fine* >> }).returns(new TypeHint>() { >> }); >> //*doesn't do anything beyond this point at runtime* >> DataStream enrichedMGStream = >> pms.createAggregatedMonitoringGroupingWindowStream1 >> (filteredKinesisStream, ruleFactory, rule, parallelProcess); >> enrichedMGStream.addSink(influxSink) >> .setParallelism(nbrSinks); >> } >> >> private DataStream >> createAggregatedMonitoringGroupingWindowStream1(DataStream> Object>> kinesisStream, RuleFactory ruleFactory, Rule rule, int >> parallelProcess) { >> DataStream enrichedComponentInstanceStream1; >> RuleConfig ruleConfig = rule.getRuleConfig(); >> String ruleType = ruleConfig != null ? ruleConfig.getRuleType() : ""; >> RuleIF ruleImpl = ruleFactory.getRule(ruleType); >> Map ruleProps = ruleConfig != null ? >> ruleConfig.getRuleProps() : new
Re: add() method of AggregateFunction not called even though new watermark is emitted
Hi Theo, It gets to the FilterFunction during the creation of the ExecutionGraph initially but not during the runtime when recs are streaming in.So, it is not getting that far- seems to be stuck in the final SingleOutputStreamOperator> filteredKinesisStream = kinesisStream.filter code. Doesn't seem to get past it as it keeps incrementing watermarks but the Watermark never seems to hit the end of the window.Maybe I am doing something super simple stupid. TIA, Vijay On Tue, Oct 15, 2019 at 12:48 AM Theo Diefenthal < theo.diefent...@scoop-software.de> wrote: > Hi Vijay, > > Maybe a stupid question, but according to your comments, the code works > fine up till a "flatMap" operation. It seems that this flatMap is directly > followed by a filter-Function in the method > createAggregatedMonitoringGroupingWindowStream1. Is ist maybe filtering out > all events? Or is not even the filter function itself called? (Due to your > comments suggesting it). > > Best regards > Theo > > -- > *Von: *"Vijay Balakrishnan" > *An: *"Dawid Wysakowicz" > *CC: *"user" > *Gesendet: *Dienstag, 15. Oktober 2019 02:01:05 > *Betreff: *Re: add() method of AggregateFunction not called even though > new watermark is emitted > > Hi, > Thx for the replies - Congxian & Dawdi. > Watermarks are advancing.Not sure how to check every new generated > watermark is reaching end of the window > > I did check the Flink UI for the currentInputWatermark and it is > increasing monotonically. > > Narrowed down the problem to not calling the windowStream.aggregate. > I also *added a checkpoint *to see if it was causing the issue.Didn't > seem to help. > Most of the code is reached during the creation of the ExecutionGraph on > the start of the program. > > I generate an incrementing sequence of timestamps(delay of 5000ms between > each rec) from a Producer to Kinesis and it emits a new watermark as it > starts receiving the input records. > My window size is 15s. > I see a WindowedStream is created with windowAssigner: > TumblingEventTimeWindows(15000) and trigger: EventTimeTrigger > but the *code never gets into the EventTimeTrigger.onElement() or > onEventTime() to fire the trigger*. > It gets into TimestampsAndPunctuatedWatermarkOperator and emitWatermark(). > I even tried to use ProcessingTime but that also didn't help. > > > //code to create kinesis consumer successfully.. > for (Rule rule : rules.getRules()) { > //gets in here fine > final SingleOutputStreamOperator> > filteredKinesisStream = kinesisStream.filter(mon -> { > boolean result; > String eventName = mon.get(MEASUREMENT) != null ? (String) > mon.get(MEASUREMENT) : ""; > InputMetricSelector inputMetricSelector = > rule.getInputMetricSelector(); > String measurement = inputMetricSelector != null ? > inputMetricSelector.getMeasurement() : ""; > result = eventName.equals(measurement); > if (result) { > Map inputTags = mon.get(TAGS) != null > ? (Map) mon.get(TAGS) : new HashMap<>(); > Map ruleTags = inputMetricSelector != > null ? inputMetricSelector.getTags() : new HashMap<>(); > result = matchTags(inputTags, ruleTags); > } > return result;//*<== this is true* > } > ).flatMap((FlatMapFunction, Map>) > (input, out) -> { > out.collect(input);//*< runs up till here fine* > }).returns(new TypeHint>() { > }); > //*doesn't do anything beyond this point at runtime* > DataStream enrichedMGStream = > pms.createAggregatedMonitoringGroupingWindowStream1 > (filteredKinesisStream, ruleFactory, rule, parallelProcess); > enrichedMGStream.addSink(influxSink) > .setParallelism(nbrSinks); > } > > private DataStream > createAggregatedMonitoringGroupingWindowStream1(DataStream Object>> kinesisStream, RuleFactory ruleFactory, Rule rule, int > parallelProcess) { > DataStream enrichedComponentInstanceStream1; > RuleConfig ruleConfig = rule.getRuleConfig(); > String ruleType = ruleConfig != null ? ruleConfig.getRuleType() : ""; > RuleIF ruleImpl = ruleFactory.getRule(ruleType); > Map ruleProps = ruleConfig != null ? > ruleConfig.getRuleProps() : new HashMap<>(); > Object intervalObj = ruleProps.get("rule_eval_window"); > String timeInterval = intervalObj != null ? (String) intervalObj : ""; > org.apache.flink.streaming.api.windowing.time.Time timeWindow = > getTimeWindowFromInterval(timeInterval); > > Object windowTypeObj = ruleProps.get("window_type"); > String windowType = windowTypeObj != null ? (String) windowTypeObj : > ""; > > InputMetricSelector inputMetricSelector = > rule.getInputMetricSelector(); > Map tags = inputMetricSelector != null ? > inputMetricSelector.getTags() : new HashMap<>(); > String groupByObj = tags.get(GROUP_BY); > String groupBy
Re: Discard message on deserialization errors.
Ah ok thanks! On Sat, 12 Oct 2019 at 11:13, Zhu Zhu wrote: > I mean the Kafka source provided in Flink can correctly ignores null > deserialized values. > > isEndOfStream allows you to control when to end the input stream. > If it is used for running infinite stream jobs, you can simply return > false. > > Thanks, > Zhu Zhu > > John Smith 于2019年10月12日周六 下午8:40写道: > >> The Kafka Fetcher you mean the flink JSON schemas? They throw >> IOExceptions? >> >> Also what's the purpose of isEndOfStream most schemas I looked at don't >> do anything but just return false? >> >> On Fri., Oct. 11, 2019, 11:44 p.m. Zhu Zhu, wrote: >> >>> Hi John, >>> >>> It should work with a *null* return value. >>> In the java doc of DeserializationSchema#deserialize it says that >>> *@return The deserialized message as an object (null if the message cannot be deserialized).* >>> >>> >>> I also checked the Kafka fetcher in Flink and it can correctly handle a >>> null deserialized record. >>> >>> Just pay attention to also not make >>> *DeserializationSchema#isEndOfStream* throw errors on a null record >>> provided. >>> >>> Thanks, >>> Zhu Zhu >>> >>> John Smith 于2019年10月12日周六 上午5:36写道: >>> Hi using Flink 1.8.0. I am ingesting data from Kafka, unfortunately for the time being I have not looked into using the schema registry. So for now I would like to write a simple deserialization schema that discards the data if deserialization fails. The other option is to do in flat map with markers and split to dead letter queue, but I'm not too concerned about that for now. Is it ok to just return null if deserialization fails? @Override public MyObject deserialize(byte[] message) { try { return MyDecoder.decode(message)); } catch(IOException ex) { logger.warn("Failed to decode message.", ex); return null; } }
RE: FLINK-13497 / "Could not create file for checking if truncate works" / HDFS
Hi, FYI we've switched to a different Hadoop server, and the issue vanished... It does look as the cause was on hadoop side. Thanks again Congxian.Adrian - Original message -From: "Adrian Vasiliu" To: qcx978132...@gmail.comCc: user@flink.apache.orgSubject: [EXTERNAL] RE: FLINK-13497 / "Could not create file for checking if truncate works" / HDFSDate: Tue, Oct 15, 2019 8:37 AM Thanks Congxian. The possible causes listed in the mostly voted answer of https://stackoverflow.com/questions/36015864/hadoop-be-replicated-to-0-nodes-instead-of-minreplication-1-there-are-1/36310025 do not seem to hold for us, because we have other pretty much similar flink jobs using the same Hadoop server and root directory (under different hdfs paths), and they do work. Thus in principle the config on the Hadoop server-side wouldn't be the cause. Also, according to the Ambari monitoring tools, the Hadoop server is healthy, and we did restart it. However, we'll check all points mentioned in various answers, in particular the one about temp files. Thanks Adrian - Original message -From: Congxian Qiu To: Adrian Vasiliu Cc: user Subject: [EXTERNAL] Re: FLINK-13497 / "Could not create file for checking if truncate works" / HDFSDate: Tue, Oct 15, 2019 4:02 AM Hi From the given stack trace, maybe you could solve the "replication problem" first, File /okd-dev/3fe6b069-43bf-4d86-9762-4f501c9db16e could only be replicated to 0 nodes instead of minReplication (=1). There are 2 datanode(s) running and no node(s) are excluded in this operation, and maybe the answer from SO[1] can help. [1] https://stackoverflow.com/questions/36015864/hadoop-be-replicated-to-0-nodes-instead-of-minreplication-1-there-are-1/36310025 Best, Congxian Adrian Vasiliu于2019年10月14日周一 下午9:10写道: Hello, We recently upgraded our product from Flink 1.7.2 to Flink 1.9, and we experience repeated failing jobs with java.lang.RuntimeException: Could not create file for checking if truncate works. You can disable support for truncate() completely via BucketingSink.setUseTruncate(false). at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.reflectTruncate(BucketingSink.java:645) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:388) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:281) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /okd-dev/3fe6b069-43bf-4d86-9762-4f501c9db16e could only be replicated to 0 nodes instead of minReplication (=1). There are 2 datanode(s) running and no node(s) are excluded in this operation. at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1719) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getNewBlockTargets(FSNamesystem.java:3368) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3292) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:850) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:504) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2351) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2347) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2347) at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1489) at org.apache.hadoop.ipc.Client.call(Client.java:1435) at org.apache.hadoop.ipc.Client.call(Client.java:1345) at
Re: Is it possible to get Flink job name in an operator?
I think ExecutionConfig.GlobalJobParameters is the way to do this if you want to retrieve it in runtime. Or you can just pass the name to each operator you implement to have it serialized together with the udf. Thanks, Zhu Zhu 马阳阳 于2019年10月15日周二 下午3:11写道: > As the title. Is it possible now? Or if we can do something to achieve > this. I tried to put the job name into the > ExecutionConfig.GlobalJobParameters. > But it is not possible to get the job name before Environment.execute() is > called. > > Best regards, > mayangyang >
Re: add() method of AggregateFunction not called even though new watermark is emitted
Hi Vijay, Maybe a stupid question, but according to your comments, the code works fine up till a "flatMap" operation. It seems that this flatMap is directly followed by a filter-Function in the method createAggregatedMonitoringGroupingWindowStream1. Is ist maybe filtering out all events? Or is not even the filter function itself called? (Due to your comments suggesting it). Best regards Theo Von: "Vijay Balakrishnan" An: "Dawid Wysakowicz" CC: "user" Gesendet: Dienstag, 15. Oktober 2019 02:01:05 Betreff: Re: add() method of AggregateFunction not called even though new watermark is emitted Hi, Thx for the replies - Congxian & Dawdi. Watermarks are advancing. Not sure how to check every new generated watermark is reaching end of the window I did check the Flink UI for the currentInputWatermark and it is increasing monotonically. Narrowed down the problem to not calling the windowStream.aggregate. I also added a checkpoint to see if it was causing the issue.Didn't seem to help. Most of the code is reached during the creation of the ExecutionGraph on the start of the program. I generate an incrementing sequence of timestamps(delay of 5000ms between each rec) from a Producer to Kinesis and it emits a new watermark as it starts receiving the input records. My window size is 15s. I see a WindowedStream is created with windowAssigner: TumblingEventTimeWindows(15000) and trigger: EventTimeTrigger but the code never gets into the EventTimeTrigger.onElement() or onEventTime() to fire the trigger . It gets into TimestampsAndPunctuatedWatermarkOperator and emitWatermark(). I even tried to use ProcessingTime but that also didn't help. //code to create kinesis consumer successfully.. for (Rule rule : rules.getRules()) { //gets in here fine final SingleOutputStreamOperator> filteredKinesisStream = kinesisStream.filter(mon -> { boolean result; String eventName = mon.get(MEASUREMENT) != null ? (String) mon.get(MEASUREMENT) : ""; InputMetricSelector inputMetricSelector = rule.getInputMetricSelector(); String measurement = inputMetricSelector != null ? inputMetricSelector.getMeasurement() : ""; result = eventName.equals(measurement); if (result) { Map inputTags = mon.get(TAGS) != null ? (Map) mon.get(TAGS) : new HashMap<>(); Map ruleTags = inputMetricSelector != null ? inputMetricSelector.getTags() : new HashMap<>(); result = matchTags(inputTags, ruleTags); } return result;// <== this is true } ).flatMap((FlatMapFunction, Map>) (input, out) -> { out.collect(input);// < runs up till here fine }).returns(new TypeHint>() { }); // doesn't do anything beyond this point at runtime DataStream enrichedMGStream = pms.createAggregatedMonitoringGroupingWindowStream1 (filteredKinesisStream, ruleFactory, rule, parallelProcess); enrichedMGStream.addSink(influxSink) .setParallelism(nbrSinks); } private DataStream createAggregatedMonitoringGroupingWindowStream1(DataStream> kinesisStream, RuleFactory ruleFactory, Rule rule, int parallelProcess) { DataStream enrichedComponentInstanceStream1; RuleConfig ruleConfig = rule.getRuleConfig(); String ruleType = ruleConfig != null ? ruleConfig.getRuleType() : ""; RuleIF ruleImpl = ruleFactory.getRule(ruleType); Map ruleProps = ruleConfig != null ? ruleConfig.getRuleProps() : new HashMap<>(); Object intervalObj = ruleProps.get("rule_eval_window"); String timeInterval = intervalObj != null ? (String) intervalObj : ""; org.apache.flink.streaming.api.windowing.time.Time timeWindow = getTimeWindowFromInterval(timeInterval); Object windowTypeObj = ruleProps.get("window_type"); String windowType = windowTypeObj != null ? (String) windowTypeObj : ""; InputMetricSelector inputMetricSelector = rule.getInputMetricSelector(); Map tags = inputMetricSelector != null ? inputMetricSelector.getTags() : new HashMap<>(); String groupByObj = tags.get(GROUP_BY); String groupBy = groupByObj != null ? groupByObj : ""; kinesisStream = kinesisStream.filter((FilterFunction>) inputMap -> { Object groupByValueObj = inputMap.get(groupBy); return groupByValueObj != null; }); Set groupBySet = new HashSet<>(Arrays.asList(groupBy.split(KEY_DELIMITER))); String metric = Objects.requireNonNull(inputMetricSelector).getMetric(); //till here, it went through fine during creation of ExceutionGraph KeyedStream, MonitoringTuple> monitoringTupleKeyedStream = kinesisStream.keyBy(new MapTupleKeySelector(groupBySet, metric)); <=== never gets into the MapTupleKeySelector.getKey() - a similar class works in another project enrichedComponentInstanceStream1 = getMonitoringGroupDataStream1(monitoringTupleKeyedStream, timeWindow, windowType, timeInterval, ruleImpl, rule, parallelProcess); return enrichedComponentInstanceStream1; } private DataStream getMonitoringGroupDataStream1(KeyedStream, MonitoringTuple> monitoringTupleKeyedStream, org.apache.flink.streaming.api.windowing.time.Time timeWindow, String
Fwd: Is it possible to get Flink job name in an operator?
As the title. Is it possible now? Or if we can do something to achieve this. I tried to put the job name into the ExecutionConfig.GlobalJobParameters. But it is not possible to get the job name before Environment.execute() is called. Best regards, mayangyang
RE: FLINK-13497 / "Could not create file for checking if truncate works" / HDFS
Thanks Congxian. The possible causes listed in the mostly voted answer of https://stackoverflow.com/questions/36015864/hadoop-be-replicated-to-0-nodes-instead-of-minreplication-1-there-are-1/36310025 do not seem to hold for us, because we have other pretty much similar flink jobs using the same Hadoop server and root directory (under different hdfs paths), and they do work. Thus in principle the config on the Hadoop server-side wouldn't be the cause. Also, according to the Ambari monitoring tools, the Hadoop server is healthy, and we did restart it. However, we'll check all points mentioned in various answers, in particular the one about temp files. Thanks Adrian - Original message -From: Congxian Qiu To: Adrian Vasiliu Cc: user Subject: [EXTERNAL] Re: FLINK-13497 / "Could not create file for checking if truncate works" / HDFSDate: Tue, Oct 15, 2019 4:02 AM Hi From the given stack trace, maybe you could solve the "replication problem" first, File /okd-dev/3fe6b069-43bf-4d86-9762-4f501c9db16e could only be replicated to 0 nodes instead of minReplication (=1). There are 2 datanode(s) running and no node(s) are excluded in this operation, and maybe the answer from SO[1] can help. [1] https://stackoverflow.com/questions/36015864/hadoop-be-replicated-to-0-nodes-instead-of-minreplication-1-there-are-1/36310025 Best, Congxian Adrian Vasiliu于2019年10月14日周一 下午9:10写道: Hello, We recently upgraded our product from Flink 1.7.2 to Flink 1.9, and we experience repeated failing jobs with java.lang.RuntimeException: Could not create file for checking if truncate works. You can disable support for truncate() completely via BucketingSink.setUseTruncate(false). at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.reflectTruncate(BucketingSink.java:645) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:388) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:281) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /okd-dev/3fe6b069-43bf-4d86-9762-4f501c9db16e could only be replicated to 0 nodes instead of minReplication (=1). There are 2 datanode(s) running and no node(s) are excluded in this operation. at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1719) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getNewBlockTargets(FSNamesystem.java:3368) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3292) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:850) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:504) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2351) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2347) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2347) at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1489) at org.apache.hadoop.ipc.Client.call(Client.java:1435) at org.apache.hadoop.ipc.Client.call(Client.java:1345) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116) at com.sun.proxy.$Proxy49.addBlock(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:444) at sun.reflect.GeneratedMethodAccessor87.invoke(Unknown Source)