Re: [EXTERNAL] Re: flink s3[parquet] -> s3[iceberg]
Hi Dan I think using Flink SQL should be able to meet your needs. You can write a Flink Jar program. Accept different directories, schemas, mappings, and sink tables to generate DDL and DML. Assuming you have two directories: directory1 -> f1, f2, f3, f4 -> iceberg1 directory2 -> f1, f2, f3 -> iceberg2 DDL can be generated roughly as follows. CREATE TABLE s3_table1 ( f1 varchar, f2 varchar, f3 varchar, f4 varchar ) with ( 'connector' = 's3://dir1' . ); CREATE TABLE s3_table2 ( f1 varchar, f2 varchar, f3 varchar ) with ( 'connector' = 's3://dir2', ... ); Based on your MAPPING selection of the fields you need and then generate DML. INSERT INTO iceberg_catalog.iceberg_database1.tb1 SELECT f1,f2,f3 FROM s3_table; INSERT INTO iceberg_catalog.iceberg_database.tb12 SELECT f11,f22 FROM s32_table; Of course,this is my understanding of your requirements,I don't know if it meets your scenario. Best regards, Feng On Fri, Nov 24, 2023 at 3:02 AM Oxlade, Dan wrote: > Thanks Feng, > > I think my challenge (and why I expected I’d need to use Java) is that > there will be parquet files with different schemas landing in the s3 bucket > - so I don’t want to hard-code the schema in a sql table definition. > > I’m not sure if this is even possible? Maybe I would have to write a job > that accepts the schema, directory and iceberg target table as params and > start instances of the job through the job api. > > Unless reading the parquet to a temporary table doesn’t need the schema > definition? I couldn't really work things out from the links. > > Dan > -- > *From:* Feng Jin > *Sent:* Thursday, November 23, 2023 6:49:11 PM > *To:* Oxlade, Dan > *Cc:* user@flink.apache.org > *Subject:* [EXTERNAL] Re: flink s3[parquet] -> s3[iceberg] > > Hi Oxlade > > I think using Flink SQL can conveniently fulfill your requirements. > > For S3 Parquet files, you can create a temporary table using a filesystem > connector[1] . > For Iceberg tables, FlinkSQL can easily integrate with the Iceberg > catalog[2]. > > Therefore, you can use Flink SQL to export S3 files to Iceberg. > > If you only need field mapping or transformation, I believe using Flink > SQL + UDF (User-Defined Functions) would be sufficient to meet your needs. > > > [1]. > https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#directory-watching > [nightlies.apache.org] > <https://urldefense.proofpoint.com/v2/url?u=https-3A__nightlies.apache.org_flink_flink-2Ddocs-2Dmaster_docs_connectors_table_filesystem_-23directory-2Dwatching&d=DwMFaQ&c=NUhaNIajfB1frln1iJ2Yk7NG56jrODI6LbjgSoSeFoE&r=DniGlstAN2EzNsLZ9xC7twBZPQnWEW90QWwFv-Z9BnI&m=nzRd1qdU-XluyJusASfUSi-QLOYVOWY6EvDAlicmzJgVY16Jtg60C5aADMd_oLJg&s=rnrUmbL_i3hK6kK_eWoXjz-67_xsc14c1oUxQrwK75A&e=> > [2]. > https://iceberg.apache.org/docs/latest/flink-connector/#table-managed-in-hadoop-catalog > [iceberg.apache.org] > <https://urldefense.proofpoint.com/v2/url?u=https-3A__iceberg.apache.org_docs_latest_flink-2Dconnector_-23table-2Dmanaged-2Din-2Dhadoop-2Dcatalog&d=DwMFaQ&c=NUhaNIajfB1frln1iJ2Yk7NG56jrODI6LbjgSoSeFoE&r=DniGlstAN2EzNsLZ9xC7twBZPQnWEW90QWwFv-Z9BnI&m=nzRd1qdU-XluyJusASfUSi-QLOYVOWY6EvDAlicmzJgVY16Jtg60C5aADMd_oLJg&s=gbHDXpaow809oo_go0V99A3jIkA2KMh_mINPyNBwcDs&e=> > > > Best, > Feng > > > On Thu, Nov 23, 2023 at 11:23 PM Oxlade, Dan > wrote: > > Hi all, > > > > I’m attempting to create a POC in flink to create a pipeline to stream > parquet to a data warehouse in iceberg format. > > > > Ideally – I’d like to watch a directory in s3 (minio locally) and stream > those to iceberg, doing the appropriate schema mapping/translation. > > > > I guess first; does this sound like a crazy idea? > > Assuming not is anyone able to share examples that might get me going. > I’ve found lots of iceberg and flink sql examples but I think I’ll need > something in java to do the schema mapping. Also some examples reading > parquet for s3 seem a little hard to come by. > > > > I’m aware I’ll need a catalog, I can use nessie for the prototype. I’m > also trying to use minio to get this all working locally but this might > just be adding complexity at the moment. > > > > TIA > > Dan > > T. Rowe Price International Ltd (registered number 3957748) is registered > in England and Wales with its registered office at Warwick Court, 5 > Paternoster Square, London EC4M 7DX. T. Rowe Price International Ltd is > authorised and regulated by the Financial Conduct Authority. The company > has a branch in Dubai International Financial Centre (regul
Re: [EXTERNAL] Re: flink s3[parquet] -> s3[iceberg]
Thanks Feng, I think my challenge (and why I expected I’d need to use Java) is that there will be parquet files with different schemas landing in the s3 bucket - so I don’t want to hard-code the schema in a sql table definition. I’m not sure if this is even possible? Maybe I would have to write a job that accepts the schema, directory and iceberg target table as params and start instances of the job through the job api. Unless reading the parquet to a temporary table doesn’t need the schema definition? I couldn't really work things out from the links. Dan From: Feng Jin Sent: Thursday, November 23, 2023 6:49:11 PM To: Oxlade, Dan Cc: user@flink.apache.org Subject: [EXTERNAL] Re: flink s3[parquet] -> s3[iceberg] Hi Oxlade I think using Flink SQL can conveniently fulfill your requirements. For S3 Parquet files, you can create a temporary table using a filesystem connector[1] . For Iceberg tables, FlinkSQL can easily integrate with the Iceberg catalog[2]. Therefore, you can use Flink SQL to export S3 files to Iceberg. If you only need field mapping or transformation, I believe using Flink SQL + UDF (User-Defined Functions) would be sufficient to meet your needs. [1]. https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#directory-watching [nightlies.apache.org]<https://urldefense.proofpoint.com/v2/url?u=https-3A__nightlies.apache.org_flink_flink-2Ddocs-2Dmaster_docs_connectors_table_filesystem_-23directory-2Dwatching&d=DwMFaQ&c=NUhaNIajfB1frln1iJ2Yk7NG56jrODI6LbjgSoSeFoE&r=DniGlstAN2EzNsLZ9xC7twBZPQnWEW90QWwFv-Z9BnI&m=nzRd1qdU-XluyJusASfUSi-QLOYVOWY6EvDAlicmzJgVY16Jtg60C5aADMd_oLJg&s=rnrUmbL_i3hK6kK_eWoXjz-67_xsc14c1oUxQrwK75A&e=> [2]. https://iceberg.apache.org/docs/latest/flink-connector/#table-managed-in-hadoop-catalog [iceberg.apache.org]<https://urldefense.proofpoint.com/v2/url?u=https-3A__iceberg.apache.org_docs_latest_flink-2Dconnector_-23table-2Dmanaged-2Din-2Dhadoop-2Dcatalog&d=DwMFaQ&c=NUhaNIajfB1frln1iJ2Yk7NG56jrODI6LbjgSoSeFoE&r=DniGlstAN2EzNsLZ9xC7twBZPQnWEW90QWwFv-Z9BnI&m=nzRd1qdU-XluyJusASfUSi-QLOYVOWY6EvDAlicmzJgVY16Jtg60C5aADMd_oLJg&s=gbHDXpaow809oo_go0V99A3jIkA2KMh_mINPyNBwcDs&e=> Best, Feng On Thu, Nov 23, 2023 at 11:23 PM Oxlade, Dan mailto:dan.oxl...@troweprice.com>> wrote: Hi all, I’m attempting to create a POC in flink to create a pipeline to stream parquet to a data warehouse in iceberg format. Ideally – I’d like to watch a directory in s3 (minio locally) and stream those to iceberg, doing the appropriate schema mapping/translation. I guess first; does this sound like a crazy idea? Assuming not is anyone able to share examples that might get me going. I’ve found lots of iceberg and flink sql examples but I think I’ll need something in java to do the schema mapping. Also some examples reading parquet for s3 seem a little hard to come by. I’m aware I’ll need a catalog, I can use nessie for the prototype. I’m also trying to use minio to get this all working locally but this might just be adding complexity at the moment. TIA Dan T. Rowe Price International Ltd (registered number 3957748) is registered in England and Wales with its registered office at Warwick Court, 5 Paternoster Square, London EC4M 7DX. T. Rowe Price International Ltd is authorised and regulated by the Financial Conduct Authority. The company has a branch in Dubai International Financial Centre (regulated by the DFSA as a Representative Office). T. Rowe Price (including T. Rowe Price International Ltd and its affiliates) and its associates do not provide legal or tax advice. Any tax-related discussion contained in this e-mail, including any attachments, is not intended or written to be used, and cannot be used, for the purpose of (i) avoiding any tax penalties or (ii) promoting, marketing, or recommending to any other party any transaction or matter addressed herein. Please consult your independent legal counsel and/or professional tax advisor regarding any legal or tax issues raised in this e-mail. The contents of this e-mail and any attachments are intended solely for the use of the named addressee(s) and may contain confidential and/or privileged information. Any unauthorized use, copying, disclosure, or distribution of the contents of this e-mail is strictly prohibited by the sender and may be unlawful. If you are not the intended recipient, please notify the sender immediately and delete this e-mail. T. Rowe Price International Ltd (registered number 3957748) is registered in England and Wales with its registered office at Warwick Court, 5 Paternoster Square, London EC4M 7DX. T. Rowe Price International Ltd is authorised and regulated by the Financial Conduct Authority. The company has a branch in Dubai International Financial Centre (regulated by the DFSA as a Representative Office). T. Rowe
Re: flink s3[parquet] -> s3[iceberg]
Hi Oxlade I think using Flink SQL can conveniently fulfill your requirements. For S3 Parquet files, you can create a temporary table using a filesystem connector[1] . For Iceberg tables, FlinkSQL can easily integrate with the Iceberg catalog[2]. Therefore, you can use Flink SQL to export S3 files to Iceberg. If you only need field mapping or transformation, I believe using Flink SQL + UDF (User-Defined Functions) would be sufficient to meet your needs. [1]. https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#directory-watching [2]. https://iceberg.apache.org/docs/latest/flink-connector/#table-managed-in-hadoop-catalog Best, Feng On Thu, Nov 23, 2023 at 11:23 PM Oxlade, Dan wrote: > Hi all, > > > > I’m attempting to create a POC in flink to create a pipeline to stream > parquet to a data warehouse in iceberg format. > > > > Ideally – I’d like to watch a directory in s3 (minio locally) and stream > those to iceberg, doing the appropriate schema mapping/translation. > > > > I guess first; does this sound like a crazy idea? > > Assuming not is anyone able to share examples that might get me going. > I’ve found lots of iceberg and flink sql examples but I think I’ll need > something in java to do the schema mapping. Also some examples reading > parquet for s3 seem a little hard to come by. > > > > I’m aware I’ll need a catalog, I can use nessie for the prototype. I’m > also trying to use minio to get this all working locally but this might > just be adding complexity at the moment. > > > > TIA > > Dan > > T. Rowe Price International Ltd (registered number 3957748) is registered > in England and Wales with its registered office at Warwick Court, 5 > Paternoster Square, London EC4M 7DX. T. Rowe Price International Ltd is > authorised and regulated by the Financial Conduct Authority. The company > has a branch in Dubai International Financial Centre (regulated by the DFSA > as a Representative Office). > > T. Rowe Price (including T. Rowe Price International Ltd and its > affiliates) and its associates do not provide legal or tax advice. Any > tax-related discussion contained in this e-mail, including any attachments, > is not intended or written to be used, and cannot be used, for the purpose > of (i) avoiding any tax penalties or (ii) promoting, marketing, or > recommending to any other party any transaction or matter addressed herein. > Please consult your independent legal counsel and/or professional tax > advisor regarding any legal or tax issues raised in this e-mail. > > The contents of this e-mail and any attachments are intended solely for > the use of the named addressee(s) and may contain confidential and/or > privileged information. Any unauthorized use, copying, disclosure, or > distribution of the contents of this e-mail is strictly prohibited by the > sender and may be unlawful. If you are not the intended recipient, please > notify the sender immediately and delete this e-mail. >
flink s3[parquet] -> s3[iceberg]
Hi all, I'm attempting to create a POC in flink to create a pipeline to stream parquet to a data warehouse in iceberg format. Ideally - I'd like to watch a directory in s3 (minio locally) and stream those to iceberg, doing the appropriate schema mapping/translation. I guess first; does this sound like a crazy idea? Assuming not is anyone able to share examples that might get me going. I've found lots of iceberg and flink sql examples but I think I'll need something in java to do the schema mapping. Also some examples reading parquet for s3 seem a little hard to come by. I'm aware I'll need a catalog, I can use nessie for the prototype. I'm also trying to use minio to get this all working locally but this might just be adding complexity at the moment. TIA Dan T. Rowe Price International Ltd (registered number 3957748) is registered in England and Wales with its registered office at Warwick Court, 5 Paternoster Square, London EC4M 7DX. T. Rowe Price International Ltd is authorised and regulated by the Financial Conduct Authority. The company has a branch in Dubai International Financial Centre (regulated by the DFSA as a Representative Office). T. Rowe Price (including T. Rowe Price International Ltd and its affiliates) and its associates do not provide legal or tax advice. Any tax-related discussion contained in this e-mail, including any attachments, is not intended or written to be used, and cannot be used, for the purpose of (i) avoiding any tax penalties or (ii) promoting, marketing, or recommending to any other party any transaction or matter addressed herein. Please consult your independent legal counsel and/or professional tax advisor regarding any legal or tax issues raised in this e-mail. The contents of this e-mail and any attachments are intended solely for the use of the named addressee(s) and may contain confidential and/or privileged information. Any unauthorized use, copying, disclosure, or distribution of the contents of this e-mail is strictly prohibited by the sender and may be unlawful. If you are not the intended recipient, please notify the sender immediately and delete this e-mail.