Hi Esa,

IMO, the easiest approach would be to implement a custom source function
that reads the CSV files line-wise (in the correct timestamp order) and
extracts timestamps.
At the end of each file, you can emit a watermark.
The order of files can either be hardcoded or determined from the file name.

This approach is similar to the source function in the RideCleansing
exercise [1] (without the alignment of timestamps with the actual time).

Once you have a DataStream with correctly assigned timestamps and
watermarks, you should be able to use the CEP library.

Best, Fabian

[1]
https://github.com/dataArtisans/flink-training-exercises/blob/master/src/main/java/com/dataartisans/flinktraining/exercises/datastream_java/sources/CheckpointedTaxiRideSource.java

2018-02-28 10:47 GMT+01:00 Esa Heikkinen <esa.heikki...@student.tut.fi>:

> Because I have no time to learn all features of Flink and because there
> can be some issues in this my case, I am very interested about implementing
> external “logs replayer” or some batch to stream data converter.
>
>
>
> Do you have any ideas or suggestions how to build this kind of logs
> replayer ? Or could it be even found at the ready ?
>
> Could Kafka do something like this ?
>
>
>
> I think I also can write this logs replayer by Python.
>
>
>
> What kind of parallel streams would be best and easiest for Flink ?
>
>
>
> By the way, I am writing conference paper about comparing Flink and my
> LOGDIG log file analyzer, which is described in my old paper (LOGDIG Log
> File Analyzer for Mining Expected Behavior from Log Files):
>
> https://www.researchgate.net/profile/Timo_Haemaelaeinen/
> publication/283264599_LOGDIG_Log_File_Analyzer_for_Mining_
> Expected_Behavior_from_Log_Files/links/562f7ea208ae4742240ae977.pdf
>
>
>
> LOGDIG is very simple and slow analyzer and it runs only in local computer
> (at this moment), but it is capable to analyze very complex cases from many
> parallel log files. The analysis of LOGDIG is close to CEP. I have written
> it by Python.
>
>
>
> I don’t know whether Flink is the best benchmarking target, but I do not
> know better. I also tried Spark, but it also had its own problems. For
> example CEP is not good in Spark than in Flink.
>
>
>
> Best, Esa
>
>
>
> *From:* Fabian Hueske [mailto:fhue...@gmail.com]
> *Sent:* Tuesday, February 27, 2018 11:27 PM
> *To:* Esa Heikkinen <esa.heikki...@student.tut.fi>
> *Cc:* user@flink.apache.org
> *Subject:* Re: Reading csv-files
>
>
>
> Yes, that is mostly correct.
> You can of course read files in parallel, assign watermarks, and obtain a
> DataStream with correct timestamps and watermarks.
> If you do that, you should ensure that each parallel source tasks reads
> the files in the order of increasing timestamps.
>
> As I said before, you can do that by providing a custom InputSplitAssigner
> that hands out the splits in order of their timestamps.
> The timestamp order would need to be encoded in the file name because the
> assigner cannot look into the file.
>
> Reading unsplitted files in a single task makes the problem a bit easier
> to handle, but parallel reads are also possible.
>
>
>
> The RideCleansing example that you are referring to, does not have these
> problems because the source reads the data in a single thread from a single
> file.
> This is done in order to avoid all the issues that I described before.
>
> Best, Fabian
>
>
>
>
>
> 2018-02-27 22:14 GMT+01:00 Esa Heikkinen <heikk...@student.tut.fi>:
>
>
>
> Hi
>
> Thanks for the answer. All csv-files are already present and they will not
> change during the processing.
>
> Because Flink can read many streams in parallel, i think it is also
> possbile to read many csv-files in parallel.
>
> From what i have understand, it is possible to convert csv-files to
> streams internally in Flink ? But the problem may be how to synchronize
> parallel reading of csv-files based on timestamps ?
>
> Maybe i should develop an external "replayer" of csv-files, which
> generates parallel streams of events (based on timestamps) for Flink ?
>
> But i think the "replayer" is also possible to do by Flink and it also can
> be run at an accelerated speed ?
>
> The RideCleansing-example does something like that, but i don't know if it
> otherwise appropriate to my purpose.
>
> Best, Esa
>
>
>
> Fabian Hueske kirjoitti 27.2.2018 klo 22:32:
>
> Hi Esa,
>
> Reading records from files with timestamps that need watermarks can be
> tricky.
>
> If you are aware of Flink's watermark mechanism, you know that records
> should be ingested in (roughly) increasing timestamp order.
>
> This means that files usually cannot be split (i.e, need to be read by a
> single task from start to end) and also need to be read in the right order
> (files with smaller timestamps first).
>
> Also each file should contain records of a certain time interval that
> should not overlap (too much) with the time interval of other files.
>
>
>
> Unfortunately, Flink does not provide good built-in support to read files
> in a specific order.
>
> If all files that you want to process are already present, you can
> implement a custom InputFormat by extending a CsvInputFormat, set
> unsplittable to true and override the getInputSplitAssigner() to return an
> assigner that returns the splits in the correct order.
>
>
> If you want to process files as they appear, things might be a bit easier
> given that the timestamps in each new file are larger than the timestamps
> of the previous files. In this case, you can use 
> StreamExecutionEnvironment.readFile()
> with the interval and FileProcessingMode parameter. With a correctly
> configured watermark assigner, it should be possible to get valid
> watermarks.
>
> In any case, reading timestamped data from files is much more tricky than
> ingesting data from an event log which provides the events in the same
> order in which they were written.
>
> Best, Fabian
>
>
>
> 2018-02-27 20:13 GMT+01:00 Esa Heikkinen <heikk...@student.tut.fi>:
>
>
> I'd want to read csv-files, which includes time series data and one column
> is timestamp.
>
> Is it better to use addSource() (like in Data-artisans
> RideCleansing-exercise) or CsvSourceTable() ?
>
> I am not sure CsvTableSource() can undertand timestamps ? I have not found
> good examples about that.
>
> It is maybe little more job to write csv-parser in addSource()-case ?
>
> Best, Esa
>
>
>
>
>
>
>

Reply via email to