Re: Cannot cancel job with savepoint due to timeout
Hi Bruno, >From the code I conclude that "akka.client.timeout" setting is what affects this. It defaults to 60 seconds. I'm not sure why this setting is not documented though as well as many other "akka.*" settings - maybe there are some good reasons behind. Regards, Yury 2017-01-31 17:47 GMT+03:00 Bruno Aranda: > Hi there, > > I am trying to cancel a job and create a savepoint (ie flink cancel -s) > but it takes more than a minute to do that and then it fails due to the > timeout. However, it seems that the job will be cancelled successfully and > the savepoint made, but I can only see that through the dasboard. > > Cancelling job 790b60a2b44bc98854782d4e0cac05d5 with savepoint to default > savepoint directory. > > > The program finished with the following exception: > > java.util.concurrent.TimeoutException: Futures timed out after [6 > milliseconds] > at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) > at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) > at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190) > at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn( > BlockContext.scala:53) > at scala.concurrent.Await$.result(package.scala:190) > at scala.concurrent.Await.result(package.scala) > at org.apache.flink.client.CliFrontend.cancel(CliFrontend.java:618) > at org.apache.flink.client.CliFrontend.parseParameters( > CliFrontend.java:1079) > at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120) > at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117) > at org.apache.flink.runtime.security.HadoopSecurityContext$1.run( > HadoopSecurityContext.java:43) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at org.apache.hadoop.security.UserGroupInformation.doAs( > UserGroupInformation.java:1698) > at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured( > HadoopSecurityContext.java:40) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1117) > > Is there any way to configure this timeout? So we can depend on the > outcome of this execution for scripts, etc. > > Thanks! > > Bruno >
Re: Connection refused error when writing to socket?
Yes I did open a socket with netcat. Turns out my first error was due to a stream without a sink triggering the socket connect and (I thought that without a sink the stream wouldn't affect anything so I didn't comment it out, and I didn't open the socket for that port). However I did play with it some more and I think the real issue is that I'm trying to have two streams, one write to a port and another read from the same port. i.e. val y = executionEnvironment.socketTextStream("localhost", 9000) x.writeToSocket("localhost", 9000, new SimpleStringSchema()) Once I tested just write or just the read it worked, but combined I get this error: java.net.SocketException: Connection reset at java.net.SocketInputStream.read(SocketInputStream.java:210) at java.net.SocketInputStream.read(SocketInputStream.java:141) at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284) at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326) at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178) at java.io.InputStreamReader.read(InputStreamReader.java:184) at java.io.BufferedReader.read1(BufferedReader.java:210) at java.io.BufferedReader.read(BufferedReader.java:286) at java.io.Reader.read(Reader.java:140) at org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction.run(SocketTextStreamFunction.java:101) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:80) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:53) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642) at java.lang.Thread.run(Thread.java:745) Is this operation not allowed? And I'm mainly writing to the same socket in order to pass work back and forth between streams.
Re: Cyclic ConnectedStream
I somehow still suspect that iterations might work for your use case. Note, that in the streaming API, iterations are currently nothing more than a back-edge in the topology, i.e. a low-level tool to create a cyclic topology, like as you say with your hypothetical setter syntax. (It's quite different from the iterations of the batch API.) The tricky part for your use-case is that you would want a ConnectedStream as your iteration head, which should get the elements from the back-edge in a separated way from the normal input. You could simulate this by using not ConnectedStream.flatMap, but a just a simple Stream.flatMap whose input element type is an Either type, whose two components would be the normal input and the back-edge input. (And you add maps before the closeWith and to your input1, which would appropriately wrap into the two alternatives of the Either type.) Best, Gábor 2017-01-29 15:39 GMT+01:00 Matt: > Check this image for clarification, this is what I'm trying to do: > http://i.imgur.com/iZxPv04.png > > [image: Inline image 1] > > The rectangles are the two CoFlatMapFunction, sharing a state between > process and update (map1 and map2). It's clear from the image that I need > input1 and the green box to create the blue box, and input2 and the blue > box to create the green one. > > --- > *blue* = *input1*.connect(*green*).keyBy(...).flatMap(...); > *green* = *input2*.connect(*blue*).keyBy(...).flatMap(...); > --- > > As you can see there's no cycle in the flow of data so I guess this > topology is valid. The problem is not having a way to define such flow. > > For instance, with the appropriate setters we would be able to do this: > > --- > *blue* = *input1*.connect(); > *green* = *input2*.connect(); > > *blue.*setConnection(*green*); > *green*.setConnection(*blue*); > > *blue*.keyBy(...).flatMap(...); > *green*.keyBy(...).flatMap(...); > --- > > Any idea is welcome. > > Matt > > On Sat, Jan 28, 2017 at 5:31 PM, Matt wrote: > >> I'm aware of IterativeStream but I don't think it's useful in this case. >> >> As shown in the example above, my use case is "cyclic" in that the same >> object goes from *Input* to *predictionStream* (flatMap1), then to >> *statsStream* (flatMap2, where it's updated with an object from *Input2*) >> and finally to *predictionStream* (flatMap2). >> >> The same operator is never applied twice to the object, thus I would say >> this dataflow is cyclic only in the dependencies of the stream >> (predictionStream depends on statsStream, but it depends on >> predictionStream in the first place). >> >> I hope it is clear now. >> >> Matt >> >> On Sat, Jan 28, 2017 at 3:17 PM, Gábor Gévay wrote: >> >>> Hello, >>> >>> Cyclic dataflows can be built using iterations: >>> https://ci.apache.org/projects/flink/flink-docs-release-1.2/ >>> dev/datastream_api.html#iterations >>> >>> Best, >>> Gábor >>> >>> >>> >>> >>> 2017-01-28 18:39 GMT+01:00 Matt : >>> > I have a ConnectedStream (A) that depends on another ConnectedStream >>> (B), >>> > which depends on the first one (A). >>> > >>> > Simplified code: >>> > >>> > predictionStream = input >>> > .connect(statsStream) >>> > .keyBy(...) >>> > .flatMap(CoFlatMapFunction { >>> > flatMap1(obj, output) { >>> > p = prediction(obj) >>> > output.collect(p) >>> > } >>> > flatMap2(stat, output) { >>> > updateModel(stat) >>> > } >>> > }) >>> > >>> > statsStream = input2 >>> > .connect(predictionStream) >>> > .keyBy(...) >>> > .flatMap(CoFlatMapFunction { >>> > flatMap1(obj2, output) { >>> > s = getStats(obj2, p) >>> > output.collect(s) >>> > } >>> > flatMap2(prediction, output) { >>> > p = prediction >>> > } >>> > }) >>> > >>> > I'm guessing this should be possible to achieve, one way would be to >>> add a >>> > sink on statsStream to save the elements into Kafka and read from that >>> topic >>> > on predictionStream instead of initializing it with a reference of >>> > statsStream. But I would rather avoid writing unnecessarily into kafka. >>> > >>> > Is there any other way to achieve this? >>> > >>> > Thanks, >>> > Matt >>> >> >> >
Re: readFile - Continuous file processing
Hi Nancy, Currently there is no way to do so. Flink only provides the mode you described, i.e. a modified file is considered a new file. The reason is that many filesystems do not give you separate creation from modification timestamps. If you control the way files are created, a solution could be to just write each time to a different file. Thanks, Kostas > On Jan 31, 2017, at 6:17 PM, Nancy Estradawrote: > > Hi guys, > > I have the following use case. Every day a new file is created and > periodically some log records are appended to it. I am reading the file in > the following way: > > executionEnvironment.readFile(format, directoryPath, PROCESS_CONTINUOUSLY, > period.toMilliseconds(),filePathFilter); > > However, Flink takes modified files as new files and consequently all the > content of the modified file gets processed again. I know that a solution is > to process the file until it contains all the records of the day but I will > like to process the file continuously. Therefore, I am wondering if there is > a way of processing just the new records in a file? > > Thank you in advance! :) > Nancy > > > > > > > > -- > View this message in context: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/readFile-Continuous-file-processing-tp11384.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at > Nabble.com.
readFile - Continuous file processing
Hi guys, I have the following use case. Every day a new file is created and periodically some log records are appended to it. I am reading the file in the following way: executionEnvironment.readFile(format, directoryPath, PROCESS_CONTINUOUSLY, period.toMilliseconds(),filePathFilter); However, Flink takes modified files as new files and consequently all the content of the modified file gets processed again. I know that a solution is to process the file until it contains all the records of the day but I will like to process the file continuously. Therefore, I am wondering if there is a way of processing just the new records in a file? Thank you in advance! :) Nancy -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/readFile-Continuous-file-processing-tp11384.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: Regarding Flink as a web service
+u...@apache.orgBecause he implemented queryable state. There is also queryable state, which allows you to query the internal keyed state of Flink user functions. On Mon, 30 Jan 2017 at 00:46 Jonas wrote: > You could write your data back to Kafka using the FlinkKafkaProducer and > then > use websockets to read from kafka using NodeJS or other. > > > > -- > View this message in context: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Flink-as-a-web-service-tp11364p11365.html > Sent from the Apache Flink User Mailing List archive. mailing list archive > at Nabble.com. >
Re: Bug in Table api CsvTableSink
These 2 rows if converted to Row[] of Strings should cause the problem: http://www.aaa.it/xxx/v/10002780063t/000/1,f/10001957530,cf/13,cpva/77,cf/13,,sit/A2046X,strp/408,10921957530,,1,5,1,2013-01-04T15:02:25,5,,10002780063,XXX,1,,3,,,2013-01-04T15:02:25,XXX,XXX,13,2013-01-04T15:02:25 http://www.aaa.it/xxx/v/10002780063t/000/1,f/10004002060,cf/3,cpva/7,cf/3,f/10164002060,sit/A15730L,strp/408,10164002060,10164002060,2,7,1,2008-05-29T11:47:35,1,,10002780063,XXX,1,,0,,,2008-05-29T11:47:35,XXX,XXX,3,2008-05-29T11:47:35 Best, Flavio On Tue, Jan 31, 2017 at 4:51 PM, Flavio Pompermaierwrote: > I hope to have time to write a test program :) > Otherwise I hope someone else could give it a try in the meantime.. > > Best, > Flavio > > On Tue, Jan 31, 2017 at 4:49 PM, Fabian Hueske wrote: > >> Hi Flavio, >> >> I do not remember that such a bug was fixed. Maybe by chance, but I guess >> not. >> Can you open a JIRA and maybe provide input data to reproduce the problem? >> >> Thank you, >> Fabian >> >> 2017-01-31 16:25 GMT+01:00 Flavio Pompermaier : >> >>> Hi to all, >>> I'm trying to read from a db and then writing to a csv. >>> In my code I do the following: >>> >>> tableEnv.fromDataSet(myDataSet).writeToSink(new >>> CsvTableSink(csvOutputDir, fieldDelim)); >>> >>> If I use fieldDelim= "," everything is Ok, if I use "\t" some tab is not >>> printed correctly... >>> PS: myDataSet is a dataset of 32 String fields. >>> >>> Is is something that has been fixed in Flink > 1.1.1? >>> >>> Best, >>> Flavio >>> >>> >
Re: Bug in Table api CsvTableSink
I hope to have time to write a test program :) Otherwise I hope someone else could give it a try in the meantime.. Best, Flavio On Tue, Jan 31, 2017 at 4:49 PM, Fabian Hueskewrote: > Hi Flavio, > > I do not remember that such a bug was fixed. Maybe by chance, but I guess > not. > Can you open a JIRA and maybe provide input data to reproduce the problem? > > Thank you, > Fabian > > 2017-01-31 16:25 GMT+01:00 Flavio Pompermaier : > >> Hi to all, >> I'm trying to read from a db and then writing to a csv. >> In my code I do the following: >> >> tableEnv.fromDataSet(myDataSet).writeToSink(new >> CsvTableSink(csvOutputDir, fieldDelim)); >> >> If I use fieldDelim= "," everything is Ok, if I use "\t" some tab is not >> printed correctly... >> PS: myDataSet is a dataset of 32 String fields. >> >> Is is something that has been fixed in Flink > 1.1.1? >> >> Best, >> Flavio >> >>
Re: Bug in Table api CsvTableSink
Hi Flavio, I do not remember that such a bug was fixed. Maybe by chance, but I guess not. Can you open a JIRA and maybe provide input data to reproduce the problem? Thank you, Fabian 2017-01-31 16:25 GMT+01:00 Flavio Pompermaier: > Hi to all, > I'm trying to read from a db and then writing to a csv. > In my code I do the following: > > tableEnv.fromDataSet(myDataSet).writeToSink(new > CsvTableSink(csvOutputDir, fieldDelim)); > > If I use fieldDelim= "," everything is Ok, if I use "\t" some tab is not > printed correctly... > PS: myDataSet is a dataset of 32 String fields. > > Is is something that has been fixed in Flink > 1.1.1? > > Best, > Flavio > >
Bug in Table api CsvTableSink
Hi to all, I'm trying to read from a db and then writing to a csv. In my code I do the following: tableEnv.fromDataSet(myDataSet).writeToSink(new CsvTableSink(csvOutputDir, fieldDelim)); If I use fieldDelim= "," everything is Ok, if I use "\t" some tab is not printed correctly... PS: myDataSet is a dataset of 32 String fields. Is is something that has been fixed in Flink > 1.1.1? Best, Flavio
Re: Flink survey by data Artisans
Hello, one last note on this thread: we've processed and published the Flink user survey results, and you can find a file with graphs summarizing multiple-choice responses as well as anonymous feedback from open-ended questions in a GitHub repository [1]. We also published a summary of responses on the data Artisans blog [2]. Thanks again to all who participated. We hope that the data is helpful for the community. Best, Mike [1] https://github.com/dataArtisans/flink-user-survey-2016 [2] http://data-artisans.com/flink-user-survey-2016-part-1/ On Fri, Dec 9, 2016 at 2:33 PM, Mike Winterswrote: > Hi everyone, > > A quick heads-up that we'll be closing the Flink user survey to new > responses this coming Monday 12 Dec around 9am EST. > > If you'd still like to respond before Monday, you can do so here: > http://www.surveygizmo.com/s3/3166399/181bdb611f22. > > We've seen more than 100 responses so far. Thank you to all who have > participated. > > Best, > Mike > > On Fri, Nov 18, 2016 at 7:55 PM, Shannon Carey wrote: > >> There's a newline that disrupts the URL. >> >> http://www.surveygizmo.com/s3/3166399/181bdb611f22 >> >> Not: >> >> http://www.surveygizmo.com/s3/ >> 3166399/181bdb611f22 >> >> > > > -- > -Mike > -- Data Artisans GmbH | Stresemannstraße 121A | 10963 Berlin mobile: +4917677386299 Registered at Amtsgericht Charlottenburg - HRB 158244 B Managing Directors: Kostas Tzoumas, Stephan Ewen
Re: Connection refused error when writing to socket?
Can you try opening a socket with netcat on localhost? nc -lk 9000 and see it this works? For me this works. -- Jonas -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Connection-refused-error-when-writing-to-socket-tp11372p11376.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: Calling external services/databases from DataStream API
Hi Diego, you can also broadcast a changelog stream: DataStream mainStream = ... DataStream changeStream = ... mainStream.connect(changeStream.broadcast()).flatMap(new YourCoFlatMapFunction()); All records of the changeStream will be forwarded to each instance of the flatmap operator. Best, Fabian 2017-01-31 8:12 GMT+01:00 Diego Fustes Villadóniga: > Hi Stephan, > > > > Thanks a lot for your response. I’ll study the options that you mention, > I’m not sure if the “chagelog stream” will be easy to implement since the > lookup is based on matching IP ranges and not just keys. > > > > Regards, > > > > Diego > > > > *De:* Stephan Ewen [mailto:se...@apache.org] > *Enviado el:* lunes, 30 de enero de 2017 17:39 > *Para:* user@flink.apache.org > *Asunto:* Re: Calling external services/databases from DataStream API > > > > Hi! > > > > The Distributed cache would actually indeed be nice to add to the > DataStream API. Since the runtime parts for that are all in place, the code > would be mainly on the "client" side that sets up the JobGraph to be > submitted and executed. > > > > For the problem of scaling this, there are two solutions that I can see: > > > > (1) Simpler: Use the new asynchronous I/O operator to talk with the > external database in an asynchronous fashion (that should help to get > higher throughput) https://ci.apache.org/projects/flink/flink-docs- > release-1.2/dev/stream/asyncio.html > > > > (2) More elaborate: Convert the lookup database into a "changelog stream" > and make the enrichment operation a "stream-to-stream" join. > > > > Greetings, > > Stephan > > > > > > On Mon, Jan 30, 2017 at 1:36 PM, Jonas wrote: > > I have a similar usecase where I (for the purposes of this discussion) > have a > GeoIP Database that is not fully available from the start but will > eventually be "full". The GeoIP tuples are coming in one after another. > After ~4M tuples the GeoIP database is complete. > > I also need to do the same query. > > > > -- > View this message in context: http://apache-flink-user-maili > ng-list-archive.2336050.n4.nabble.com/Calling-external- > services-databases-from-DataStream-API-tp11366p11367.html > Sent from the Apache Flink User Mailing List archive. mailing list archive > at Nabble.com. > > >