Files
Anime-Movies-Upcomming-Real…/movie_pipeline.py
2026-04-22 10:04:59 +02:00

280 lines
11 KiB
Python

#!/usr/bin/env python3
from __future__ import annotations
from datetime import date, timedelta
from difflib import SequenceMatcher
from app_config import get_settings
from data_sources.anilist_source import fetch_anilist_movie_candidates
from data_sources.animeschedule_source import fetch_animeschedule_anime_by_title
from data_sources.tmdb_source import (
extract_de_theatrical_dates,
fetch_tmdb_release_dates,
search_tmdb_movies,
select_release_in_range,
)
FUZZY_MATCH_THRESHOLD = 0.65
SCHEDULE_MATCH_THRESHOLD = 2
def add_months(value: date, months: int) -> date:
year = value.year + ((value.month - 1 + months) // 12)
month = ((value.month - 1 + months) % 12) + 1
return date(year, month, 1)
def normalize_title(text: str) -> str:
cleaned = "".join(ch.lower() if ch.isalnum() else " " for ch in (text or ""))
return " ".join(cleaned.split())
def fuzzy_ratio(left: str, right: str) -> float:
a = normalize_title(left)
b = normalize_title(right)
if not a or not b:
return 0.0
return SequenceMatcher(None, a, b).ratio()
def format_date(value: date, locale: str) -> str:
if locale == "de-DE":
return f"{value.day:02d}.{value.month:02d}.{value.year}"
return value.isoformat()
def parse_record_release_date(record: dict) -> date:
text = str(record.get("releaseDate") or "").strip()
try:
return date.fromisoformat(text)
except ValueError:
return date.max
def is_year_match(anilist_year: int, tmdb_year: int) -> bool:
if anilist_year <= 0 or tmdb_year <= 0:
return False
return abs(anilist_year - tmdb_year) <= 1
def title_score(anilist_english: str, anilist_romaji: str, anilist_native: str, tmdb_title: str, tmdb_original: str) -> tuple[int, int, int, float]:
english_norm = normalize_title(anilist_english)
romaji_norm = normalize_title(anilist_romaji)
native_norm = normalize_title(anilist_native)
tmdb_options = [normalize_title(tmdb_title), normalize_title(tmdb_original)]
exact_english = 1 if english_norm and english_norm in tmdb_options else 0
exact_romaji = 1 if romaji_norm and romaji_norm in tmdb_options else 0
exact_native = 1 if native_norm and native_norm in tmdb_options else 0
best_ratio = 0.0
for option in (tmdb_title, tmdb_original):
best_ratio = max(
best_ratio,
fuzzy_ratio(anilist_english, option),
fuzzy_ratio(anilist_romaji, option),
fuzzy_ratio(anilist_native, option),
)
return exact_english, exact_romaji, exact_native, best_ratio
def collect_tmdb_candidates_for_anilist(anilist_entry: dict, tmdb_token: str, search_cache: dict[str, list[dict]]) -> list[dict]:
english = (anilist_entry.get("title_english") or "").strip()
romaji = (anilist_entry.get("title_romaji") or "").strip()
native = (anilist_entry.get("title_native") or "").strip()
queries = []
if english:
queries.append(english)
if romaji and normalize_title(romaji) != normalize_title(english):
queries.append(romaji)
if native and normalize_title(native) not in {normalize_title(english), normalize_title(romaji)}:
queries.append(native)
if not queries:
return []
candidates_by_id = {}
languages = ["de-DE", "en-US", "ja-JP"]
for query in queries:
normalized_query = normalize_title(query)
for language in languages:
cache_key = f"{language}:{normalized_query}"
if cache_key not in search_cache:
search_cache[cache_key] = search_tmdb_movies(query, tmdb_token, language=language)
for item in search_cache[cache_key]:
tmdb_id = int(item.get("tmdb_id") or 0)
if tmdb_id <= 0:
continue
existing = candidates_by_id.get(tmdb_id)
if not existing:
candidates_by_id[tmdb_id] = item
continue
# Prefer candidate carrying a release date if duplicate appears from different language queries.
if not existing.get("release_date") and item.get("release_date"):
candidates_by_id[tmdb_id] = item
return list(candidates_by_id.values())
def pick_best_tmdb_match(anilist_entry: dict, tmdb_candidates: list[dict]) -> dict | None:
english = (anilist_entry.get("title_english") or "").strip()
romaji = (anilist_entry.get("title_romaji") or "").strip()
native = (anilist_entry.get("title_native") or "").strip()
anilist_year = int(anilist_entry.get("start_year") or 0)
best = None
best_tuple = (-1, -1, -1, 0.0)
for candidate in tmdb_candidates:
exact_english, exact_romaji, exact_native, ratio = title_score(
english,
romaji,
native,
str(candidate.get("title") or ""),
str(candidate.get("original_title") or ""),
)
tmdb_release = candidate.get("release_date")
tmdb_year = tmdb_release.year if tmdb_release else 0
has_exact_match = exact_english == 1 or exact_romaji == 1 or exact_native == 1
if not has_exact_match and not is_year_match(anilist_year, tmdb_year):
continue
if ratio < FUZZY_MATCH_THRESHOLD and exact_english == 0 and exact_romaji == 0 and exact_native == 0:
continue
score_tuple = (exact_english, exact_romaji, exact_native, ratio)
if score_tuple > best_tuple:
best_tuple = score_tuple
best = candidate
return best
def resolve_titles(anilist_entry: dict, schedule_token: str, schedule_cache: dict[str, dict]) -> tuple[str, str]:
english = (anilist_entry.get("title_english") or "").strip()
romaji = (anilist_entry.get("title_romaji") or "").strip()
native = (anilist_entry.get("title_native") or "").strip()
schedule_english = ""
if not english and romaji:
schedule = fetch_animeschedule_anime_by_title(romaji, schedule_token, schedule_cache)
if int(schedule.get("match_score") or 0) >= SCHEDULE_MATCH_THRESHOLD:
schedule_english = (schedule.get("english_title") or "").strip()
preferred_title = schedule_english or english or romaji or native
return preferred_title, schedule_english
def build_record(anilist_entry: dict, tmdb_entry: dict, release_date: date, locale: str, title: str, schedule_english: str) -> dict:
return {
"title": title,
"title_english_anilist": (anilist_entry.get("title_english") or "").strip(),
"title_anilist": (anilist_entry.get("title_english") or "").strip() or (anilist_entry.get("title_romaji") or "").strip(),
"title_schedule_english": schedule_english,
"title_romaji": (anilist_entry.get("title_romaji") or "").strip(),
"title_native": (anilist_entry.get("title_native") or "").strip(),
"studio": (anilist_entry.get("studio_text") or "").strip(),
"genres": (anilist_entry.get("genres_text") or "").strip() or "n/a",
"tags": (anilist_entry.get("tags_text") or "").strip(),
"release": format_date(release_date, locale),
"releaseDate": release_date.isoformat(),
"anilist_url": (anilist_entry.get("anilist_url") or "").strip() or "n/a",
"format": (anilist_entry.get("format") or "").strip() or "MOVIE",
"cover_image": (anilist_entry.get("cover_image") or "").strip(),
"description": (anilist_entry.get("description") or "").strip(),
"tmdb_title": (tmdb_entry.get("title") or "").strip(),
"tmdb_id": int(tmdb_entry.get("tmdb_id") or 0),
"ids": {
"anilist": int(anilist_entry.get("anilist_id") or 0),
"tmdb": int(tmdb_entry.get("tmdb_id") or 0),
},
}
def sort_dedup_records(records: list[dict]) -> list[dict]:
unique = {}
for record in records:
ids = record.get("ids") or {}
key = (int(ids.get("anilist") or 0), int(ids.get("tmdb") or 0), str(record.get("releaseDate") or ""))
if key not in unique:
unique[key] = record
result = list(unique.values())
result.sort(key=lambda item: (parse_record_release_date(item), str(item.get("title") or "")))
return result
def get_upcoming_movie_records(
locale: str,
today: date | None = None,
anime_schedule_token: str | None = None,
tmdb_read_access_token: str | None = None,
) -> list[dict]:
settings = get_settings()
schedule_token = (anime_schedule_token or settings.animeschedule_api_token).strip()
tmdb_token = (tmdb_read_access_token or settings.tmdb_read_access_token).strip()
if not tmdb_token:
return []
current_day = today or date.today()
month_start = date(current_day.year, current_day.month, 1)
end_date = add_months(month_start, 2) - timedelta(days=1)
anilist_candidates = fetch_anilist_movie_candidates(current_day)
schedule_cache: dict[str, dict] = {}
tmdb_search_cache: dict[str, list[dict]] = {}
tmdb_release_cache: dict[int, dict] = {}
output_records = []
for anilist_entry in anilist_candidates:
tmdb_candidates = collect_tmdb_candidates_for_anilist(anilist_entry, tmdb_token, tmdb_search_cache)
if not tmdb_candidates:
continue
matched_tmdb = pick_best_tmdb_match(anilist_entry, tmdb_candidates)
if not matched_tmdb:
continue
tmdb_id = int(matched_tmdb.get("tmdb_id") or 0)
if tmdb_id <= 0:
continue
if tmdb_id not in tmdb_release_cache:
tmdb_release_cache[tmdb_id] = fetch_tmdb_release_dates(tmdb_id, tmdb_token)
release_payload = tmdb_release_cache[tmdb_id]
de_theatrical_dates = extract_de_theatrical_dates(release_payload)
release_date = select_release_in_range(de_theatrical_dates, current_day, end_date)
if not release_date:
candidate_release = matched_tmdb.get("release_date")
if isinstance(candidate_release, date) and current_day <= candidate_release <= end_date:
release_date = candidate_release
else:
continue
preferred_title, schedule_english = resolve_titles(anilist_entry, schedule_token, schedule_cache)
if not preferred_title:
continue
output_records.append(
build_record(
anilist_entry=anilist_entry,
tmdb_entry=matched_tmdb,
release_date=release_date,
locale=locale,
title=preferred_title,
schedule_english=schedule_english,
)
)
return sort_dedup_records(output_records)