Flink 对接阿里云 OSS(Object Storage Service)读写、Checkpoint、插件安装与配置模板
1、OSS URI 规则与常见使用场景
在 Flink 中访问 OSS 的路径格式:
oss:///
这里的 既可以是文件,也可以是目录前缀(类似路径)。
常用场景
- FileSource 从 OSS 读文件/目录
- FileSink 写入 OSS
- Checkpoint/Savepoint 存 OSS(生产常用,尤其云上)
- HA 元数据目录、EmbeddedRocksDBStateBackend 等(只要接受 FileSystem URI)
2、代码示例:读、写、Checkpoint 放 OSS
2.1 从 OSS 读取(FileSource)
FileSource<String> fileSource = FileSource.forRecordStreamFormat(
new TextLineInputFormat(),
new Path("oss:///" )
).build();
env.fromSource(
fileSource,
WatermarkStrategy.noWatermarks(),
"oss-input"
);
2.2 写入 OSS(FileSink)
stream.sinkTo(
FileSink.forRowFormat(
new Path("oss:///" ),
new SimpleStringEncoder<String>()
).build()
);
2.3 使用 OSS 作为 Checkpoint Storage
Configuration config = new Configuration();
config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "oss:///" );
env.configure(config);
3、插件:flink-oss-fs-hadoop(Shaded Hadoop OSS FileSystem)
Flink 提供 flink-oss-fs-hadoop 插件对接 OSS:
- scheme:
oss:// - 插件会注册
oss://的默认 FileSystem wrapper - 以 shaded 方式提供(便于开箱即用,减少依赖冲突)
3.1 安装方式(plugins 目录)
启动 Flink 前把 JAR 从 opt 拷贝到 plugins:
mkdir ./plugins/oss-fs-hadoop
cp ./opt/flink-oss-fs-hadoop-2.2.0.jar ./plugins/oss-fs-hadoop/
提示:插件目录建议“一插件一子目录”,后期排查 classpath/加载问题更清晰。
4、配置:沿用 Hadoop core-site.xml 的 key(Flink 里直接写)
为了易用,OSS 插件允许你在 flink-conf.yaml 里直接使用 Hadoop OSS 的配置键(与 core-site.xml 同体系)。完整可用键可参考 Hadoop OSS 文档;其中有 3 个是必填项,其它多为高级调优项。
4.1 必填配置项(最小可用集)
fs.oss.endpoint: -oss-endpoint>
fs.oss.accessKeyId: -access-key-id>
fs.oss.accessKeySecret: -access-key-secret>
说明:
fs.oss.endpoint:你要连接的 OSS endpoint(和 region/内外网域名相关)accessKeyId/accessKeySecret:AK/SK(注意保护,不要明文散落)
生产建议:把这些配置下发到集群配置中心或用加密/挂载方式注入,避免写进镜像或代码库。
5、更安全的凭证方式:CredentialsProvider(推荐)
除了直接在配置里写 AK/SK,你还可以指定凭证提供器(CredentialsProvider),把密钥放在环境变量等更安全的载体里。
示例:从环境变量 OSS_ACCESS_KEY_ID 与 OSS_ACCESS_KEY_SECRET 读取:
fs.oss.credentials.provider: com.aliyun.oss.common.auth.EnvironmentVariableCredentialsProvider
这种方式的好处:
- 避免 AK/SK 明文出现在
flink-conf.yaml - 更适合 K8s(Secret 注入为 env)或 YARN(container env)等部署方式
- 便于轮换密钥(更新环境变量即可)
文档也提到还有其他 credential provider,可按你们的安全体系选择(如实例角色/STS 等,取决于你们运行环境的能力与合规要求)。
6、生产落地 checklist(少走弯路)
- 先装插件再启动集群:plugins 目录的 jar 需要在 JobManager/TaskManager 启动前就位
- 路径显式写 oss://:避免 default-scheme 影响路径归属
- 凭证优先用 provider:尽量别把 AK/SK 明文写配置
- checkpoint 目录分层:例如
oss://bucket/checkpoints/,避免不同作业互相污染/ / - 调优参数按现象上:先保证可用稳定,再逐步做性能调参(并发、超时、重试、分片等)
本文地址:https://www.yitenyun.com/6645.html
上一篇:数据结构基础
下一篇:Java 基础常见问题总结(3)






