400 lines
13 KiB
Python
400 lines
13 KiB
Python
# -*- coding: utf-8 -*-
|
|
# MIT License (see LICENSE.txt or https://opensource.org/licenses/MIT)
|
|
"""Implements Kodi Helper functions"""
|
|
|
|
from __future__ import absolute_import, division, unicode_literals
|
|
from contextlib import contextmanager
|
|
import xbmc
|
|
import xbmcaddon
|
|
from xbmcgui import DialogProgress, DialogProgressBG
|
|
|
|
try: # Kodi v19 or newer
|
|
from xbmcvfs import translatePath
|
|
except ImportError: # Kodi v18 and older
|
|
# pylint: disable=ungrouped-imports
|
|
from xbmc import translatePath
|
|
|
|
from .unicodes import from_unicode, to_unicode
|
|
|
|
# NOTE: We need to explicitly add the add-on id here!
|
|
ADDON = xbmcaddon.Addon('script.module.inputstreamhelper')
|
|
|
|
|
|
class progress_dialog(DialogProgress, object): # pylint: disable=invalid-name,useless-object-inheritance
|
|
"""Show Kodi's Progress dialog"""
|
|
|
|
def __init__(self):
|
|
"""Initialize a new progress dialog"""
|
|
# Wait for previous Progress dialog to close
|
|
# Progress dialog Window ID is 10101: https://kodi.wiki/view/Window_IDs
|
|
while get_current_window_id() == 10101:
|
|
xbmc.sleep(100)
|
|
super(progress_dialog, self).__init__()
|
|
|
|
def create(self, heading, message=''): # pylint: disable=arguments-differ
|
|
"""Create and show a progress dialog"""
|
|
if kodi_version_major() < 19:
|
|
lines = message.split('\n', 2)
|
|
line1, line2, line3 = (lines + [None] * (3 - len(lines)))
|
|
return super(progress_dialog, self).create(heading, line1=line1, line2=line2, line3=line3)
|
|
return super(progress_dialog, self).create(heading, message=message)
|
|
|
|
def update(self, percent, message=''): # pylint: disable=arguments-differ
|
|
"""Update the progress dialog"""
|
|
if kodi_version_major() < 19:
|
|
lines = message.split('\n', 2)
|
|
line1, line2, line3 = (lines + [None] * (3 - len(lines)))
|
|
return super(progress_dialog, self).update(percent, line1=line1, line2=line2, line3=line3)
|
|
return super(progress_dialog, self).update(percent, message=message)
|
|
|
|
|
|
class SafeDict(dict):
|
|
"""A safe dictionary implementation that does not break down on missing keys"""
|
|
def __missing__(self, key):
|
|
"""Replace missing keys with the original placeholder"""
|
|
return '{' + key + '}'
|
|
|
|
|
|
def kodi_version():
|
|
"""Returns full Kodi version as string"""
|
|
return xbmc.getInfoLabel('System.BuildVersion').split(' ')[0]
|
|
|
|
|
|
def kodi_version_major():
|
|
"""Returns major Kodi version as integer"""
|
|
return int(kodi_version().split('.')[0])
|
|
|
|
|
|
def translate_path(path):
|
|
"""Translate special xbmc paths"""
|
|
return to_unicode(translatePath(from_unicode(path)))
|
|
|
|
|
|
def get_addon_info(key):
|
|
"""Return addon information"""
|
|
return to_unicode(ADDON.getAddonInfo(key))
|
|
|
|
|
|
def addon_id():
|
|
"""Cache and return add-on ID"""
|
|
return get_addon_info('id')
|
|
|
|
|
|
def addon_profile():
|
|
"""Cache and return add-on profile"""
|
|
return translate_path(get_addon_info('profile'))
|
|
|
|
|
|
def addon_version():
|
|
"""Cache and return add-on version"""
|
|
return get_addon_info('version')
|
|
|
|
|
|
def browsesingle(type, heading, shares='', mask='', useThumbs=False, treatAsFolder=False, defaultt=None): # pylint: disable=invalid-name,redefined-builtin
|
|
"""Show a Kodi browseSingle dialog"""
|
|
from xbmcgui import Dialog
|
|
if not heading:
|
|
heading = ADDON.getAddonInfo('name')
|
|
return to_unicode(Dialog().browseSingle(type=type, heading=heading, shares=shares, mask=mask, useThumbs=useThumbs,
|
|
treatAsFolder=treatAsFolder, defaultt=defaultt))
|
|
|
|
|
|
def notification(heading='', message='', icon='info', time=4000):
|
|
"""Show a Kodi notification"""
|
|
from xbmcgui import Dialog
|
|
if not heading:
|
|
heading = ADDON.getAddonInfo('name')
|
|
return Dialog().notification(heading=heading, message=message, icon=icon, time=time)
|
|
|
|
|
|
def ok_dialog(heading='', message=''):
|
|
"""Show Kodi's OK dialog"""
|
|
from xbmcgui import Dialog
|
|
if not heading:
|
|
heading = ADDON.getAddonInfo('name')
|
|
if kodi_version_major() < 19:
|
|
return Dialog().ok(heading=heading, line1=message)
|
|
return Dialog().ok(heading=heading, message=message)
|
|
|
|
|
|
def select_dialog(heading='', opt_list=None, autoclose=0, preselect=-1, useDetails=False): # pylint: disable=invalid-name
|
|
"""Show Kodi's Select dialog"""
|
|
from xbmcgui import Dialog
|
|
if not heading:
|
|
heading = ADDON.getAddonInfo('name')
|
|
return Dialog().select(heading, opt_list, autoclose=autoclose, preselect=preselect, useDetails=useDetails)
|
|
|
|
|
|
def textviewer(heading='', text='', usemono=False):
|
|
"""Show a Kodi textviewer dialog"""
|
|
from xbmcgui import Dialog
|
|
if not heading:
|
|
heading = ADDON.getAddonInfo('name')
|
|
if kodi_version_major() < 18:
|
|
return Dialog().textviewer(heading=heading, text=text)
|
|
return Dialog().textviewer(heading=heading, text=text, usemono=usemono)
|
|
|
|
|
|
def yesno_dialog(heading='', message='', nolabel=None, yeslabel=None, autoclose=0):
|
|
"""Show Kodi's Yes/No dialog"""
|
|
from xbmcgui import Dialog
|
|
if not heading:
|
|
heading = ADDON.getAddonInfo('name')
|
|
if kodi_version_major() < 19:
|
|
return Dialog().yesno(heading=heading, line1=message, nolabel=nolabel, yeslabel=yeslabel, autoclose=autoclose)
|
|
return Dialog().yesno(heading=heading, message=message, nolabel=nolabel, yeslabel=yeslabel, autoclose=autoclose)
|
|
|
|
|
|
def localize(string_id, **kwargs):
|
|
"""Return the translated string from the .po language files, optionally translating variables"""
|
|
if kwargs:
|
|
from string import Formatter
|
|
return Formatter().vformat(ADDON.getLocalizedString(string_id), (), SafeDict(**kwargs))
|
|
return ADDON.getLocalizedString(string_id)
|
|
|
|
|
|
def get_setting(key, default=None):
|
|
"""Get an add-on setting as string"""
|
|
try:
|
|
value = to_unicode(ADDON.getSetting(key))
|
|
except RuntimeError: # Occurs when the add-on is disabled
|
|
return default
|
|
if value == '' and default is not None:
|
|
return default
|
|
return value
|
|
|
|
|
|
def get_setting_bool(key, default=None):
|
|
"""Get an add-on setting as boolean"""
|
|
try:
|
|
return ADDON.getSettingBool(key)
|
|
except (AttributeError, TypeError): # On Krypton or older, or when not a boolean
|
|
value = get_setting(key, default)
|
|
if value not in ('false', 'true'):
|
|
return default
|
|
return bool(value == 'true')
|
|
except RuntimeError: # Occurs when the add-on is disabled
|
|
return default
|
|
|
|
|
|
def get_setting_int(key, default=None):
|
|
"""Get an add-on setting as integer"""
|
|
try:
|
|
return ADDON.getSettingInt(key)
|
|
except (AttributeError, TypeError): # On Krypton or older, or when not an integer
|
|
value = get_setting(key, default)
|
|
try:
|
|
return int(value)
|
|
except ValueError:
|
|
return default
|
|
except RuntimeError: # Occurs when the add-on is disabled
|
|
return default
|
|
|
|
|
|
def get_setting_float(key, default=None):
|
|
"""Get an add-on setting as float"""
|
|
value = get_setting(key, default)
|
|
try:
|
|
return float(value)
|
|
except ValueError:
|
|
return default
|
|
except RuntimeError: # Occurs when the add-on is disabled
|
|
return default
|
|
|
|
|
|
def set_setting(key, value):
|
|
"""Set an add-on setting"""
|
|
return ADDON.setSetting(key, from_unicode(str(value)))
|
|
|
|
|
|
def set_setting_bool(key, value):
|
|
"""Set an add-on setting as boolean"""
|
|
try:
|
|
return ADDON.setSettingBool(key, value)
|
|
except (AttributeError, TypeError): # On Krypton or older, or when not a boolean
|
|
if value in ['false', 'true']:
|
|
return set_setting(key, value)
|
|
if value:
|
|
return set_setting(key, 'true')
|
|
return set_setting(key, 'false')
|
|
|
|
|
|
def get_global_setting(key):
|
|
"""Get a Kodi setting"""
|
|
result = jsonrpc(method='Settings.GetSettingValue', params=dict(setting=key))
|
|
return result.get('result', {}).get('value')
|
|
|
|
|
|
def get_current_window_id():
|
|
"""Get current window id"""
|
|
result = jsonrpc(method='GUI.GetProperties', params=dict(properties=['currentwindow']))
|
|
if result.get('error'):
|
|
return None
|
|
return result.get('result', {}).get('currentwindow').get('id')
|
|
|
|
|
|
def has_socks():
|
|
"""Test if socks is installed, and use a static variable to remember"""
|
|
if hasattr(has_socks, 'cached'):
|
|
return getattr(has_socks, 'cached')
|
|
try:
|
|
import socks # noqa: F401; pylint: disable=unused-variable,unused-import,useless-suppression
|
|
except ImportError:
|
|
has_socks.cached = False
|
|
return None # Detect if this is the first run
|
|
has_socks.cached = True
|
|
return True
|
|
|
|
|
|
def get_proxies():
|
|
"""Return a usable proxies dictionary from Kodi proxy settings"""
|
|
usehttpproxy = get_global_setting('network.usehttpproxy')
|
|
if usehttpproxy is not True:
|
|
return None
|
|
|
|
try:
|
|
httpproxytype = int(get_global_setting('network.httpproxytype'))
|
|
except ValueError:
|
|
httpproxytype = 0
|
|
|
|
socks_supported = has_socks()
|
|
if httpproxytype != 0 and not socks_supported:
|
|
# Only open the dialog the first time (to avoid multiple popups)
|
|
if socks_supported is None:
|
|
ok_dialog('', localize(30042)) # Requires PySocks
|
|
return None
|
|
|
|
proxy_types = ['http', 'socks4', 'socks4a', 'socks5', 'socks5h']
|
|
|
|
proxy = dict(
|
|
scheme=proxy_types[httpproxytype] if 0 <= httpproxytype < 5 else 'http',
|
|
server=get_global_setting('network.httpproxyserver'),
|
|
port=get_global_setting('network.httpproxyport'),
|
|
username=get_global_setting('network.httpproxyusername'),
|
|
password=get_global_setting('network.httpproxypassword'),
|
|
)
|
|
|
|
if proxy.get('username') and proxy.get('password') and proxy.get('server') and proxy.get('port'):
|
|
proxy_address = '{scheme}://{username}:{password}@{server}:{port}'.format(**proxy)
|
|
elif proxy.get('username') and proxy.get('server') and proxy.get('port'):
|
|
proxy_address = '{scheme}://{username}@{server}:{port}'.format(**proxy)
|
|
elif proxy.get('server') and proxy.get('port'):
|
|
proxy_address = '{scheme}://{server}:{port}'.format(**proxy)
|
|
elif proxy.get('server'):
|
|
proxy_address = '{scheme}://{server}'.format(**proxy)
|
|
else:
|
|
return None
|
|
|
|
return dict(http=proxy_address, https=proxy_address)
|
|
|
|
|
|
def log(level=0, message='', **kwargs):
|
|
"""Log info messages to Kodi"""
|
|
if kwargs:
|
|
from string import Formatter
|
|
message = Formatter().vformat(message, (), SafeDict(**kwargs))
|
|
message = '[{addon}] {message}'.format(addon=addon_id(), message=message)
|
|
xbmc.log(from_unicode(message), level)
|
|
|
|
|
|
def jsonrpc(*args, **kwargs):
|
|
"""Perform JSONRPC calls"""
|
|
from json import dumps, loads
|
|
|
|
# We do not accept both args and kwargs
|
|
if args and kwargs:
|
|
log(4, 'ERROR: Wrong use of jsonrpc()')
|
|
return None
|
|
|
|
# Process a list of actions
|
|
if args:
|
|
for (idx, cmd) in enumerate(args):
|
|
if cmd.get('id') is None:
|
|
cmd.update(id=idx)
|
|
if cmd.get('jsonrpc') is None:
|
|
cmd.update(jsonrpc='2.0')
|
|
return loads(xbmc.executeJSONRPC(dumps(args)))
|
|
|
|
# Process a single action
|
|
if kwargs.get('id') is None:
|
|
kwargs.update(id=0)
|
|
if kwargs.get('jsonrpc') is None:
|
|
kwargs.update(jsonrpc='2.0')
|
|
return loads(xbmc.executeJSONRPC(dumps(kwargs)))
|
|
|
|
|
|
def kodi_to_ascii(string):
|
|
"""Convert Kodi format tags to ascii"""
|
|
if string is None:
|
|
return None
|
|
string = string.replace('[B]', '')
|
|
string = string.replace('[/B]', '')
|
|
string = string.replace('[I]', '')
|
|
string = string.replace('[/I]', '')
|
|
string = string.replace('[COLOR gray]', '')
|
|
string = string.replace('[COLOR yellow]', '')
|
|
string = string.replace('[/COLOR]', '')
|
|
return string
|
|
|
|
|
|
@contextmanager
|
|
def open_file(path, flags='r'):
|
|
"""Open a file (using xbmcvfs)"""
|
|
from xbmcvfs import File
|
|
fdesc = File(path, flags)
|
|
yield fdesc
|
|
fdesc.close()
|
|
|
|
|
|
def copy(src, dest):
|
|
"""Copy a file (using xbmcvfs)"""
|
|
from xbmcvfs import copy as vfscopy
|
|
log(2, "Copy file '{src}' to '{dest}'.", src=src, dest=dest)
|
|
return vfscopy(from_unicode(src), from_unicode(dest))
|
|
|
|
|
|
def delete(path):
|
|
"""Remove a file (using xbmcvfs)"""
|
|
from xbmcvfs import delete as vfsdelete
|
|
log(2, "Delete file '{path}'.", path=path)
|
|
return vfsdelete(from_unicode(path))
|
|
|
|
|
|
def exists(path):
|
|
"""Whether the path exists (using xbmcvfs)"""
|
|
# File or folder (folder must end with slash or backslash)
|
|
from xbmcvfs import exists as vfsexists
|
|
return vfsexists(from_unicode(path))
|
|
|
|
|
|
def listdir(path):
|
|
"""Return all files in a directory (using xbmcvfs)"""
|
|
from xbmcvfs import listdir as vfslistdir
|
|
dirs, files = vfslistdir(from_unicode(path))
|
|
return [to_unicode(item) for items in (dirs, files) for item in items]
|
|
|
|
|
|
def mkdir(path):
|
|
"""Create a directory (using xbmcvfs)"""
|
|
from xbmcvfs import mkdir as vfsmkdir
|
|
log(2, "Create directory '{path}'.", path=path)
|
|
return vfsmkdir(from_unicode(path))
|
|
|
|
|
|
def mkdirs(path):
|
|
"""Create directory including parents (using xbmcvfs)"""
|
|
from xbmcvfs import mkdirs as vfsmkdirs
|
|
log(2, "Recursively create directory '{path}'.", path=path)
|
|
return vfsmkdirs(from_unicode(path))
|
|
|
|
|
|
def stat_file(path):
|
|
"""Return information about a file (using xbmcvfs)"""
|
|
from xbmcvfs import Stat
|
|
return Stat(from_unicode(path))
|
|
|
|
|
|
def bg_progress_dialog():
|
|
"""Show Kodi's Background Progress dialog"""
|
|
return DialogProgressBG()
|