如果只能本地运行的话,建议本地部署flink standalone。spring 只负责提交任务到本地,并且监控任务运行状态。我理解你的任务都是批任务。

> 2024年12月2日 下午1:52,yyjwork <17317625...@163.com> 写道:
> 
> 附件补充成代码
> import org.apache.flink.api.common.functions.RichMapFunction;
> import org.apache.flink.api.common.serialization.SimpleStringEncoder;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.configuration.CoreOptions;
> import org.apache.flink.configuration.MemorySize;
> import org.apache.flink.connector.file.sink.FileSink;
> import org.apache.flink.core.fs.Path;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import 
> org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
> import 
> org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
> import 
> org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import org.apache.flink.types.Row;
> 
> 
> import java.io.File;
> import java.time.Duration;
> import java.time.ZoneId;
> 
> 
> 
> 
> public class FlinkSqlReadFileDemo {
> 
> 
> public static void main(String[] args) throws Exception {
> Configuration configuration = new Configuration();
> configuration.setString("classloader.resolve-order", "parent-first");
> configuration.setString(CoreOptions.CHECK_LEAKED_CLASSLOADER.key(), "true");
> configuration.setString(CoreOptions.ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL.key(),
>  "com.;org.;java.;sun."
> );
> // configuration.setString("pipeline.classpaths","");
> configuration.setString("execution.runtime-mode", "batch");
> // configuration.setString("pipeline.jars",
> // "");
> 
> 
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment(configuration);
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
> // 需要变更地址
> String tmp = "engine-test-starter/resources/tmp";
> // 扫描文件
> File folder = new File(tmp);
> // 读取本地需要处理的T+1文件不存在时则 创建
> if (!folder.exists()) {
> folder.mkdirs();
> }
> File[] files = folder.listFiles();
> for (File file : files) {
> tEnv.executeSql(
> "CREATE TEMPORARY TABLE text_source (" +
> " id String," +
> " name STRING" +
> ") WITH (" +
> " 'connector.type' = 'filesystem'," +
> " 'format.type' = 'csv'," +
> " 'connector.path' = 'file:///" + file.getAbsolutePath() + "'," +
> " 'format.field-delimiter' = '|'" +
> ")"
> );
> 
> 
> Table resultTable = tEnv.sqlQuery("SELECT id, name FROM text_source ");
> 
> 
> DataStream<Row> dataStream = tEnv.toDataStream(resultTable);
> DataStream<String> processedStream = dataStream.map(new RichMapFunction<Row, 
> String>() {
> @Override
> public String map(Row value) throws Exception {
> 
> 
> return "3";
> }
> })
> .setParallelism(8)
> .returns(String.class);
> String outputPath = ""; // 指定输出路径
> //
> // // 输出到文件系统
> FileSink<String> fieSink2 = FileSink
> // 输出行式存储的文件,指定路径、指定编码
> .<String>forRowFormat(new Path(outputPath), new 
> SimpleStringEncoder<>("UTF-8"))
> // 输出文件的一些配置: 文件名的前缀、后缀
> .withOutputFileConfig(
> OutputFileConfig.builder()
> .withPartPrefix("file-file")
> .withPartSuffix(".dat")
> .build()
> )
> // 按照目录分桶:如下,就是每个小时一个目录
> .withBucketAssigner(new DateTimeBucketAssigner<>("yyyyMMddHH", 
> ZoneId.systemDefault()))
> // 文件滚动策略: 1分钟 或 1m
> .withRollingPolicy(
> DefaultRollingPolicy.builder()
> .withRolloverInterval(Duration.ofMinutes(1))
> .withMaxPartSize(new MemorySize(1024 * 1024))
> .build()
> )
> .build();
> processedStream.sinkTo(fieSink2)
> 
> 
> .setParallelism(8);
> env.execute();
> tEnv.executeSql(" DROP TEMPORARY table text_source");
> }
> 
> 
> }
> 
> 
> }
> 
> 
> <project xmlns="http://maven.apache.org/POM/4.0.0"; 
> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
>         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
> http://maven.apache.org/maven-v4_0_0.xsd";>
>    <modelVersion>4.0.0</modelVersion>
> 
> 
>    <groupId>com.test</groupId>
>    <artifactId>engine-test-starter</artifactId>
>    <name>engine-test-starter</name>
>    <version>2.0.0-SNAPSHOT</version>
> 
> 
> 
> 
>    <properties>
>        <flink.version>1.18.1</flink.version>
>    </properties>
> 
> 
>    <dependencies>
> <!--     -->
>        <!-- SpringWeb模块 -->
>        <dependency>
>            <groupId>org.springframework</groupId>
>            <artifactId>spring-web</artifactId>
>            <version>5.3.15</version>
>        </dependency>
>        <dependency>
>            <groupId>org.springframework</groupId>
>            <artifactId>spring-context</artifactId>
>            <version>5.3.15</version>
>        </dependency>
>        <dependency>
>            <groupId>log4j</groupId>
>            <artifactId>log4j</artifactId>
>            <version>1.2.17</version>
>        </dependency>
>        <dependency>
>            <groupId>org.slf4j</groupId>
>            <artifactId>slf4j-api</artifactId>
>            <version>1.7.25</version>
>        </dependency>
>        <dependency>
>            <groupId>org.slf4j</groupId>
>            <artifactId>slf4j-log4j12</artifactId>
>            <version>1.7.25</version>
>            <scope>test</scope>
>        </dependency>
>        <dependency>
>            <groupId>org.slf4j</groupId>
>            <artifactId>slf4j-simple</artifactId>
>            <version>1.7.25</version>
>            <scope>test</scope>
>        </dependency>
>        <dependency>
>            <groupId>org.projectlombok</groupId>
>            <artifactId>lombok</artifactId>
>            <version>1.18.12</version>
>        </dependency>
>        <dependency>
>            <groupId>org.apache.flink</groupId>
>            <artifactId>flink-connector-files</artifactId>
>            <version>${flink.version}</version>
>        </dependency>
> 
> 
>        <!-- Flink 基础库 -->
>        <dependency>
>            <groupId>org.apache.flink</groupId>
>            <artifactId>flink-java</artifactId>
>            <version>1.18.1</version>
>            <!--<scope>provided</scope>-->
>        </dependency>
>        <dependency>
>            <groupId>org.apache.flink</groupId>
>            <artifactId>flink-streaming-java</artifactId>
>            <version>1.18.1</version>
>            <!--<scope>provided</scope>-->
>        </dependency>
>        <dependency>
>            <groupId>org.apache.flink</groupId>
>            <artifactId>flink-table-api-java</artifactId>
>            <version>1.18.1</version>
>        </dependency>
>        <dependency>
>            <groupId>org.apache.flink</groupId>
>            <artifactId>flink-table-api-java-bridge</artifactId>
>            <version>1.18.1</version>
>        </dependency>
> 
> 
>        <!--<dependency>-->
>        <!--<groupId>org.apache.flink</groupId>-->
>        <!--<artifactId>flink-clients</artifactId>-->
>        <!--<version>1.18.1</version>-->
>        <!--&lt;!&ndash;<scope>provided</scope>&ndash;&gt;-->
>        <!--</dependency>-->
>        <dependency>
>            <groupId>org.apache.flink</groupId>
>            <artifactId>flink-table-runtime</artifactId>
>            <version>1.18.1</version>
>        </dependency>
>        <dependency>
>            <groupId>org.apache.flink</groupId>
>            <artifactId>flink-runtime-web</artifactId>
>            <version>1.18.1</version>
>        </dependency>
> 
> 
>        <dependency>
>            <groupId>org.apache.flink</groupId>
>            <artifactId>flink-core</artifactId>
>            <version>1.18.1</version>
>        </dependency>
> <!--        <dependency>-->
> <!--            <groupId>org.apache.flink</groupId>-->
> <!--            <artifactId>flink-connector-kafka</artifactId>-->
> <!--            <version>${flink.version}</version>-->
> <!--        </dependency>-->
>        <dependency>
>            <groupId>org.apache.flink</groupId>
>            <artifactId>flink-csv</artifactId>
>            <version>1.18.1</version>
>        </dependency>
>        <dependency>
>            <groupId>org.postgresql</groupId>
>            <artifactId>postgresql</artifactId>
>            <version>42.5.0</version> <!-- 请检查最新版本 -->
>        </dependency>
>        <dependency>
>            <groupId>com.alibaba</groupId>
>            <artifactId>fastjson</artifactId>
>            <version>1.2.83</version>
>        </dependency>
> 
> 
>        <!--生成数据-->
>        <dependency>
>            <groupId>org.apache.flink</groupId>
>            <artifactId>flink-connector-datagen</artifactId>
>            <version>${flink.version}</version>
>        </dependency>
>        <dependency>
>            <groupId>org.projectlombok</groupId>
>            <artifactId>lombok</artifactId>
>            <version>1.16.22</version>
>        </dependency>
>        <dependency>
>            <groupId>org.apache.flink</groupId>
>            <artifactId>flink-connector-jdbc</artifactId>
>            <version>3.1.2-1.17</version>
>        </dependency>
>        <dependency>
>            <groupId>org.apache.flink</groupId>
>            <artifactId>flink-table-planner_2.12</artifactId>
>            <version>${flink.version}</version>
>        </dependency>
>        <dependency>
>            <groupId>mysql</groupId>
>            <artifactId>mysql-connector-java</artifactId>
>            <version>8.0.23</version>
>        </dependency>
>    </dependencies>
> 
> 
>    <repositories>
>        <repository>
>            <id>spring-milestones</id>
>            <name>Spring Milestones</name>
>            <url>https://repo.spring.io/libs-milestone</url>
>            <snapshots>
>                <enabled>false</enabled>
>            </snapshots>
>        </repository>
>    </repositories>
> 
> 
>    <!-- 发布节点 -->
>    <distributionManagement>
>        <!-- mvn deploy -Pproduct 发布正式版本到nexus私服 -->
>        <repository>
>            <id>dev-release</id>
>            <name>Nexus Snapshot</name>
>            <url></url>
>        </repository>
>        <!-- mvn deploy 发布默认快照版本到nexus私服 -->
>        <snapshotRepository>
>            <id>dev-snapshot</id>
>            <name>Nexus Snapshot</name>
>            <url></url>
>        </snapshotRepository>
>    </distributionManagement>
> 
> 
>    <build>
>        <plugins>
>            <plugin>
>                <groupId>org.apache.maven.plugins</groupId>
>                <artifactId>maven-compiler-plugin</artifactId>
>                <version>3.5.1</version>
>                <configuration>
>                    <source>1.8</source>
>                    <target>1.8</target>
>                </configuration>
>            </plugin>
>        </plugins>
> 
> 
>    </build>
> </project>
> 
> 
> 
> 
> tmp目录下的文件内容
> file1.txt
> id|name
> 1|1
> 1|1
> 1|1
> 1|1
> 1|1
> 1|1
> 1|1
> 1|1
> 1|1
> 1|1
> 1|1
> 1|1
> 1|1
> 

回复