Add modular TMDb-first movie pipeline and Discord bot

This commit is contained in:
ProgrammGamer
2026-04-21 21:43:35 +02:00
commit bbba110268
13 changed files with 1283 additions and 0 deletions

1
data_sources/__init__.py Normal file
View File

@@ -0,0 +1 @@
#!/usr/bin/env python3

View File

@@ -0,0 +1,176 @@
#!/usr/bin/env python3
from __future__ import annotations
import json
import urllib.error
import urllib.request
from datetime import date
ANILIST_URL = "https://graphql.anilist.co"
def normalize_title(text: str) -> str:
cleaned = "".join(ch.lower() if ch.isalnum() else " " for ch in (text or ""))
return " ".join(cleaned.split())
def safe_date(sd: dict | None) -> date | None:
if not sd:
return None
year = sd.get("year") or 0
month = sd.get("month") or 0
day = sd.get("day") or 0
if year <= 0 or month <= 0:
return None
if day <= 0:
day = 1
try:
return date(year, month, day)
except ValueError:
return None
def post_graphql(query: str, variables: dict) -> dict:
payload = json.dumps({"query": query, "variables": variables}).encode("utf-8")
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
"User-Agent": "anime-movies-script/1.0",
}
req = urllib.request.Request(ANILIST_URL, data=payload, headers=headers)
try:
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode("utf-8"))
except urllib.error.HTTPError as exc:
body = exc.read().decode("utf-8", "replace")
raise RuntimeError(f"HTTP {exc.code}: {body}") from exc
def pick_best_title(title: dict) -> str:
english = (title.get("english") or "").strip()
if english:
return english
romaji = (title.get("romaji") or "").strip()
if romaji:
return romaji
native = (title.get("native") or "").strip()
return native or ""
def map_anilist_media(media: dict | None) -> dict:
media = media or {}
title = media.get("title") or {}
studios = ((media.get("studios") or {}).get("nodes") or [])
studio_names = [((node or {}).get("name") or "").strip() for node in studios]
studio_names = [name for name in studio_names if name]
genres = [str(item).strip() for item in (media.get("genres") or [])]
genres = [g for g in genres if g]
tags_raw = media.get("tags") or []
tags = []
for tag in tags_raw:
if not isinstance(tag, dict):
continue
name = (tag.get("name") or "").strip()
rank = int(tag.get("rank") or 0)
if name:
tags.append({"name": name, "rank": rank})
mapped = {
"id": media.get("id"),
"title_best": pick_best_title(title),
"title_english": (title.get("english") or "").strip(),
"title_romaji": (title.get("romaji") or "").strip(),
"title_native": (title.get("native") or "").strip(),
"start_date": safe_date(media.get("startDate")),
"format": (media.get("format") or "").strip(),
"episodes": media.get("episodes"),
"duration": media.get("duration"),
"source": str(media.get("source") or "").replace("_", " ").title(),
"description": (media.get("description") or "").strip(),
"genres": genres,
"genres_text": ", ".join(genres) if genres else "",
"tags": tags,
"tags_text": ", ".join(tag["name"] for tag in tags if tag["rank"] >= 70) or "",
"studio_names": studio_names,
"studio_text": ", ".join(studio_names) if studio_names else "",
"anilist_url": (media.get("siteUrl") or "").strip(),
"cover_image": (((media.get("coverImage") or {}).get("large")) or "").strip(),
"raw": media,
}
return mapped
def fetch_anilist_movie_by_search(search_text: str, cache: dict[str, dict | None]) -> dict | None:
key = normalize_title(search_text)
if key in cache:
return cache[key]
query = """
query ($search: String, $perPage: Int) {
Page(page: 1, perPage: $perPage) {
media(type: ANIME, format: MOVIE, search: $search, sort: [SEARCH_MATCH, POPULARITY_DESC]) {
id
title { english romaji native }
startDate { year month day }
format
episodes
duration
source
description(asHtml: false)
genres
tags { name rank }
coverImage { large }
studios { nodes { name } }
siteUrl
}
}
}
"""
try:
data = post_graphql(query, {"search": search_text, "perPage": 5})
except Exception:
cache[key] = None
return None
if "errors" in data:
cache[key] = None
return None
candidates = data.get("data", {}).get("Page", {}).get("media", [])
if not candidates:
cache[key] = None
return None
wanted = normalize_title(search_text)
best = None
best_score = -1
for media in candidates:
title = media.get("title") or {}
options = [title.get("english"), title.get("romaji"), title.get("native")]
score = 0
for option in options:
normalized = normalize_title(str(option or ""))
if not normalized:
continue
if normalized == wanted:
score = max(score, 3)
elif wanted and (wanted in normalized or normalized in wanted):
score = max(score, 2)
elif normalized.split(" ")[:2] == wanted.split(" ")[:2]:
score = max(score, 1)
if score > best_score:
best_score = score
best = media
if score == 3:
break
if best_score <= 0:
cache[key] = None
return None
mapped = map_anilist_media(best) if best else None
cache[key] = mapped
return mapped

View File

@@ -0,0 +1,261 @@
#!/usr/bin/env python3
from __future__ import annotations
import json
import urllib.parse
import urllib.request
ANIMESCHEDULE_API_BASE = "https://animeschedule.net/api/v3"
def normalize_title(text: str) -> str:
cleaned = "".join(ch.lower() if ch.isalnum() else " " for ch in (text or ""))
return " ".join(cleaned.split())
def flatten_items(payload: dict | list) -> list[dict]:
if isinstance(payload, list):
return [item for item in payload if isinstance(item, dict)]
if not isinstance(payload, dict):
return []
for key in ("anime", "items", "data", "results"):
value = payload.get(key)
if isinstance(value, list):
return [item for item in value if isinstance(item, dict)]
return []
def extract_names(item: dict) -> list[str]:
names = []
direct = [item.get("title"), item.get("name"), item.get("romaji"), item.get("english"), item.get("native")]
for candidate in direct:
text = (candidate or "").strip()
if text:
names.append(text)
nested = item.get("names") or {}
if isinstance(nested, dict):
for candidate in nested.values():
if isinstance(candidate, list):
for value in candidate:
text = (str(value) if value is not None else "").strip()
if text:
names.append(text)
continue
text = (str(candidate) if candidate is not None else "").strip()
if text:
names.append(text)
unique = []
seen = set()
for name in names:
lowered = name.lower()
if lowered in seen:
continue
seen.add(lowered)
unique.append(name)
return unique
def extract_url(item: dict) -> str:
slug = (item.get("slug") or item.get("route") or item.get("id") or "").strip()
if slug:
return f"https://animeschedule.net/anime/{slug}"
websites = item.get("websites") or item.get("links") or []
if isinstance(websites, list):
for entry in websites:
if not isinstance(entry, dict):
continue
url = (entry.get("url") or "").strip()
if url:
return url
return ""
def extract_english_title(item: dict) -> str:
direct = [item.get("english"), item.get("titleEnglish"), item.get("englishTitle")]
for candidate in direct:
text = (candidate or "").strip()
if text:
return text
names = item.get("names") or {}
if isinstance(names, dict):
for key, value in names.items():
key_norm = str(key).strip().lower()
if key_norm in {"en", "eng", "english", "titleenglish"}:
text = (str(value) if value is not None else "").strip()
if text:
return text
return ""
def extract_list(item: dict, *keys: str) -> str:
for key in keys:
value = item.get(key)
if not value:
continue
if isinstance(value, str):
text = value.strip()
if text:
return text
if isinstance(value, list):
names = []
for entry in value:
if isinstance(entry, str):
text = entry.strip()
elif isinstance(entry, dict):
text = (entry.get("name") or entry.get("title") or "").strip()
else:
text = ""
if text:
names.append(text)
if names:
return ", ".join(names)
return ""
def extract_format(item: dict) -> str:
for key in ("format", "mediaType", "type"):
value = item.get(key)
text = (str(value) if value is not None else "").strip()
if text:
return text
return ""
def extract_description(item: dict) -> str:
for key in ("description", "synopsis", "overview", "summary"):
value = item.get(key)
text = (str(value) if value is not None else "").strip()
if text:
return text
details = item.get("details") or {}
if isinstance(details, dict):
for key in ("description", "synopsis", "overview", "summary"):
value = details.get(key)
text = (str(value) if value is not None else "").strip()
if text:
return text
return ""
def empty_result() -> dict:
return {
"url": "",
"title": "",
"english_title": "",
"format": "",
"genres": "",
"studios": "",
"description": "",
"names": [],
"match_score": 0,
"raw": {},
}
def title_match_score(wanted: str, names: list[str]) -> int:
wanted_norm = normalize_title(wanted)
if not wanted_norm:
return 0
best = 0
wanted_parts = wanted_norm.split(" ")
for candidate in names:
normalized = normalize_title(candidate)
if not normalized:
continue
if normalized == wanted_norm:
best = max(best, 4)
continue
if wanted_norm in normalized or normalized in wanted_norm:
best = max(best, 3)
continue
normalized_parts = normalized.split(" ")
if normalized_parts[:2] == wanted_parts[:2] and len(normalized_parts) >= 2 and len(wanted_parts) >= 2:
best = max(best, 2)
continue
if set(normalized_parts) & set(wanted_parts):
best = max(best, 1)
return best
def fetch_animeschedule_anime_by_title(title: str, token: str, cache: dict[str, dict]) -> dict:
key = normalize_title(title)
if key in cache:
return cache[key]
if not token:
cache[key] = empty_result()
return cache[key]
params = {"search": title, "take": "10"}
url = ANIMESCHEDULE_API_BASE + "/anime?" + urllib.parse.urlencode(params)
req = urllib.request.Request(
url,
headers={
"Accept": "application/json",
"Authorization": f"Bearer {token}",
"User-Agent": "anime-movies-script/1.0",
},
)
try:
with urllib.request.urlopen(req, timeout=20) as resp:
payload = json.loads(resp.read().decode("utf-8"))
except Exception:
cache[key] = empty_result()
return cache[key]
candidates = flatten_items(payload)
if not candidates:
cache[key] = empty_result()
return cache[key]
best = None
best_score = -1
for item in candidates:
names = extract_names(item)
if not names:
continue
score = title_match_score(title, names)
if score <= 0:
continue
if score > best_score:
best_score = score
best = item
if score == 4:
break
if not best:
cache[key] = empty_result()
return cache[key]
names = extract_names(best)
cache[key] = {
"url": extract_url(best),
"title": names[0] if names else "",
"english_title": extract_english_title(best),
"format": extract_format(best),
"genres": extract_list(best, "genres", "genre", "categories"),
"studios": extract_list(best, "studios", "studio"),
"description": extract_description(best),
"names": names,
"match_score": best_score,
"raw": best,
}
return cache[key]

118
data_sources/tmdb_source.py Normal file
View File

@@ -0,0 +1,118 @@
#!/usr/bin/env python3
from __future__ import annotations
import json
import urllib.parse
import urllib.request
from datetime import date
TMDB_API_BASE = "https://api.themoviedb.org/3"
TMDB_ANIME_KEYWORD_ID = "210024"
def parse_release_date(value: str) -> date | None:
text = (value or "").strip()
if not text:
return None
try:
return date.fromisoformat(text[:10])
except ValueError:
return None
def tmdb_get_json(path: str, params: dict, token: str) -> dict:
url = TMDB_API_BASE + path + "?" + urllib.parse.urlencode(params)
req = urllib.request.Request(
url,
headers={
"Accept": "application/json",
"Authorization": f"Bearer {token}",
"User-Agent": "anime-movies-script/1.0",
},
)
with urllib.request.urlopen(req, timeout=20) as resp:
return json.loads(resp.read().decode("utf-8"))
def poster_url(path: str) -> str:
text = (path or "").strip()
if not text:
return ""
return "https://image.tmdb.org/t/p/w780" + text
def fetch_genre_map(token: str, language: str = "de-DE") -> dict[int, str]:
payload = tmdb_get_json("/genre/movie/list", {"language": language}, token)
result = {}
for entry in payload.get("genres", []):
try:
gid = int(entry.get("id"))
except Exception:
continue
name = (entry.get("name") or "").strip()
if name:
result[gid] = name
return result
def fetch_tmdb_anime_movies(start_date: date, end_date: date, token: str, language: str = "de-DE") -> list[dict]:
if not token:
return []
genre_map = fetch_genre_map(token, language=language)
page = 1
results = []
seen_ids = set()
while True:
params = {
"include_adult": "false",
"include_video": "false",
"language": language,
"sort_by": "primary_release_date.desc",
"primary_release_date.gte": start_date.isoformat(),
"primary_release_date.lte": end_date.isoformat(),
"with_keywords": TMDB_ANIME_KEYWORD_ID,
"page": str(page),
}
payload = tmdb_get_json("/discover/movie", params, token)
page_results = payload.get("results", [])
if not page_results:
break
for movie in page_results:
movie_id = movie.get("id")
if movie_id in seen_ids:
continue
release_date = parse_release_date(str(movie.get("release_date") or ""))
if not release_date:
continue
seen_ids.add(movie_id)
genre_names = [genre_map.get(gid, "") for gid in (movie.get("genre_ids") or [])]
genre_names = [name for name in genre_names if name]
results.append(
{
"tmdb_id": movie_id,
"title": (movie.get("title") or "").strip(),
"original_title": (movie.get("original_title") or "").strip(),
"overview": (movie.get("overview") or "").strip(),
"release_date": release_date,
"genres": genre_names,
"genres_text": ", ".join(genre_names),
"poster_url": poster_url(str(movie.get("poster_path") or "")),
"popularity": movie.get("popularity"),
"vote_average": movie.get("vote_average"),
"vote_count": movie.get("vote_count"),
"raw": movie,
}
)
total_pages = int(payload.get("total_pages") or page)
if page >= total_pages:
break
page += 1
return results