Moving analytics data from a Zalando-managed PostgreSQL cluster into OpenSearch for full-text search and dashboarding looks simple on paper. In practice, it requires date-windowed reads over JDBC, deterministic document IDs, CronJob scheduling, and Kubernetes secret injection. This article documents the full pattern.
This is the wiring layer, not the analysis layer. The point is a reliable, idempotent sync from Postgres into OpenSearch so dashboards and search queries run against indexed data without touching the transactional database directly.
The Stack
Four components: Zalando Postgres Operator managing the source cluster, OpenSearch as the target, PySpark running in local mode inside a Kubernetes CronJob, and the OpenSearch Spark connector handling bulk writes. Spark local mode means no cluster to manage. One container runs spark-submit and exits when done. The OpenSearch Spark connector beats raw JDBC writes because it handles bulk indexing natively and is actively maintained. Each sync target gets its own CronJob, and all scripts live in a ConfigMap mounted as a volume.
Custom Docker Image
The base image is apache/spark:3.5.8-python3. Three JARs go into /opt/spark/jars/ at build time: the PostgreSQL JDBC driver, the OpenSearch Spark connector, and httpclient. That last one is a required transitive dependency of the OpenSearch connector. Leave it out and you get a ClassNotFoundException at runtime that costs an hour to diagnose.
FROM apache/spark:3.5.8-python3
USER root
RUN curl -L -o /opt/spark/jars/postgresql-42.7.4.jar \
https://jdbc.postgresql.org/download/postgresql-42.7.4.jar && \
curl -L -o /opt/spark/jars/opensearch-spark-30_2.12-1.3.0.jar \
https://repo1.maven.org/maven2/org/opensearch/client/opensearch-spark-30_2.12/1.3.0/opensearch-spark-30_2.12-1.3.0.jar && \
curl -L -o /opt/spark/jars/httpclient-4.5.14.jar \
https://repo1.maven.org/maven2/org/apache/httpcomponents/httpclient/4.5.14/httpclient-4.5.14.jar
RUN mkdir -p /home/spark/.ivy2/cache /home/spark/.ivy2/jars && \
chown -R spark:spark /home/spark/.ivy2 && \
chmod -R 755 /home/spark/.ivy2
WORKDIR /opt/spark
USER spark
Pin these versions. The OpenSearch connector artifact name encodes both the Spark major version (30 = Spark 3.x) and the Scala version (2.12). Mixing versions produces silent failures or missing class errors at startup.
Script Setup
Each migration script is self-contained. All configuration comes from environment variables with safe defaults, and the SparkSession is built once at the top.
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import os
from datetime import date, timedelta
PG_HOST = os.environ.get("PG_HOST", "analytics-cluster")
PG_PORT = os.environ.get("PG_PORT", "5432")
PG_DATABASE = os.environ.get("PG_DATABASE", "datawarehouse")
PG_USER = os.environ.get("PG_USER", "")
PG_PASSWORD = os.environ.get("PG_PASSWORD", "")
OS_HOST = os.environ.get("OS_HOST", "opensearch")
OS_PORT = os.environ.get("OS_PORT", "9200")
DATE_FROM = os.environ.get("DATE_FROM", "2020-01-01")
DATE_TO = os.environ.get("DATE_TO", (date.today() + timedelta(days=1)).isoformat())
BATCH_DAYS = int(os.environ.get("BATCH_DAYS", "30"))
DRY_RUN = os.environ.get("DRY_RUN", "false").lower() == "true"
PG_URL = f"jdbc:postgresql://{PG_HOST}:{PG_PORT}/{PG_DATABASE}"
OS_NODES = f"{OS_HOST}:{OS_PORT}"
spark = (
SparkSession.builder
.appName("postgres-to-opensearch")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.sql.adaptive.enabled", "true")
.config("spark.sql.shuffle.partitions", "100")
.getOrCreate()
)
def read_batch(table, date_col, date_from, date_to):
query = (
f"(SELECT * FROM {table} "
f"WHERE {date_col} >= '{date_from}' AND {date_col} < '{date_to}') AS batch"
)
return (
spark.read
.format("jdbc")
.option("url", PG_URL)
.option("dbtable", query)
.option("user", PG_USER)
.option("password", PG_PASSWORD)
.option("fetchsize", "10000")
.option("driver", "org.postgresql.Driver")
.load()
)
Zalando Secret Injection
The Zalando Postgres Operator creates one Secret per database user per cluster. The naming convention is {dbname}.{cluster-name}.credentials.postgresql.acid.zalan.do, with two keys: username and password. Inject them directly into the container environment using secretKeyRef.
- name: PG_USER
valueFrom:
secretKeyRef:
name: datawarehouse.analytics-cluster.credentials.postgresql.acid.zalan.do
key: username
- name: PG_PASSWORD
valueFrom:
secretKeyRef:
name: datawarehouse.analytics-cluster.credentials.postgresql.acid.zalan.do
key: password
No credential management code in the scripts. If the Zalando operator rotates credentials, the CronJob picks up the new values automatically because Kubernetes re-reads the Secret at pod start.
Date-Windowed Reads
Full table scans on large Postgres tables cause lock contention and memory pressure in the JDBC driver. Push a WHERE clause into Postgres via a JDBC subquery instead. Spark receives only the filtered rows.
def read_batch(table, date_col, date_from, date_to):
# The subquery alias (AS batch) is required by Spark JDBC.
# Without it, the query fails with a syntax error.
# The WHERE clause runs in Postgres - Spark receives only filtered rows.
query = (
f"(SELECT * FROM {table} "
f"WHERE {date_col} >= '{date_from}' AND {date_col} < '{date_to}') AS batch"
)
return (
spark.read
.format("jdbc")
.option("url", PG_URL)
.option("dbtable", query)
.option("user", PG_USER)
.option("password", PG_PASSWORD)
.option("fetchsize", "10000")
.option("driver", "org.postgresql.Driver")
.load()
)
BATCH_DAYS defaults to 30. Set it higher (up to 90) for tables with low row density. For initial backfill, set DATE_FROM to the earliest row in the table. For nightly sync, a shell wrapper in the CronJob command computes the rolling window at runtime.
Writing to OpenSearch
def write_to_opensearch(df, index_name):
(
df.write
.format("opensearch")
.option("opensearch.nodes", OS_NODES)
.option("opensearch.resource", index_name)
.option("opensearch.mapping.id", "_id")
.option("opensearch.mapping.exclude", "_id")
.option("opensearch.nodes.wan.only", "true")
.option("opensearch.batch.size.entries", "1000")
.option("opensearch.batch.size.bytes", "5mb")
.option("opensearch.batch.write.retry.count", "3")
.option("opensearch.batch.write.retry.wait", "30s")
.option("opensearch.net.ssl", "false")
.mode("append")
.save()
)
mode("append") combined with opensearch.mapping.id produces upsert behavior: if a document with the same _id already exists in the index, it gets overwritten. opensearch.mapping.exclude keeps the _id field out of the stored document body.
opensearch.nodes.wan.only: true is the setting that trips up Kubernetes deployments. Without it, the connector queries OpenSearch for its internal node IPs and tries to write directly to those addresses, which are unreachable from the Spark pod. Setting wan.only forces the connector to use the configured hostname and nothing else.
Deterministic Document IDs
Idempotency requires generating the same _id for the same source row on every run. Two patterns cover most cases.
# Short composite keys: concatenate with | separator
df = df.withColumn(
"_id",
F.concat_ws("|", F.col("event_date"), F.col("source"), F.col("seq"))
)
# Long keys (URLs, free text): MD5 hash of the concatenated key
df = df.withColumn(
"_id",
F.md5(F.concat_ws("|", F.col("user_id"), F.col("url"), F.col("timestamp")))
)
The | separator prevents false collisions: without it, a + bc and ab + c produce the same string. MD5 here is a fixed-length fingerprint, not a security measure.
ConfigMap as Script Store
All migration scripts live as keys in a single ConfigMap, mounted at /scripts/ in the container. When a script changes, update the ConfigMap and the next CronJob trigger picks it up with no image rebuild.
apiVersion: v1
kind: ConfigMap
metadata:
name: opensearch-migration-scripts
namespace: data
data:
migrate-events.py: |
#!/usr/bin/env python3
# script content here
migrate-contacts.py: |
#!/usr/bin/env python3
# script content here
The relevant volume configuration in the CronJob pod spec:
volumeMounts:
- name: migration-scripts
mountPath: /scripts
- name: spark-local-dir
mountPath: /tmp/spark-local-dir
volumes:
- name: migration-scripts
configMap:
name: opensearch-migration-scripts
defaultMode: 0755
- name: spark-local-dir
emptyDir:
sizeLimit: 20Gi
The emptyDir volume gives Spark somewhere to spill data if the job exceeds driver memory. The sizeLimit keeps a runaway job from filling the node disk.
CronJob Scheduling
One CronJob per sync target. Stagger schedules by at least 30 minutes to avoid simultaneous JDBC connections saturating the Postgres connection pool.
One thing that catches people: environment variable values in a CronJob spec are not processed through a shell. The expression $(date -u -d '30 days ago' +%Y-%m-%d) in an env.value field does not expand. Use a shell wrapper in the container command instead.
command: ["/bin/bash", "-c"]
args:
- |
export DATE_FROM=$(date -u -d '30 days ago' +%Y-%m-%d)
export DATE_TO=$(date -u +%Y-%m-%d)
/opt/spark/bin/spark-submit --master "local[*]" --driver-memory 4g /scripts/migrate-events.py
Full CronJob manifest:
apiVersion: batch/v1
kind: CronJob
metadata:
name: os-sync-events
namespace: data
labels:
app: opensearch-migration
table: events
spec:
schedule: "0 8 * * *"
concurrencyPolicy: Forbid
successfulJobsHistoryLimit: 3
failedJobsHistoryLimit: 3
jobTemplate:
spec:
backoffLimit: 2
activeDeadlineSeconds: 7200
template:
spec:
restartPolicy: Never
containers:
- name: spark-migration
image: your-registry/spark-opensearch:latest
command: ["/bin/bash", "-c"]
args:
- |
set -e
export DATE_FROM=$(date -u -d '30 days ago' +%Y-%m-%d)
export DATE_TO=$(date -u +%Y-%m-%d)
exec /opt/spark/bin/spark-submit \
--master local[*] \
--conf spark.driver.memory=6g \
--conf spark.executor.memory=6g \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
/scripts/migrate-events.py
env:
- name: PG_HOST
value: "analytics-cluster"
- name: PG_PORT
value: "5432"
- name: PG_DATABASE
value: "datawarehouse"
- name: PG_USER
valueFrom:
secretKeyRef:
name: datawarehouse.analytics-cluster.credentials.postgresql.acid.zalan.do
key: username
- name: PG_PASSWORD
valueFrom:
secretKeyRef:
name: datawarehouse.analytics-cluster.credentials.postgresql.acid.zalan.do
key: password
- name: OS_HOST
value: "opensearch"
- name: OS_PORT
value: "9200"
- name: BATCH_DAYS
value: "30"
- name: DRY_RUN
value: "false"
resources:
requests:
memory: "8Gi"
cpu: "2"
limits:
memory: "10Gi"
cpu: "4"
volumeMounts:
- name: migration-scripts
mountPath: /scripts
- name: spark-local-dir
mountPath: /tmp/spark-local-dir
volumes:
- name: migration-scripts
configMap:
name: opensearch-migration-scripts
defaultMode: 0755
- name: spark-local-dir
emptyDir:
sizeLimit: 20Gi
concurrencyPolicy: Forbid skips the new trigger if the previous job is still running, which prevents slow runs from overlapping and doubling the write load. activeDeadlineSeconds: 7200 kills any job that runs longer than two hours. restartPolicy: Never prevents container restarts that would cause partial writes to be re-attempted.
Enrichment Passes
Some indices need context that is not in the source table. OpenSearch does not support joins, so run the JOIN in Postgres and write the denormalized result instead. The JOIN executes as part of the JDBC subquery, and Spark receives already-joined rows.
query = """(
SELECT e.*, u.display_name, u.org_id, o.org_name
FROM events e
JOIN users u ON e.user_id = u.id
JOIN orgs o ON u.org_id = o.id
WHERE e.event_date >= '{date_from}' AND e.event_date < '{date_to}'
) AS batch""".format(date_from=date_from, date_to=date_to)
Re-running the enrichment job is safe. Each row gets the same _id as the base table sync, so the enriched write upserts the existing document. Each enrichment CronJob runs after its base sync on a staggered schedule.
Running It
Apply the ConfigMap before the CronJobs. The scripts must exist before the first pod starts.
kubectl apply -f 20-configmap.yml
kubectl apply -f 50-cronjob-events.yml
Trigger a manual run for testing:
kubectl create job --from=cronjob/os-sync-events os-sync-events-test-01 -n data
kubectl logs -f job/os-sync-events-test-01 -n data
Verify data landed in OpenSearch:
curl http://opensearch.opensearch.svc:9200/events/_count
Notes
opensearch.nodes.wan.only: true is the setting that trips people up in Kubernetes. The connector’s default behavior is to discover internal OpenSearch node IPs, which are unreachable from outside the OpenSearch namespace.
The _id strategy must be in place before data lands. Changing the ID scheme after the fact forces a full re-index because existing documents have IDs that no longer match any incoming write.
No date column? Partition by monotonically increasing ID with range batching: WHERE id >= {start} AND id < {start + batch_size}. The logic is the same; only the partition key changes.
Spark local mode works well here. The bottleneck is JDBC read throughput from Postgres and bulk write throughput to OpenSearch, not CPU or in-memory shuffle. A full Spark cluster would not change those numbers.
ConfigMap scripts are picked up by the next CronJob trigger without touching the image. Fix a query, adjust the _id logic, or add a new table without a build and push cycle.
This blog post, titled: "PostgreSQL to OpenSearch with PySpark on Kubernetes: Date-windowed ETL, idempotent upserts, and CronJob scheduling" by Craig Johnston, is licensed under a Creative Commons Attribution 4.0 International License.
