from __future__ import annotations import argparse import os from typing import Tuple import numpy as np import torch from PIL import Image from common.image_io import fetch_url_bytes, bytes_to_pil, ImageLoadError try: import open_clip except Exception: open_clip = None try: from transformers import CLIPProcessor, CLIPModel except Exception: CLIPModel = None CLIPProcessor = None def load_openclip(model_name: str = "ViT-B-32", pretrained: str = "openai") -> Tuple: device = "cuda" if torch.cuda.is_available() else "cpu" if open_clip is None: raise RuntimeError("open_clip is not installed") model, _, preprocess = open_clip.create_model_and_transforms(model_name, pretrained=pretrained) model = model.to(device).eval() return model, preprocess, device def embed_openclip(model, preprocess, device, pil_image: Image.Image) -> np.ndarray: image_input = preprocess(pil_image).unsqueeze(0).to(device) with torch.no_grad(): image_features = model.encode_image(image_input) image_features = image_features / image_features.norm(dim=-1, keepdim=True) return image_features.cpu().numpy()[0] def load_hf_clip(model_name: str = "openai/clip-vit-base-patch32") -> Tuple: device = "cuda" if torch.cuda.is_available() else "cpu" if CLIPModel is None or CLIPProcessor is None: raise RuntimeError("transformers (CLIP) is not installed") model = CLIPModel.from_pretrained(model_name).to(device).eval() processor = CLIPProcessor.from_pretrained(model_name) return model, processor, device def embed_hf_clip(model, processor, device, pil_image: Image.Image) -> np.ndarray: inputs = processor(images=pil_image, return_tensors="pt") inputs = {k: v.to(device) for k, v in inputs.items()} with torch.no_grad(): feats = model.get_image_features(**inputs) feats = feats / feats.norm(dim=-1, keepdim=True) return feats.cpu().numpy()[0] def load_image(path_or_url: str) -> Image.Image: if path_or_url.startswith("http://") or path_or_url.startswith("https://"): data = fetch_url_bytes(path_or_url) return bytes_to_pil(data) else: return Image.open(path_or_url).convert("RGB") def main() -> None: parser = argparse.ArgumentParser(description="Vectorize an image using CLIP (open_clip or HuggingFace)") parser.add_argument("input", help="Path to image file or URL") parser.add_argument("--backend", choices=("openclip", "hf"), default="openclip") parser.add_argument("--model", default=None, help="Model name (backend-specific)") parser.add_argument("--pretrained", default="openai", help="open_clip pretrained source (openclip backend)") parser.add_argument("--out", default=None, help="Output .npy path (defaults to stdout)") args = parser.parse_args() try: img = load_image(args.input) except ImageLoadError as e: raise SystemExit(f"Failed to load image: {e}") if args.backend == "openclip": model_name = args.model or os.getenv("MODEL_NAME", "ViT-B-32") pretrained = args.pretrained model, preprocess, device = load_openclip(model_name, pretrained=pretrained) vec = embed_openclip(model, preprocess, device, img) else: model_name = args.model or "openai/clip-vit-base-patch32" model, processor, device = load_hf_clip(model_name) vec = embed_hf_clip(model, processor, device, img) vec = np.asarray(vec, dtype=np.float32) if args.out: np.save(args.out, vec) print(f"Saved vector shape={vec.shape} to {args.out}") else: # Print a short summary and the vector length. Full vector to stdout can be large. print(f"vector_shape={vec.shape}") print(np.array2string(vec, precision=6, separator=", ")) if __name__ == "__main__": main()