#!/usr/bin/env python3 """ backfill_payloads.py — Repair missing payload fields for existing Qdrant points. WHY THIS EXISTS --------------- If artworks were initially upserted without the full payload (is_public, is_nsfw, category_id, content_type_id, is_deleted, status), those fields will have near-0% coverage in the payload index. This prevents filtered searches (e.g., is_public=true) from returning correct results. This script scrolls through all points in the collection, detects which ones are missing the required fields, and lets you supply a lookup function that fetches the correct values from your source-of-truth (database, API, CSV, etc.). HOW TO ADAPT ------------ 1. Fill in `fetch_payloads_for_ids()` to return a dict mapping qdrant-point-id -> payload patch for each missing ID. The simplest approach is a SQL query to your Skinbase database using the `_original_id` stored in the Qdrant payload. 2. Run the script directly (no app container needed, just qdrant-client installed): # Inside Docker network: docker exec -it vision-qdrant-svc-1 python /app/backfill_payloads.py # Or from host with qdrant-client installed: pip install qdrant-client QDRANT_HOST=localhost QDRANT_PORT=6333 python qdrant/backfill_payloads.py 3. The script is resumable: it prints the last-processed offset ID so you can restart from where you left off by setting RESUME_OFFSET env var. REQUIRED ENV VARS (all optional, sensible defaults for Docker Compose): QDRANT_HOST default: qdrant QDRANT_PORT default: 6333 COLLECTION_NAME default: images BATCH_SIZE default: 256 DRY_RUN default: 0 (set to 1 to only report, no writes) RESUME_OFFSET default: None (UUID or int of last seen point to skip to) FIELDS CHECKED -------------- user_id, is_public, is_nsfw, category_id, content_type_id, is_deleted, status """ from __future__ import annotations import os import sys import time import logging from typing import Any, Dict, List, Optional from qdrant_client import QdrantClient from qdrant_client.models import PointStruct logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") log = logging.getLogger("backfill") # --------------------------------------------------------------------------- # Config # --------------------------------------------------------------------------- QDRANT_HOST = os.getenv("QDRANT_HOST", "qdrant") QDRANT_PORT = int(os.getenv("QDRANT_PORT", "6333")) COLLECTION_NAME = os.getenv("COLLECTION_NAME", "images") BATCH_SIZE = int(os.getenv("BATCH_SIZE", "256")) DRY_RUN = os.getenv("DRY_RUN", "0") == "1" RESUME_OFFSET: Optional[str] = os.getenv("RESUME_OFFSET") # point id to continue from # Fields that MUST be present in every point payload for filtered search to work. REQUIRED_FIELDS = [ "user_id", "is_public", "is_nsfw", "category_id", "content_type_id", "is_deleted", "status", ] # --------------------------------------------------------------------------- # TODO: implement this function to fetch correct payload values from your DB. # --------------------------------------------------------------------------- def fetch_payloads_for_ids( missing_ids: List[Any], original_ids: Dict[Any, str], ) -> Dict[Any, Dict[str, Any]]: """Return a mapping of qdrant_point_id -> payload_patch for the given IDs. Parameters ---------- missing_ids: List of Qdrant point IDs (UUID strings or ints) that need patching. original_ids: Dict mapping qdrant_point_id -> original application ID (stored in `_original_id` payload field, or the point id itself if they match). Returns ------- Dict mapping each point id to a dict of fields to set. Only include the fields you want to SET — existing fields are not cleared. Example implementation (pseudo-code for your database): import psycopg2 conn = psycopg2.connect(os.environ["DATABASE_URL"]) cur = conn.cursor() orig_id_list = list(original_ids.values()) cur.execute( "SELECT id, user_id, is_public, is_nsfw, category_id, " " content_type_id, is_deleted, status " "FROM artworks WHERE id = ANY(%s)", (orig_id_list,) ) rows = cur.fetchall() by_orig = {str(r[0]): r for r in rows} result = {} for qdrant_id, orig_id in original_ids.items(): row = by_orig.get(str(orig_id)) if row: result[qdrant_id] = { "user_id": str(row[1]), "is_public": bool(row[2]), "is_nsfw": bool(row[3]), "category_id": int(row[4]) if row[4] is not None else None, "content_type_id": int(row[5]) if row[5] is not None else None, "is_deleted": bool(row[6]), "status": str(row[7]), } return result """ # ---- STUB: replace with your real implementation ---- log.warning( "fetch_payloads_for_ids() is a stub — no data will be patched.\n" "Edit qdrant/backfill_payloads.py and implement this function." ) return {} # --------------------------------------------------------------------------- # Core backfill logic # --------------------------------------------------------------------------- def run_backfill(): log.info( "backfill start collection=%s host=%s:%s dry_run=%s batch=%d", COLLECTION_NAME, QDRANT_HOST, QDRANT_PORT, DRY_RUN, BATCH_SIZE, ) qclient = QdrantClient(host=QDRANT_HOST, port=QDRANT_PORT) # Verify collection exists collections = [c.name for c in qclient.get_collections().collections] if COLLECTION_NAME not in collections: log.error("Collection '%s' not found. Existing: %s", COLLECTION_NAME, collections) sys.exit(1) info = qclient.get_collection(COLLECTION_NAME) total_points = info.points_count or 0 log.info("collection points_count=%d indexed_vectors=%d", total_points, info.indexed_vectors_count or 0) offset = RESUME_OFFSET scanned = 0 missing_count = 0 patched = 0 errors = 0 t_start = time.perf_counter() while True: points, next_offset = qclient.scroll( collection_name=COLLECTION_NAME, offset=offset, limit=BATCH_SIZE, with_payload=True, with_vectors=False, ) if not points: break scanned += len(points) # Find points missing any required field needs_patch: List[Any] = [] original_ids: Dict[Any, str] = {} for pt in points: payload = pt.payload or {} missing = [f for f in REQUIRED_FIELDS if f not in payload or payload[f] is None] if missing: needs_patch.append(pt.id) # Use _original_id if present (IDs that couldn't be stored as Qdrant IDs) original_ids[pt.id] = str(payload.get("_original_id", pt.id)) missing_count += 1 if needs_patch: patches = fetch_payloads_for_ids(needs_patch, original_ids) for pid, patch in patches.items(): if not patch: continue if DRY_RUN: log.info("[DRY RUN] would patch id=%s fields=%s", pid, list(patch.keys())) else: try: qclient.set_payload( collection_name=COLLECTION_NAME, payload=patch, points=[pid], ) patched += 1 except Exception as exc: log.error("failed to patch id=%s: %s", pid, exc) errors += 1 elapsed = time.perf_counter() - t_start rate = scanned / elapsed if elapsed > 0 else 0 log.info( "progress scanned=%d/%d missing=%d patched=%d errors=%d rate=%.0f/s offset=%s", scanned, total_points, missing_count, patched, errors, rate, next_offset, ) if next_offset is None: break offset = next_offset elapsed = time.perf_counter() - t_start log.info( "backfill complete scanned=%d missing=%d patched=%d errors=%d elapsed=%.1fs", scanned, missing_count, patched, errors, elapsed, ) if missing_count > 0 and patched == 0 and not DRY_RUN: log.warning( "%d points are missing payload fields but 0 were patched. " "Implement fetch_payloads_for_ids() in this script.", missing_count, ) if __name__ == "__main__": run_backfill()