> unpredictable file schema(Table API)  in the source directory

You'll probably have to write some logic that helps predict the schema :)

Are there actual schemas for the CSV files somewhere?  JSONSchema or
something of the like?    At Wikimedia we use JSONSchema (not with CSV
data, but it could work), and have code that can convert from JSONSchema
<https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities/src/main/java/org/wikimedia/eventutilities/core/event/types/JsonSchemaConverter.java#22>
to Flink Schemas, either TypeInformation or Table API DataType
<https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities-flink/src/main/java/org/wikimedia/eventutilities/flink/formats/json/DataTypeSchemaConversions.java>

Here's an example
<https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities-flink/src/main/java/org/wikimedia/eventutilities/flink/formats/json/JsonSchemaFlinkConverter.java#29>
in code docs for use with Kafka.  You could use this to build read CSV
files instead?  Something like:

TableDescriptor.forConnector("filesystem")
.schema(JsonSchemaFlinkConverter.toSchemaBuilder(jsonSchema).build())
...

If you are doing pure SQL (not table api), you'll need to have something
that translates from your schema to SQL...or start implementing a custom
Catalog
<https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/catalogs/#user-defined-catalog>,
which uh, we kind of did
<https://wikitech.wikimedia.org/wiki/Event_Platform/Stream_Processing/Flink_Catalog>,
but it was not easy.









On Mon, Nov 6, 2023 at 1:30 PM arjun s <arjunjoice...@gmail.com> wrote:

> Thanks for your response.
> How should we address the issue of dealing with the unpredictable file
> schema(Table API)  in the source directory, as I previously mentioned in my
> email?
>
> Thanks and regards,
> Arjun
>
> On Mon, 6 Nov 2023 at 20:56, Chen Yu <yuchen.e...@gmail.com> wrote:
>
>> Hi Arjun,
>>
>> If you can filter files by a regex pattern, I think the config
>> `source.path.regex-pattern`[1] maybe what you want.
>>
>>   'source.path.regex-pattern' = '...',  -- optional: regex pattern to filter 
>> files to read under the                                         -- directory 
>> of `path` option. This regex pattern should be                               
>>          -- matched with the absolute file path. If this option is set,      
>>                                   -- the connector  will recursive all files 
>> under the directory                                        -- of `path` 
>> option
>>
>>
>> Best,
>> Yu Chen
>>
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/table/filesystem/
>>
>> ------------------------------
>> *发件人:* arjun s <arjunjoice...@gmail.com>
>> *发送时间:* 2023年11月6日 20:50
>> *收件人:* user@flink.apache.org <user@flink.apache.org>
>> *主题:* Handling Schema Variability and Applying Regex Patterns in Flink
>> Job Configuration
>>
>> Hi team,
>> I'm currently utilizing the Table API function within my Flink job, with
>> the objective of reading records from CSV files located in a source
>> directory. To obtain the file names, I'm creating a table and specifying
>> the schema using the Table API in Flink. Consequently, when the schema
>> matches, my Flink job successfully submits and executes as intended.
>> However, in cases where the schema does not match, the job fails to submit.
>> Given that the schema of the files in the source directory is
>> unpredictable, I'm seeking a method to handle this situation.
>> Create table query
>> =============
>> CREATE TABLE sample (col1 STRING,col2 STRING,col3 STRING,col4
>> STRING,file.path` STRING NOT NULL METADATA) WITH ('connector' =
>> 'filesystem','path' = 'file:///home/techuser/inputdata','format' =
>> 'csv','source.monitor-interval' = '10000')
>> =============
>>
>> Furthermore, I have a question about whether there's a way to read files
>> from the source directory based on a specific regex pattern. This is
>> relevant in our situation because only file names that match a particular
>> pattern need to be processed by the Flink job.
>>
>> Thanks and Regards,
>> Arjun
>>
>

Reply via email to