105 lines
3.8 KiB
Python
105 lines
3.8 KiB
Python
from __future__ import annotations
|
|
|
|
import argparse
|
|
import os
|
|
from typing import Tuple
|
|
|
|
import numpy as np
|
|
import torch
|
|
from PIL import Image
|
|
|
|
from common.image_io import fetch_url_bytes, bytes_to_pil, ImageLoadError
|
|
|
|
try:
|
|
import open_clip
|
|
except Exception:
|
|
open_clip = None
|
|
|
|
try:
|
|
from transformers import CLIPProcessor, CLIPModel
|
|
except Exception:
|
|
CLIPModel = None
|
|
CLIPProcessor = None
|
|
|
|
|
|
def load_openclip(model_name: str = "ViT-B-32", pretrained: str = "openai") -> Tuple:
|
|
device = "cuda" if torch.cuda.is_available() else "cpu"
|
|
if open_clip is None:
|
|
raise RuntimeError("open_clip is not installed")
|
|
model, _, preprocess = open_clip.create_model_and_transforms(model_name, pretrained=pretrained)
|
|
model = model.to(device).eval()
|
|
return model, preprocess, device
|
|
|
|
|
|
def embed_openclip(model, preprocess, device, pil_image: Image.Image) -> np.ndarray:
|
|
image_input = preprocess(pil_image).unsqueeze(0).to(device)
|
|
with torch.no_grad():
|
|
image_features = model.encode_image(image_input)
|
|
image_features = image_features / image_features.norm(dim=-1, keepdim=True)
|
|
return image_features.cpu().numpy()[0]
|
|
|
|
|
|
def load_hf_clip(model_name: str = "openai/clip-vit-base-patch32") -> Tuple:
|
|
device = "cuda" if torch.cuda.is_available() else "cpu"
|
|
if CLIPModel is None or CLIPProcessor is None:
|
|
raise RuntimeError("transformers (CLIP) is not installed")
|
|
model = CLIPModel.from_pretrained(model_name).to(device).eval()
|
|
processor = CLIPProcessor.from_pretrained(model_name)
|
|
return model, processor, device
|
|
|
|
|
|
def embed_hf_clip(model, processor, device, pil_image: Image.Image) -> np.ndarray:
|
|
inputs = processor(images=pil_image, return_tensors="pt")
|
|
inputs = {k: v.to(device) for k, v in inputs.items()}
|
|
with torch.no_grad():
|
|
feats = model.get_image_features(**inputs)
|
|
feats = feats / feats.norm(dim=-1, keepdim=True)
|
|
return feats.cpu().numpy()[0]
|
|
|
|
|
|
def load_image(path_or_url: str) -> Image.Image:
|
|
if path_or_url.startswith("http://") or path_or_url.startswith("https://"):
|
|
data = fetch_url_bytes(path_or_url)
|
|
return bytes_to_pil(data)
|
|
else:
|
|
return Image.open(path_or_url).convert("RGB")
|
|
|
|
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser(description="Vectorize an image using CLIP (open_clip or HuggingFace)")
|
|
parser.add_argument("input", help="Path to image file or URL")
|
|
parser.add_argument("--backend", choices=("openclip", "hf"), default="openclip")
|
|
parser.add_argument("--model", default=None, help="Model name (backend-specific)")
|
|
parser.add_argument("--pretrained", default="openai", help="open_clip pretrained source (openclip backend)")
|
|
parser.add_argument("--out", default=None, help="Output .npy path (defaults to stdout)")
|
|
args = parser.parse_args()
|
|
|
|
try:
|
|
img = load_image(args.input)
|
|
except ImageLoadError as e:
|
|
raise SystemExit(f"Failed to load image: {e}")
|
|
|
|
if args.backend == "openclip":
|
|
model_name = args.model or os.getenv("MODEL_NAME", "ViT-B-32")
|
|
pretrained = args.pretrained
|
|
model, preprocess, device = load_openclip(model_name, pretrained=pretrained)
|
|
vec = embed_openclip(model, preprocess, device, img)
|
|
else:
|
|
model_name = args.model or "openai/clip-vit-base-patch32"
|
|
model, processor, device = load_hf_clip(model_name)
|
|
vec = embed_hf_clip(model, processor, device, img)
|
|
|
|
vec = np.asarray(vec, dtype=np.float32)
|
|
|
|
if args.out:
|
|
np.save(args.out, vec)
|
|
print(f"Saved vector shape={vec.shape} to {args.out}")
|
|
else:
|
|
# Print a short summary and the vector length. Full vector to stdout can be large.
|
|
print(f"vector_shape={vec.shape}")
|
|
print(np.array2string(vec, precision=6, separator=", "))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|