finalising the Querries and the rest

This commit is contained in:
ProgrammGamer
2026-04-21 22:41:43 +02:00
parent 2b3dffb62f
commit e5784bc1b8
3 changed files with 372 additions and 304 deletions

View File

@@ -4,30 +4,13 @@ from __future__ import annotations
import json
import urllib.error
import urllib.request
from datetime import date
from datetime import date, timedelta
ANILIST_URL = "https://graphql.anilist.co"
def normalize_title(text: str) -> str:
cleaned = "".join(ch.lower() if ch.isalnum() else " " for ch in (text or ""))
return " ".join(cleaned.split())
def safe_date(sd: dict | None) -> date | None:
if not sd:
return None
year = sd.get("year") or 0
month = sd.get("month") or 0
day = sd.get("day") or 0
if year <= 0 or month <= 0:
return None
if day <= 0:
day = 1
try:
return date(year, month, day)
except ValueError:
return None
def yyyymmdd(value: date) -> int:
return value.year * 10000 + value.month * 100 + value.day
def post_graphql(query: str, variables: dict) -> dict:
@@ -46,78 +29,69 @@ def post_graphql(query: str, variables: dict) -> dict:
raise RuntimeError(f"HTTP {exc.code}: {body}") from exc
def pick_best_title(title: dict) -> str:
english = (title.get("english") or "").strip()
if english:
return english
romaji = (title.get("romaji") or "").strip()
if romaji:
return romaji
native = (title.get("native") or "").strip()
return native or ""
def map_anilist_media(media: dict | None) -> dict:
def map_anilist_movie(media: dict | None) -> dict:
media = media or {}
title = media.get("title") or {}
start_date = media.get("startDate") or {}
studios = ((media.get("studios") or {}).get("nodes") or [])
studio_names = [((node or {}).get("name") or "").strip() for node in studios]
studio_names = [name for name in studio_names if name]
genres = [str(item).strip() for item in (media.get("genres") or [])]
genres = [g for g in genres if g]
studio_names = []
for node in studios:
name = ((node or {}).get("name") or "").strip()
if name:
studio_names.append(name)
genres = []
for entry in media.get("genres") or []:
text = str(entry or "").strip()
if text:
genres.append(text)
tags_raw = media.get("tags") or []
tags = []
for tag in tags_raw:
for tag in media.get("tags") or []:
if not isinstance(tag, dict):
continue
name = (tag.get("name") or "").strip()
rank = int(tag.get("rank") or 0)
if name:
tags.append({"name": name, "rank": rank})
if name and rank >= 70:
tags.append(name)
mapped = {
"id": media.get("id"),
"title_best": pick_best_title(title),
return {
"anilist_id": int(media.get("id") or 0),
"title_english": (title.get("english") or "").strip(),
"title_romaji": (title.get("romaji") or "").strip(),
"title_native": (title.get("native") or "").strip(),
"start_date": safe_date(media.get("startDate")),
"format": (media.get("format") or "").strip(),
"episodes": media.get("episodes"),
"duration": media.get("duration"),
"source": str(media.get("source") or "").replace("_", " ").title(),
"description": (media.get("description") or "").strip(),
"genres": genres,
"genres_text": ", ".join(genres) if genres else "",
"tags": tags,
"tags_text": ", ".join(tag["name"] for tag in tags if tag["rank"] >= 70) or "",
"studio_names": studio_names,
"studio_text": ", ".join(studio_names) if studio_names else "",
"anilist_url": (media.get("siteUrl") or "").strip(),
"cover_image": (((media.get("coverImage") or {}).get("large")) or "").strip(),
"raw": media,
"anilist_url": (media.get("siteUrl") or "").strip(),
"start_year": int(start_date.get("year") or 0),
"format": (media.get("format") or "").strip() or "MOVIE",
"description": (media.get("description") or "").strip(),
"genres_text": ", ".join(genres),
"tags_text": ", ".join(tags),
"studio_text": ", ".join(studio_names),
}
return mapped
def fetch_anilist_movie_by_search(search_text: str, cache: dict[str, dict | None]) -> dict | None:
key = normalize_title(search_text)
if key in cache:
return cache[key]
def fetch_anilist_movie_candidates(today: date, years_window: int = 1) -> list[dict]:
start_date = today - timedelta(days=365 * years_window)
end_date = today + timedelta(days=365 * years_window)
query = """
query ($search: String, $perPage: Int) {
Page(page: 1, perPage: $perPage) {
media(type: ANIME, format: MOVIE, search: $search, sort: [SEARCH_MATCH, POPULARITY_DESC]) {
query ($page: Int, $perPage: Int, $start: FuzzyDateInt, $end: FuzzyDateInt) {
Page(page: $page, perPage: $perPage) {
pageInfo { hasNextPage }
media(
type: ANIME
format: MOVIE
countryOfOrigin: JP
sort: [POPULARITY_DESC, START_DATE_DESC]
startDate_greater: $start
startDate_lesser: $end
) {
id
title { english romaji native }
startDate { year month day }
startDate { year }
format
episodes
duration
source
description(asHtml: false)
genres
tags { name rank }
@@ -129,48 +103,39 @@ def fetch_anilist_movie_by_search(search_text: str, cache: dict[str, dict | None
}
"""
try:
data = post_graphql(query, {"search": search_text, "perPage": 5})
except Exception:
cache[key] = None
return None
page = 1
results = []
seen_ids = set()
if "errors" in data:
cache[key] = None
return None
while True:
payload = post_graphql(
query,
{
"page": page,
"perPage": 50,
"start": yyyymmdd(start_date) - 1,
"end": yyyymmdd(end_date) + 1,
},
)
candidates = data.get("data", {}).get("Page", {}).get("media", [])
if not candidates:
cache[key] = None
return None
if "errors" in payload:
raise RuntimeError(payload["errors"])
wanted = normalize_title(search_text)
best = None
best_score = -1
for media in candidates:
title = media.get("title") or {}
options = [title.get("english"), title.get("romaji"), title.get("native")]
score = 0
for option in options:
normalized = normalize_title(str(option or ""))
if not normalized:
page_data = payload.get("data", {}).get("Page", {})
for media in page_data.get("media", []):
mapped = map_anilist_movie(media)
anilist_id = mapped.get("anilist_id") or 0
if anilist_id <= 0 or anilist_id in seen_ids:
continue
if normalized == wanted:
score = max(score, 3)
elif wanted and (wanted in normalized or normalized in wanted):
score = max(score, 2)
elif normalized.split(" ")[:2] == wanted.split(" ")[:2]:
score = max(score, 1)
if score > best_score:
best_score = score
best = media
if score == 3:
break
seen_ids.add(anilist_id)
if best_score <= 0:
cache[key] = None
return None
if not mapped.get("title_english") and not mapped.get("title_romaji") and not mapped.get("title_native"):
continue
mapped = map_anilist_media(best) if best else None
cache[key] = mapped
return mapped
results.append(mapped)
if not page_data.get("pageInfo", {}).get("hasNextPage"):
break
page += 1
return results

View File

@@ -7,7 +7,6 @@ import urllib.request
from datetime import date
TMDB_API_BASE = "https://api.themoviedb.org/3"
TMDB_ANIME_KEYWORD_ID = "210024"
def parse_release_date(value: str) -> date | None:
@@ -21,7 +20,10 @@ def parse_release_date(value: str) -> date | None:
def tmdb_get_json(path: str, params: dict, token: str) -> dict:
url = TMDB_API_BASE + path + "?" + urllib.parse.urlencode(params)
url = TMDB_API_BASE + path
if params:
url += "?" + urllib.parse.urlencode(params)
req = urllib.request.Request(
url,
headers={
@@ -30,89 +32,74 @@ def tmdb_get_json(path: str, params: dict, token: str) -> dict:
"User-Agent": "anime-movies-script/1.0",
},
)
with urllib.request.urlopen(req, timeout=20) as resp:
return json.loads(resp.read().decode("utf-8"))
def poster_url(path: str) -> str:
text = (path or "").strip()
if not text:
return ""
return "https://image.tmdb.org/t/p/w780" + text
def fetch_genre_map(token: str, language: str = "de-DE") -> dict[int, str]:
payload = tmdb_get_json("/genre/movie/list", {"language": language}, token)
result = {}
for entry in payload.get("genres", []):
try:
gid = int(entry.get("id"))
except Exception:
continue
name = (entry.get("name") or "").strip()
if name:
result[gid] = name
return result
def fetch_tmdb_anime_movies(start_date: date, end_date: date, token: str, language: str = "de-DE") -> list[dict]:
if not token:
def search_tmdb_movies(title: str, token: str, language: str = "en-US") -> list[dict]:
query = (title or "").strip()
if not query:
return []
genre_map = fetch_genre_map(token, language=language)
page = 1
results = []
seen_ids = set()
while True:
params = {
payload = tmdb_get_json(
"/search/movie",
{
"query": query,
"include_adult": "false",
"include_video": "false",
"language": language,
"sort_by": "primary_release_date.desc",
"primary_release_date.gte": start_date.isoformat(),
"primary_release_date.lte": end_date.isoformat(),
"with_keywords": TMDB_ANIME_KEYWORD_ID,
"page": str(page),
}
payload = tmdb_get_json("/discover/movie", params, token)
page_results = payload.get("results", [])
if not page_results:
break
"page": "1",
},
token,
)
for movie in page_results:
movie_id = movie.get("id")
if movie_id in seen_ids:
continue
results = []
for item in payload.get("results", []):
movie_id = item.get("id")
if not movie_id:
continue
release_date = parse_release_date(str(movie.get("release_date") or ""))
if not release_date:
continue
seen_ids.add(movie_id)
genre_names = [genre_map.get(gid, "") for gid in (movie.get("genre_ids") or [])]
genre_names = [name for name in genre_names if name]
results.append(
{
"tmdb_id": movie_id,
"title": (movie.get("title") or "").strip(),
"original_title": (movie.get("original_title") or "").strip(),
"overview": (movie.get("overview") or "").strip(),
"release_date": release_date,
"genres": genre_names,
"genres_text": ", ".join(genre_names),
"poster_url": poster_url(str(movie.get("poster_path") or "")),
"popularity": movie.get("popularity"),
"vote_average": movie.get("vote_average"),
"vote_count": movie.get("vote_count"),
"raw": movie,
}
)
total_pages = int(payload.get("total_pages") or page)
if page >= total_pages:
break
page += 1
results.append(
{
"tmdb_id": int(movie_id),
"title": (item.get("title") or "").strip(),
"original_title": (item.get("original_title") or "").strip(),
"release_date": parse_release_date(str(item.get("release_date") or "")),
}
)
return results
def fetch_tmdb_release_dates(movie_id: int, token: str) -> dict:
return tmdb_get_json(f"/movie/{movie_id}/release_dates", {}, token)
def extract_de_theatrical_dates(release_payload: dict) -> list[date]:
german_block = None
for result in release_payload.get("results", []):
if str(result.get("iso_3166_1") or "").upper() == "DE":
german_block = result
break
if not german_block:
return []
dates = []
for entry in german_block.get("release_dates", []):
release_type = int(entry.get("type") or 0)
if release_type not in {2, 3}:
continue
release_date = parse_release_date(str(entry.get("release_date") or ""))
if release_date:
dates.append(release_date)
return sorted(dates)
def select_release_in_range(release_dates: list[date], start_date: date, end_date: date) -> date | None:
for release_date in sorted(release_dates):
if start_date <= release_date <= end_date:
return release_date
return None