PostgreSQL to OpenSearch with PySpark on Kubernetes

Date-windowed ETL, idempotent upserts, and CronJob scheduling

Posted by Craig Johnston on Wednesday, February 25, 2026

Moving analytics data from a Zalando-managed PostgreSQL cluster into OpenSearch for full-text search and dashboarding looks simple on paper. In practice, it requires date-windowed reads over JDBC, deterministic document IDs, CronJob scheduling, and Kubernetes secret injection. This article documents the full pattern.

This is the wiring layer, not the analysis layer. The point is a reliable, idempotent sync from Postgres into OpenSearch so dashboards and search queries run against indexed data without touching the transactional database directly.

The Stack

Four components: Zalando Postgres Operator managing the source cluster, OpenSearch as the target, PySpark running in local mode inside a Kubernetes CronJob, and the OpenSearch Spark connector handling bulk writes. Spark local mode means no cluster to manage. One container runs spark-submit and exits when done. The OpenSearch Spark connector beats raw JDBC writes because it handles bulk indexing natively and is actively maintained. Each sync target gets its own CronJob, and all scripts live in a ConfigMap mounted as a volume.

Custom Docker Image

The base image is apache/spark:3.5.8-python3. Three JARs go into /opt/spark/jars/ at build time: the PostgreSQL JDBC driver, the OpenSearch Spark connector, and httpclient. That last one is a required transitive dependency of the OpenSearch connector. Leave it out and you get a ClassNotFoundException at runtime that costs an hour to diagnose.

FROM apache/spark:3.5.8-python3
USER root
RUN curl -L -o /opt/spark/jars/postgresql-42.7.4.jar \
    https://jdbc.postgresql.org/download/postgresql-42.7.4.jar && \
    curl -L -o /opt/spark/jars/opensearch-spark-30_2.12-1.3.0.jar \
    https://repo1.maven.org/maven2/org/opensearch/client/opensearch-spark-30_2.12/1.3.0/opensearch-spark-30_2.12-1.3.0.jar && \
    curl -L -o /opt/spark/jars/httpclient-4.5.14.jar \
    https://repo1.maven.org/maven2/org/apache/httpcomponents/httpclient/4.5.14/httpclient-4.5.14.jar
RUN mkdir -p /home/spark/.ivy2/cache /home/spark/.ivy2/jars && \
    chown -R spark:spark /home/spark/.ivy2 && \
    chmod -R 755 /home/spark/.ivy2
WORKDIR /opt/spark
USER spark

Pin these versions. The OpenSearch connector artifact name encodes both the Spark major version (30 = Spark 3.x) and the Scala version (2.12). Mixing versions produces silent failures or missing class errors at startup.

Script Setup

Each migration script is self-contained. All configuration comes from environment variables with safe defaults, and the SparkSession is built once at the top.

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import os
from datetime import date, timedelta

PG_HOST     = os.environ.get("PG_HOST", "analytics-cluster")
PG_PORT     = os.environ.get("PG_PORT", "5432")
PG_DATABASE = os.environ.get("PG_DATABASE", "datawarehouse")
PG_USER     = os.environ.get("PG_USER", "")
PG_PASSWORD = os.environ.get("PG_PASSWORD", "")

OS_HOST = os.environ.get("OS_HOST", "opensearch")
OS_PORT = os.environ.get("OS_PORT", "9200")

DATE_FROM  = os.environ.get("DATE_FROM", "2020-01-01")
DATE_TO    = os.environ.get("DATE_TO", (date.today() + timedelta(days=1)).isoformat())
BATCH_DAYS = int(os.environ.get("BATCH_DAYS", "30"))
DRY_RUN    = os.environ.get("DRY_RUN", "false").lower() == "true"

PG_URL   = f"jdbc:postgresql://{PG_HOST}:{PG_PORT}/{PG_DATABASE}"
OS_NODES = f"{OS_HOST}:{OS_PORT}"

spark = (
    SparkSession.builder
    .appName("postgres-to-opensearch")
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .config("spark.sql.adaptive.enabled", "true")
    .config("spark.sql.shuffle.partitions", "100")
    .getOrCreate()
)


def read_batch(table, date_col, date_from, date_to):
    query = (
        f"(SELECT * FROM {table} "
        f"WHERE {date_col} >= '{date_from}' AND {date_col} < '{date_to}') AS batch"
    )
    return (
        spark.read
        .format("jdbc")
        .option("url", PG_URL)
        .option("dbtable", query)
        .option("user", PG_USER)
        .option("password", PG_PASSWORD)
        .option("fetchsize", "10000")
        .option("driver", "org.postgresql.Driver")
        .load()
    )

Zalando Secret Injection

The Zalando Postgres Operator creates one Secret per database user per cluster. The naming convention is {dbname}.{cluster-name}.credentials.postgresql.acid.zalan.do, with two keys: username and password. Inject them directly into the container environment using secretKeyRef.

- name: PG_USER
  valueFrom:
    secretKeyRef:
      name: datawarehouse.analytics-cluster.credentials.postgresql.acid.zalan.do
      key: username
- name: PG_PASSWORD
  valueFrom:
    secretKeyRef:
      name: datawarehouse.analytics-cluster.credentials.postgresql.acid.zalan.do
      key: password

No credential management code in the scripts. If the Zalando operator rotates credentials, the CronJob picks up the new values automatically because Kubernetes re-reads the Secret at pod start.

Date-Windowed Reads

Full table scans on large Postgres tables cause lock contention and memory pressure in the JDBC driver. Push a WHERE clause into Postgres via a JDBC subquery instead. Spark receives only the filtered rows.

def read_batch(table, date_col, date_from, date_to):
    # The subquery alias (AS batch) is required by Spark JDBC.
    # Without it, the query fails with a syntax error.
    # The WHERE clause runs in Postgres - Spark receives only filtered rows.
    query = (
        f"(SELECT * FROM {table} "
        f"WHERE {date_col} >= '{date_from}' AND {date_col} < '{date_to}') AS batch"
    )
    return (
        spark.read
        .format("jdbc")
        .option("url", PG_URL)
        .option("dbtable", query)
        .option("user", PG_USER)
        .option("password", PG_PASSWORD)
        .option("fetchsize", "10000")
        .option("driver", "org.postgresql.Driver")
        .load()
    )

BATCH_DAYS defaults to 30. Set it higher (up to 90) for tables with low row density. For initial backfill, set DATE_FROM to the earliest row in the table. For nightly sync, a shell wrapper in the CronJob command computes the rolling window at runtime.

Writing to OpenSearch

def write_to_opensearch(df, index_name):
    (
        df.write
        .format("opensearch")
        .option("opensearch.nodes", OS_NODES)
        .option("opensearch.resource", index_name)
        .option("opensearch.mapping.id", "_id")
        .option("opensearch.mapping.exclude", "_id")
        .option("opensearch.nodes.wan.only", "true")
        .option("opensearch.batch.size.entries", "1000")
        .option("opensearch.batch.size.bytes", "5mb")
        .option("opensearch.batch.write.retry.count", "3")
        .option("opensearch.batch.write.retry.wait", "30s")
        .option("opensearch.net.ssl", "false")
        .mode("append")
        .save()
    )

mode("append") combined with opensearch.mapping.id produces upsert behavior: if a document with the same _id already exists in the index, it gets overwritten. opensearch.mapping.exclude keeps the _id field out of the stored document body.

opensearch.nodes.wan.only: true is the setting that trips up Kubernetes deployments. Without it, the connector queries OpenSearch for its internal node IPs and tries to write directly to those addresses, which are unreachable from the Spark pod. Setting wan.only forces the connector to use the configured hostname and nothing else.

Deterministic Document IDs

Idempotency requires generating the same _id for the same source row on every run. Two patterns cover most cases.

# Short composite keys: concatenate with | separator
df = df.withColumn(
    "_id",
    F.concat_ws("|", F.col("event_date"), F.col("source"), F.col("seq"))
)

# Long keys (URLs, free text): MD5 hash of the concatenated key
df = df.withColumn(
    "_id",
    F.md5(F.concat_ws("|", F.col("user_id"), F.col("url"), F.col("timestamp")))
)

The | separator prevents false collisions: without it, a + bc and ab + c produce the same string. MD5 here is a fixed-length fingerprint, not a security measure.

ConfigMap as Script Store

All migration scripts live as keys in a single ConfigMap, mounted at /scripts/ in the container. When a script changes, update the ConfigMap and the next CronJob trigger picks it up with no image rebuild.

apiVersion: v1
kind: ConfigMap
metadata:
  name: opensearch-migration-scripts
  namespace: data
data:
  migrate-events.py: |
    #!/usr/bin/env python3
    # script content here
  migrate-contacts.py: |
    #!/usr/bin/env python3
    # script content here

The relevant volume configuration in the CronJob pod spec:

volumeMounts:
  - name: migration-scripts
    mountPath: /scripts
  - name: spark-local-dir
    mountPath: /tmp/spark-local-dir
volumes:
  - name: migration-scripts
    configMap:
      name: opensearch-migration-scripts
      defaultMode: 0755
  - name: spark-local-dir
    emptyDir:
      sizeLimit: 20Gi

The emptyDir volume gives Spark somewhere to spill data if the job exceeds driver memory. The sizeLimit keeps a runaway job from filling the node disk.

CronJob Scheduling

One CronJob per sync target. Stagger schedules by at least 30 minutes to avoid simultaneous JDBC connections saturating the Postgres connection pool.

One thing that catches people: environment variable values in a CronJob spec are not processed through a shell. The expression $(date -u -d '30 days ago' +%Y-%m-%d) in an env.value field does not expand. Use a shell wrapper in the container command instead.

command: ["/bin/bash", "-c"]
args:
  - |
    export DATE_FROM=$(date -u -d '30 days ago' +%Y-%m-%d)
    export DATE_TO=$(date -u +%Y-%m-%d)
    /opt/spark/bin/spark-submit --master "local[*]" --driver-memory 4g /scripts/migrate-events.py

Full CronJob manifest:

apiVersion: batch/v1
kind: CronJob
metadata:
  name: os-sync-events
  namespace: data
  labels:
    app: opensearch-migration
    table: events
spec:
  schedule: "0 8 * * *"
  concurrencyPolicy: Forbid
  successfulJobsHistoryLimit: 3
  failedJobsHistoryLimit: 3
  jobTemplate:
    spec:
      backoffLimit: 2
      activeDeadlineSeconds: 7200
      template:
        spec:
          restartPolicy: Never
          containers:
            - name: spark-migration
              image: your-registry/spark-opensearch:latest
              command: ["/bin/bash", "-c"]
              args:
                - |
                  set -e
                  export DATE_FROM=$(date -u -d '30 days ago' +%Y-%m-%d)
                  export DATE_TO=$(date -u +%Y-%m-%d)
                  exec /opt/spark/bin/spark-submit \
                    --master local[*] \
                    --conf spark.driver.memory=6g \
                    --conf spark.executor.memory=6g \
                    --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
                    /scripts/migrate-events.py
              env:
                - name: PG_HOST
                  value: "analytics-cluster"
                - name: PG_PORT
                  value: "5432"
                - name: PG_DATABASE
                  value: "datawarehouse"
                - name: PG_USER
                  valueFrom:
                    secretKeyRef:
                      name: datawarehouse.analytics-cluster.credentials.postgresql.acid.zalan.do
                      key: username
                - name: PG_PASSWORD
                  valueFrom:
                    secretKeyRef:
                      name: datawarehouse.analytics-cluster.credentials.postgresql.acid.zalan.do
                      key: password
                - name: OS_HOST
                  value: "opensearch"
                - name: OS_PORT
                  value: "9200"
                - name: BATCH_DAYS
                  value: "30"
                - name: DRY_RUN
                  value: "false"
              resources:
                requests:
                  memory: "8Gi"
                  cpu: "2"
                limits:
                  memory: "10Gi"
                  cpu: "4"
              volumeMounts:
                - name: migration-scripts
                  mountPath: /scripts
                - name: spark-local-dir
                  mountPath: /tmp/spark-local-dir
          volumes:
            - name: migration-scripts
              configMap:
                name: opensearch-migration-scripts
                defaultMode: 0755
            - name: spark-local-dir
              emptyDir:
                sizeLimit: 20Gi

concurrencyPolicy: Forbid skips the new trigger if the previous job is still running, which prevents slow runs from overlapping and doubling the write load. activeDeadlineSeconds: 7200 kills any job that runs longer than two hours. restartPolicy: Never prevents container restarts that would cause partial writes to be re-attempted.

Enrichment Passes

Some indices need context that is not in the source table. OpenSearch does not support joins, so run the JOIN in Postgres and write the denormalized result instead. The JOIN executes as part of the JDBC subquery, and Spark receives already-joined rows.

query = """(
    SELECT e.*, u.display_name, u.org_id, o.org_name
    FROM events e
    JOIN users u ON e.user_id = u.id
    JOIN orgs o ON u.org_id = o.id
    WHERE e.event_date >= '{date_from}' AND e.event_date < '{date_to}'
) AS batch""".format(date_from=date_from, date_to=date_to)

Re-running the enrichment job is safe. Each row gets the same _id as the base table sync, so the enriched write upserts the existing document. Each enrichment CronJob runs after its base sync on a staggered schedule.

Running It

Apply the ConfigMap before the CronJobs. The scripts must exist before the first pod starts.

kubectl apply -f 20-configmap.yml
kubectl apply -f 50-cronjob-events.yml

Trigger a manual run for testing:

kubectl create job --from=cronjob/os-sync-events os-sync-events-test-01 -n data
kubectl logs -f job/os-sync-events-test-01 -n data

Verify data landed in OpenSearch:

curl http://opensearch.opensearch.svc:9200/events/_count

Notes

opensearch.nodes.wan.only: true is the setting that trips people up in Kubernetes. The connector’s default behavior is to discover internal OpenSearch node IPs, which are unreachable from outside the OpenSearch namespace.

The _id strategy must be in place before data lands. Changing the ID scheme after the fact forces a full re-index because existing documents have IDs that no longer match any incoming write.

No date column? Partition by monotonically increasing ID with range batching: WHERE id >= {start} AND id < {start + batch_size}. The logic is the same; only the partition key changes.

Spark local mode works well here. The bottleneck is JDBC read throughput from Postgres and bulk write throughput to OpenSearch, not CPU or in-memory shuffle. A full Spark cluster would not change those numbers.

ConfigMap scripts are picked up by the next CronJob trigger without touching the image. Fix a query, adjust the _id logic, or add a new table without a build and push cycle.

Note: This blog is a collection of personal notes. Making them public encourages me to think beyond the limited scope of the current problem I'm trying to solve or concept I'm implementing, and hopefully provides something useful to my team and others.

This blog post, titled: "PostgreSQL to OpenSearch with PySpark on Kubernetes: Date-windowed ETL, idempotent upserts, and CronJob scheduling" by Craig Johnston, is licensed under a Creative Commons Attribution 4.0 International License. Creative Commons License