Skip to main content

Apache Flink

Apache Flink is a powerful, open-source distributed processing framework designed for stateful computations over both bounded and unbounded data streams at any scale. It enables high-throughput, low-latency, and fault-tolerant processing while offering elastic scaling capabilities to handle millions of events per second across thousands of cores.

Apache Flink can use Apache Ozone for reading and writing data, and for storing essential operational components like application state checkpoints and savepoints.

Quickstart

This tutorial shows how to get started with connecting Apache Flink to Apache Ozone using the S3 Gateway, with Docker Compose.

Quickstart environment

  • Unsecure Ozone and Flink clusters.
  • Ozone S3G enables path style access. To enable virtual-host style addressing see here.
  • Flink accesses Ozone via S3 Gateway.

Step 1 — Download Ozone's docker-compose.yaml

First, obtain Ozone's sample Docker Compose configuration and save it as docker-compose.yaml:

curl -O https://raw.githubusercontent.com/apache/ozone-docker/refs/heads/latest/docker-compose.yaml

Edit the docker-compose.yaml:

Append the last 2 SCM safemode configurations to the x-common-config: section to enable starting with a single Datanode.

x-common-config:
&common-config
...
no_proxy: "om,recon,scm,s3g,localhost,127.0.0.1"
OZONE-SITE.XML_hdds.scm.safemode.min.datanode: "1"
OZONE-SITE.XML_hdds.scm.safemode.healthy.pipeline.pct: "0"

Refer to the Docker quick start page for details.

services:
jobmanager:
image: flink:scala_2.12-java17
command: >
bash -c "mkdir -p /opt/flink/plugins/s3-fs-hadoop &&
cp /opt/flink/opt/flink-s3-fs-hadoop-*.jar /opt/flink/plugins/s3-fs-hadoop/ &&
/docker-entrypoint.sh jobmanager"
ports:
- "8081:8081"
environment:
AWS_ACCESS_KEY_ID: ozone
AWS_SECRET_ACCESS_KEY: ozone
FLINK_PROPERTIES: |
jobmanager.rpc.address: jobmanager
fs.s3a.endpoint: http://s3g:9878
fs.s3a.path.style.access: true
fs.s3a.connection.ssl.enabled: false
fs.s3a.access.key: ozone
fs.s3a.secret.key: ozone

taskmanager:
image: flink:scala_2.12-java17
command: >
bash -c "mkdir -p /opt/flink/plugins/s3-fs-hadoop &&
cp /opt/flink/opt/flink-s3-fs-hadoop-*.jar /opt/flink/plugins/s3-fs-hadoop/ &&
/docker-entrypoint.sh taskmanager"
depends_on:
- jobmanager
environment:
AWS_ACCESS_KEY_ID: ozone
AWS_SECRET_ACCESS_KEY: ozone
FLINK_PROPERTIES: |
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 4
fs.s3a.endpoint: http://s3g:9878
fs.s3a.path.style.access: true
fs.s3a.connection.ssl.enabled: false
fs.s3a.access.key: ozone
fs.s3a.secret.key: ozone

With both docker-compose.yaml (for Ozone) and docker-compose-flink.yml (for Flink) in the same directory, you can start both services together, sharing the same network, using:

export COMPOSE_FILE=docker-compose.yaml:docker-compose-flink.yml
docker compose up -d

Verify containers are running:

docker ps

Step 4 — Create an Ozone bucket

You need to connect to Ozone (for example, s3g) to create a OBS bucket:

docker compose exec -it s3g ozone sh bucket create s3v/bucket1 -l obs
docker compose exec -it jobmanager ./bin/sql-client.sh

You should now be in:

Flink SQL>

Step 6 — Create and Query a table backed by Ozone S3

Important: Must use BATCH mode otherwise multi-part upload fails.

SET 'execution.runtime-mode' = 'BATCH';

CREATE TABLE ozone_sink (
id STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'filesystem',
'path' = 's3a://bucket1/ozone_sink/',
'format' = 'csv'
);

Insert data:

INSERT INTO ozone_sink VALUES ('hello', CURRENT_TIMESTAMP);

Query it:

SELECT * FROM ozone_sink;

If this works, Flink is successfully reading/writing Ozone via S3.

Open your browser:

http://localhost:8081/

Here you can:

  • See running and completed jobs
  • Inspect TaskManagers
  • Debug failures visually

This is the first place to look if something goes wrong.

Key takeaways (important)

  • Flink Docker images do not ship with S3 enabled
  • The S3 plugin must exist in both JM and TM
  • Flink and Ozone should be started using a combined Docker Compose file (COMPOSE_FILE) to ensure they share the same network.
  • Always use s3a:// with flink-s3-fs-hadoop
  • Check http://localhost:8081/ to confirm jobs are running
  • Batch mode is required for Flink SQL to avoid multipart upload failures to Ozone. Use SET 'execution.runtime-mode' = 'BATCH';