astrXbian/.install/.kodi/addons/script.module.inputstreamhe.../lib/inputstreamhelper/kodiutils.py

400 lines
13 KiB
Python

# -*- coding: utf-8 -*-
# MIT License (see LICENSE.txt or https://opensource.org/licenses/MIT)
"""Implements Kodi Helper functions"""
from __future__ import absolute_import, division, unicode_literals
from contextlib import contextmanager
import xbmc
import xbmcaddon
from xbmcgui import DialogProgress, DialogProgressBG
try: # Kodi v19 or newer
from xbmcvfs import translatePath
except ImportError: # Kodi v18 and older
# pylint: disable=ungrouped-imports
from xbmc import translatePath
from .unicodes import from_unicode, to_unicode
# NOTE: We need to explicitly add the add-on id here!
ADDON = xbmcaddon.Addon('script.module.inputstreamhelper')
class progress_dialog(DialogProgress, object): # pylint: disable=invalid-name,useless-object-inheritance
"""Show Kodi's Progress dialog"""
def __init__(self):
"""Initialize a new progress dialog"""
# Wait for previous Progress dialog to close
# Progress dialog Window ID is 10101: https://kodi.wiki/view/Window_IDs
while get_current_window_id() == 10101:
xbmc.sleep(100)
super(progress_dialog, self).__init__()
def create(self, heading, message=''): # pylint: disable=arguments-differ
"""Create and show a progress dialog"""
if kodi_version_major() < 19:
lines = message.split('\n', 2)
line1, line2, line3 = (lines + [None] * (3 - len(lines)))
return super(progress_dialog, self).create(heading, line1=line1, line2=line2, line3=line3)
return super(progress_dialog, self).create(heading, message=message)
def update(self, percent, message=''): # pylint: disable=arguments-differ
"""Update the progress dialog"""
if kodi_version_major() < 19:
lines = message.split('\n', 2)
line1, line2, line3 = (lines + [None] * (3 - len(lines)))
return super(progress_dialog, self).update(percent, line1=line1, line2=line2, line3=line3)
return super(progress_dialog, self).update(percent, message=message)
class SafeDict(dict):
"""A safe dictionary implementation that does not break down on missing keys"""
def __missing__(self, key):
"""Replace missing keys with the original placeholder"""
return '{' + key + '}'
def kodi_version():
"""Returns full Kodi version as string"""
return xbmc.getInfoLabel('System.BuildVersion').split(' ')[0]
def kodi_version_major():
"""Returns major Kodi version as integer"""
return int(kodi_version().split('.')[0])
def translate_path(path):
"""Translate special xbmc paths"""
return to_unicode(translatePath(from_unicode(path)))
def get_addon_info(key):
"""Return addon information"""
return to_unicode(ADDON.getAddonInfo(key))
def addon_id():
"""Cache and return add-on ID"""
return get_addon_info('id')
def addon_profile():
"""Cache and return add-on profile"""
return translate_path(get_addon_info('profile'))
def addon_version():
"""Cache and return add-on version"""
return get_addon_info('version')
def browsesingle(type, heading, shares='', mask='', useThumbs=False, treatAsFolder=False, defaultt=None): # pylint: disable=invalid-name,redefined-builtin
"""Show a Kodi browseSingle dialog"""
from xbmcgui import Dialog
if not heading:
heading = ADDON.getAddonInfo('name')
return to_unicode(Dialog().browseSingle(type=type, heading=heading, shares=shares, mask=mask, useThumbs=useThumbs,
treatAsFolder=treatAsFolder, defaultt=defaultt))
def notification(heading='', message='', icon='info', time=4000):
"""Show a Kodi notification"""
from xbmcgui import Dialog
if not heading:
heading = ADDON.getAddonInfo('name')
return Dialog().notification(heading=heading, message=message, icon=icon, time=time)
def ok_dialog(heading='', message=''):
"""Show Kodi's OK dialog"""
from xbmcgui import Dialog
if not heading:
heading = ADDON.getAddonInfo('name')
if kodi_version_major() < 19:
return Dialog().ok(heading=heading, line1=message)
return Dialog().ok(heading=heading, message=message)
def select_dialog(heading='', opt_list=None, autoclose=0, preselect=-1, useDetails=False): # pylint: disable=invalid-name
"""Show Kodi's Select dialog"""
from xbmcgui import Dialog
if not heading:
heading = ADDON.getAddonInfo('name')
return Dialog().select(heading, opt_list, autoclose=autoclose, preselect=preselect, useDetails=useDetails)
def textviewer(heading='', text='', usemono=False):
"""Show a Kodi textviewer dialog"""
from xbmcgui import Dialog
if not heading:
heading = ADDON.getAddonInfo('name')
if kodi_version_major() < 18:
return Dialog().textviewer(heading=heading, text=text)
return Dialog().textviewer(heading=heading, text=text, usemono=usemono)
def yesno_dialog(heading='', message='', nolabel=None, yeslabel=None, autoclose=0):
"""Show Kodi's Yes/No dialog"""
from xbmcgui import Dialog
if not heading:
heading = ADDON.getAddonInfo('name')
if kodi_version_major() < 19:
return Dialog().yesno(heading=heading, line1=message, nolabel=nolabel, yeslabel=yeslabel, autoclose=autoclose)
return Dialog().yesno(heading=heading, message=message, nolabel=nolabel, yeslabel=yeslabel, autoclose=autoclose)
def localize(string_id, **kwargs):
"""Return the translated string from the .po language files, optionally translating variables"""
if kwargs:
from string import Formatter
return Formatter().vformat(ADDON.getLocalizedString(string_id), (), SafeDict(**kwargs))
return ADDON.getLocalizedString(string_id)
def get_setting(key, default=None):
"""Get an add-on setting as string"""
try:
value = to_unicode(ADDON.getSetting(key))
except RuntimeError: # Occurs when the add-on is disabled
return default
if value == '' and default is not None:
return default
return value
def get_setting_bool(key, default=None):
"""Get an add-on setting as boolean"""
try:
return ADDON.getSettingBool(key)
except (AttributeError, TypeError): # On Krypton or older, or when not a boolean
value = get_setting(key, default)
if value not in ('false', 'true'):
return default
return bool(value == 'true')
except RuntimeError: # Occurs when the add-on is disabled
return default
def get_setting_int(key, default=None):
"""Get an add-on setting as integer"""
try:
return ADDON.getSettingInt(key)
except (AttributeError, TypeError): # On Krypton or older, or when not an integer
value = get_setting(key, default)
try:
return int(value)
except ValueError:
return default
except RuntimeError: # Occurs when the add-on is disabled
return default
def get_setting_float(key, default=None):
"""Get an add-on setting as float"""
value = get_setting(key, default)
try:
return float(value)
except ValueError:
return default
except RuntimeError: # Occurs when the add-on is disabled
return default
def set_setting(key, value):
"""Set an add-on setting"""
return ADDON.setSetting(key, from_unicode(str(value)))
def set_setting_bool(key, value):
"""Set an add-on setting as boolean"""
try:
return ADDON.setSettingBool(key, value)
except (AttributeError, TypeError): # On Krypton or older, or when not a boolean
if value in ['false', 'true']:
return set_setting(key, value)
if value:
return set_setting(key, 'true')
return set_setting(key, 'false')
def get_global_setting(key):
"""Get a Kodi setting"""
result = jsonrpc(method='Settings.GetSettingValue', params=dict(setting=key))
return result.get('result', {}).get('value')
def get_current_window_id():
"""Get current window id"""
result = jsonrpc(method='GUI.GetProperties', params=dict(properties=['currentwindow']))
if result.get('error'):
return None
return result.get('result', {}).get('currentwindow').get('id')
def has_socks():
"""Test if socks is installed, and use a static variable to remember"""
if hasattr(has_socks, 'cached'):
return getattr(has_socks, 'cached')
try:
import socks # noqa: F401; pylint: disable=unused-variable,unused-import,useless-suppression
except ImportError:
has_socks.cached = False
return None # Detect if this is the first run
has_socks.cached = True
return True
def get_proxies():
"""Return a usable proxies dictionary from Kodi proxy settings"""
usehttpproxy = get_global_setting('network.usehttpproxy')
if usehttpproxy is not True:
return None
try:
httpproxytype = int(get_global_setting('network.httpproxytype'))
except ValueError:
httpproxytype = 0
socks_supported = has_socks()
if httpproxytype != 0 and not socks_supported:
# Only open the dialog the first time (to avoid multiple popups)
if socks_supported is None:
ok_dialog('', localize(30042)) # Requires PySocks
return None
proxy_types = ['http', 'socks4', 'socks4a', 'socks5', 'socks5h']
proxy = dict(
scheme=proxy_types[httpproxytype] if 0 <= httpproxytype < 5 else 'http',
server=get_global_setting('network.httpproxyserver'),
port=get_global_setting('network.httpproxyport'),
username=get_global_setting('network.httpproxyusername'),
password=get_global_setting('network.httpproxypassword'),
)
if proxy.get('username') and proxy.get('password') and proxy.get('server') and proxy.get('port'):
proxy_address = '{scheme}://{username}:{password}@{server}:{port}'.format(**proxy)
elif proxy.get('username') and proxy.get('server') and proxy.get('port'):
proxy_address = '{scheme}://{username}@{server}:{port}'.format(**proxy)
elif proxy.get('server') and proxy.get('port'):
proxy_address = '{scheme}://{server}:{port}'.format(**proxy)
elif proxy.get('server'):
proxy_address = '{scheme}://{server}'.format(**proxy)
else:
return None
return dict(http=proxy_address, https=proxy_address)
def log(level=0, message='', **kwargs):
"""Log info messages to Kodi"""
if kwargs:
from string import Formatter
message = Formatter().vformat(message, (), SafeDict(**kwargs))
message = '[{addon}] {message}'.format(addon=addon_id(), message=message)
xbmc.log(from_unicode(message), level)
def jsonrpc(*args, **kwargs):
"""Perform JSONRPC calls"""
from json import dumps, loads
# We do not accept both args and kwargs
if args and kwargs:
log(4, 'ERROR: Wrong use of jsonrpc()')
return None
# Process a list of actions
if args:
for (idx, cmd) in enumerate(args):
if cmd.get('id') is None:
cmd.update(id=idx)
if cmd.get('jsonrpc') is None:
cmd.update(jsonrpc='2.0')
return loads(xbmc.executeJSONRPC(dumps(args)))
# Process a single action
if kwargs.get('id') is None:
kwargs.update(id=0)
if kwargs.get('jsonrpc') is None:
kwargs.update(jsonrpc='2.0')
return loads(xbmc.executeJSONRPC(dumps(kwargs)))
def kodi_to_ascii(string):
"""Convert Kodi format tags to ascii"""
if string is None:
return None
string = string.replace('[B]', '')
string = string.replace('[/B]', '')
string = string.replace('[I]', '')
string = string.replace('[/I]', '')
string = string.replace('[COLOR gray]', '')
string = string.replace('[COLOR yellow]', '')
string = string.replace('[/COLOR]', '')
return string
@contextmanager
def open_file(path, flags='r'):
"""Open a file (using xbmcvfs)"""
from xbmcvfs import File
fdesc = File(path, flags)
yield fdesc
fdesc.close()
def copy(src, dest):
"""Copy a file (using xbmcvfs)"""
from xbmcvfs import copy as vfscopy
log(2, "Copy file '{src}' to '{dest}'.", src=src, dest=dest)
return vfscopy(from_unicode(src), from_unicode(dest))
def delete(path):
"""Remove a file (using xbmcvfs)"""
from xbmcvfs import delete as vfsdelete
log(2, "Delete file '{path}'.", path=path)
return vfsdelete(from_unicode(path))
def exists(path):
"""Whether the path exists (using xbmcvfs)"""
# File or folder (folder must end with slash or backslash)
from xbmcvfs import exists as vfsexists
return vfsexists(from_unicode(path))
def listdir(path):
"""Return all files in a directory (using xbmcvfs)"""
from xbmcvfs import listdir as vfslistdir
dirs, files = vfslistdir(from_unicode(path))
return [to_unicode(item) for items in (dirs, files) for item in items]
def mkdir(path):
"""Create a directory (using xbmcvfs)"""
from xbmcvfs import mkdir as vfsmkdir
log(2, "Create directory '{path}'.", path=path)
return vfsmkdir(from_unicode(path))
def mkdirs(path):
"""Create directory including parents (using xbmcvfs)"""
from xbmcvfs import mkdirs as vfsmkdirs
log(2, "Recursively create directory '{path}'.", path=path)
return vfsmkdirs(from_unicode(path))
def stat_file(path):
"""Return information about a file (using xbmcvfs)"""
from xbmcvfs import Stat
return Stat(from_unicode(path))
def bg_progress_dialog():
"""Show Kodi's Background Progress dialog"""
return DialogProgressBG()