附件补充成代码 import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.serialization.SimpleStringEncoder; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.MemorySize; import org.apache.flink.connector.file.sink.FileSink; import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig; import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row;
import java.io.File; import java.time.Duration; import java.time.ZoneId; public class FlinkSqlReadFileDemo { public static void main(String[] args) throws Exception { Configuration configuration = new Configuration(); configuration.setString("classloader.resolve-order", "parent-first"); configuration.setString(CoreOptions.CHECK_LEAKED_CLASSLOADER.key(), "true"); configuration.setString(CoreOptions.ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL.key(), "com.;org.;java.;sun." ); // configuration.setString("pipeline.classpaths",""); configuration.setString("execution.runtime-mode", "batch"); // configuration.setString("pipeline.jars", // ""); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); // 需要变更地址 String tmp = "engine-test-starter/resources/tmp"; // 扫描文件 File folder = new File(tmp); // 读取本地需要处理的T+1文件不存在时则 创建 if (!folder.exists()) { folder.mkdirs(); } File[] files = folder.listFiles(); for (File file : files) { tEnv.executeSql( "CREATE TEMPORARY TABLE text_source (" + " id String," + " name STRING" + ") WITH (" + " 'connector.type' = 'filesystem'," + " 'format.type' = 'csv'," + " 'connector.path' = 'file:///" + file.getAbsolutePath() + "'," + " 'format.field-delimiter' = '|'" + ")" ); Table resultTable = tEnv.sqlQuery("SELECT id, name FROM text_source "); DataStream<Row> dataStream = tEnv.toDataStream(resultTable); DataStream<String> processedStream = dataStream.map(new RichMapFunction<Row, String>() { @Override public String map(Row value) throws Exception { return "3"; } }) .setParallelism(8) .returns(String.class); String outputPath = ""; // 指定输出路径 // // // 输出到文件系统 FileSink<String> fieSink2 = FileSink // 输出行式存储的文件,指定路径、指定编码 .<String>forRowFormat(new Path(outputPath), new SimpleStringEncoder<>("UTF-8")) // 输出文件的一些配置: 文件名的前缀、后缀 .withOutputFileConfig( OutputFileConfig.builder() .withPartPrefix("file-file") .withPartSuffix(".dat") .build() ) // 按照目录分桶:如下,就是每个小时一个目录 .withBucketAssigner(new DateTimeBucketAssigner<>("yyyyMMddHH", ZoneId.systemDefault())) // 文件滚动策略: 1分钟 或 1m .withRollingPolicy( DefaultRollingPolicy.builder() .withRolloverInterval(Duration.ofMinutes(1)) .withMaxPartSize(new MemorySize(1024 * 1024)) .build() ) .build(); processedStream.sinkTo(fieSink2) .setParallelism(8); env.execute(); tEnv.executeSql(" DROP TEMPORARY table text_source"); } } } <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.test</groupId> <artifactId>engine-test-starter</artifactId> <name>engine-test-starter</name> <version>2.0.0-SNAPSHOT</version> <properties> <flink.version>1.18.1</flink.version> </properties> <dependencies> <!-- --> <!-- SpringWeb模块 --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-web</artifactId> <version>5.3.15</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>5.3.15</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.25</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.25</version> <scope>test</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.25</version> <scope>test</scope> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.12</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-files</artifactId> <version>${flink.version}</version> </dependency> <!-- Flink 基础库 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.18.1</version> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>1.18.1</version> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java</artifactId> <version>1.18.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge</artifactId> <version>1.18.1</version> </dependency> <!--<dependency>--> <!--<groupId>org.apache.flink</groupId>--> <!--<artifactId>flink-clients</artifactId>--> <!--<version>1.18.1</version>--> <!--<!–<scope>provided</scope>–>--> <!--</dependency>--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-runtime</artifactId> <version>1.18.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime-web</artifactId> <version>1.18.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>1.18.1</version> </dependency> <!-- <dependency>--> <!-- <groupId>org.apache.flink</groupId>--> <!-- <artifactId>flink-connector-kafka</artifactId>--> <!-- <version>${flink.version}</version>--> <!-- </dependency>--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-csv</artifactId> <version>1.18.1</version> </dependency> <dependency> <groupId>org.postgresql</groupId> <artifactId>postgresql</artifactId> <version>42.5.0</version> <!-- 请检查最新版本 --> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.83</version> </dependency> <!--生成数据--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-datagen</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.16.22</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc</artifactId> <version>3.1.2-1.17</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.23</version> </dependency> </dependencies> <repositories> <repository> <id>spring-milestones</id> <name>Spring Milestones</name> <url>https://repo.spring.io/libs-milestone</url> <snapshots> <enabled>false</enabled> </snapshots> </repository> </repositories> <!-- 发布节点 --> <distributionManagement> <!-- mvn deploy -Pproduct 发布正式版本到nexus私服 --> <repository> <id>dev-release</id> <name>Nexus Snapshot</name> <url></url> </repository> <!-- mvn deploy 发布默认快照版本到nexus私服 --> <snapshotRepository> <id>dev-snapshot</id> <name>Nexus Snapshot</name> <url></url> </snapshotRepository> </distributionManagement> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.5.1</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build> </project> tmp目录下的文件内容 file1.txt id|name 1|1 1|1 1|1 1|1 1|1 1|1 1|1 1|1 1|1 1|1 1|1 1|1 1|1