Generating an embedding is slow. On a CPU-only Ollama running nomic-embed-text, a single short text takes one to three seconds, and a batch of thirty-two runs somewhere between thirty and ninety seconds. That number decides the entire architecture. You cannot embed a record on the write path, because no one will wait three seconds to save a note. You cannot embed on the read path either, because the vector has to exist before the search runs. The work has to happen somewhere in between: asynchronously, durably, and without a human watching. That is a job queue, and the interesting part is that I built it on the Postgres I already had, with no message broker.
§TL;DR
Embeddings are too slow to compute inline, so they run as background jobs. Instead of adding Redis, RabbitMQ, or Celery, the platform uses one Postgres table as a durable queue:
LISTEN/NOTIFYfor low-latency wake-ups,SELECT ... FOR UPDATE SKIP LOCKEDfor lock-free claiming across pods, a partial unique index for idempotent enqueue, lease-based ownership with a heartbeat and a reaper for crash recovery, and a reconciler that re-detects gaps every five minutes. A SHA-256 text hash plus a stamped model name make re-embedding cheap and model swaps automatic. The embedding provider is Ollama, called in batches with a fallback for older servers.
Vector Search Without the Vector Database | Part 2 of 3. Part 1 covered why a data MCP needs semantic recall and how pgvector stores and searches the vectors. This part builds the pipeline that fills those vectors in. Part 3 deploys the whole thing on Kubernetes with a CPU-only embedder.
§Why Not a Real Queue
The instinct when you need a job queue is to reach for a broker. Redis with a worker library, RabbitMQ, a managed SQS, Celery if you are in Python. They are good tools. They were also the wrong tool here, and the reasoning generalizes to a lot of systems that are smaller than they think they are.
The platform, txn2/mcp-data-platform (the open-source MCP server behind Plexara), already requires Postgres. It holds the memory records, the audit log, the configuration, the catalog. Adding a broker means adding a second stateful system to deploy, monitor, back up, secure, and reason about during an incident. The question is not “is Redis good at queues,” it is “does this workload justify a second datastore.” For a queue that processes hundreds to low thousands of jobs a day, the answer is no.
Postgres can be the queue, and it brings three properties a broker makes you work for. Jobs are durable by default, because they are rows: a crash leaves them sitting in the table in whatever state they were in, not lost in a process’s memory. Enqueue can be made idempotent with a unique index, so a write that fires twice does not create two jobs. And the whole thing is transactional with the data it serves, so a job and the row it indexes can be reasoned about together. The cost is that Postgres will not scale a queue to millions of messages per minute. That ceiling is nowhere near a semantic-indexing workload.
§One Table, A State Machine
The queue is a single table. Every job is a row that moves through a small state machine: pending to running to either succeeded or failed, with a path back to pending for retries and crash recovery.
CREATE TABLE IF NOT EXISTS index_jobs (
id BIGSERIAL PRIMARY KEY,
source_kind TEXT NOT NULL, -- the corpus: memory, api_catalog, tools...
source_id TEXT NOT NULL, -- opaque id within the corpus
trigger_kind TEXT NOT NULL, -- write | reconciler | manual_retry
status TEXT NOT NULL DEFAULT 'pending',
attempts INTEGER NOT NULL DEFAULT 0,
last_error TEXT NOT NULL DEFAULT '',
next_run_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
worker_id TEXT NOT NULL DEFAULT '',
lease_expires_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
items_done INTEGER NOT NULL DEFAULT 0
);
The table is deliberately generic. A job does not say “embed memory record X,” it says “index the unit identified by (source_kind, source_id).” That keeps the queue agnostic: the memory store, the API catalog, the tool list, and the prompt library all enqueue into the same table and share the same worker pool. The framework never parses source_id; each consumer interprets its own.
The indexes are where the design lives. Three of them are partial, each backing one access pattern:
-- At most one open job per unit. Producers ON CONFLICT DO NOTHING here.
CREATE UNIQUE INDEX index_jobs_open
ON index_jobs (source_kind, source_id)
WHERE status IN ('pending', 'running');
-- The worker's claim query: oldest runnable pending job.
CREATE INDEX index_jobs_ready
ON index_jobs (next_run_at)
WHERE status = 'pending';
-- The reaper's sweep: running jobs whose lease has elapsed.
CREATE INDEX index_jobs_lease
ON index_jobs (lease_expires_at)
WHERE status = 'running';
That first index, index_jobs_open, is the one that earns its keep. It enforces “at most one pending or running job per unit.” It is what makes enqueue idempotent, and idempotent enqueue is what lets every producer in the system fire and forget.
§Enqueue, And Then Notify
A producer enqueues by inserting a row. The partial unique index means a duplicate insert collapses to nothing:
func (s *PostgresStore) Enqueue(ctx context.Context, key Key, trigger Trigger) (bool, error) {
const q = `
INSERT INTO index_jobs
(source_kind, source_id, trigger_kind)
VALUES ($1, $2, $3)
ON CONFLICT (source_kind, source_id)
WHERE status IN ('pending', 'running')
DO NOTHING
RETURNING id
`
var id int64
err := s.db.QueryRowContext(ctx, q, key.SourceKind, key.SourceID, string(trigger)).Scan(&id)
switch {
case errors.Is(err, sql.ErrNoRows):
// An open job for this key already exists. That is the desired
// idempotent behavior, not an error.
return false, nil
case err != nil:
return false, fmt.Errorf("indexjobs: enqueue: %w", err)
}
s.notify(ctx)
return true, nil
}
If a record is saved, edited, and saved again in quick succession while its first job is still pending, the second and third enqueues are no-ops. There is one job, and when it runs it loads the current state of the record. No thundering herd of redundant work.
After a successful insert comes the notify:
func (s *PostgresStore) notify(ctx context.Context) {
if _, err := s.db.ExecContext(ctx, `SELECT pg_notify($1, '')`, NotifyChannel); err != nil {
_ = err // non-fatal; the poll tick is the backstop
}
}
pg_notify publishes on a channel. Any connection that has issued LISTEN on that channel gets woken. The payload is empty on purpose: the worker does not trust the notification to carry the work, it just treats it as a nudge to go look at the table. That distinction is the key to making LISTEN/NOTIFY safe to rely on. Within a connected session Postgres delivers notifications reliably, but a listener that drops its connection and reconnects can miss everything sent during the gap. The system has to survive that, so the table, not the notification, is always the source of truth.
§The Listener Wakes The Worker
A separate goroutine holds the LISTEN connection using lib/pq’s built-in listener, and forwards every notification to the worker:
func (l *Listener) run() {
defer l.wg.Done()
ch := l.listener.NotificationChannel()
for {
select {
case <-l.stopCh:
return
case n := <-ch:
// pq.Listener emits nil on a reconnect to signal "you may
// have missed events." Either way, wake every notifier so
// they re-query the table.
_ = n
l.broadcast()
}
}
}
Notice the comment. When the listener’s connection drops and reconnects, pq delivers a nil notification to say “you were disconnected, you might have missed something.” The code treats that identically to a real notification: wake up and re-query. This is the design pattern that makes notification-based queues correct. The notification is never the source of truth. It is an optimization that lowers latency from “next poll” to “right now.” The source of truth is always the table, and the worker always re-reads it.
The worker’s side of the nudge is a buffered channel that coalesces a flurry of notifications into a single wake:
func (w *Worker) Notify() {
select {
case w.wakeup <- struct{}{}:
default: // already a wake pending; coalesce
}
}
And the worker loop sleeps on either the nudge or a fallback poll ticker, then drains the queue until it is empty:
func (w *Worker) run() {
defer w.wg.Done()
ticker := time.NewTicker(w.cfg.PollEvery) // default 30s
defer ticker.Stop()
for {
w.drainQueue()
select {
case <-w.stopCh:
return
case <-w.wakeup:
case <-ticker.C:
}
}
}
If every notification in the world were lost, this worker would still process every job, just up to thirty seconds later. LISTEN/NOTIFY turns “within thirty seconds” into “within milliseconds,” and costs nothing when it fails. That is the right way to use it.
§Claiming Without Locking Anyone Out
When multiple workers, possibly across multiple pods, all watch the same table, they will all wake on the same notification and race to grab the same job. The mechanism that makes that race safe is one clause: FOR UPDATE SKIP LOCKED.
func (s *PostgresStore) Claim(ctx context.Context, workerID string) (*Job, error) {
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return nil, fmt.Errorf("indexjobs: claim begin: %w", err)
}
defer func() { _ = tx.Rollback() }()
const sel = `
SELECT id
FROM index_jobs
WHERE status = 'pending' AND next_run_at <= NOW()
ORDER BY next_run_at, id
LIMIT 1
FOR UPDATE SKIP LOCKED
`
var id int64
if err := tx.QueryRowContext(ctx, sel).Scan(&id); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, ErrNoJob
}
return nil, fmt.Errorf("indexjobs: claim select: %w", err)
}
const upd = `
UPDATE index_jobs
SET status = 'running',
worker_id = $2,
attempts = attempts + 1,
started_at = NOW(),
lease_expires_at = NOW() + ($3 || ' seconds')::INTERVAL,
items_done = 0
WHERE id = $1
RETURNING ` + jobColumns
job, err := scanJob(tx.QueryRowContext(ctx, upd, id, workerID, leaseSeconds(s.leaseDuration)))
if err != nil {
return nil, fmt.Errorf("indexjobs: claim update: %w", err)
}
if err := tx.Commit(); err != nil {
return nil, fmt.Errorf("indexjobs: claim commit: %w", err)
}
return job, nil
}
Without SKIP LOCKED, every worker that runs the SELECT ... FOR UPDATE would queue up behind the first one’s row lock, then wake one at a time to find the row already taken. With SKIP LOCKED, a row another transaction has locked is invisible: each worker’s query skips past locked rows and grabs the next free one. Ten workers wake on the same notification and walk away with ten different jobs, no coordination service and no distributed lock. It is not completely free: each claim still scans from the head of the queue and skips past rows other workers hold, so the cost grows with how many jobs are in flight at once. At a semantic-indexing workload that cost is noise, and Postgres arbitrates by doing what it already does well.
The claim also stamps a lease: lease_expires_at = NOW() + interval. That timestamp is the entire crash-recovery story.
§Leases, Heartbeats, And The Reaper
The first time I watched this fail, a worker pod hit its memory limit partway through a large batch and got OOMKilled. The job sat in running with that dead pod’s worker id stamped on it, and nothing else would touch it, because the claim query only looks at pending. A crash, an eviction, a node going away: any of them leaves a row stranded the same way. The lease solves this. A claim is not ownership forever, it is ownership for ten minutes. If the worker does not renew, the lease expires and the job becomes claimable again.
But ten minutes is shorter than some embed passes. A large unit on a slow CPU embedder can run longer than a single lease. So a healthy worker renews its lease on a timer while it works, at one-third of the lease window so there are two renewal chances before expiry:
func (w *Worker) heartbeat(ctx context.Context, job *Job) {
interval := w.cfg.LeaseDuration / heartbeatDivisor // lease / 3
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
err := w.cfg.Store.RenewLease(ctx, job.ID, w.cfg.WorkerID, w.cfg.LeaseDuration)
if errors.Is(err, ErrNotFound) {
// The lease rotated to another worker. Stop renewing.
return
}
// Other errors are logged but do not stop the heartbeat: a
// transient DB blip should not let the reaper kill us mid-batch.
}
}
}
The lease distinguishes “this pod went silent” from “this embed batch is slow.” A slow batch keeps the heartbeat ticking and the lease alive. A dead pod stops the heartbeat, the lease lapses, and the job is recoverable. That is exactly the line you want, and it is the line that OOMKill taught me to draw.
The recovery itself is the reaper, a small loop that sweeps expired leases back to pending every thirty seconds:
func (s *PostgresStore) ReleaseExpiredLeases(ctx context.Context) (int, error) {
const q = `
UPDATE index_jobs
SET status = 'pending', worker_id = '', lease_expires_at = NULL
WHERE status = 'running'
AND lease_expires_at IS NOT NULL
AND lease_expires_at <= NOW()
`
res, err := s.db.ExecContext(ctx, q)
// ... return rows affected ...
}
The reaper does not increment attempts. The next claim does that. The invariant the whole system keeps is “attempts counts how many times a worker started this job,” which is what the retry ceiling needs to mean something. A pod crash that the reaper recovers is not the job’s fault and should not burn a retry.
§Retries With Backoff, Then Surrender
Some failures are worth retrying: the embedder timed out, the database hiccuped, the network blipped. Some are not: the source row was deleted, no consumer is registered for the kind. The worker routes between them. Retryable failures get exponential backoff up to a ceiling; past the ceiling, the job moves to terminal failed.
func (w *Worker) retryOrFail(ctx context.Context, job *Job, errMsg string) {
if job.Attempts >= MaxAttempts { // MaxAttempts = 5
w.terminate(ctx, job, errMsg)
return
}
if err := w.cfg.Store.Retry(ctx, job.ID, w.cfg.WorkerID, errMsg); err != nil {
slog.Error("indexjobs: retry release failed", logKeyJobID, job.ID, logKeyError, err)
}
}
Backoff is a pure function, which means it is unit-testable without a database. With a five-second base and five attempts, the schedule is five, ten, twenty, forty, eighty seconds: just over two and a half minutes of grace before a job gives up.
func computeBackoffSeconds(attempts int) int {
if attempts > maxBackoffShift {
attempts = maxBackoffShift
}
if attempts < 0 {
attempts = 0
}
return int(retryBackoffBase/time.Second) * (1 << attempts)
}
A terminal failed ends that job row, not the unit. The partial unique index only suppresses pending and running rows, not failed ones, which means the reconciler is free to enqueue a brand new job for the same unit later. A transient failure that exhausts its retries still self-heals on the next reconcile sweep. A permanent failure re-fails each cycle until someone fixes the cause, which is the correct behavior: it stays visible.
§The Reconciler Catches What the Producer Misses
The producer path, write a record and enqueue a job, is the fast path. It is also not trustworthy on its own. Records get written before the embedder is configured. Jobs fail terminally and nobody notices. A backup restore brings rows back without their vectors. Someone prunes the vector table while debugging. In every one of those cases, the producer never fired or its job is long gone, and a record sits there permanently unsearchable.
The reconciler is the backstop. Every five minutes, and once immediately on boot, it asks each consumer “which of your units are missing vectors?” and enqueues a job for each gap:
func (r *Reconciler) reconcileOnce() {
ctx, cancel := context.WithTimeout(context.Background(), r.interval/2)
defer cancel()
var total int
for _, sink := range r.registry.Sinks() {
ids, err := sink.FindGaps(ctx)
if err != nil {
slog.Warn("indexjobs: reconciler FindGaps failed",
logKeySourceKind, sink.Kind(), logKeyError, err)
continue
}
for _, id := range ids {
created, err := r.store.Enqueue(ctx, Key{SourceKind: sink.Kind(), SourceID: id}, TriggerReconciler)
if err != nil {
continue
}
if created {
total++
}
}
}
if total > 0 {
slog.Info("indexjobs: reconciler enqueued gap jobs", "count", total)
}
}
This is where the idempotent enqueue pays off completely. The reconciler can run on every pod, in lockstep, enqueuing the same gaps, and the partial unique index collapses all the duplicates to single jobs. There is no leader election, no “only one reconciler” rule to enforce. They all run, they all converge on the same correct set of jobs, and the database deduplicates. The system tends toward fully indexed without anyone clicking anything.
§Cheap Re-Embedding And Automatic Model Swaps
The slowest possible thing the pipeline could do is re-embed text that has not changed. The defense is a content hash. Each stored vector carries the SHA-256 of the exact text it was generated from, plus the model name that generated it. Before embedding, the worker plans which items actually need work:
func planVectors(items []Item, existing map[string]Vector, model string, dim int) (rows []Vector, toEmbedIdx []int, toEmbedTexts []string) {
rows = make([]Vector, len(items))
for i, item := range items {
row := Vector{
ItemID: item.ItemID,
TextHash: TextHash(item.Text),
Model: model,
Dim: dim,
}
if prev, ok := existing[item.ItemID]; ok &&
bytes.Equal(prev.TextHash, row.TextHash) &&
len(prev.Embedding) == dim &&
prev.Model == model {
row.Embedding = prev.Embedding // reuse; no provider call
} else {
toEmbedIdx = append(toEmbedIdx, i)
toEmbedTexts = append(toEmbedTexts, item.Text)
}
rows[i] = row
}
return rows, toEmbedIdx, toEmbedTexts
}
The reuse condition checks three things: the text hash matches, the dimension matches, and the model name matches. If all three hold, the existing vector is reused and no model call happens. Re-running a job on an unchanged corpus is nearly free.
The third condition is the one that delivers a feature for free. Swap the embedding model, say from nomic-embed-text to something else, and suddenly every stored vector’s Model no longer equals the current model. The hash still matches, the text did not change, but the model did, so every item falls into the re-embed set. Combine that with the reconciler, which detects the now-stale vectors as gaps, and a model swap becomes: change one config value, restart, and walk away. The whole corpus re-embeds itself in the background over the next several reconcile cycles. The model name stamped on each row is the entire mechanism.
One caveat, and it is a real one: this is config-only when the new model produces vectors of the same width. The embedding column is declared vector(768) and the HNSW index is built for 768 dimensions, so a model with the same dimensionality drops in with no schema change. Swap to a model that outputs a different number of dimensions and you have an actual migration on your hands: alter the column width and rebuild the index, then let the reconciler re-embed. The free part is the re-embedding, not the reshaping.
§Talking To Ollama In Batches
The embedding provider itself is a thin HTTP client. The detail worth showing is the batch path, because it cuts the number of HTTP round trips, and because not every Ollama server supports it. What batching buys on CPU is narrow: the model still processes the texts roughly one at a time, so the win is fewer requests and less per-call overhead, not parallel inference. On a GPU the same batch runs in parallel. On CPU it is mostly a round-trip optimization.
Modern Ollama has a batch endpoint, /api/embed, that takes an array of texts and returns an array of vectors in one round trip. Older servers only have the singular /api/embeddings, which takes one prompt and returns one vector. The two even shape their JSON differently: the batch endpoint returns embeddings (plural, an array), the singular one returns embedding (singular), so the fallback is a different decode, not just a different URL. The client tries the batch endpoint and, when the server signals it does not have it, permanently falls back to looping the singular endpoint:
func (o *ollamaProvider) EmbedBatch(ctx context.Context, texts []string) ([][]float32, error) {
if len(texts) == 0 {
return nil, nil
}
if o.batchUnsupported.Load() {
return o.embedBatchSequential(ctx, texts)
}
results, fallback, err := o.embedBatchOnce(ctx, texts)
if fallback { // got a 404 from /api/embed; remember it and loop singular
return o.embedBatchSequential(ctx, texts)
}
if err != nil {
return nil, err
}
return results, nil
}
The batchUnsupported flag is an atomic boolean so the worker can call EmbedBatch from concurrent goroutines without a mutex. The first call against an old server eats a 404, sets the flag, and every call after that skips straight to the sequential path. The worst case is a handful of redundant 404s before the flag settles, which is fine.
Batch size defaults to thirty-two. That number is a deliberate trade against CPU latency. A bigger batch amortizes per-call overhead better, but if a batch times out you lose the whole batch’s work and retry it. Thirty-two keeps the lost-work window small on a slow embedder while still cutting the number of round trips by a factor of thirty-two. Each chunk is persisted as soon as it returns, so a job that dies on chunk five keeps chunks one through four for the next attempt’s dedup pass. Progress is never thrown away.
§What Holds It Together
There is no broker in this picture. The durable queue is a Postgres table. Low-latency wake-up is LISTEN/NOTIFY, with a poll as the safety net so a lost notification costs latency, not correctness. Concurrent claiming across every pod is FOR UPDATE SKIP LOCKED, with no lock manager and no leader. Crash recovery is a lease, a heartbeat, and a reaper. Self-healing is a reconciler that trusts the table over the producer and an idempotent enqueue that lets every replica run it at once. Cheap iteration and free model swaps come from a content hash and a stamped model name.
Every one of those is a Postgres feature or a few lines of Go. The same database that stores the vectors runs the pipeline that fills them. The only thing outside Postgres is the embedding model, and Part 1 already showed search holding up when even that is gone.
What is left is making it real on a cluster: running a CPU-only Ollama under Kubernetes without a GPU, sizing pgvector, tuning the worker’s timeouts and lease and batch size for slow inference, and operating the failure-triage surface when a job does go terminal. That is Part 3.
§References
- txn2/mcp-data-platform : the open-source MCP server this series is built from
- Plexara : the commercial data platform built on it
- PostgreSQL LISTEN and NOTIFY : the publish-subscribe primitives
- SELECT FOR UPDATE SKIP LOCKED : lock-free row claiming
- Ollama embedding API : the embedding provider used here
Vector Search Without the Vector Database | Part 2 of 3. Previous: Why Data MCPs Need Vector Search. Next: Running It on Kubernetes deploys Ollama on CPU, pgvector, and the worker pool, and covers tuning and operations.