Upload beautify
This commit is contained in:
401
app/Jobs/AutoTagArtworkJob.php
Normal file
401
app/Jobs/AutoTagArtworkJob.php
Normal file
@@ -0,0 +1,401 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace App\Jobs;
|
||||
|
||||
use App\Models\Artwork;
|
||||
use App\Services\TagNormalizer;
|
||||
use App\Services\TagService;
|
||||
use Illuminate\Bus\Queueable;
|
||||
use Illuminate\Contracts\Queue\ShouldQueue;
|
||||
use Illuminate\Foundation\Bus\Dispatchable;
|
||||
use Illuminate\Queue\InteractsWithQueue;
|
||||
use Illuminate\Queue\SerializesModels;
|
||||
use Illuminate\Support\Facades\Http;
|
||||
use Illuminate\Support\Facades\Log;
|
||||
use Illuminate\Support\Facades\Redis;
|
||||
use Illuminate\Support\Str;
|
||||
|
||||
final class AutoTagArtworkJob implements ShouldQueue
|
||||
{
|
||||
use Dispatchable;
|
||||
use InteractsWithQueue;
|
||||
use Queueable;
|
||||
use SerializesModels;
|
||||
|
||||
/**
|
||||
* Keep retries low; tagging must never block publish.
|
||||
*/
|
||||
public int $tries = 3;
|
||||
|
||||
/**
|
||||
* Hard timeout safety for queue workers.
|
||||
*/
|
||||
public int $timeout = 20;
|
||||
|
||||
/**
|
||||
* @param int $artworkId
|
||||
* @param string $hash File hash used to build public derivative URLs.
|
||||
*/
|
||||
public function __construct(
|
||||
private readonly int $artworkId,
|
||||
private readonly string $hash,
|
||||
) {
|
||||
$queue = (string) config('vision.queue', 'default');
|
||||
if ($queue !== '') {
|
||||
$this->onQueue($queue);
|
||||
}
|
||||
}
|
||||
|
||||
public function backoff(): array
|
||||
{
|
||||
return [2, 10, 30];
|
||||
}
|
||||
|
||||
public function handle(TagService $tagService, TagNormalizer $normalizer): void
|
||||
{
|
||||
if (! (bool) config('vision.enabled', true)) {
|
||||
return;
|
||||
}
|
||||
|
||||
$artwork = Artwork::query()->with(['categories.contentType'])->find($this->artworkId);
|
||||
if (! $artwork) {
|
||||
return;
|
||||
}
|
||||
|
||||
$imageUrl = $this->buildImageUrl($this->hash);
|
||||
if ($imageUrl === null) {
|
||||
return;
|
||||
}
|
||||
|
||||
$processingKey = $this->processingKey($this->artworkId, $this->hash);
|
||||
if (! $this->acquireProcessingLock($processingKey)) {
|
||||
return;
|
||||
}
|
||||
|
||||
$ref = (string) Str::uuid();
|
||||
|
||||
try {
|
||||
$clipTags = $this->callClip($imageUrl, $ref);
|
||||
|
||||
$yoloTags = [];
|
||||
if ($this->shouldRunYolo($artwork)) {
|
||||
$yoloTags = $this->callYolo($imageUrl, $ref);
|
||||
}
|
||||
|
||||
$merged = $this->mergeTags($clipTags, $yoloTags);
|
||||
if ($merged === []) {
|
||||
$this->markProcessed($this->processedKey($this->artworkId, $this->hash));
|
||||
return;
|
||||
}
|
||||
|
||||
// Normalize explicitly (requirement), then attach via TagService (source=ai + confidence).
|
||||
$payload = [];
|
||||
foreach ($merged as $row) {
|
||||
$tag = $normalizer->normalize((string) ($row['tag'] ?? ''));
|
||||
if ($tag === '') {
|
||||
continue;
|
||||
}
|
||||
$payload[] = [
|
||||
'tag' => $tag,
|
||||
'confidence' => isset($row['confidence']) && is_numeric($row['confidence']) ? (float) $row['confidence'] : null,
|
||||
];
|
||||
}
|
||||
|
||||
$tagService->attachAiTags($artwork, $payload);
|
||||
|
||||
$this->markProcessed($this->processedKey($this->artworkId, $this->hash));
|
||||
} catch (\Throwable $e) {
|
||||
Log::error('AutoTagArtworkJob failed', [
|
||||
'ref' => $ref,
|
||||
'artwork_id' => $this->artworkId,
|
||||
'hash' => $this->hash,
|
||||
'attempt' => $this->attempts(),
|
||||
'error' => $e->getMessage(),
|
||||
]);
|
||||
|
||||
// Retry-safe: allow queue retry on transient failures.
|
||||
throw $e;
|
||||
} finally {
|
||||
$this->releaseProcessingLock($processingKey);
|
||||
}
|
||||
}
|
||||
|
||||
private function buildImageUrl(string $hash): ?string
|
||||
{
|
||||
$base = (string) config('cdn.files_url');
|
||||
$base = rtrim($base, '/');
|
||||
if ($base === '') {
|
||||
return null;
|
||||
}
|
||||
|
||||
$variant = (string) config('vision.image_variant', 'md');
|
||||
$variant = $variant !== '' ? $variant : 'md';
|
||||
|
||||
// Matches the upload public path layout used for derivatives (img/aa/bb/cc/variant.webp).
|
||||
$clean = strtolower((string) preg_replace('/[^a-z0-9]/', '', $hash));
|
||||
$clean = str_pad($clean, 6, '0');
|
||||
$segments = [substr($clean, 0, 2) ?: '00', substr($clean, 2, 2) ?: '00', substr($clean, 4, 2) ?: '00'];
|
||||
$path = 'img/' . implode('/', $segments) . '/' . $variant . '.webp';
|
||||
|
||||
return $base . '/' . $path;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return array<int, array{tag: string, confidence?: float|int|null}>
|
||||
*/
|
||||
private function callClip(string $imageUrl, string $ref): array
|
||||
{
|
||||
$base = trim((string) config('vision.clip.base_url', ''));
|
||||
if ($base === '') {
|
||||
return [];
|
||||
}
|
||||
|
||||
$endpoint = (string) config('vision.clip.endpoint', '/analyze');
|
||||
$url = rtrim($base, '/') . '/' . ltrim($endpoint, '/');
|
||||
|
||||
$timeout = (int) config('vision.clip.timeout_seconds', 8);
|
||||
$connectTimeout = (int) config('vision.clip.connect_timeout_seconds', 2);
|
||||
$retries = (int) config('vision.clip.retries', 1);
|
||||
$delay = (int) config('vision.clip.retry_delay_ms', 200);
|
||||
|
||||
try {
|
||||
$response = Http::acceptJson()
|
||||
->connectTimeout(max(1, $connectTimeout))
|
||||
->timeout(max(1, $timeout))
|
||||
->retry(max(0, $retries), max(0, $delay), throw: false)
|
||||
->post($url, [
|
||||
'image_url' => $imageUrl,
|
||||
'artwork_id' => $this->artworkId,
|
||||
'hash' => $this->hash,
|
||||
]);
|
||||
} catch (\Throwable $e) {
|
||||
Log::warning('CLIP analyze request failed', ['ref' => $ref, 'artwork_id' => $this->artworkId, 'error' => $e->getMessage()]);
|
||||
throw $e;
|
||||
}
|
||||
|
||||
if ($response->serverError()) {
|
||||
Log::warning('CLIP analyze server error', ['ref' => $ref, 'status' => $response->status(), 'body' => $this->safeBody($response->body())]);
|
||||
throw new \RuntimeException('CLIP server error: ' . $response->status());
|
||||
}
|
||||
|
||||
if (! $response->ok()) {
|
||||
Log::warning('CLIP analyze non-ok response', ['ref' => $ref, 'status' => $response->status(), 'body' => $this->safeBody($response->body())]);
|
||||
return [];
|
||||
}
|
||||
|
||||
return $this->extractTagList($response->json());
|
||||
}
|
||||
|
||||
/**
|
||||
* @return array<int, array{tag: string, confidence?: float|int|null}>
|
||||
*/
|
||||
private function callYolo(string $imageUrl, string $ref): array
|
||||
{
|
||||
if (! (bool) config('vision.yolo.enabled', true)) {
|
||||
return [];
|
||||
}
|
||||
|
||||
$base = trim((string) config('vision.yolo.base_url', ''));
|
||||
if ($base === '') {
|
||||
return [];
|
||||
}
|
||||
|
||||
$endpoint = (string) config('vision.yolo.endpoint', '/analyze');
|
||||
$url = rtrim($base, '/') . '/' . ltrim($endpoint, '/');
|
||||
|
||||
$timeout = (int) config('vision.yolo.timeout_seconds', 8);
|
||||
$connectTimeout = (int) config('vision.yolo.connect_timeout_seconds', 2);
|
||||
$retries = (int) config('vision.yolo.retries', 1);
|
||||
$delay = (int) config('vision.yolo.retry_delay_ms', 200);
|
||||
|
||||
try {
|
||||
$response = Http::acceptJson()
|
||||
->connectTimeout(max(1, $connectTimeout))
|
||||
->timeout(max(1, $timeout))
|
||||
->retry(max(0, $retries), max(0, $delay), throw: false)
|
||||
->post($url, [
|
||||
'image_url' => $imageUrl,
|
||||
'artwork_id' => $this->artworkId,
|
||||
'hash' => $this->hash,
|
||||
]);
|
||||
} catch (\Throwable $e) {
|
||||
Log::warning('YOLO analyze request failed', ['ref' => $ref, 'artwork_id' => $this->artworkId, 'error' => $e->getMessage()]);
|
||||
throw $e;
|
||||
}
|
||||
|
||||
if ($response->serverError()) {
|
||||
Log::warning('YOLO analyze server error', ['ref' => $ref, 'status' => $response->status(), 'body' => $this->safeBody($response->body())]);
|
||||
throw new \RuntimeException('YOLO server error: ' . $response->status());
|
||||
}
|
||||
|
||||
if (! $response->ok()) {
|
||||
Log::warning('YOLO analyze non-ok response', ['ref' => $ref, 'status' => $response->status(), 'body' => $this->safeBody($response->body())]);
|
||||
return [];
|
||||
}
|
||||
|
||||
return $this->extractTagList($response->json());
|
||||
}
|
||||
|
||||
private function shouldRunYolo(Artwork $artwork): bool
|
||||
{
|
||||
if (! (bool) config('vision.yolo.enabled', true)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (! (bool) config('vision.yolo.photography_only', true)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
foreach ($artwork->categories as $category) {
|
||||
$slug = strtolower((string) ($category->contentType?->slug ?? ''));
|
||||
if ($slug === 'photography') {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param mixed $json
|
||||
* @return array<int, array{tag: string, confidence?: float|int|null}>
|
||||
*/
|
||||
private function extractTagList(mixed $json): array
|
||||
{
|
||||
if (is_array($json) && $this->isListOfTags($json)) {
|
||||
return $json;
|
||||
}
|
||||
|
||||
if (is_array($json) && isset($json['tags']) && is_array($json['tags']) && $this->isListOfTags($json['tags'])) {
|
||||
return $json['tags'];
|
||||
}
|
||||
|
||||
if (is_array($json) && isset($json['data']) && is_array($json['data']) && $this->isListOfTags($json['data'])) {
|
||||
return $json['data'];
|
||||
}
|
||||
|
||||
// Common YOLO-style response: objects: [{label, confidence}]
|
||||
if (is_array($json) && isset($json['objects']) && is_array($json['objects'])) {
|
||||
$out = [];
|
||||
foreach ($json['objects'] as $obj) {
|
||||
if (! is_array($obj)) {
|
||||
continue;
|
||||
}
|
||||
$label = (string) ($obj['label'] ?? $obj['tag'] ?? '');
|
||||
if ($label === '') {
|
||||
continue;
|
||||
}
|
||||
$out[] = ['tag' => $label, 'confidence' => $obj['confidence'] ?? null];
|
||||
}
|
||||
return $out;
|
||||
}
|
||||
|
||||
return [];
|
||||
}
|
||||
|
||||
/**
|
||||
* @param array<mixed> $arr
|
||||
*/
|
||||
private function isListOfTags(array $arr): bool
|
||||
{
|
||||
if ($arr === []) {
|
||||
return true;
|
||||
}
|
||||
|
||||
foreach ($arr as $row) {
|
||||
if (! is_array($row)) {
|
||||
return false;
|
||||
}
|
||||
if (! array_key_exists('tag', $row)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param array<int, array{tag: string, confidence?: float|int|null}> $a
|
||||
* @param array<int, array{tag: string, confidence?: float|int|null}> $b
|
||||
* @return array<int, array{tag: string, confidence?: float|int|null}>
|
||||
*/
|
||||
private function mergeTags(array $a, array $b): array
|
||||
{
|
||||
$byTag = [];
|
||||
foreach (array_merge($a, $b) as $row) {
|
||||
$tag = (string) ($row['tag'] ?? '');
|
||||
if ($tag === '') {
|
||||
continue;
|
||||
}
|
||||
$conf = $row['confidence'] ?? null;
|
||||
$conf = is_numeric($conf) ? (float) $conf : null;
|
||||
|
||||
// Keep highest confidence for duplicates.
|
||||
if (! isset($byTag[$tag])) {
|
||||
$byTag[$tag] = ['tag' => $tag, 'confidence' => $conf];
|
||||
continue;
|
||||
}
|
||||
|
||||
$existing = $byTag[$tag]['confidence'];
|
||||
if ($existing === null || ($conf !== null && $conf > (float) $existing)) {
|
||||
$byTag[$tag]['confidence'] = $conf;
|
||||
}
|
||||
}
|
||||
|
||||
return array_values($byTag);
|
||||
}
|
||||
|
||||
private function processingKey(int $artworkId, string $hash): string
|
||||
{
|
||||
return 'autotag:processing:' . $artworkId . ':' . $hash;
|
||||
}
|
||||
|
||||
private function processedKey(int $artworkId, string $hash): string
|
||||
{
|
||||
return 'autotag:processed:' . $artworkId . ':' . $hash;
|
||||
}
|
||||
|
||||
private function acquireProcessingLock(string $key): bool
|
||||
{
|
||||
try {
|
||||
$didSet = Redis::setnx($key, 1);
|
||||
if ($didSet) {
|
||||
Redis::expire($key, 1800);
|
||||
}
|
||||
return (bool) $didSet;
|
||||
} catch (\Throwable $e) {
|
||||
// If Redis is unavailable, proceed without dedupe.
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
private function releaseProcessingLock(string $key): void
|
||||
{
|
||||
try {
|
||||
Redis::del($key);
|
||||
} catch (\Throwable $e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
|
||||
private function markProcessed(string $key): void
|
||||
{
|
||||
try {
|
||||
Redis::setex($key, 604800, 1); // 7 days
|
||||
} catch (\Throwable $e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
|
||||
private function safeBody(string $body): string
|
||||
{
|
||||
$body = trim($body);
|
||||
if ($body === '') {
|
||||
return '';
|
||||
}
|
||||
|
||||
return Str::limit($body, 800);
|
||||
}
|
||||
}
|
||||
61
app/Jobs/BackfillArtworkEmbeddingsJob.php
Normal file
61
app/Jobs/BackfillArtworkEmbeddingsJob.php
Normal file
@@ -0,0 +1,61 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace App\Jobs;
|
||||
|
||||
use App\Models\Artwork;
|
||||
use App\Jobs\GenerateArtworkEmbeddingJob;
|
||||
use Illuminate\Bus\Queueable;
|
||||
use Illuminate\Contracts\Queue\ShouldQueue;
|
||||
use Illuminate\Foundation\Bus\Dispatchable;
|
||||
use Illuminate\Queue\InteractsWithQueue;
|
||||
use Illuminate\Queue\SerializesModels;
|
||||
|
||||
final class BackfillArtworkEmbeddingsJob implements ShouldQueue
|
||||
{
|
||||
use Dispatchable;
|
||||
use InteractsWithQueue;
|
||||
use Queueable;
|
||||
use SerializesModels;
|
||||
|
||||
public int $tries = 1;
|
||||
|
||||
public int $timeout = 120;
|
||||
|
||||
public function __construct(
|
||||
private readonly int $afterId = 0,
|
||||
private readonly int $batchSize = 200,
|
||||
private readonly bool $force = false,
|
||||
) {
|
||||
$queue = (string) config('recommendations.queue', config('vision.queue', 'default'));
|
||||
if ($queue !== '') {
|
||||
$this->onQueue($queue);
|
||||
}
|
||||
}
|
||||
|
||||
public function handle(): void
|
||||
{
|
||||
$batch = max(1, min($this->batchSize, 1000));
|
||||
|
||||
$artworks = Artwork::query()
|
||||
->where('id', '>', $this->afterId)
|
||||
->whereNotNull('hash')
|
||||
->orderBy('id')
|
||||
->limit($batch)
|
||||
->get(['id', 'hash']);
|
||||
|
||||
if ($artworks->isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
foreach ($artworks as $artwork) {
|
||||
GenerateArtworkEmbeddingJob::dispatch((int) $artwork->id, (string) $artwork->hash, $this->force);
|
||||
}
|
||||
|
||||
if ($artworks->count() === $batch) {
|
||||
$lastId = (int) $artworks->last()->id;
|
||||
self::dispatch($lastId, $batch, $this->force);
|
||||
}
|
||||
}
|
||||
}
|
||||
178
app/Jobs/GenerateArtworkEmbeddingJob.php
Normal file
178
app/Jobs/GenerateArtworkEmbeddingJob.php
Normal file
@@ -0,0 +1,178 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace App\Jobs;
|
||||
|
||||
use App\Models\Artwork;
|
||||
use App\Models\ArtworkEmbedding;
|
||||
use App\Services\Vision\ArtworkEmbeddingClient;
|
||||
use Illuminate\Bus\Queueable;
|
||||
use Illuminate\Contracts\Queue\ShouldQueue;
|
||||
use Illuminate\Foundation\Bus\Dispatchable;
|
||||
use Illuminate\Queue\InteractsWithQueue;
|
||||
use Illuminate\Queue\SerializesModels;
|
||||
use Illuminate\Support\Facades\Redis;
|
||||
|
||||
final class GenerateArtworkEmbeddingJob implements ShouldQueue
|
||||
{
|
||||
use Dispatchable;
|
||||
use InteractsWithQueue;
|
||||
use Queueable;
|
||||
use SerializesModels;
|
||||
|
||||
public int $tries = 3;
|
||||
|
||||
public int $timeout = 20;
|
||||
|
||||
public function __construct(
|
||||
private readonly int $artworkId,
|
||||
private readonly ?string $sourceHash = null,
|
||||
private readonly bool $force = false,
|
||||
) {
|
||||
$queue = (string) config('recommendations.queue', config('vision.queue', 'default'));
|
||||
if ($queue !== '') {
|
||||
$this->onQueue($queue);
|
||||
}
|
||||
}
|
||||
|
||||
public function backoff(): array
|
||||
{
|
||||
return [2, 10, 30];
|
||||
}
|
||||
|
||||
public function handle(ArtworkEmbeddingClient $client): void
|
||||
{
|
||||
if (! (bool) config('recommendations.embedding.enabled', true)) {
|
||||
return;
|
||||
}
|
||||
|
||||
$artwork = Artwork::query()->find($this->artworkId);
|
||||
if (! $artwork) {
|
||||
return;
|
||||
}
|
||||
|
||||
$sourceHash = strtolower((string) preg_replace('/[^a-z0-9]/', '', (string) ($this->sourceHash ?? $artwork->hash ?? '')));
|
||||
if ($sourceHash === '') {
|
||||
return;
|
||||
}
|
||||
|
||||
$model = (string) config('recommendations.embedding.model', 'clip');
|
||||
$modelVersion = (string) config('recommendations.embedding.model_version', 'v1');
|
||||
$algoVersion = (string) config('recommendations.embedding.algo_version', 'clip-cosine-v1');
|
||||
|
||||
if (! $this->force) {
|
||||
$existing = ArtworkEmbedding::query()
|
||||
->where('artwork_id', $artwork->id)
|
||||
->where('model', $model)
|
||||
->where('model_version', $modelVersion)
|
||||
->first();
|
||||
|
||||
if ($existing && (string) ($existing->source_hash ?? '') === $sourceHash) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
$lockKey = $this->lockKey($artwork->id, $model, $modelVersion);
|
||||
if (! $this->acquireLock($lockKey)) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
$imageUrl = $this->buildImageUrl($sourceHash);
|
||||
if ($imageUrl === null) {
|
||||
return;
|
||||
}
|
||||
|
||||
$vector = $client->embed($imageUrl, (int) $artwork->id, $sourceHash);
|
||||
if ($vector === []) {
|
||||
return;
|
||||
}
|
||||
|
||||
$normalized = $this->normalize($vector);
|
||||
|
||||
ArtworkEmbedding::query()->updateOrCreate(
|
||||
[
|
||||
'artwork_id' => (int) $artwork->id,
|
||||
'model' => $model,
|
||||
'model_version' => $modelVersion,
|
||||
],
|
||||
[
|
||||
'algo_version' => $algoVersion,
|
||||
'dim' => count($normalized),
|
||||
'embedding_json' => json_encode($normalized, JSON_THROW_ON_ERROR),
|
||||
'source_hash' => $sourceHash,
|
||||
'is_normalized' => true,
|
||||
'generated_at' => now(),
|
||||
'meta' => [
|
||||
'source' => 'clip',
|
||||
'image_variant' => (string) config('vision.image_variant', 'md'),
|
||||
],
|
||||
]
|
||||
);
|
||||
} finally {
|
||||
$this->releaseLock($lockKey);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param array<int, float> $vector
|
||||
* @return array<int, float>
|
||||
*/
|
||||
private function normalize(array $vector): array
|
||||
{
|
||||
$sumSquares = 0.0;
|
||||
foreach ($vector as $value) {
|
||||
$sumSquares += ($value * $value);
|
||||
}
|
||||
|
||||
if ($sumSquares <= 0.0) {
|
||||
return $vector;
|
||||
}
|
||||
|
||||
$norm = sqrt($sumSquares);
|
||||
return array_map(static fn (float $value): float => $value / $norm, $vector);
|
||||
}
|
||||
|
||||
private function buildImageUrl(string $hash): ?string
|
||||
{
|
||||
$base = rtrim((string) config('cdn.files_url', ''), '/');
|
||||
if ($base === '') {
|
||||
return null;
|
||||
}
|
||||
|
||||
$variant = (string) config('vision.image_variant', 'md');
|
||||
$clean = strtolower((string) preg_replace('/[^a-z0-9]/', '', $hash));
|
||||
$clean = str_pad($clean, 6, '0');
|
||||
$segments = [substr($clean, 0, 2) ?: '00', substr($clean, 2, 2) ?: '00', substr($clean, 4, 2) ?: '00'];
|
||||
|
||||
return $base . '/img/' . implode('/', $segments) . '/' . $variant . '.webp';
|
||||
}
|
||||
|
||||
private function lockKey(int $artworkId, string $model, string $version): string
|
||||
{
|
||||
return 'artwork-embedding:lock:' . $artworkId . ':' . $model . ':' . $version;
|
||||
}
|
||||
|
||||
private function acquireLock(string $key): bool
|
||||
{
|
||||
try {
|
||||
$didSet = Redis::setnx($key, 1);
|
||||
if ($didSet) {
|
||||
Redis::expire($key, 1800);
|
||||
}
|
||||
return (bool) $didSet;
|
||||
} catch (\Throwable) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
private function releaseLock(string $key): void
|
||||
{
|
||||
try {
|
||||
Redis::del($key);
|
||||
} catch (\Throwable) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
}
|
||||
38
app/Jobs/GenerateDerivativesJob.php
Normal file
38
app/Jobs/GenerateDerivativesJob.php
Normal file
@@ -0,0 +1,38 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace App\Jobs;
|
||||
|
||||
use App\Services\Uploads\UploadPipelineService;
|
||||
use App\Jobs\AutoTagArtworkJob;
|
||||
use App\Jobs\GenerateArtworkEmbeddingJob;
|
||||
use Illuminate\Bus\Queueable;
|
||||
use Illuminate\Contracts\Queue\ShouldQueue;
|
||||
use Illuminate\Foundation\Bus\Dispatchable;
|
||||
use Illuminate\Queue\InteractsWithQueue;
|
||||
use Illuminate\Queue\SerializesModels;
|
||||
|
||||
final class GenerateDerivativesJob implements ShouldQueue
|
||||
{
|
||||
use Dispatchable;
|
||||
use InteractsWithQueue;
|
||||
use Queueable;
|
||||
use SerializesModels;
|
||||
|
||||
public function __construct(
|
||||
private readonly string $sessionId,
|
||||
private readonly string $hash,
|
||||
private readonly int $artworkId
|
||||
) {
|
||||
}
|
||||
|
||||
public function handle(UploadPipelineService $pipeline): void
|
||||
{
|
||||
$pipeline->processAndPublish($this->sessionId, $this->hash, $this->artworkId);
|
||||
|
||||
// Auto-tagging is async and must never block publish.
|
||||
AutoTagArtworkJob::dispatch($this->artworkId, $this->hash)->afterCommit();
|
||||
GenerateArtworkEmbeddingJob::dispatch($this->artworkId, $this->hash)->afterCommit();
|
||||
}
|
||||
}
|
||||
122
app/Jobs/IngestUserDiscoveryEventJob.php
Normal file
122
app/Jobs/IngestUserDiscoveryEventJob.php
Normal file
@@ -0,0 +1,122 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace App\Jobs;
|
||||
|
||||
use App\Services\Recommendations\UserInterestProfileService;
|
||||
use Carbon\CarbonImmutable;
|
||||
use Illuminate\Bus\Queueable;
|
||||
use Illuminate\Contracts\Queue\ShouldQueue;
|
||||
use Illuminate\Foundation\Bus\Dispatchable;
|
||||
use Illuminate\Queue\InteractsWithQueue;
|
||||
use Illuminate\Queue\SerializesModels;
|
||||
use Illuminate\Support\Facades\DB;
|
||||
use Illuminate\Support\Facades\Log;
|
||||
use Illuminate\Support\Facades\Redis;
|
||||
use Illuminate\Support\Facades\Schema;
|
||||
|
||||
final class IngestUserDiscoveryEventJob implements ShouldQueue
|
||||
{
|
||||
use Dispatchable;
|
||||
use InteractsWithQueue;
|
||||
use Queueable;
|
||||
use SerializesModels;
|
||||
|
||||
public int $tries = 3;
|
||||
|
||||
/** @var array<int, int> */
|
||||
public array $backoff = [5, 30, 120];
|
||||
|
||||
/**
|
||||
* @param array<string, mixed> $meta
|
||||
*/
|
||||
public function __construct(
|
||||
public readonly string $eventId,
|
||||
public readonly int $userId,
|
||||
public readonly int $artworkId,
|
||||
public readonly string $eventType,
|
||||
public readonly string $algoVersion,
|
||||
public readonly string $occurredAt,
|
||||
public readonly array $meta = []
|
||||
) {
|
||||
}
|
||||
|
||||
public function handle(UserInterestProfileService $profileService): void
|
||||
{
|
||||
$idempotencyKey = sprintf('discovery:event:processed:%s', $this->eventId);
|
||||
|
||||
try {
|
||||
$didSet = false;
|
||||
try {
|
||||
$didSet = (bool) Redis::setnx($idempotencyKey, 1);
|
||||
if ($didSet) {
|
||||
Redis::expire($idempotencyKey, 86400 * 2);
|
||||
}
|
||||
} catch (\Throwable $e) {
|
||||
Log::warning('Redis unavailable for discovery ingestion; proceeding without redis dedupe', [
|
||||
'event_id' => $this->eventId,
|
||||
'error' => $e->getMessage(),
|
||||
]);
|
||||
$didSet = true;
|
||||
}
|
||||
|
||||
if (! $didSet) {
|
||||
return;
|
||||
}
|
||||
|
||||
$occurredAt = CarbonImmutable::parse($this->occurredAt);
|
||||
$eventVersion = (string) config('discovery.event_version', 'event-v1');
|
||||
$eventWeight = (float) ((array) config('discovery.weights', []))[$this->eventType] ?? 1.0;
|
||||
|
||||
$categoryId = DB::table('artwork_category')
|
||||
->where('artwork_id', $this->artworkId)
|
||||
->orderBy('category_id')
|
||||
->value('category_id');
|
||||
|
||||
$insertPayload = [
|
||||
'event_id' => $this->eventId,
|
||||
'user_id' => $this->userId,
|
||||
'artwork_id' => $this->artworkId,
|
||||
'category_id' => $categoryId !== null ? (int) $categoryId : null,
|
||||
'event_type' => $this->eventType,
|
||||
'event_version' => $eventVersion,
|
||||
'algo_version' => $this->algoVersion,
|
||||
'weight' => $eventWeight,
|
||||
'event_date' => $occurredAt->toDateString(),
|
||||
'occurred_at' => $occurredAt->toDateTimeString(),
|
||||
'created_at' => now(),
|
||||
'updated_at' => now(),
|
||||
];
|
||||
|
||||
if (Schema::hasColumn('user_discovery_events', 'meta')) {
|
||||
$insertPayload['meta'] = $this->meta;
|
||||
} elseif (Schema::hasColumn('user_discovery_events', 'metadata')) {
|
||||
$insertPayload['metadata'] = json_encode($this->meta, JSON_UNESCAPED_SLASHES);
|
||||
}
|
||||
|
||||
DB::table('user_discovery_events')->insertOrIgnore($insertPayload);
|
||||
|
||||
$profileService->applyEvent(
|
||||
userId: $this->userId,
|
||||
eventType: $this->eventType,
|
||||
artworkId: $this->artworkId,
|
||||
categoryId: $categoryId !== null ? (int) $categoryId : null,
|
||||
occurredAt: $occurredAt,
|
||||
eventId: $this->eventId,
|
||||
algoVersion: $this->algoVersion,
|
||||
eventMeta: $this->meta
|
||||
);
|
||||
} catch (\Throwable $e) {
|
||||
Log::error('IngestUserDiscoveryEventJob failed', [
|
||||
'event_id' => $this->eventId,
|
||||
'user_id' => $this->userId,
|
||||
'artwork_id' => $this->artworkId,
|
||||
'event_type' => $this->eventType,
|
||||
'error' => $e->getMessage(),
|
||||
]);
|
||||
|
||||
throw $e;
|
||||
}
|
||||
}
|
||||
}
|
||||
47
app/Jobs/RegenerateUserRecommendationCacheJob.php
Normal file
47
app/Jobs/RegenerateUserRecommendationCacheJob.php
Normal file
@@ -0,0 +1,47 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace App\Jobs;
|
||||
|
||||
use App\Services\Recommendations\PersonalizedFeedService;
|
||||
use Illuminate\Bus\Queueable;
|
||||
use Illuminate\Contracts\Queue\ShouldQueue;
|
||||
use Illuminate\Foundation\Bus\Dispatchable;
|
||||
use Illuminate\Queue\InteractsWithQueue;
|
||||
use Illuminate\Queue\SerializesModels;
|
||||
use Illuminate\Support\Facades\Log;
|
||||
|
||||
final class RegenerateUserRecommendationCacheJob implements ShouldQueue
|
||||
{
|
||||
use Dispatchable;
|
||||
use InteractsWithQueue;
|
||||
use Queueable;
|
||||
use SerializesModels;
|
||||
|
||||
public int $tries = 3;
|
||||
|
||||
/** @var array<int, int> */
|
||||
public array $backoff = [10, 60, 180];
|
||||
|
||||
public function __construct(
|
||||
public readonly int $userId,
|
||||
public readonly string $algoVersion
|
||||
) {
|
||||
}
|
||||
|
||||
public function handle(PersonalizedFeedService $feedService): void
|
||||
{
|
||||
try {
|
||||
$feedService->regenerateCacheForUser($this->userId, $this->algoVersion);
|
||||
} catch (\Throwable $e) {
|
||||
Log::error('RegenerateUserRecommendationCacheJob failed', [
|
||||
'user_id' => $this->userId,
|
||||
'algo_version' => $this->algoVersion,
|
||||
'error' => $e->getMessage(),
|
||||
]);
|
||||
|
||||
throw $e;
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user