如果只能本地运行的话,建议本地部署flink standalone。spring 只负责提交任务到本地,并且监控任务运行状态。我理解你的任务都是批任务。
> 2024年12月2日 下午1:52,yyjwork <17317625...@163.com> 写道: > > 附件补充成代码 > import org.apache.flink.api.common.functions.RichMapFunction; > import org.apache.flink.api.common.serialization.SimpleStringEncoder; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.configuration.CoreOptions; > import org.apache.flink.configuration.MemorySize; > import org.apache.flink.connector.file.sink.FileSink; > import org.apache.flink.core.fs.Path; > import org.apache.flink.streaming.api.datastream.DataStream; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import > org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig; > import > org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner; > import > org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; > import org.apache.flink.table.api.Table; > import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > import org.apache.flink.types.Row; > > > import java.io.File; > import java.time.Duration; > import java.time.ZoneId; > > > > > public class FlinkSqlReadFileDemo { > > > public static void main(String[] args) throws Exception { > Configuration configuration = new Configuration(); > configuration.setString("classloader.resolve-order", "parent-first"); > configuration.setString(CoreOptions.CHECK_LEAKED_CLASSLOADER.key(), "true"); > configuration.setString(CoreOptions.ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL.key(), > "com.;org.;java.;sun." > ); > // configuration.setString("pipeline.classpaths",""); > configuration.setString("execution.runtime-mode", "batch"); > // configuration.setString("pipeline.jars", > // ""); > > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(configuration); > StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); > // 需要变更地址 > String tmp = "engine-test-starter/resources/tmp"; > // 扫描文件 > File folder = new File(tmp); > // 读取本地需要处理的T+1文件不存在时则 创建 > if (!folder.exists()) { > folder.mkdirs(); > } > File[] files = folder.listFiles(); > for (File file : files) { > tEnv.executeSql( > "CREATE TEMPORARY TABLE text_source (" + > " id String," + > " name STRING" + > ") WITH (" + > " 'connector.type' = 'filesystem'," + > " 'format.type' = 'csv'," + > " 'connector.path' = 'file:///" + file.getAbsolutePath() + "'," + > " 'format.field-delimiter' = '|'" + > ")" > ); > > > Table resultTable = tEnv.sqlQuery("SELECT id, name FROM text_source "); > > > DataStream<Row> dataStream = tEnv.toDataStream(resultTable); > DataStream<String> processedStream = dataStream.map(new RichMapFunction<Row, > String>() { > @Override > public String map(Row value) throws Exception { > > > return "3"; > } > }) > .setParallelism(8) > .returns(String.class); > String outputPath = ""; // 指定输出路径 > // > // // 输出到文件系统 > FileSink<String> fieSink2 = FileSink > // 输出行式存储的文件,指定路径、指定编码 > .<String>forRowFormat(new Path(outputPath), new > SimpleStringEncoder<>("UTF-8")) > // 输出文件的一些配置: 文件名的前缀、后缀 > .withOutputFileConfig( > OutputFileConfig.builder() > .withPartPrefix("file-file") > .withPartSuffix(".dat") > .build() > ) > // 按照目录分桶:如下,就是每个小时一个目录 > .withBucketAssigner(new DateTimeBucketAssigner<>("yyyyMMddHH", > ZoneId.systemDefault())) > // 文件滚动策略: 1分钟 或 1m > .withRollingPolicy( > DefaultRollingPolicy.builder() > .withRolloverInterval(Duration.ofMinutes(1)) > .withMaxPartSize(new MemorySize(1024 * 1024)) > .build() > ) > .build(); > processedStream.sinkTo(fieSink2) > > > .setParallelism(8); > env.execute(); > tEnv.executeSql(" DROP TEMPORARY table text_source"); > } > > > } > > > } > > > <project xmlns="http://maven.apache.org/POM/4.0.0" > xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" > xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 > http://maven.apache.org/maven-v4_0_0.xsd"> > <modelVersion>4.0.0</modelVersion> > > > <groupId>com.test</groupId> > <artifactId>engine-test-starter</artifactId> > <name>engine-test-starter</name> > <version>2.0.0-SNAPSHOT</version> > > > > > <properties> > <flink.version>1.18.1</flink.version> > </properties> > > > <dependencies> > <!-- --> > <!-- SpringWeb模块 --> > <dependency> > <groupId>org.springframework</groupId> > <artifactId>spring-web</artifactId> > <version>5.3.15</version> > </dependency> > <dependency> > <groupId>org.springframework</groupId> > <artifactId>spring-context</artifactId> > <version>5.3.15</version> > </dependency> > <dependency> > <groupId>log4j</groupId> > <artifactId>log4j</artifactId> > <version>1.2.17</version> > </dependency> > <dependency> > <groupId>org.slf4j</groupId> > <artifactId>slf4j-api</artifactId> > <version>1.7.25</version> > </dependency> > <dependency> > <groupId>org.slf4j</groupId> > <artifactId>slf4j-log4j12</artifactId> > <version>1.7.25</version> > <scope>test</scope> > </dependency> > <dependency> > <groupId>org.slf4j</groupId> > <artifactId>slf4j-simple</artifactId> > <version>1.7.25</version> > <scope>test</scope> > </dependency> > <dependency> > <groupId>org.projectlombok</groupId> > <artifactId>lombok</artifactId> > <version>1.18.12</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-connector-files</artifactId> > <version>${flink.version}</version> > </dependency> > > > <!-- Flink 基础库 --> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-java</artifactId> > <version>1.18.1</version> > <!--<scope>provided</scope>--> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-streaming-java</artifactId> > <version>1.18.1</version> > <!--<scope>provided</scope>--> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-table-api-java</artifactId> > <version>1.18.1</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-table-api-java-bridge</artifactId> > <version>1.18.1</version> > </dependency> > > > <!--<dependency>--> > <!--<groupId>org.apache.flink</groupId>--> > <!--<artifactId>flink-clients</artifactId>--> > <!--<version>1.18.1</version>--> > <!--<!–<scope>provided</scope>–>--> > <!--</dependency>--> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-table-runtime</artifactId> > <version>1.18.1</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-runtime-web</artifactId> > <version>1.18.1</version> > </dependency> > > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-core</artifactId> > <version>1.18.1</version> > </dependency> > <!-- <dependency>--> > <!-- <groupId>org.apache.flink</groupId>--> > <!-- <artifactId>flink-connector-kafka</artifactId>--> > <!-- <version>${flink.version}</version>--> > <!-- </dependency>--> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-csv</artifactId> > <version>1.18.1</version> > </dependency> > <dependency> > <groupId>org.postgresql</groupId> > <artifactId>postgresql</artifactId> > <version>42.5.0</version> <!-- 请检查最新版本 --> > </dependency> > <dependency> > <groupId>com.alibaba</groupId> > <artifactId>fastjson</artifactId> > <version>1.2.83</version> > </dependency> > > > <!--生成数据--> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-connector-datagen</artifactId> > <version>${flink.version}</version> > </dependency> > <dependency> > <groupId>org.projectlombok</groupId> > <artifactId>lombok</artifactId> > <version>1.16.22</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-connector-jdbc</artifactId> > <version>3.1.2-1.17</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-table-planner_2.12</artifactId> > <version>${flink.version}</version> > </dependency> > <dependency> > <groupId>mysql</groupId> > <artifactId>mysql-connector-java</artifactId> > <version>8.0.23</version> > </dependency> > </dependencies> > > > <repositories> > <repository> > <id>spring-milestones</id> > <name>Spring Milestones</name> > <url>https://repo.spring.io/libs-milestone</url> > <snapshots> > <enabled>false</enabled> > </snapshots> > </repository> > </repositories> > > > <!-- 发布节点 --> > <distributionManagement> > <!-- mvn deploy -Pproduct 发布正式版本到nexus私服 --> > <repository> > <id>dev-release</id> > <name>Nexus Snapshot</name> > <url></url> > </repository> > <!-- mvn deploy 发布默认快照版本到nexus私服 --> > <snapshotRepository> > <id>dev-snapshot</id> > <name>Nexus Snapshot</name> > <url></url> > </snapshotRepository> > </distributionManagement> > > > <build> > <plugins> > <plugin> > <groupId>org.apache.maven.plugins</groupId> > <artifactId>maven-compiler-plugin</artifactId> > <version>3.5.1</version> > <configuration> > <source>1.8</source> > <target>1.8</target> > </configuration> > </plugin> > </plugins> > > > </build> > </project> > > > > > tmp目录下的文件内容 > file1.txt > id|name > 1|1 > 1|1 > 1|1 > 1|1 > 1|1 > 1|1 > 1|1 > 1|1 > 1|1 > 1|1 > 1|1 > 1|1 > 1|1 >