513 lines
16 KiB
Python
513 lines
16 KiB
Python
|
# coding: utf-8
|
|||
|
#
|
|||
|
# Copyright © 2020 melmorabity
|
|||
|
#
|
|||
|
# This program is free software; you can redistribute it and/or modify it under
|
|||
|
# the terms of the GNU General Public License as published by the Free Software
|
|||
|
# Foundation; either version 2 of the License, or (at your option) any later
|
|||
|
# version.
|
|||
|
#
|
|||
|
# This program is distributed in the hope that it will be useful, but WITHOUT
|
|||
|
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
|
|||
|
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
|
|||
|
# details.
|
|||
|
#
|
|||
|
# You should have received a copy of the GNU General Public License along with
|
|||
|
# this program; if not, write to the Free Software Foundation, Inc., 51
|
|||
|
# Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
|||
|
|
|||
|
from __future__ import unicode_literals
|
|||
|
import logging
|
|||
|
from os.path import dirname
|
|||
|
from os.path import join
|
|||
|
import re
|
|||
|
|
|||
|
import time
|
|||
|
|
|||
|
try:
|
|||
|
from itertools import zip_longest # type: ignore
|
|||
|
except ImportError:
|
|||
|
from itertools import izip_longest as zip_longest
|
|||
|
|
|||
|
try:
|
|||
|
from typing import Any
|
|||
|
from typing import Dict
|
|||
|
from typing import Generator
|
|||
|
from typing import List
|
|||
|
from typing import NamedTuple
|
|||
|
from typing import Optional
|
|||
|
from typing import Text
|
|||
|
from typing import Union
|
|||
|
|
|||
|
Item = Dict[Text, Any]
|
|||
|
Collection = List[Item]
|
|||
|
|
|||
|
Art = Dict[Text, Optional[Text]] # pylint: disable=unsubscriptable-object
|
|||
|
Url = Dict[Text, Text]
|
|||
|
|
|||
|
ParsedItem = NamedTuple(
|
|||
|
"ParsedItem",
|
|||
|
[
|
|||
|
("label", Text),
|
|||
|
("url", Url),
|
|||
|
("info", Dict[Text, Any]),
|
|||
|
("art", Art),
|
|||
|
("properties", Dict[Text, Text]),
|
|||
|
],
|
|||
|
)
|
|||
|
except ImportError:
|
|||
|
from collections import namedtuple
|
|||
|
|
|||
|
ParsedItem = namedtuple( # type: ignore
|
|||
|
"ParsedItem", ["label", "url", "info", "art", "properties"]
|
|||
|
)
|
|||
|
|
|||
|
from requests import Response
|
|||
|
from requests import Session
|
|||
|
from requests.exceptions import HTTPError
|
|||
|
|
|||
|
from resources.lib.utils import capitalize
|
|||
|
from resources.lib.utils import html_to_text
|
|||
|
from resources.lib.utils import update_url_params
|
|||
|
|
|||
|
|
|||
|
_LOGGER = logging.getLogger(__name__)
|
|||
|
|
|||
|
_IMAGE_TYPE_MAPPING = {
|
|||
|
"background_16x9": "fanart",
|
|||
|
"carre": "thumb",
|
|||
|
"vignette_16x9": "fanart",
|
|||
|
"vignette_3x4": "poster",
|
|||
|
"hero": "clearart",
|
|||
|
"hero_plein": "characterart",
|
|||
|
"logo": "clearlogo",
|
|||
|
} # type: Dict[Optional[Text], Text]
|
|||
|
|
|||
|
_MEDIA_DIR = join(dirname(__file__), "..", "media")
|
|||
|
|
|||
|
_CHANNEL_ICONS = {
|
|||
|
"france-2": join(_MEDIA_DIR, "france-2.png"),
|
|||
|
"france-3": join(_MEDIA_DIR, "france-3.png"),
|
|||
|
"france-4": join(_MEDIA_DIR, "france-4.png"),
|
|||
|
"france-5": join(_MEDIA_DIR, "france-5.png"),
|
|||
|
"france-o": join(_MEDIA_DIR, "france-o.png"),
|
|||
|
"la1ere": join(_MEDIA_DIR, "la1ere.png"),
|
|||
|
"franceinfo": join(_MEDIA_DIR, "franceinfo.png"),
|
|||
|
"slash": join(_MEDIA_DIR, "slash.png"),
|
|||
|
"okoo": join(_MEDIA_DIR, "okoo.png"),
|
|||
|
} # type: Dict[Optional[Text], Text]
|
|||
|
|
|||
|
_ALL_TV_SHOWS_ICON = join(_MEDIA_DIR, "all-tv-shows.png")
|
|||
|
_ALL_VIDEOS_ICON = join(_MEDIA_DIR, "all-videos.png")
|
|||
|
_NEXT_PAGE_ICON = join(_MEDIA_DIR, "next-page.png")
|
|||
|
|
|||
|
|
|||
|
class FranceTVException(Exception):
|
|||
|
pass
|
|||
|
|
|||
|
|
|||
|
class FranceTV:
|
|||
|
_API_URL = "https://api-mobile.yatta.francetv.fr"
|
|||
|
|
|||
|
def __init__(self):
|
|||
|
self._session = Session()
|
|||
|
self._session.hooks = {"response": [self._requests_raise_status]}
|
|||
|
|
|||
|
def __enter__(self):
|
|||
|
return self
|
|||
|
|
|||
|
def __exit__(self, *args):
|
|||
|
if self._session:
|
|||
|
self._session.close()
|
|||
|
|
|||
|
@staticmethod
|
|||
|
def _requests_raise_status(response, *_args, **_kwargs):
|
|||
|
# type: (Response, Any, Any) -> None
|
|||
|
|
|||
|
try:
|
|||
|
response.raise_for_status()
|
|||
|
except HTTPError as ex:
|
|||
|
try:
|
|||
|
raise FranceTVException(ex, ex.response.json().get("error"))
|
|||
|
except ValueError:
|
|||
|
raise ex
|
|||
|
|
|||
|
def _query_api(self, path):
|
|||
|
# type: (Text) -> Union[Item, Collection]
|
|||
|
|
|||
|
return self._session.get(
|
|||
|
"{}/{}".format(self._API_URL, path), params={"platform": "apps"},
|
|||
|
).json()
|
|||
|
|
|||
|
@staticmethod
|
|||
|
def _get_channel_id(item):
|
|||
|
# type: (Item) -> Optional[Text]
|
|||
|
|
|||
|
if isinstance(item.get("channel"), dict):
|
|||
|
channel_id = item["channel"].get("channel_path")
|
|||
|
else:
|
|||
|
channel_id = (
|
|||
|
item.get("channel")
|
|||
|
or item.get("channel_path")
|
|||
|
or item.get("region_path")
|
|||
|
)
|
|||
|
|
|||
|
if channel_id:
|
|||
|
return re.split("[_/]", channel_id)[0]
|
|||
|
|
|||
|
return None
|
|||
|
|
|||
|
@staticmethod
|
|||
|
def _is_live(item, parent_item):
|
|||
|
# type: (Item, Item) -> bool
|
|||
|
|
|||
|
# "is_live" item key can be "false", even for "real" lives
|
|||
|
if parent_item.get("type") in [
|
|||
|
"live",
|
|||
|
"live_channel",
|
|||
|
"current_live",
|
|||
|
]:
|
|||
|
return True
|
|||
|
|
|||
|
return bool(item.get("is_live"))
|
|||
|
|
|||
|
@staticmethod
|
|||
|
def _parse_item_art(item, parent_item):
|
|||
|
# type: (Item, Item) -> Art
|
|||
|
|
|||
|
art = {} # type: Art
|
|||
|
|
|||
|
channel_icon = _CHANNEL_ICONS.get(FranceTV._get_channel_id(item))
|
|||
|
art.setdefault("icon", channel_icon)
|
|||
|
|
|||
|
# Use channel logo as thumb for live videos
|
|||
|
if FranceTV._is_live(item, parent_item):
|
|||
|
art.setdefault("thumb", channel_icon)
|
|||
|
|
|||
|
item_type = item.get("type")
|
|||
|
|
|||
|
# Artwork provided by the france.tv API is really bad for
|
|||
|
# channels
|
|||
|
if item_type == "channel":
|
|||
|
return art
|
|||
|
|
|||
|
for image in item.get("images") or []:
|
|||
|
image_type = _IMAGE_TYPE_MAPPING.get(image.get("type"))
|
|||
|
|
|||
|
if not image_type or not image.get("urls"):
|
|||
|
continue
|
|||
|
|
|||
|
# Category fanarts provided the france.tv API are low-resolution
|
|||
|
if item_type == "categorie" and image_type == "fanart":
|
|||
|
continue
|
|||
|
|
|||
|
# Sort images by quality
|
|||
|
image_urls = sorted(
|
|||
|
list(image["urls"].items()),
|
|||
|
key=lambda i: int(i[0].split(":")[1]),
|
|||
|
)
|
|||
|
|
|||
|
art.setdefault(image_type, image_urls[-1][1])
|
|||
|
|
|||
|
# Complete missing artwork with item program
|
|||
|
if item.get("program"):
|
|||
|
program_art = FranceTV._parse_item_art(item["program"], {})
|
|||
|
art = dict(list(program_art.items()) + list(art.items()))
|
|||
|
|
|||
|
return art
|
|||
|
|
|||
|
@staticmethod
|
|||
|
# pylint: disable=too-many-branches
|
|||
|
def _get_item_url(
|
|||
|
item, # type: Item
|
|||
|
path, # type: Text
|
|||
|
level, # type: Optional[int]
|
|||
|
parent_item, # type: Item
|
|||
|
):
|
|||
|
# type: (...) -> Optional[Url]
|
|||
|
|
|||
|
video_id = None
|
|||
|
if FranceTV._is_live(item, parent_item):
|
|||
|
video_id = (item.get("channel") or {}).get("si_id")
|
|||
|
if not video_id:
|
|||
|
video_id = item.get("si_id")
|
|||
|
|
|||
|
if video_id:
|
|||
|
return {
|
|||
|
"mode": "watch",
|
|||
|
"id": video_id,
|
|||
|
}
|
|||
|
|
|||
|
url = {
|
|||
|
"mode": "collection",
|
|||
|
} # type: Dict[Text, Any]
|
|||
|
|
|||
|
item_type = item.get("type")
|
|||
|
|
|||
|
if item.get("url_complete"):
|
|||
|
if item_type == "sous_categorie":
|
|||
|
url["path"] = "apps/sub-categories/{}".format(
|
|||
|
item["url_complete"]
|
|||
|
)
|
|||
|
else:
|
|||
|
url["path"] = "apps/{}s/{}".format(
|
|||
|
item_type, item["url_complete"]
|
|||
|
)
|
|||
|
elif item_type == "collection" and item.get("id"):
|
|||
|
url["path"] = "generic/collections/{}".format(item["id"])
|
|||
|
elif item.get("program_path"):
|
|||
|
url["path"] = "apps/program/{}".format(item["program_path"])
|
|||
|
elif item.get("link"):
|
|||
|
url["path"] = item["link"]
|
|||
|
elif item.get("region_path"):
|
|||
|
url["path"] = "/apps/regions/{}/{}".format(
|
|||
|
item["region_path"], path.split("/")[-1]
|
|||
|
)
|
|||
|
elif item.get("channel_path"):
|
|||
|
if item.get("channel_url") == "la1ere":
|
|||
|
url["path"] = "apps/regions/outre-mer"
|
|||
|
else:
|
|||
|
url["path"] = "apps/channels/{}".format(item["channel_path"])
|
|||
|
elif "items" in item:
|
|||
|
url["path"] = path
|
|||
|
url["level"] = level
|
|||
|
else:
|
|||
|
_LOGGER.warning("Item %s in path %s is unmanaged", item, path)
|
|||
|
return None
|
|||
|
|
|||
|
# Ignore items based on user authentication
|
|||
|
if ":userId" in url["path"] or ":userUId" in url["path"]:
|
|||
|
return None
|
|||
|
|
|||
|
return url
|
|||
|
|
|||
|
@staticmethod
|
|||
|
# pylint: disable=too-many-branches,too-many-statements
|
|||
|
def _parse_item(
|
|||
|
item, # type: Item
|
|||
|
path, # type: Text
|
|||
|
level, # type: Optional[int]
|
|||
|
parent_item, # type: Item
|
|||
|
):
|
|||
|
# type: (...) -> Optional[ParsedItem]
|
|||
|
|
|||
|
url = FranceTV._get_item_url(item, path, level, parent_item)
|
|||
|
if not url:
|
|||
|
return None
|
|||
|
|
|||
|
info = {} # type: Dict[Text, Any]
|
|||
|
art = FranceTV._parse_item_art(item, parent_item)
|
|||
|
properties = {} # type: Dict[Text, Text]
|
|||
|
|
|||
|
item_type = item.get("type")
|
|||
|
|
|||
|
title = capitalize(item.get("label") or item.get("title"))
|
|||
|
program = capitalize((item.get("program") or {}).get("label"))
|
|||
|
|
|||
|
if not title:
|
|||
|
if item_type == "categories":
|
|||
|
title = "Catégories"
|
|||
|
elif (
|
|||
|
parent_item.get("type") == "program"
|
|||
|
and item_type == "playlist_program"
|
|||
|
):
|
|||
|
title = "À regarder également"
|
|||
|
elif program:
|
|||
|
title = program
|
|||
|
else:
|
|||
|
_LOGGER.warning("No title in item %s in path %s", item, path)
|
|||
|
title = "Inconnu"
|
|||
|
|
|||
|
if title == program and item.get("episode_title"):
|
|||
|
title = capitalize(item["episode_title"])
|
|||
|
|
|||
|
label_parts = [program, title]
|
|||
|
label = " – ".join([i for i in label_parts if i])
|
|||
|
|
|||
|
result = ParsedItem(label, url, info, art, properties)
|
|||
|
|
|||
|
info["title"] = title
|
|||
|
info["plot"] = (
|
|||
|
html_to_text(item.get("description") or item.get("synopsis"))
|
|||
|
or title
|
|||
|
)
|
|||
|
|
|||
|
# No need to parse more item metadata for folders
|
|||
|
if url["mode"] != "watch":
|
|||
|
return result
|
|||
|
|
|||
|
info["genre"] = capitalize((item.get("category") or {}).get("label"))
|
|||
|
info["year"] = item.get("production_year") or item.get("year")
|
|||
|
|
|||
|
if item.get("episode"):
|
|||
|
info["episode"] = item["episode"]
|
|||
|
|
|||
|
if item.get("season"):
|
|||
|
info["season"] = item["season"]
|
|||
|
|
|||
|
if FranceTV._is_live(item, parent_item):
|
|||
|
# Don't mark live streams as read once played
|
|||
|
info["playcount"] = 0
|
|||
|
|
|||
|
if item.get("casting"):
|
|||
|
cast = item["casting"].split(", ")
|
|||
|
|
|||
|
if item.get("characters"):
|
|||
|
info["cast"] = list(
|
|||
|
zip_longest(
|
|||
|
cast, item["characters"].replace("\n", "").split(", ")
|
|||
|
)
|
|||
|
)
|
|||
|
else:
|
|||
|
info["cast"] = cast
|
|||
|
elif item.get("presenter"):
|
|||
|
info["cast"] = [
|
|||
|
(p, "Présentateur",)
|
|||
|
for p in item["presenter"]
|
|||
|
.replace("Présenté par ", "")
|
|||
|
.rstrip(".")
|
|||
|
.split(",")
|
|||
|
]
|
|||
|
|
|||
|
if item.get("director"):
|
|||
|
info["director"] = item["director"].split(", ")
|
|||
|
|
|||
|
info["mpaa"] = item.get("rating_csa_code")
|
|||
|
info["plotoutline"] = item.get("headline_title") or item.get(
|
|||
|
"subtitle"
|
|||
|
)
|
|||
|
info["duration"] = item.get("duration")
|
|||
|
|
|||
|
if program:
|
|||
|
info["tvshowtitle"] = program
|
|||
|
|
|||
|
if not info["year"] and item.get("broadcast_begin_date"):
|
|||
|
info["aired"] = time.strftime(
|
|||
|
"%Y-%m-%d", # type: ignore
|
|||
|
time.localtime(item["broadcast_begin_date"]),
|
|||
|
)
|
|||
|
|
|||
|
if item.get("begin_date"):
|
|||
|
info["dateadded"] = time.strftime(
|
|||
|
"%Y-%m-%d %H:%M:%S", # type: ignore
|
|||
|
time.localtime(item["begin_date"]),
|
|||
|
)
|
|||
|
|
|||
|
if (
|
|||
|
info.get("tvshowtitle")
|
|||
|
or info.get("episode")
|
|||
|
or info.get("season")
|
|||
|
):
|
|||
|
info["mediatype"] = "episode"
|
|||
|
else:
|
|||
|
info["mediatype"] = "movie"
|
|||
|
|
|||
|
properties["isPlayable"] = "true"
|
|||
|
|
|||
|
return result
|
|||
|
|
|||
|
def get_collection(self, path, level=None):
|
|||
|
# type: (Text, Optional[int]) -> Generator[ParsedItem, None, None]
|
|||
|
|
|||
|
data = self._query_api(path)
|
|||
|
|
|||
|
if isinstance(data, dict):
|
|||
|
cursor = data.get("cursor")
|
|||
|
parent_item = data.get("item") or data
|
|||
|
collection = data.get("collections") or data.get("items") or []
|
|||
|
else:
|
|||
|
cursor = None
|
|||
|
parent_item = {}
|
|||
|
collection = data
|
|||
|
|
|||
|
if level is not None and level < len(collection):
|
|||
|
cursor = None
|
|||
|
parent_item = collection[level]
|
|||
|
collection = collection[level].get("items") or []
|
|||
|
|
|||
|
# Sub-category items only provides incomplete list of programs and
|
|||
|
# videos, use our own extra items insteads
|
|||
|
if parent_item.get("type") != "sous_categorie":
|
|||
|
for index, item in enumerate(collection):
|
|||
|
parsed_item = self._parse_item(item, path, index, parent_item)
|
|||
|
if parsed_item:
|
|||
|
yield parsed_item
|
|||
|
|
|||
|
if path == "generic/channels":
|
|||
|
# Add "virtual" Okoo channel in the channel collection, as done on
|
|||
|
# the france.tv website
|
|||
|
yield ParsedItem(
|
|||
|
"Okoo",
|
|||
|
{"mode": "collection", "path": "apps/categories/enfants"},
|
|||
|
{"plot": "Okoo"},
|
|||
|
{"icon": _CHANNEL_ICONS["okoo"]},
|
|||
|
{},
|
|||
|
)
|
|||
|
|
|||
|
# Extra items
|
|||
|
|
|||
|
parent_item_type = parent_item.get("type")
|
|||
|
|
|||
|
if parent_item_type in [
|
|||
|
"categorie",
|
|||
|
"channel",
|
|||
|
"program",
|
|||
|
"region",
|
|||
|
"sous_categorie",
|
|||
|
]:
|
|||
|
collection_id = (
|
|||
|
parent_item.get("channel_path")
|
|||
|
or parent_item.get("program_path")
|
|||
|
or parent_item.get("region_path")
|
|||
|
or parent_item.get("url_complete")
|
|||
|
)
|
|||
|
|
|||
|
# Add "all TV shows"/"all programs" item
|
|||
|
if parent_item_type != "program":
|
|||
|
yield ParsedItem(
|
|||
|
"$LOCALIZE[30101]",
|
|||
|
{
|
|||
|
"mode": "collection",
|
|||
|
"path": "apps/regions/{}/programs".format(
|
|||
|
collection_id
|
|||
|
),
|
|||
|
},
|
|||
|
{"plot": ""},
|
|||
|
{"icon": _ALL_TV_SHOWS_ICON},
|
|||
|
{"SpecialSort": "bottom"},
|
|||
|
)
|
|||
|
|
|||
|
yield ParsedItem(
|
|||
|
"$LOCALIZE[30102]",
|
|||
|
{
|
|||
|
"mode": "collection",
|
|||
|
"path": "generic/taxonomy/{}/contents".format(
|
|||
|
collection_id
|
|||
|
),
|
|||
|
},
|
|||
|
{"plot": ""},
|
|||
|
{"icon": _ALL_VIDEOS_ICON},
|
|||
|
{"SpecialSort": "bottom"},
|
|||
|
)
|
|||
|
|
|||
|
if (
|
|||
|
level is None
|
|||
|
and cursor
|
|||
|
and cursor.get("next")
|
|||
|
and cursor.get("last")
|
|||
|
):
|
|||
|
# Add "next page" item
|
|||
|
label = "$LOCALIZE[30103] ({}/{})".format(
|
|||
|
cursor["next"] + 1, cursor["last"] + 1
|
|||
|
)
|
|||
|
yield ParsedItem(
|
|||
|
label,
|
|||
|
{
|
|||
|
"mode": "collection",
|
|||
|
"path": update_url_params(path, page=cursor["next"]),
|
|||
|
},
|
|||
|
{"plot": ""},
|
|||
|
{"icon": _NEXT_PAGE_ICON},
|
|||
|
{"SpecialSort": "bottom"},
|
|||
|
)
|