Files
Anime-Movies-Upcomming-Real…/data_sources/anilist_source.py
2026-04-22 10:04:59 +02:00

142 lines
4.4 KiB
Python

#!/usr/bin/env python3
from __future__ import annotations
import json
import urllib.error
import urllib.request
from datetime import date, timedelta
ANILIST_URL = "https://graphql.anilist.co"
def yyyymmdd(value: date) -> int:
return value.year * 10000 + value.month * 100 + value.day
def post_graphql(query: str, variables: dict) -> dict:
payload = json.dumps({"query": query, "variables": variables}).encode("utf-8")
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
"User-Agent": "anime-movies-script/1.0",
}
req = urllib.request.Request(ANILIST_URL, data=payload, headers=headers)
try:
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode("utf-8"))
except urllib.error.HTTPError as exc:
body = exc.read().decode("utf-8", "replace")
raise RuntimeError(f"HTTP {exc.code}: {body}") from exc
def map_anilist_movie(media: dict | None) -> dict:
media = media or {}
title = media.get("title") or {}
start_date = media.get("startDate") or {}
studios = ((media.get("studios") or {}).get("nodes") or [])
studio_names = []
for node in studios:
name = ((node or {}).get("name") or "").strip()
if name:
studio_names.append(name)
genres = []
for entry in media.get("genres") or []:
text = str(entry or "").strip()
if text:
genres.append(text)
tags = []
for tag in media.get("tags") or []:
if not isinstance(tag, dict):
continue
name = (tag.get("name") or "").strip()
rank = int(tag.get("rank") or 0)
if name and rank >= 70:
tags.append(name)
return {
"anilist_id": int(media.get("id") or 0),
"title_english": (title.get("english") or "").strip(),
"title_romaji": (title.get("romaji") or "").strip(),
"title_native": (title.get("native") or "").strip(),
"cover_image": (((media.get("coverImage") or {}).get("large")) or "").strip(),
"anilist_url": (media.get("siteUrl") or "").strip(),
"start_year": int(start_date.get("year") or 0),
"format": (media.get("format") or "").strip() or "MOVIE",
"description": (media.get("description") or "").strip(),
"genres_text": ", ".join(genres),
"tags_text": ", ".join(tags),
"studio_text": ", ".join(studio_names),
}
def fetch_anilist_movie_candidates(today: date, years_window: int = 1) -> list[dict]:
start_date = today - timedelta(days=365 * years_window)
end_date = today + timedelta(days=365 * years_window)
query = """
query ($page: Int, $perPage: Int, $start: FuzzyDateInt, $end: FuzzyDateInt) {
Page(page: $page, perPage: $perPage) {
pageInfo { hasNextPage }
media(
type: ANIME
format: MOVIE
countryOfOrigin: JP
sort: [POPULARITY_DESC, START_DATE_DESC]
startDate_greater: $start
startDate_lesser: $end
) {
id
title { english romaji native }
startDate { year }
format
description(asHtml: false)
genres
tags { name rank }
coverImage { large }
studios { nodes { name } }
siteUrl
}
}
}
"""
page = 1
results = []
seen_ids = set()
while True:
payload = post_graphql(
query,
{
"page": page,
"perPage": 50,
"start": yyyymmdd(start_date) - 1,
"end": yyyymmdd(end_date) + 1,
},
)
if "errors" in payload:
raise RuntimeError(payload["errors"])
page_data = payload.get("data", {}).get("Page", {})
for media in page_data.get("media", []):
mapped = map_anilist_movie(media)
anilist_id = mapped.get("anilist_id") or 0
if anilist_id <= 0 or anilist_id in seen_ids:
continue
seen_ids.add(anilist_id)
if not mapped.get("title_english") and not mapped.get("title_romaji") and not mapped.get("title_native"):
continue
results.append(mapped)
if not page_data.get("pageInfo", {}).get("hasNextPage"):
break
page += 1
return results