diff --git a/.env.example b/.env.example index ec69e3f..c0cdb0b 100644 --- a/.env.example +++ b/.env.example @@ -1,6 +1,6 @@ -DISCORD_BOT_TOKEN=your_discord_bot_token -DISCORD_GUILD_ID=your_discord_guild_id -ANIMESCHEDULE_APP_ID=your_animeschedule_app_id -ANIMESCHEDULE_API_TOKEN=your_animeschedule_api_token -TMDB_READ_ACCESS_TOKEN=your_tmdb_read_access_token -LOCALE=de-DE +DISCORD_BOT_TOKEN=your_discord_bot_token +DISCORD_GUILD_ID=your_discord_guild_id +ANIMESCHEDULE_APP_ID=your_animeschedule_app_id +ANIMESCHEDULE_API_TOKEN=your_animeschedule_api_token +TMDB_READ_ACCESS_TOKEN=your_tmdb_read_access_token +LOCALE=de-DE diff --git a/.gitignore b/.gitignore index 09355de..0a94dc2 100644 --- a/.gitignore +++ b/.gitignore @@ -1,206 +1,206 @@ -# ---> Python -# Byte-compiled / optimized / DLL files -__pycache__/ -*.py[cod] -*$py.class - -# C extensions -*.so - -# Distribution / packaging -.Python -build/ -develop-eggs/ -dist/ -downloads/ -eggs/ -.eggs/ -lib/ -lib64/ -parts/ -sdist/ -var/ -wheels/ -share/python-wheels/ -*.egg-info/ -.installed.cfg -*.egg -MANIFEST - -# PyInstaller -# Usually these files are written by a python script from a template -# before PyInstaller builds the exe, so as to inject date/other infos into it. -*.manifest -*.spec - -# Installer logs -pip-log.txt -pip-delete-this-directory.txt - -# Unit test / coverage reports -htmlcov/ -.tox/ -.nox/ -.coverage -.coverage.* -.cache -nosetests.xml -coverage.xml -*.cover -*.py,cover -.hypothesis/ -.pytest_cache/ -cover/ - -# Translations -*.mo -*.pot - -# Django stuff: -*.log -local_settings.py -db.sqlite3 -db.sqlite3-journal - -# Flask stuff: -instance/ -.webassets-cache - -# Scrapy stuff: -.scrapy - -# Sphinx documentation -docs/_build/ - -# PyBuilder -.pybuilder/ -target/ - -# Jupyter Notebook -.ipynb_checkpoints - -# IPython -profile_default/ -ipython_config.py - -# pyenv -# For a library or package, you might want to ignore these files since the code is -# intended to run in multiple environments; otherwise, check them in: -# .python-version - -# pipenv -# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. -# However, in case of collaboration, if having platform-specific dependencies or dependencies -# having no cross-platform support, pipenv may install dependencies that don't work, or not -# install all needed dependencies. -#Pipfile.lock - -# UV -# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control. -# This is especially recommended for binary packages to ensure reproducibility, and is more -# commonly ignored for libraries. -#uv.lock - -# poetry -# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. -# This is especially recommended for binary packages to ensure reproducibility, and is more -# commonly ignored for libraries. -# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control -#poetry.lock - -# pdm -# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. -#pdm.lock -# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it -# in version control. -# https://pdm.fming.dev/latest/usage/project/#working-with-version-control -.pdm.toml -.pdm-python -.pdm-build/ - -# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm -__pypackages__/ - -# Celery stuff -celerybeat-schedule -celerybeat.pid - -# SageMath parsed files -*.sage.py - -# Environments -.env -.venv -env/ -venv/ -ENV/ -env.bak/ -venv.bak/ - -# Spyder project settings -.spyderproject -.spyproject - -# Rope project settings -.ropeproject - -# mkdocs documentation -/site - -# mypy -.mypy_cache/ -.dmypy.json -dmypy.json - -# Pyre type checker -.pyre/ - -# pytype static type analyzer -.pytype/ - -# Cython debug symbols -cython_debug/ - -# PyCharm -# JetBrains specific template is maintained in a separate JetBrains.gitignore that can -# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore -# and can be added to the global gitignore or merged into this file. For a more nuclear -# option (not recommended) you can uncomment the following to ignore the entire idea folder. -#.idea/ - -# Ruff stuff: -.ruff_cache/ - -# PyPI configuration file -.pypirc - -# ---> VisualStudioCode -.vscode/* -!.vscode/settings.json -!.vscode/tasks.json -!.vscode/launch.json -!.vscode/extensions.json -!.vscode/*.code-snippets - -# Local History for Visual Studio Code -.history/ - -# Built Visual Studio Code Extensions -*.vsix - -# ---> VirtualEnv -# Virtualenv -# http://iamzed.com/2009/05/07/a-primer-on-virtualenv/ -.Python -[Bb]in -[Ii]nclude -[Ll]ib -[Ll]ib64 -[Ll]ocal -[Ss]cripts -pyvenv.cfg -.venv -pip-selfcheck.json - - -env/ +# ---> Python +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# UV +# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +#uv.lock + +# poetry +# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control +#poetry.lock + +# pdm +# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. +#pdm.lock +# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it +# in version control. +# https://pdm.fming.dev/latest/usage/project/#working-with-version-control +.pdm.toml +.pdm-python +.pdm-build/ + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# PyCharm +# JetBrains specific template is maintained in a separate JetBrains.gitignore that can +# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore +# and can be added to the global gitignore or merged into this file. For a more nuclear +# option (not recommended) you can uncomment the following to ignore the entire idea folder. +#.idea/ + +# Ruff stuff: +.ruff_cache/ + +# PyPI configuration file +.pypirc + +# ---> VisualStudioCode +.vscode/* +!.vscode/settings.json +!.vscode/tasks.json +!.vscode/launch.json +!.vscode/extensions.json +!.vscode/*.code-snippets + +# Local History for Visual Studio Code +.history/ + +# Built Visual Studio Code Extensions +*.vsix + +# ---> VirtualEnv +# Virtualenv +# http://iamzed.com/2009/05/07/a-primer-on-virtualenv/ +.Python +[Bb]in +[Ii]nclude +[Ll]ib +[Ll]ib64 +[Ll]ocal +[Ss]cripts +pyvenv.cfg +.venv +pip-selfcheck.json + + +env/ diff --git a/README.md b/README.md index 635992a..76ff915 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,2 @@ -# Anime-Movies-Upcomming-Realese-Bot - +# Anime-Movies-Upcomming-Realese-Bot + diff --git a/anime_movies_months.py b/anime_movies_months.py index 36a646b..0eff04e 100644 --- a/anime_movies_months.py +++ b/anime_movies_months.py @@ -1,77 +1,77 @@ -#!/usr/bin/env python3 -from __future__ import annotations - -import sys -from datetime import date - -from app_config import get_settings -from movie_pipeline import get_upcoming_movie_records - - -def truncate(text: str, max_len: int) -> str: - if len(text) <= max_len: - return text - if max_len <= 3: - return text[:max_len] - return text[: max_len - 3] + "..." - - -def format_table(headers: list[str], rows: list[list[str]]) -> str: - max_widths = [36, 24, 24, 12, 8, 60] - default_max_width = 40 - widths = [len(h) for h in headers] - for row in rows: - for i, cell in enumerate(row): - max_width = max_widths[i] if i < len(max_widths) else default_max_width - widths[i] = min(max(widths[i], len(cell)), max_width) - - lines = [] - header_line = " | ".join(truncate(h, widths[i]).ljust(widths[i]) for i, h in enumerate(headers)) - sep_line = "-+-".join("-" * w for w in widths) - lines.append(header_line) - lines.append(sep_line) - - for row in rows: - line = " | ".join(truncate(row[i], widths[i]).ljust(widths[i]) for i in range(len(headers))) - lines.append(line) - - return "\n".join(lines) - - -def get_upcoming_movie_rows(locale: str, today: date | None = None) -> list[list[str]]: - records = get_upcoming_movie_records(locale, today=today) - rows = [] - for item in records: - rows.append( - [ - item["title"], - item["studio"], - item["genres"], - item["release"], - item["anilist_url"], - ] - ) - return rows - - -def main() -> int: - settings = get_settings() - locale = settings.locale - - try: - rows = get_upcoming_movie_rows(locale) - except Exception as exc: - print(f"Fehler beim Datenabruf: {exc}", file=sys.stderr) - return 1 - - if not rows: - print("Keine Anime Filme fuer diesen und naechsten Monat gefunden.") - return 0 - - headers = ["Title", "Studio", "Genres", "Release", "AniList"] - print(format_table(headers, rows)) - return 0 - - -if __name__ == "__main__": - raise SystemExit(main()) +#!/usr/bin/env python3 +from __future__ import annotations + +import sys +from datetime import date + +from app_config import get_settings +from movie_pipeline import get_upcoming_movie_records + + +def truncate(text: str, max_len: int) -> str: + if len(text) <= max_len: + return text + if max_len <= 3: + return text[:max_len] + return text[: max_len - 3] + "..." + + +def format_table(headers: list[str], rows: list[list[str]]) -> str: + max_widths = [36, 24, 24, 12, 8, 60] + default_max_width = 40 + widths = [len(h) for h in headers] + for row in rows: + for i, cell in enumerate(row): + max_width = max_widths[i] if i < len(max_widths) else default_max_width + widths[i] = min(max(widths[i], len(cell)), max_width) + + lines = [] + header_line = " | ".join(truncate(h, widths[i]).ljust(widths[i]) for i, h in enumerate(headers)) + sep_line = "-+-".join("-" * w for w in widths) + lines.append(header_line) + lines.append(sep_line) + + for row in rows: + line = " | ".join(truncate(row[i], widths[i]).ljust(widths[i]) for i in range(len(headers))) + lines.append(line) + + return "\n".join(lines) + + +def get_upcoming_movie_rows(locale: str, today: date | None = None) -> list[list[str]]: + records = get_upcoming_movie_records(locale, today=today) + rows = [] + for item in records: + rows.append( + [ + item["title"], + item["studio"], + item["genres"], + item["release"], + item["anilist_url"], + ] + ) + return rows + + +def main() -> int: + settings = get_settings() + locale = settings.locale + + try: + rows = get_upcoming_movie_rows(locale) + except Exception as exc: + print(f"Fehler beim Datenabruf: {exc}", file=sys.stderr) + return 1 + + if not rows: + print("Keine Anime Filme fuer diesen und naechsten Monat gefunden.") + return 0 + + headers = ["Title", "Studio", "Genres", "Release", "AniList"] + print(format_table(headers, rows)) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/app_config.py b/app_config.py index fe0a164..4fd0f10 100644 --- a/app_config.py +++ b/app_config.py @@ -1,43 +1,43 @@ -#!/usr/bin/env python3 -from __future__ import annotations - -import os -from dataclasses import dataclass -from pathlib import Path - - -def load_dotenv(path: str = ".env") -> None: - dotenv_path = Path(path) - if not dotenv_path.exists(): - return - - for raw_line in dotenv_path.read_text(encoding="utf-8").splitlines(): - line = raw_line.strip() - if not line or line.startswith("#") or "=" not in line: - continue - - key, value = line.split("=", 1) - key = key.strip() - value = value.strip().strip('"').strip("'") - if key and key not in os.environ: - os.environ[key] = value - - -@dataclass(frozen=True) -class AppSettings: - discord_bot_token: str - discord_guild_id: str - locale: str - animeschedule_api_token: str - tmdb_read_access_token: str - - -def get_settings() -> AppSettings: - load_dotenv() - return AppSettings( - discord_bot_token=os.environ.get("DISCORD_BOT_TOKEN", "").strip(), - discord_guild_id=os.environ.get("DISCORD_GUILD_ID", "").strip(), - locale=os.environ.get("LOCALE", "de-DE").strip() or "de-DE", - animeschedule_api_token=os.environ.get("ANIMESCHEDULE_API_TOKEN", "").strip(), - tmdb_read_access_token=os.environ.get("TMDB_READ_ACCESS_TOKEN", "").strip(), - ) +#!/usr/bin/env python3 +from __future__ import annotations + +import os +from dataclasses import dataclass +from pathlib import Path + + +def load_dotenv(path: str = ".env") -> None: + dotenv_path = Path(path) + if not dotenv_path.exists(): + return + + for raw_line in dotenv_path.read_text(encoding="utf-8").splitlines(): + line = raw_line.strip() + if not line or line.startswith("#") or "=" not in line: + continue + + key, value = line.split("=", 1) + key = key.strip() + value = value.strip().strip('"').strip("'") + if key and key not in os.environ: + os.environ[key] = value + + +@dataclass(frozen=True) +class AppSettings: + discord_bot_token: str + discord_guild_id: str + locale: str + animeschedule_api_token: str + tmdb_read_access_token: str + + +def get_settings() -> AppSettings: + load_dotenv() + return AppSettings( + discord_bot_token=os.environ.get("DISCORD_BOT_TOKEN", "").strip(), + discord_guild_id=os.environ.get("DISCORD_GUILD_ID", "").strip(), + locale=os.environ.get("LOCALE", "de-DE").strip() or "de-DE", + animeschedule_api_token=os.environ.get("ANIMESCHEDULE_API_TOKEN", "").strip(), + tmdb_read_access_token=os.environ.get("TMDB_READ_ACCESS_TOKEN", "").strip(), + ) diff --git a/data_sources/__init__.py b/data_sources/__init__.py index e5a0d9b..2ceba5b 100644 --- a/data_sources/__init__.py +++ b/data_sources/__init__.py @@ -1 +1 @@ -#!/usr/bin/env python3 +#!/usr/bin/env python3 diff --git a/data_sources/anilist_source.py b/data_sources/anilist_source.py index e7555d1..4ceca85 100644 --- a/data_sources/anilist_source.py +++ b/data_sources/anilist_source.py @@ -1,141 +1,141 @@ -#!/usr/bin/env python3 -from __future__ import annotations - -import json -import urllib.error -import urllib.request -from datetime import date, timedelta - -ANILIST_URL = "https://graphql.anilist.co" - - -def yyyymmdd(value: date) -> int: - return value.year * 10000 + value.month * 100 + value.day - - -def post_graphql(query: str, variables: dict) -> dict: - payload = json.dumps({"query": query, "variables": variables}).encode("utf-8") - headers = { - "Content-Type": "application/json", - "Accept": "application/json", - "User-Agent": "anime-movies-script/1.0", - } - req = urllib.request.Request(ANILIST_URL, data=payload, headers=headers) - try: - with urllib.request.urlopen(req, timeout=30) as resp: - return json.loads(resp.read().decode("utf-8")) - except urllib.error.HTTPError as exc: - body = exc.read().decode("utf-8", "replace") - raise RuntimeError(f"HTTP {exc.code}: {body}") from exc - - -def map_anilist_movie(media: dict | None) -> dict: - media = media or {} - title = media.get("title") or {} - start_date = media.get("startDate") or {} - studios = ((media.get("studios") or {}).get("nodes") or []) - - studio_names = [] - for node in studios: - name = ((node or {}).get("name") or "").strip() - if name: - studio_names.append(name) - - genres = [] - for entry in media.get("genres") or []: - text = str(entry or "").strip() - if text: - genres.append(text) - - tags = [] - for tag in media.get("tags") or []: - if not isinstance(tag, dict): - continue - name = (tag.get("name") or "").strip() - rank = int(tag.get("rank") or 0) - if name and rank >= 70: - tags.append(name) - - return { - "anilist_id": int(media.get("id") or 0), - "title_english": (title.get("english") or "").strip(), - "title_romaji": (title.get("romaji") or "").strip(), - "title_native": (title.get("native") or "").strip(), - "cover_image": (((media.get("coverImage") or {}).get("large")) or "").strip(), - "anilist_url": (media.get("siteUrl") or "").strip(), - "start_year": int(start_date.get("year") or 0), - "format": (media.get("format") or "").strip() or "MOVIE", - "description": (media.get("description") or "").strip(), - "genres_text": ", ".join(genres), - "tags_text": ", ".join(tags), - "studio_text": ", ".join(studio_names), - } - - -def fetch_anilist_movie_candidates(today: date, years_window: int = 1) -> list[dict]: - start_date = today - timedelta(days=365 * years_window) - end_date = today + timedelta(days=365 * years_window) - - query = """ - query ($page: Int, $perPage: Int, $start: FuzzyDateInt, $end: FuzzyDateInt) { - Page(page: $page, perPage: $perPage) { - pageInfo { hasNextPage } - media( - type: ANIME - format: MOVIE - countryOfOrigin: JP - sort: [POPULARITY_DESC, START_DATE_DESC] - startDate_greater: $start - startDate_lesser: $end - ) { - id - title { english romaji native } - startDate { year } - format - description(asHtml: false) - genres - tags { name rank } - coverImage { large } - studios { nodes { name } } - siteUrl - } - } - } - """ - - page = 1 - results = [] - seen_ids = set() - - while True: - payload = post_graphql( - query, - { - "page": page, - "perPage": 50, - "start": yyyymmdd(start_date) - 1, - "end": yyyymmdd(end_date) + 1, - }, - ) - - if "errors" in payload: - raise RuntimeError(payload["errors"]) - - page_data = payload.get("data", {}).get("Page", {}) - for media in page_data.get("media", []): - mapped = map_anilist_movie(media) - anilist_id = mapped.get("anilist_id") or 0 - if anilist_id <= 0 or anilist_id in seen_ids: - continue - seen_ids.add(anilist_id) - - if not mapped.get("title_english") and not mapped.get("title_romaji") and not mapped.get("title_native"): - continue - - results.append(mapped) - - if not page_data.get("pageInfo", {}).get("hasNextPage"): - break - page += 1 - - return results +#!/usr/bin/env python3 +from __future__ import annotations + +import json +import urllib.error +import urllib.request +from datetime import date, timedelta + +ANILIST_URL = "https://graphql.anilist.co" + + +def yyyymmdd(value: date) -> int: + return value.year * 10000 + value.month * 100 + value.day + + +def post_graphql(query: str, variables: dict) -> dict: + payload = json.dumps({"query": query, "variables": variables}).encode("utf-8") + headers = { + "Content-Type": "application/json", + "Accept": "application/json", + "User-Agent": "anime-movies-script/1.0", + } + req = urllib.request.Request(ANILIST_URL, data=payload, headers=headers) + try: + with urllib.request.urlopen(req, timeout=30) as resp: + return json.loads(resp.read().decode("utf-8")) + except urllib.error.HTTPError as exc: + body = exc.read().decode("utf-8", "replace") + raise RuntimeError(f"HTTP {exc.code}: {body}") from exc + + +def map_anilist_movie(media: dict | None) -> dict: + media = media or {} + title = media.get("title") or {} + start_date = media.get("startDate") or {} + studios = ((media.get("studios") or {}).get("nodes") or []) + + studio_names = [] + for node in studios: + name = ((node or {}).get("name") or "").strip() + if name: + studio_names.append(name) + + genres = [] + for entry in media.get("genres") or []: + text = str(entry or "").strip() + if text: + genres.append(text) + + tags = [] + for tag in media.get("tags") or []: + if not isinstance(tag, dict): + continue + name = (tag.get("name") or "").strip() + rank = int(tag.get("rank") or 0) + if name and rank >= 70: + tags.append(name) + + return { + "anilist_id": int(media.get("id") or 0), + "title_english": (title.get("english") or "").strip(), + "title_romaji": (title.get("romaji") or "").strip(), + "title_native": (title.get("native") or "").strip(), + "cover_image": (((media.get("coverImage") or {}).get("large")) or "").strip(), + "anilist_url": (media.get("siteUrl") or "").strip(), + "start_year": int(start_date.get("year") or 0), + "format": (media.get("format") or "").strip() or "MOVIE", + "description": (media.get("description") or "").strip(), + "genres_text": ", ".join(genres), + "tags_text": ", ".join(tags), + "studio_text": ", ".join(studio_names), + } + + +def fetch_anilist_movie_candidates(today: date, years_window: int = 1) -> list[dict]: + start_date = today - timedelta(days=365 * years_window) + end_date = today + timedelta(days=365 * years_window) + + query = """ + query ($page: Int, $perPage: Int, $start: FuzzyDateInt, $end: FuzzyDateInt) { + Page(page: $page, perPage: $perPage) { + pageInfo { hasNextPage } + media( + type: ANIME + format: MOVIE + countryOfOrigin: JP + sort: [POPULARITY_DESC, START_DATE_DESC] + startDate_greater: $start + startDate_lesser: $end + ) { + id + title { english romaji native } + startDate { year } + format + description(asHtml: false) + genres + tags { name rank } + coverImage { large } + studios { nodes { name } } + siteUrl + } + } + } + """ + + page = 1 + results = [] + seen_ids = set() + + while True: + payload = post_graphql( + query, + { + "page": page, + "perPage": 50, + "start": yyyymmdd(start_date) - 1, + "end": yyyymmdd(end_date) + 1, + }, + ) + + if "errors" in payload: + raise RuntimeError(payload["errors"]) + + page_data = payload.get("data", {}).get("Page", {}) + for media in page_data.get("media", []): + mapped = map_anilist_movie(media) + anilist_id = mapped.get("anilist_id") or 0 + if anilist_id <= 0 or anilist_id in seen_ids: + continue + seen_ids.add(anilist_id) + + if not mapped.get("title_english") and not mapped.get("title_romaji") and not mapped.get("title_native"): + continue + + results.append(mapped) + + if not page_data.get("pageInfo", {}).get("hasNextPage"): + break + page += 1 + + return results diff --git a/data_sources/animeschedule_source.py b/data_sources/animeschedule_source.py index 71113ea..7117eaf 100644 --- a/data_sources/animeschedule_source.py +++ b/data_sources/animeschedule_source.py @@ -1,261 +1,261 @@ -#!/usr/bin/env python3 -from __future__ import annotations - -import json -import urllib.parse -import urllib.request - -ANIMESCHEDULE_API_BASE = "https://animeschedule.net/api/v3" - - -def normalize_title(text: str) -> str: - cleaned = "".join(ch.lower() if ch.isalnum() else " " for ch in (text or "")) - return " ".join(cleaned.split()) - - -def flatten_items(payload: dict | list) -> list[dict]: - if isinstance(payload, list): - return [item for item in payload if isinstance(item, dict)] - if not isinstance(payload, dict): - return [] - - for key in ("anime", "items", "data", "results"): - value = payload.get(key) - if isinstance(value, list): - return [item for item in value if isinstance(item, dict)] - return [] - - -def extract_names(item: dict) -> list[str]: - names = [] - direct = [item.get("title"), item.get("name"), item.get("romaji"), item.get("english"), item.get("native")] - for candidate in direct: - text = (candidate or "").strip() - if text: - names.append(text) - - nested = item.get("names") or {} - if isinstance(nested, dict): - for candidate in nested.values(): - if isinstance(candidate, list): - for value in candidate: - text = (str(value) if value is not None else "").strip() - if text: - names.append(text) - continue - - text = (str(candidate) if candidate is not None else "").strip() - if text: - names.append(text) - - unique = [] - seen = set() - for name in names: - lowered = name.lower() - if lowered in seen: - continue - seen.add(lowered) - unique.append(name) - return unique - - -def extract_url(item: dict) -> str: - slug = (item.get("slug") or item.get("route") or item.get("id") or "").strip() - if slug: - return f"https://animeschedule.net/anime/{slug}" - - websites = item.get("websites") or item.get("links") or [] - if isinstance(websites, list): - for entry in websites: - if not isinstance(entry, dict): - continue - url = (entry.get("url") or "").strip() - if url: - return url - return "" - - -def extract_english_title(item: dict) -> str: - direct = [item.get("english"), item.get("titleEnglish"), item.get("englishTitle")] - for candidate in direct: - text = (candidate or "").strip() - if text: - return text - - names = item.get("names") or {} - if isinstance(names, dict): - for key, value in names.items(): - key_norm = str(key).strip().lower() - if key_norm in {"en", "eng", "english", "titleenglish"}: - text = (str(value) if value is not None else "").strip() - if text: - return text - return "" - - -def extract_list(item: dict, *keys: str) -> str: - for key in keys: - value = item.get(key) - if not value: - continue - if isinstance(value, str): - text = value.strip() - if text: - return text - if isinstance(value, list): - names = [] - for entry in value: - if isinstance(entry, str): - text = entry.strip() - elif isinstance(entry, dict): - text = (entry.get("name") or entry.get("title") or "").strip() - else: - text = "" - if text: - names.append(text) - if names: - return ", ".join(names) - return "" - - -def extract_format(item: dict) -> str: - for key in ("format", "mediaType", "type"): - value = item.get(key) - text = (str(value) if value is not None else "").strip() - if text: - return text - return "" - - -def extract_description(item: dict) -> str: - for key in ("description", "synopsis", "overview", "summary"): - value = item.get(key) - text = (str(value) if value is not None else "").strip() - if text: - return text - - details = item.get("details") or {} - if isinstance(details, dict): - for key in ("description", "synopsis", "overview", "summary"): - value = details.get(key) - text = (str(value) if value is not None else "").strip() - if text: - return text - return "" - - -def empty_result() -> dict: - return { - "url": "", - "title": "", - "english_title": "", - "format": "", - "genres": "", - "studios": "", - "description": "", - "names": [], - "match_score": 0, - "raw": {}, - } - - -def title_match_score(wanted: str, names: list[str]) -> int: - wanted_norm = normalize_title(wanted) - if not wanted_norm: - return 0 - - best = 0 - wanted_parts = wanted_norm.split(" ") - - for candidate in names: - normalized = normalize_title(candidate) - if not normalized: - continue - - if normalized == wanted_norm: - best = max(best, 4) - continue - - if wanted_norm in normalized or normalized in wanted_norm: - best = max(best, 3) - continue - - normalized_parts = normalized.split(" ") - if normalized_parts[:2] == wanted_parts[:2] and len(normalized_parts) >= 2 and len(wanted_parts) >= 2: - best = max(best, 2) - continue - - if set(normalized_parts) & set(wanted_parts): - best = max(best, 1) - - return best - - -def fetch_animeschedule_anime_by_title(title: str, token: str, cache: dict[str, dict]) -> dict: - key = normalize_title(title) - if key in cache: - return cache[key] - - if not token: - cache[key] = empty_result() - return cache[key] - - params = {"search": title, "take": "10"} - url = ANIMESCHEDULE_API_BASE + "/anime?" + urllib.parse.urlencode(params) - req = urllib.request.Request( - url, - headers={ - "Accept": "application/json", - "Authorization": f"Bearer {token}", - "User-Agent": "anime-movies-script/1.0", - }, - ) - - try: - with urllib.request.urlopen(req, timeout=20) as resp: - payload = json.loads(resp.read().decode("utf-8")) - except Exception: - cache[key] = empty_result() - return cache[key] - - candidates = flatten_items(payload) - if not candidates: - cache[key] = empty_result() - return cache[key] - - best = None - best_score = -1 - - for item in candidates: - names = extract_names(item) - if not names: - continue - - score = title_match_score(title, names) - if score <= 0: - continue - - if score > best_score: - best_score = score - best = item - if score == 4: - break - - if not best: - cache[key] = empty_result() - return cache[key] - - names = extract_names(best) - cache[key] = { - "url": extract_url(best), - "title": names[0] if names else "", - "english_title": extract_english_title(best), - "format": extract_format(best), - "genres": extract_list(best, "genres", "genre", "categories"), - "studios": extract_list(best, "studios", "studio"), - "description": extract_description(best), - "names": names, - "match_score": best_score, - "raw": best, - } - return cache[key] +#!/usr/bin/env python3 +from __future__ import annotations + +import json +import urllib.parse +import urllib.request + +ANIMESCHEDULE_API_BASE = "https://animeschedule.net/api/v3" + + +def normalize_title(text: str) -> str: + cleaned = "".join(ch.lower() if ch.isalnum() else " " for ch in (text or "")) + return " ".join(cleaned.split()) + + +def flatten_items(payload: dict | list) -> list[dict]: + if isinstance(payload, list): + return [item for item in payload if isinstance(item, dict)] + if not isinstance(payload, dict): + return [] + + for key in ("anime", "items", "data", "results"): + value = payload.get(key) + if isinstance(value, list): + return [item for item in value if isinstance(item, dict)] + return [] + + +def extract_names(item: dict) -> list[str]: + names = [] + direct = [item.get("title"), item.get("name"), item.get("romaji"), item.get("english"), item.get("native")] + for candidate in direct: + text = (candidate or "").strip() + if text: + names.append(text) + + nested = item.get("names") or {} + if isinstance(nested, dict): + for candidate in nested.values(): + if isinstance(candidate, list): + for value in candidate: + text = (str(value) if value is not None else "").strip() + if text: + names.append(text) + continue + + text = (str(candidate) if candidate is not None else "").strip() + if text: + names.append(text) + + unique = [] + seen = set() + for name in names: + lowered = name.lower() + if lowered in seen: + continue + seen.add(lowered) + unique.append(name) + return unique + + +def extract_url(item: dict) -> str: + slug = (item.get("slug") or item.get("route") or item.get("id") or "").strip() + if slug: + return f"https://animeschedule.net/anime/{slug}" + + websites = item.get("websites") or item.get("links") or [] + if isinstance(websites, list): + for entry in websites: + if not isinstance(entry, dict): + continue + url = (entry.get("url") or "").strip() + if url: + return url + return "" + + +def extract_english_title(item: dict) -> str: + direct = [item.get("english"), item.get("titleEnglish"), item.get("englishTitle")] + for candidate in direct: + text = (candidate or "").strip() + if text: + return text + + names = item.get("names") or {} + if isinstance(names, dict): + for key, value in names.items(): + key_norm = str(key).strip().lower() + if key_norm in {"en", "eng", "english", "titleenglish"}: + text = (str(value) if value is not None else "").strip() + if text: + return text + return "" + + +def extract_list(item: dict, *keys: str) -> str: + for key in keys: + value = item.get(key) + if not value: + continue + if isinstance(value, str): + text = value.strip() + if text: + return text + if isinstance(value, list): + names = [] + for entry in value: + if isinstance(entry, str): + text = entry.strip() + elif isinstance(entry, dict): + text = (entry.get("name") or entry.get("title") or "").strip() + else: + text = "" + if text: + names.append(text) + if names: + return ", ".join(names) + return "" + + +def extract_format(item: dict) -> str: + for key in ("format", "mediaType", "type"): + value = item.get(key) + text = (str(value) if value is not None else "").strip() + if text: + return text + return "" + + +def extract_description(item: dict) -> str: + for key in ("description", "synopsis", "overview", "summary"): + value = item.get(key) + text = (str(value) if value is not None else "").strip() + if text: + return text + + details = item.get("details") or {} + if isinstance(details, dict): + for key in ("description", "synopsis", "overview", "summary"): + value = details.get(key) + text = (str(value) if value is not None else "").strip() + if text: + return text + return "" + + +def empty_result() -> dict: + return { + "url": "", + "title": "", + "english_title": "", + "format": "", + "genres": "", + "studios": "", + "description": "", + "names": [], + "match_score": 0, + "raw": {}, + } + + +def title_match_score(wanted: str, names: list[str]) -> int: + wanted_norm = normalize_title(wanted) + if not wanted_norm: + return 0 + + best = 0 + wanted_parts = wanted_norm.split(" ") + + for candidate in names: + normalized = normalize_title(candidate) + if not normalized: + continue + + if normalized == wanted_norm: + best = max(best, 4) + continue + + if wanted_norm in normalized or normalized in wanted_norm: + best = max(best, 3) + continue + + normalized_parts = normalized.split(" ") + if normalized_parts[:2] == wanted_parts[:2] and len(normalized_parts) >= 2 and len(wanted_parts) >= 2: + best = max(best, 2) + continue + + if set(normalized_parts) & set(wanted_parts): + best = max(best, 1) + + return best + + +def fetch_animeschedule_anime_by_title(title: str, token: str, cache: dict[str, dict]) -> dict: + key = normalize_title(title) + if key in cache: + return cache[key] + + if not token: + cache[key] = empty_result() + return cache[key] + + params = {"search": title, "take": "10"} + url = ANIMESCHEDULE_API_BASE + "/anime?" + urllib.parse.urlencode(params) + req = urllib.request.Request( + url, + headers={ + "Accept": "application/json", + "Authorization": f"Bearer {token}", + "User-Agent": "anime-movies-script/1.0", + }, + ) + + try: + with urllib.request.urlopen(req, timeout=20) as resp: + payload = json.loads(resp.read().decode("utf-8")) + except Exception: + cache[key] = empty_result() + return cache[key] + + candidates = flatten_items(payload) + if not candidates: + cache[key] = empty_result() + return cache[key] + + best = None + best_score = -1 + + for item in candidates: + names = extract_names(item) + if not names: + continue + + score = title_match_score(title, names) + if score <= 0: + continue + + if score > best_score: + best_score = score + best = item + if score == 4: + break + + if not best: + cache[key] = empty_result() + return cache[key] + + names = extract_names(best) + cache[key] = { + "url": extract_url(best), + "title": names[0] if names else "", + "english_title": extract_english_title(best), + "format": extract_format(best), + "genres": extract_list(best, "genres", "genre", "categories"), + "studios": extract_list(best, "studios", "studio"), + "description": extract_description(best), + "names": names, + "match_score": best_score, + "raw": best, + } + return cache[key] diff --git a/data_sources/tmdb_source.py b/data_sources/tmdb_source.py index 819266c..63d1a72 100644 --- a/data_sources/tmdb_source.py +++ b/data_sources/tmdb_source.py @@ -1,105 +1,105 @@ -#!/usr/bin/env python3 -from __future__ import annotations - -import json -import urllib.parse -import urllib.request -from datetime import date - -TMDB_API_BASE = "https://api.themoviedb.org/3" - - -def parse_release_date(value: str) -> date | None: - text = (value or "").strip() - if not text: - return None - try: - return date.fromisoformat(text[:10]) - except ValueError: - return None - - -def tmdb_get_json(path: str, params: dict, token: str) -> dict: - url = TMDB_API_BASE + path - if params: - url += "?" + urllib.parse.urlencode(params) - - req = urllib.request.Request( - url, - headers={ - "Accept": "application/json", - "Authorization": f"Bearer {token}", - "User-Agent": "anime-movies-script/1.0", - }, - ) - - with urllib.request.urlopen(req, timeout=20) as resp: - return json.loads(resp.read().decode("utf-8")) - - -def search_tmdb_movies(title: str, token: str, language: str = "en-US") -> list[dict]: - query = (title or "").strip() - if not query: - return [] - - payload = tmdb_get_json( - "/search/movie", - { - "query": query, - "include_adult": "false", - "language": language, - "page": "1", - }, - token, - ) - - results = [] - for item in payload.get("results", []): - movie_id = item.get("id") - if not movie_id: - continue - - results.append( - { - "tmdb_id": int(movie_id), - "title": (item.get("title") or "").strip(), - "original_title": (item.get("original_title") or "").strip(), - "release_date": parse_release_date(str(item.get("release_date") or "")), - } - ) - - return results - - -def fetch_tmdb_release_dates(movie_id: int, token: str) -> dict: - return tmdb_get_json(f"/movie/{movie_id}/release_dates", {}, token) - - -def extract_de_theatrical_dates(release_payload: dict) -> list[date]: - german_block = None - for result in release_payload.get("results", []): - if str(result.get("iso_3166_1") or "").upper() == "DE": - german_block = result - break - - if not german_block: - return [] - - dates = [] - for entry in german_block.get("release_dates", []): - release_type = int(entry.get("type") or 0) - if release_type not in {2, 3}: - continue - - release_date = parse_release_date(str(entry.get("release_date") or "")) - if release_date: - dates.append(release_date) - - return sorted(dates) - - -def select_release_in_range(release_dates: list[date], start_date: date, end_date: date) -> date | None: - for release_date in sorted(release_dates): - if start_date <= release_date <= end_date: - return release_date - return None +#!/usr/bin/env python3 +from __future__ import annotations + +import json +import urllib.parse +import urllib.request +from datetime import date + +TMDB_API_BASE = "https://api.themoviedb.org/3" + + +def parse_release_date(value: str) -> date | None: + text = (value or "").strip() + if not text: + return None + try: + return date.fromisoformat(text[:10]) + except ValueError: + return None + + +def tmdb_get_json(path: str, params: dict, token: str) -> dict: + url = TMDB_API_BASE + path + if params: + url += "?" + urllib.parse.urlencode(params) + + req = urllib.request.Request( + url, + headers={ + "Accept": "application/json", + "Authorization": f"Bearer {token}", + "User-Agent": "anime-movies-script/1.0", + }, + ) + + with urllib.request.urlopen(req, timeout=20) as resp: + return json.loads(resp.read().decode("utf-8")) + + +def search_tmdb_movies(title: str, token: str, language: str = "en-US") -> list[dict]: + query = (title or "").strip() + if not query: + return [] + + payload = tmdb_get_json( + "/search/movie", + { + "query": query, + "include_adult": "false", + "language": language, + "page": "1", + }, + token, + ) + + results = [] + for item in payload.get("results", []): + movie_id = item.get("id") + if not movie_id: + continue + + results.append( + { + "tmdb_id": int(movie_id), + "title": (item.get("title") or "").strip(), + "original_title": (item.get("original_title") or "").strip(), + "release_date": parse_release_date(str(item.get("release_date") or "")), + } + ) + + return results + + +def fetch_tmdb_release_dates(movie_id: int, token: str) -> dict: + return tmdb_get_json(f"/movie/{movie_id}/release_dates", {}, token) + + +def extract_de_theatrical_dates(release_payload: dict) -> list[date]: + german_block = None + for result in release_payload.get("results", []): + if str(result.get("iso_3166_1") or "").upper() == "DE": + german_block = result + break + + if not german_block: + return [] + + dates = [] + for entry in german_block.get("release_dates", []): + release_type = int(entry.get("type") or 0) + if release_type not in {2, 3}: + continue + + release_date = parse_release_date(str(entry.get("release_date") or "")) + if release_date: + dates.append(release_date) + + return sorted(dates) + + +def select_release_in_range(release_dates: list[date], start_date: date, end_date: date) -> date | None: + for release_date in sorted(release_dates): + if start_date <= release_date <= end_date: + return release_date + return None diff --git a/discord_bot.py b/discord_bot.py index aae80ff..9e92baf 100644 --- a/discord_bot.py +++ b/discord_bot.py @@ -1,113 +1,113 @@ -#!/usr/bin/env python3 -import asyncio - -import discord -from discord import app_commands -from discord.ext import commands - -from app_config import get_settings -from movie_pipeline import get_upcoming_movie_records -from embed_builder import build_movie_embed, make_links_view - - -async def fetch_records(locale: str, tmdb_token: str, schedule_token: str) -> list[dict]: - return await asyncio.to_thread( - get_upcoming_movie_records, - locale, - None, - schedule_token, - tmdb_token, - ) - - -def create_bot() -> commands.Bot: - settings = get_settings() - intents = discord.Intents.default() - bot = commands.Bot(command_prefix="!", intents=intents) - guild_obj = discord.Object(id=int(settings.discord_guild_id)) if settings.discord_guild_id.isdigit() else None - - async def sync_commands() -> None: - if guild_obj: - try: - # Remove stale global commands to avoid duplicate /anime entries. - cleared_global = await bot.tree.sync() - cleared_names = ", ".join(cmd.name for cmd in cleared_global) if cleared_global else "keine" - print(f"Globale Slash Commands aktualisiert: {len(cleared_global)} ({cleared_names})") - - synced = await bot.tree.sync(guild=guild_obj) - names = ", ".join(cmd.name for cmd in synced) if synced else "keine" - print(f"Slash Commands fuer Guild synchronisiert: {len(synced)} ({names})") - return - except Exception as exc: - print(f"Guild-Sync fehlgeschlagen, nutze globalen Sync: {exc}") - - synced = await bot.tree.sync() - names = ", ".join(cmd.name for cmd in synced) if synced else "keine" - print(f"Globale Slash Commands synchronisiert: {len(synced)} ({names})") - print("Hinweis: Globale Slash Commands koennen bis zu 60 Minuten brauchen.") - print("Setze DISCORD_GUILD_ID fuer sofortige Verfuegbarkeit in deinem Server.") - - @bot.event - async def setup_hook() -> None: - await sync_commands() - - @bot.event - async def on_ready() -> None: - print(f"Bot online als {bot.user}") - - async def anime_handler(interaction: discord.Interaction, limit: app_commands.Range[int, 1, 25] = 10) -> None: - locale = settings.locale - safe_limit = int(limit) - - await interaction.response.defer(thinking=True) - try: - records = await fetch_records(locale, settings.tmdb_read_access_token, settings.animeschedule_api_token) - except Exception as exc: - await interaction.followup.send(f"Fehler beim Abruf: {exc}") - return - - if not records: - await interaction.followup.send("Keine Anime Filme fuer diesen und naechsten Monat gefunden.") - return - - selected = records[:safe_limit] - await interaction.followup.send(f"Gefunden: {len(selected)} Anime-Filme") - - for idx, item in enumerate(selected, start=1): - embed = build_movie_embed(item, idx) - view = make_links_view(str(item.get("anilist_url", ""))) - if view is None: - await interaction.followup.send(embed=embed) - else: - await interaction.followup.send(embed=embed, view=view) - - anime_handler = app_commands.describe(limit="Anzahl Ergebnisse (1-25)")(anime_handler) - if guild_obj: - bot.tree.command( - name="anime", - description="Zeigt kommende Anime-Filme", - guild=guild_obj, - )(anime_handler) - else: - bot.tree.command( - name="anime", - description="Zeigt kommende Anime-Filme", - )(anime_handler) - - return bot - - -def main() -> int: - settings = get_settings() - token = settings.discord_bot_token - if not token: - print("Fehler: DISCORD_BOT_TOKEN ist nicht gesetzt.") - return 1 - - bot = create_bot() - bot.run(token) - return 0 - - -if __name__ == "__main__": - raise SystemExit(main()) +#!/usr/bin/env python3 +import asyncio + +import discord +from discord import app_commands +from discord.ext import commands + +from app_config import get_settings +from movie_pipeline import get_upcoming_movie_records +from embed_builder import build_movie_embed, make_links_view + + +async def fetch_records(locale: str, tmdb_token: str, schedule_token: str) -> list[dict]: + return await asyncio.to_thread( + get_upcoming_movie_records, + locale, + None, + schedule_token, + tmdb_token, + ) + + +def create_bot() -> commands.Bot: + settings = get_settings() + intents = discord.Intents.default() + bot = commands.Bot(command_prefix="!", intents=intents) + guild_obj = discord.Object(id=int(settings.discord_guild_id)) if settings.discord_guild_id.isdigit() else None + + async def sync_commands() -> None: + if guild_obj: + try: + # Remove stale global commands to avoid duplicate /anime entries. + cleared_global = await bot.tree.sync() + cleared_names = ", ".join(cmd.name for cmd in cleared_global) if cleared_global else "keine" + print(f"Globale Slash Commands aktualisiert: {len(cleared_global)} ({cleared_names})") + + synced = await bot.tree.sync(guild=guild_obj) + names = ", ".join(cmd.name for cmd in synced) if synced else "keine" + print(f"Slash Commands fuer Guild synchronisiert: {len(synced)} ({names})") + return + except Exception as exc: + print(f"Guild-Sync fehlgeschlagen, nutze globalen Sync: {exc}") + + synced = await bot.tree.sync() + names = ", ".join(cmd.name for cmd in synced) if synced else "keine" + print(f"Globale Slash Commands synchronisiert: {len(synced)} ({names})") + print("Hinweis: Globale Slash Commands koennen bis zu 60 Minuten brauchen.") + print("Setze DISCORD_GUILD_ID fuer sofortige Verfuegbarkeit in deinem Server.") + + @bot.event + async def setup_hook() -> None: + await sync_commands() + + @bot.event + async def on_ready() -> None: + print(f"Bot online als {bot.user}") + + async def anime_handler(interaction: discord.Interaction, limit: app_commands.Range[int, 1, 25] = 10) -> None: + locale = settings.locale + safe_limit = int(limit) + + await interaction.response.defer(thinking=True) + try: + records = await fetch_records(locale, settings.tmdb_read_access_token, settings.animeschedule_api_token) + except Exception as exc: + await interaction.followup.send(f"Fehler beim Abruf: {exc}") + return + + if not records: + await interaction.followup.send("Keine Anime Filme fuer diesen und naechsten Monat gefunden.") + return + + selected = records[:safe_limit] + await interaction.followup.send(f"Gefunden: {len(selected)} Anime-Filme") + + for idx, item in enumerate(selected, start=1): + embed = build_movie_embed(item, idx) + view = make_links_view(str(item.get("anilist_url", ""))) + if view is None: + await interaction.followup.send(embed=embed) + else: + await interaction.followup.send(embed=embed, view=view) + + anime_handler = app_commands.describe(limit="Anzahl Ergebnisse (1-25)")(anime_handler) + if guild_obj: + bot.tree.command( + name="anime", + description="Zeigt kommende Anime-Filme", + guild=guild_obj, + )(anime_handler) + else: + bot.tree.command( + name="anime", + description="Zeigt kommende Anime-Filme", + )(anime_handler) + + return bot + + +def main() -> int: + settings = get_settings() + token = settings.discord_bot_token + if not token: + print("Fehler: DISCORD_BOT_TOKEN ist nicht gesetzt.") + return 1 + + bot = create_bot() + bot.run(token) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/embed_builder.py b/embed_builder.py index 4886656..9ae788c 100644 --- a/embed_builder.py +++ b/embed_builder.py @@ -1,110 +1,110 @@ -#!/usr/bin/env python3 -from __future__ import annotations - -import html -import re - -import discord - - -def is_missing(value: str | None) -> bool: - text = (value or "").strip().lower() - return text in {"", "n/a", "none", "null"} - - -def fit_embed_value(text: str, max_len: int = 1000) -> str: - value = (text or "").strip() - if is_missing(value): - value = "Unbekannt" - if len(value) <= max_len: - return value - return value[: max_len - 3] + "..." - - -def fit_embed_description(text: str, max_len: int = 3800) -> str: - value = (text or "").strip() - if len(value) <= max_len: - return value - return value[: max_len - 3] + "..." - - -def format_tag_badges(tags_text: str) -> str: - if is_missing(tags_text): - return "Unbekannt" - parts = [part.strip() for part in tags_text.split(",") if part.strip()] - if not parts: - return "Unbekannt" - return " ".join(f"`{part}`" for part in parts) - - -def format_single_badge(text: str) -> str: - if is_missing(text): - return "Unbekannt" - return f"`{fit_embed_value(text, max_len=200)}`" - - -def compact_text(text: str) -> str: - raw = text or "" - raw = re.sub(r"<\s*br\s*/?\s*>", " ", raw, flags=re.IGNORECASE) - raw = re.sub(r"", " ", raw, flags=re.IGNORECASE) - raw = re.sub(r"<[^>]+>", "", raw) - raw = html.unescape(raw) - return " ".join(raw.replace("\n", " ").split()) - - -def add_line(lines: list[str], label: str, value: str) -> None: - if is_missing(value): - return - lines.append(f"**{label}:** {fit_embed_value(value)}") - - -def make_links_view(anilist_url: str) -> discord.ui.View | None: - if is_missing(anilist_url): - return None - - view = discord.ui.View(timeout=None) - view.add_item(discord.ui.Button(label="Zum Anime", style=discord.ButtonStyle.link, url=anilist_url)) - return view - - -def build_movie_embed(item: dict, index: int) -> discord.Embed: - title = fit_embed_value( - str(item.get("title_schedule_english") or item.get("title_english_anilist") or item.get("title", "")), - max_len=200, - ) - anilist_url = str(item.get("anilist_url", "")).strip() - embed = discord.Embed( - title=title, - url=anilist_url if not is_missing(anilist_url) else None, - color=discord.Color.from_rgb(30, 144, 255), - ) - - tags_badges = format_tag_badges(str(item.get("tags", ""))) - genres_badges = format_tag_badges(str(item.get("genres", ""))) - romaji_badge = format_single_badge(str(item.get("title_romaji", ""))) - native_badge = format_single_badge(str(item.get("title_native", ""))) - description_text = compact_text(str(item.get("description", ""))) - - lines: list[str] = [] - add_line(lines, "Anime Typ", str(item.get("format", ""))) - add_line(lines, "Genres", genres_badges) - add_line(lines, "Tags", tags_badges) - add_line(lines, "Release", str(item.get("release", ""))) - add_line(lines, "Studios", str(item.get("studio", ""))) - add_line(lines, "Romaji", romaji_badge) - add_line(lines, "Nativ", native_badge) - add_line(lines, "Beschreibung", description_text) - embed.description = fit_embed_description("\n".join(lines) if lines else "Keine Details verfuegbar") - - cover = (item.get("cover_image") or "").strip() - if cover: - embed.set_image(url=cover) - - embed.set_footer( - text=( - "Daten: AniList, AnimeSchedule, TMDB | " - "This product uses the TMDB API but is not endorsed or certified by TMDB. " - f"| Eintrag #{index}" - ) - ) - return embed +#!/usr/bin/env python3 +from __future__ import annotations + +import html +import re + +import discord + + +def is_missing(value: str | None) -> bool: + text = (value or "").strip().lower() + return text in {"", "n/a", "none", "null"} + + +def fit_embed_value(text: str, max_len: int = 1000) -> str: + value = (text or "").strip() + if is_missing(value): + value = "Unbekannt" + if len(value) <= max_len: + return value + return value[: max_len - 3] + "..." + + +def fit_embed_description(text: str, max_len: int = 3800) -> str: + value = (text or "").strip() + if len(value) <= max_len: + return value + return value[: max_len - 3] + "..." + + +def format_tag_badges(tags_text: str) -> str: + if is_missing(tags_text): + return "Unbekannt" + parts = [part.strip() for part in tags_text.split(",") if part.strip()] + if not parts: + return "Unbekannt" + return " ".join(f"`{part}`" for part in parts) + + +def format_single_badge(text: str) -> str: + if is_missing(text): + return "Unbekannt" + return f"`{fit_embed_value(text, max_len=200)}`" + + +def compact_text(text: str) -> str: + raw = text or "" + raw = re.sub(r"<\s*br\s*/?\s*>", " ", raw, flags=re.IGNORECASE) + raw = re.sub(r"", " ", raw, flags=re.IGNORECASE) + raw = re.sub(r"<[^>]+>", "", raw) + raw = html.unescape(raw) + return " ".join(raw.replace("\n", " ").split()) + + +def add_line(lines: list[str], label: str, value: str) -> None: + if is_missing(value): + return + lines.append(f"**{label}:** {fit_embed_value(value)}") + + +def make_links_view(anilist_url: str) -> discord.ui.View | None: + if is_missing(anilist_url): + return None + + view = discord.ui.View(timeout=None) + view.add_item(discord.ui.Button(label="Zum Anime", style=discord.ButtonStyle.link, url=anilist_url)) + return view + + +def build_movie_embed(item: dict, index: int) -> discord.Embed: + title = fit_embed_value( + str(item.get("title_schedule_english") or item.get("title_english_anilist") or item.get("title", "")), + max_len=200, + ) + anilist_url = str(item.get("anilist_url", "")).strip() + embed = discord.Embed( + title=title, + url=anilist_url if not is_missing(anilist_url) else None, + color=discord.Color.from_rgb(30, 144, 255), + ) + + tags_badges = format_tag_badges(str(item.get("tags", ""))) + genres_badges = format_tag_badges(str(item.get("genres", ""))) + romaji_badge = format_single_badge(str(item.get("title_romaji", ""))) + native_badge = format_single_badge(str(item.get("title_native", ""))) + description_text = compact_text(str(item.get("description", ""))) + + lines: list[str] = [] + add_line(lines, "Anime Typ", str(item.get("format", ""))) + add_line(lines, "Genres", genres_badges) + add_line(lines, "Tags", tags_badges) + add_line(lines, "Release", str(item.get("release", ""))) + add_line(lines, "Studios", str(item.get("studio", ""))) + add_line(lines, "Romaji", romaji_badge) + add_line(lines, "Nativ", native_badge) + add_line(lines, "Beschreibung", description_text) + embed.description = fit_embed_description("\n".join(lines) if lines else "Keine Details verfuegbar") + + cover = (item.get("cover_image") or "").strip() + if cover: + embed.set_image(url=cover) + + embed.set_footer( + text=( + "Daten: AniList, AnimeSchedule, TMDB | " + "This product uses the TMDB API but is not endorsed or certified by TMDB. " + f"| Eintrag #{index}" + ) + ) + return embed diff --git a/movie_pipeline.py b/movie_pipeline.py index 7bc01be..1bcb8ef 100644 --- a/movie_pipeline.py +++ b/movie_pipeline.py @@ -1,279 +1,279 @@ -#!/usr/bin/env python3 -from __future__ import annotations - -from datetime import date, timedelta -from difflib import SequenceMatcher - -from app_config import get_settings -from data_sources.anilist_source import fetch_anilist_movie_candidates -from data_sources.animeschedule_source import fetch_animeschedule_anime_by_title -from data_sources.tmdb_source import ( - extract_de_theatrical_dates, - fetch_tmdb_release_dates, - search_tmdb_movies, - select_release_in_range, -) - -FUZZY_MATCH_THRESHOLD = 0.65 -SCHEDULE_MATCH_THRESHOLD = 2 - - -def add_months(value: date, months: int) -> date: - year = value.year + ((value.month - 1 + months) // 12) - month = ((value.month - 1 + months) % 12) + 1 - return date(year, month, 1) - - -def normalize_title(text: str) -> str: - cleaned = "".join(ch.lower() if ch.isalnum() else " " for ch in (text or "")) - return " ".join(cleaned.split()) - - -def fuzzy_ratio(left: str, right: str) -> float: - a = normalize_title(left) - b = normalize_title(right) - if not a or not b: - return 0.0 - return SequenceMatcher(None, a, b).ratio() - - -def format_date(value: date, locale: str) -> str: - if locale == "de-DE": - return f"{value.day:02d}.{value.month:02d}.{value.year}" - return value.isoformat() - - -def parse_record_release_date(record: dict) -> date: - text = str(record.get("releaseDate") or "").strip() - try: - return date.fromisoformat(text) - except ValueError: - return date.max - - -def is_year_match(anilist_year: int, tmdb_year: int) -> bool: - if anilist_year <= 0 or tmdb_year <= 0: - return False - return abs(anilist_year - tmdb_year) <= 1 - - -def title_score(anilist_english: str, anilist_romaji: str, anilist_native: str, tmdb_title: str, tmdb_original: str) -> tuple[int, int, int, float]: - english_norm = normalize_title(anilist_english) - romaji_norm = normalize_title(anilist_romaji) - native_norm = normalize_title(anilist_native) - tmdb_options = [normalize_title(tmdb_title), normalize_title(tmdb_original)] - - exact_english = 1 if english_norm and english_norm in tmdb_options else 0 - exact_romaji = 1 if romaji_norm and romaji_norm in tmdb_options else 0 - exact_native = 1 if native_norm and native_norm in tmdb_options else 0 - - best_ratio = 0.0 - for option in (tmdb_title, tmdb_original): - best_ratio = max( - best_ratio, - fuzzy_ratio(anilist_english, option), - fuzzy_ratio(anilist_romaji, option), - fuzzy_ratio(anilist_native, option), - ) - - return exact_english, exact_romaji, exact_native, best_ratio - - -def collect_tmdb_candidates_for_anilist(anilist_entry: dict, tmdb_token: str, search_cache: dict[str, list[dict]]) -> list[dict]: - english = (anilist_entry.get("title_english") or "").strip() - romaji = (anilist_entry.get("title_romaji") or "").strip() - native = (anilist_entry.get("title_native") or "").strip() - - queries = [] - if english: - queries.append(english) - if romaji and normalize_title(romaji) != normalize_title(english): - queries.append(romaji) - if native and normalize_title(native) not in {normalize_title(english), normalize_title(romaji)}: - queries.append(native) - - if not queries: - return [] - - candidates_by_id = {} - languages = ["de-DE", "en-US", "ja-JP"] - - for query in queries: - normalized_query = normalize_title(query) - for language in languages: - cache_key = f"{language}:{normalized_query}" - if cache_key not in search_cache: - search_cache[cache_key] = search_tmdb_movies(query, tmdb_token, language=language) - - for item in search_cache[cache_key]: - tmdb_id = int(item.get("tmdb_id") or 0) - if tmdb_id <= 0: - continue - - existing = candidates_by_id.get(tmdb_id) - if not existing: - candidates_by_id[tmdb_id] = item - continue - - # Prefer candidate carrying a release date if duplicate appears from different language queries. - if not existing.get("release_date") and item.get("release_date"): - candidates_by_id[tmdb_id] = item - - return list(candidates_by_id.values()) - - -def pick_best_tmdb_match(anilist_entry: dict, tmdb_candidates: list[dict]) -> dict | None: - english = (anilist_entry.get("title_english") or "").strip() - romaji = (anilist_entry.get("title_romaji") or "").strip() - native = (anilist_entry.get("title_native") or "").strip() - anilist_year = int(anilist_entry.get("start_year") or 0) - - best = None - best_tuple = (-1, -1, -1, 0.0) - - for candidate in tmdb_candidates: - exact_english, exact_romaji, exact_native, ratio = title_score( - english, - romaji, - native, - str(candidate.get("title") or ""), - str(candidate.get("original_title") or ""), - ) - - tmdb_release = candidate.get("release_date") - tmdb_year = tmdb_release.year if tmdb_release else 0 - has_exact_match = exact_english == 1 or exact_romaji == 1 or exact_native == 1 - if not has_exact_match and not is_year_match(anilist_year, tmdb_year): - continue - - if ratio < FUZZY_MATCH_THRESHOLD and exact_english == 0 and exact_romaji == 0 and exact_native == 0: - continue - - score_tuple = (exact_english, exact_romaji, exact_native, ratio) - if score_tuple > best_tuple: - best_tuple = score_tuple - best = candidate - - return best - - -def resolve_titles(anilist_entry: dict, schedule_token: str, schedule_cache: dict[str, dict]) -> tuple[str, str]: - english = (anilist_entry.get("title_english") or "").strip() - romaji = (anilist_entry.get("title_romaji") or "").strip() - native = (anilist_entry.get("title_native") or "").strip() - schedule_english = "" - - if not english and romaji: - schedule = fetch_animeschedule_anime_by_title(romaji, schedule_token, schedule_cache) - if int(schedule.get("match_score") or 0) >= SCHEDULE_MATCH_THRESHOLD: - schedule_english = (schedule.get("english_title") or "").strip() - - preferred_title = schedule_english or english or romaji or native - return preferred_title, schedule_english - - -def build_record(anilist_entry: dict, tmdb_entry: dict, release_date: date, locale: str, title: str, schedule_english: str) -> dict: - return { - "title": title, - "title_english_anilist": (anilist_entry.get("title_english") or "").strip(), - "title_anilist": (anilist_entry.get("title_english") or "").strip() or (anilist_entry.get("title_romaji") or "").strip(), - "title_schedule_english": schedule_english, - "title_romaji": (anilist_entry.get("title_romaji") or "").strip(), - "title_native": (anilist_entry.get("title_native") or "").strip(), - "studio": (anilist_entry.get("studio_text") or "").strip(), - "genres": (anilist_entry.get("genres_text") or "").strip() or "n/a", - "tags": (anilist_entry.get("tags_text") or "").strip(), - "release": format_date(release_date, locale), - "releaseDate": release_date.isoformat(), - "anilist_url": (anilist_entry.get("anilist_url") or "").strip() or "n/a", - "format": (anilist_entry.get("format") or "").strip() or "MOVIE", - "cover_image": (anilist_entry.get("cover_image") or "").strip(), - "description": (anilist_entry.get("description") or "").strip(), - "tmdb_title": (tmdb_entry.get("title") or "").strip(), - "tmdb_id": int(tmdb_entry.get("tmdb_id") or 0), - "ids": { - "anilist": int(anilist_entry.get("anilist_id") or 0), - "tmdb": int(tmdb_entry.get("tmdb_id") or 0), - }, - } - - -def sort_dedup_records(records: list[dict]) -> list[dict]: - unique = {} - for record in records: - ids = record.get("ids") or {} - key = (int(ids.get("anilist") or 0), int(ids.get("tmdb") or 0), str(record.get("releaseDate") or "")) - if key not in unique: - unique[key] = record - - result = list(unique.values()) - result.sort(key=lambda item: (parse_record_release_date(item), str(item.get("title") or ""))) - return result - - -def get_upcoming_movie_records( - locale: str, - today: date | None = None, - anime_schedule_token: str | None = None, - tmdb_read_access_token: str | None = None, -) -> list[dict]: - settings = get_settings() - schedule_token = (anime_schedule_token or settings.animeschedule_api_token).strip() - tmdb_token = (tmdb_read_access_token or settings.tmdb_read_access_token).strip() - - if not tmdb_token: - return [] - - current_day = today or date.today() - month_start = date(current_day.year, current_day.month, 1) - end_date = add_months(month_start, 2) - timedelta(days=1) - - anilist_candidates = fetch_anilist_movie_candidates(current_day) - schedule_cache: dict[str, dict] = {} - tmdb_search_cache: dict[str, list[dict]] = {} - tmdb_release_cache: dict[int, dict] = {} - - output_records = [] - - for anilist_entry in anilist_candidates: - tmdb_candidates = collect_tmdb_candidates_for_anilist(anilist_entry, tmdb_token, tmdb_search_cache) - if not tmdb_candidates: - continue - - matched_tmdb = pick_best_tmdb_match(anilist_entry, tmdb_candidates) - if not matched_tmdb: - continue - - tmdb_id = int(matched_tmdb.get("tmdb_id") or 0) - if tmdb_id <= 0: - continue - - if tmdb_id not in tmdb_release_cache: - tmdb_release_cache[tmdb_id] = fetch_tmdb_release_dates(tmdb_id, tmdb_token) - - release_payload = tmdb_release_cache[tmdb_id] - de_theatrical_dates = extract_de_theatrical_dates(release_payload) - release_date = select_release_in_range(de_theatrical_dates, current_day, end_date) - if not release_date: - candidate_release = matched_tmdb.get("release_date") - if isinstance(candidate_release, date) and current_day <= candidate_release <= end_date: - release_date = candidate_release - else: - continue - - preferred_title, schedule_english = resolve_titles(anilist_entry, schedule_token, schedule_cache) - if not preferred_title: - continue - - output_records.append( - build_record( - anilist_entry=anilist_entry, - tmdb_entry=matched_tmdb, - release_date=release_date, - locale=locale, - title=preferred_title, - schedule_english=schedule_english, - ) - ) - - return sort_dedup_records(output_records) +#!/usr/bin/env python3 +from __future__ import annotations + +from datetime import date, timedelta +from difflib import SequenceMatcher + +from app_config import get_settings +from data_sources.anilist_source import fetch_anilist_movie_candidates +from data_sources.animeschedule_source import fetch_animeschedule_anime_by_title +from data_sources.tmdb_source import ( + extract_de_theatrical_dates, + fetch_tmdb_release_dates, + search_tmdb_movies, + select_release_in_range, +) + +FUZZY_MATCH_THRESHOLD = 0.65 +SCHEDULE_MATCH_THRESHOLD = 2 + + +def add_months(value: date, months: int) -> date: + year = value.year + ((value.month - 1 + months) // 12) + month = ((value.month - 1 + months) % 12) + 1 + return date(year, month, 1) + + +def normalize_title(text: str) -> str: + cleaned = "".join(ch.lower() if ch.isalnum() else " " for ch in (text or "")) + return " ".join(cleaned.split()) + + +def fuzzy_ratio(left: str, right: str) -> float: + a = normalize_title(left) + b = normalize_title(right) + if not a or not b: + return 0.0 + return SequenceMatcher(None, a, b).ratio() + + +def format_date(value: date, locale: str) -> str: + if locale == "de-DE": + return f"{value.day:02d}.{value.month:02d}.{value.year}" + return value.isoformat() + + +def parse_record_release_date(record: dict) -> date: + text = str(record.get("releaseDate") or "").strip() + try: + return date.fromisoformat(text) + except ValueError: + return date.max + + +def is_year_match(anilist_year: int, tmdb_year: int) -> bool: + if anilist_year <= 0 or tmdb_year <= 0: + return False + return abs(anilist_year - tmdb_year) <= 1 + + +def title_score(anilist_english: str, anilist_romaji: str, anilist_native: str, tmdb_title: str, tmdb_original: str) -> tuple[int, int, int, float]: + english_norm = normalize_title(anilist_english) + romaji_norm = normalize_title(anilist_romaji) + native_norm = normalize_title(anilist_native) + tmdb_options = [normalize_title(tmdb_title), normalize_title(tmdb_original)] + + exact_english = 1 if english_norm and english_norm in tmdb_options else 0 + exact_romaji = 1 if romaji_norm and romaji_norm in tmdb_options else 0 + exact_native = 1 if native_norm and native_norm in tmdb_options else 0 + + best_ratio = 0.0 + for option in (tmdb_title, tmdb_original): + best_ratio = max( + best_ratio, + fuzzy_ratio(anilist_english, option), + fuzzy_ratio(anilist_romaji, option), + fuzzy_ratio(anilist_native, option), + ) + + return exact_english, exact_romaji, exact_native, best_ratio + + +def collect_tmdb_candidates_for_anilist(anilist_entry: dict, tmdb_token: str, search_cache: dict[str, list[dict]]) -> list[dict]: + english = (anilist_entry.get("title_english") or "").strip() + romaji = (anilist_entry.get("title_romaji") or "").strip() + native = (anilist_entry.get("title_native") or "").strip() + + queries = [] + if english: + queries.append(english) + if romaji and normalize_title(romaji) != normalize_title(english): + queries.append(romaji) + if native and normalize_title(native) not in {normalize_title(english), normalize_title(romaji)}: + queries.append(native) + + if not queries: + return [] + + candidates_by_id = {} + languages = ["de-DE", "en-US", "ja-JP"] + + for query in queries: + normalized_query = normalize_title(query) + for language in languages: + cache_key = f"{language}:{normalized_query}" + if cache_key not in search_cache: + search_cache[cache_key] = search_tmdb_movies(query, tmdb_token, language=language) + + for item in search_cache[cache_key]: + tmdb_id = int(item.get("tmdb_id") or 0) + if tmdb_id <= 0: + continue + + existing = candidates_by_id.get(tmdb_id) + if not existing: + candidates_by_id[tmdb_id] = item + continue + + # Prefer candidate carrying a release date if duplicate appears from different language queries. + if not existing.get("release_date") and item.get("release_date"): + candidates_by_id[tmdb_id] = item + + return list(candidates_by_id.values()) + + +def pick_best_tmdb_match(anilist_entry: dict, tmdb_candidates: list[dict]) -> dict | None: + english = (anilist_entry.get("title_english") or "").strip() + romaji = (anilist_entry.get("title_romaji") or "").strip() + native = (anilist_entry.get("title_native") or "").strip() + anilist_year = int(anilist_entry.get("start_year") or 0) + + best = None + best_tuple = (-1, -1, -1, 0.0) + + for candidate in tmdb_candidates: + exact_english, exact_romaji, exact_native, ratio = title_score( + english, + romaji, + native, + str(candidate.get("title") or ""), + str(candidate.get("original_title") or ""), + ) + + tmdb_release = candidate.get("release_date") + tmdb_year = tmdb_release.year if tmdb_release else 0 + has_exact_match = exact_english == 1 or exact_romaji == 1 or exact_native == 1 + if not has_exact_match and not is_year_match(anilist_year, tmdb_year): + continue + + if ratio < FUZZY_MATCH_THRESHOLD and exact_english == 0 and exact_romaji == 0 and exact_native == 0: + continue + + score_tuple = (exact_english, exact_romaji, exact_native, ratio) + if score_tuple > best_tuple: + best_tuple = score_tuple + best = candidate + + return best + + +def resolve_titles(anilist_entry: dict, schedule_token: str, schedule_cache: dict[str, dict]) -> tuple[str, str]: + english = (anilist_entry.get("title_english") or "").strip() + romaji = (anilist_entry.get("title_romaji") or "").strip() + native = (anilist_entry.get("title_native") or "").strip() + schedule_english = "" + + if not english and romaji: + schedule = fetch_animeschedule_anime_by_title(romaji, schedule_token, schedule_cache) + if int(schedule.get("match_score") or 0) >= SCHEDULE_MATCH_THRESHOLD: + schedule_english = (schedule.get("english_title") or "").strip() + + preferred_title = schedule_english or english or romaji or native + return preferred_title, schedule_english + + +def build_record(anilist_entry: dict, tmdb_entry: dict, release_date: date, locale: str, title: str, schedule_english: str) -> dict: + return { + "title": title, + "title_english_anilist": (anilist_entry.get("title_english") or "").strip(), + "title_anilist": (anilist_entry.get("title_english") or "").strip() or (anilist_entry.get("title_romaji") or "").strip(), + "title_schedule_english": schedule_english, + "title_romaji": (anilist_entry.get("title_romaji") or "").strip(), + "title_native": (anilist_entry.get("title_native") or "").strip(), + "studio": (anilist_entry.get("studio_text") or "").strip(), + "genres": (anilist_entry.get("genres_text") or "").strip() or "n/a", + "tags": (anilist_entry.get("tags_text") or "").strip(), + "release": format_date(release_date, locale), + "releaseDate": release_date.isoformat(), + "anilist_url": (anilist_entry.get("anilist_url") or "").strip() or "n/a", + "format": (anilist_entry.get("format") or "").strip() or "MOVIE", + "cover_image": (anilist_entry.get("cover_image") or "").strip(), + "description": (anilist_entry.get("description") or "").strip(), + "tmdb_title": (tmdb_entry.get("title") or "").strip(), + "tmdb_id": int(tmdb_entry.get("tmdb_id") or 0), + "ids": { + "anilist": int(anilist_entry.get("anilist_id") or 0), + "tmdb": int(tmdb_entry.get("tmdb_id") or 0), + }, + } + + +def sort_dedup_records(records: list[dict]) -> list[dict]: + unique = {} + for record in records: + ids = record.get("ids") or {} + key = (int(ids.get("anilist") or 0), int(ids.get("tmdb") or 0), str(record.get("releaseDate") or "")) + if key not in unique: + unique[key] = record + + result = list(unique.values()) + result.sort(key=lambda item: (parse_record_release_date(item), str(item.get("title") or ""))) + return result + + +def get_upcoming_movie_records( + locale: str, + today: date | None = None, + anime_schedule_token: str | None = None, + tmdb_read_access_token: str | None = None, +) -> list[dict]: + settings = get_settings() + schedule_token = (anime_schedule_token or settings.animeschedule_api_token).strip() + tmdb_token = (tmdb_read_access_token or settings.tmdb_read_access_token).strip() + + if not tmdb_token: + return [] + + current_day = today or date.today() + month_start = date(current_day.year, current_day.month, 1) + end_date = add_months(month_start, 2) - timedelta(days=1) + + anilist_candidates = fetch_anilist_movie_candidates(current_day) + schedule_cache: dict[str, dict] = {} + tmdb_search_cache: dict[str, list[dict]] = {} + tmdb_release_cache: dict[int, dict] = {} + + output_records = [] + + for anilist_entry in anilist_candidates: + tmdb_candidates = collect_tmdb_candidates_for_anilist(anilist_entry, tmdb_token, tmdb_search_cache) + if not tmdb_candidates: + continue + + matched_tmdb = pick_best_tmdb_match(anilist_entry, tmdb_candidates) + if not matched_tmdb: + continue + + tmdb_id = int(matched_tmdb.get("tmdb_id") or 0) + if tmdb_id <= 0: + continue + + if tmdb_id not in tmdb_release_cache: + tmdb_release_cache[tmdb_id] = fetch_tmdb_release_dates(tmdb_id, tmdb_token) + + release_payload = tmdb_release_cache[tmdb_id] + de_theatrical_dates = extract_de_theatrical_dates(release_payload) + release_date = select_release_in_range(de_theatrical_dates, current_day, end_date) + if not release_date: + candidate_release = matched_tmdb.get("release_date") + if isinstance(candidate_release, date) and current_day <= candidate_release <= end_date: + release_date = candidate_release + else: + continue + + preferred_title, schedule_english = resolve_titles(anilist_entry, schedule_token, schedule_cache) + if not preferred_title: + continue + + output_records.append( + build_record( + anilist_entry=anilist_entry, + tmdb_entry=matched_tmdb, + release_date=release_date, + locale=locale, + title=preferred_title, + schedule_english=schedule_english, + ) + ) + + return sort_dedup_records(output_records) diff --git a/requirements.txt b/requirements.txt index d97d58e..10feb68 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1 @@ -discord.py>=2.4,<3.0 +discord.py>=2.4,<3.0 diff --git a/start-bot.bat b/start-bot.bat index d61242c..2c59f78 100644 --- a/start-bot.bat +++ b/start-bot.bat @@ -1,10 +1,10 @@ -@echo off -setlocal - -echo Starte Discord Bot... -if not exist ".env" ( - echo Hinweis: .env wurde nicht gefunden. Der Bot startet nur, wenn die benoetigten Variablen bereits als Umgebungsvariablen gesetzt sind. -) - -k:\dev\Animes\.venv\Scripts\python.exe k:\dev\Animes\discord_bot.py +@echo off +setlocal + +echo Starte Discord Bot... +if not exist ".env" ( + echo Hinweis: .env wurde nicht gefunden. Der Bot startet nur, wenn die benoetigten Variablen bereits als Umgebungsvariablen gesetzt sind. +) + +k:\dev\Animes\.venv\Scripts\python.exe k:\dev\Animes\discord_bot.py exit /b %errorlevel% \ No newline at end of file diff --git a/start.sh b/start.sh new file mode 100644 index 0000000..dea8d83 --- /dev/null +++ b/start.sh @@ -0,0 +1,24 @@ +#!/usr/bin/env bash +set -euo pipefail + +# Ensure the script works regardless of the current working directory. +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +cd "$SCRIPT_DIR" + +echo "Starte Discord Bot..." +if [[ ! -f ".env" ]]; then + echo "Hinweis: .env wurde nicht gefunden. Der Bot startet nur, wenn die benoetigten Variablen bereits als Umgebungsvariablen gesetzt sind." +fi + +PYTHON_BIN=".venv/bin/python" +if [[ ! -x "$PYTHON_BIN" ]]; then + if command -v python3 >/dev/null 2>&1; then + PYTHON_BIN="$(command -v python3)" + echo "Hinweis: .venv/bin/python nicht gefunden, nutze $PYTHON_BIN." + else + echo "Fehler: Kein Python-Interpreter gefunden. Lege zuerst ein venv an oder installiere python3." + exit 1 + fi +fi + +exec "$PYTHON_BIN" "$SCRIPT_DIR/discord_bot.py" \ No newline at end of file