astrXbian_UX-et-webUI/.install/.kodi/addons/plugin.video.vstream/resources/lib/librecaptcha/recaptcha.py

518 lines
16 KiB
Python

# Copyright (C) 2017, 2019 nickolas360 <contact@nickolas360.com>
#
# This file is part of librecaptcha.
#
# librecaptcha is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# librecaptcha is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with librecaptcha. If not, see <http://www.gnu.org/licenses/>.
from resources.lib.comaddon import VSlog # import du dialog progress
from .errors import UserError
from .extract_strings import extract_and_save
from threading import Thread
try:
from html.parser import HTMLParser
from urllib.parse import urlparse
except:
from HTMLParser import HTMLParser
from urlparse import urlparse
import requests
import base64
import json
import re
import time
import xbmcvfs
BASE_URL = "https://www.google.com/recaptcha/api2/"
API_JS_URL = "https://www.google.com/recaptcha/api.js"
JS_URL_TEMPLATE = "https://www.gstatic.com/recaptcha/releases/{}/recaptcha__fr.js"
STRINGS_VERSION = "0.1.0"
STRINGS_PATH = 'special://home/userdata/addon_data/plugin.video.vstream'
DYNAMIC_SELECT_DELAY = 4.5 # seconds
FIND_GOAL_SEARCH_DISTANCE = 10
def get_full_url(url):
return BASE_URL.rstrip("/") + "/" + url.lstrip("/")
def get_rc_site_url(url):
parsed = urlparse(url)
if not parsed.hostname:
raise UserError("Error: Site URL has no hostname.")
if not parsed.scheme:
raise UserError("Error: Site URL has no scheme.")
if parsed.scheme not in ["http", "https"]:
raise UserError(
"Error: Site URL has invalid scheme: {}".format(parsed.scheme),
)
port = parsed.port
if port is None:
port = {"http": 80, "https": 443}[parsed.scheme]
return "{}://{}:{}".format(parsed.scheme, parsed.hostname, port)
def rc_base64(string):
data = string
try:
if isinstance(string, unicode):
data = string.encode()
except:
if isinstance(string, str):
data = string.encode()
return base64.b64encode(data, b"-_").decode().replace("=", ".")
def load_rc_json(text):
return json.loads(text.split("\n", 1)[1])
def get_meta(pmeta, probable_index):
if not isinstance(pmeta, list):
raise TypeError("pmeta is not a list: {!r}".format(pmeta))
def matches(meta):
return meta and isinstance(meta, list)
if probable_index < len(pmeta):
meta = pmeta[probable_index]
if matches(meta):
return meta
for child in pmeta:
if matches(child):
return child
raise RuntimeError("Could not find meta; pmeta: {!r}".format(pmeta))
def get_rresp(uvresp):
if not isinstance(uvresp, list):
raise TypeError("uvresp is not a list: {!r}".format(uvresp))
for child in uvresp:
if child and isinstance(child, list) and child[0] == "rresp":
return child
return None
def get_js_strings(user_agent, rc_version):
def get_json():
f = xbmcvfs.File(STRINGS_PATH)
version, text = f.read().split("\n", 1)
if version != "{}/{}".format(STRINGS_VERSION, rc_version):
raise OSError("Incorrect version: {}".format(version))
return json.loads(text)
try:
return get_json()
except (OSError, ValueError):
pass
result = extract_and_save(
JS_URL_TEMPLATE.format(rc_version), STRINGS_PATH, STRINGS_VERSION,
rc_version, user_agent,
)
return result
def get_rc_version(user_agent):
match = re.search("/recaptcha/releases/(.+?)/", requests.get(
API_JS_URL, headers={
"User-Agent": user_agent,
},
).text)
if match is None:
raise RuntimeError("Could not extract version from api.js.")
return match.group(1)
class Solver(object):
def __init__(self, recaptcha):
self.rc = recaptcha
def on_solved(response, **kwargs):
"""Callback; set this attribute in the parent class."""
raise NotImplementedError
class HasGrid(object):
@property
def num_rows(self):
return self.dimensions[0]
@property
def num_columns(self):
return self.dimensions[1]
@property
def num_tiles(self):
return self.num_rows * self.num_columns
class DynamicSolver(Solver, HasGrid):
def __init__(self, recaptcha, pmeta):
super(DynamicSolver, self).__init__(recaptcha)
self.selections = []
meta = get_meta(pmeta, 1)
self.meta = meta
self.dimensions = (meta[3], meta[4])
self.tile_index_map = list(range(self.num_tiles))
self.last_request_map = [0] * self.num_tiles
self.latest_index = self.num_tiles - 1
def on_initial_image(self, image, **kwargs):
"""Callback; set this attribute in the parent class."""
raise NotImplementedError
def on_tile_image(self, index, image, **kwargs):
"""Callback; set this attribute in the parent class."""
raise NotImplementedError
def run(self):
self.rc.show_challenge_goal(self.meta)
self.first_payload()
def finish(self, block=True):
if block:
time.sleep(self.final_timeout)
self.on_solved(self.selections)
@property
def final_timeout(self):
return max(self.get_timeout(i) for i in range(self.num_tiles))
def get_timeout(self, index):
elapsed = time.time() - self.last_request_map[index]
duration = max(DYNAMIC_SELECT_DELAY - elapsed, 0)
return duration
def first_payload(self):
image = self.rc.get("payload", api=False, params={
"c": self.rc.current_token,
"k": self.rc.api_key,
}).url
self.on_initial_image(image)
def select_tile(self, index):
def target():
time.sleep(self.get_timeout(index))
self.on_tile_image(index, image)
image = self.replace_tile(index)
myThread = Thread(target=target)
myThread.daemon = True
myThread.start()
def replace_tile(self, index):
real_index = self.tile_index_map[int(index)]
self.selections.append(int(real_index))
r = self.rc.post("replaceimage", data={
"c": self.rc.current_token,
"ds": "[{}]".format(real_index),
})
self.last_request_map[index] = time.time()
data = load_rc_json(r.text)
self.latest_index += 1
self.tile_index_map[index] = self.latest_index
self.rc.current_token = data[1]
replacement_id = data[2][0]
image = self.rc.get("payload", api=False, params={
"c": self.rc.current_token,
"k": self.rc.api_key,
"id": replacement_id,
}).url
return image
class MultiCaptchaSolver(Solver, HasGrid):
def __init__(self, recaptcha, pmeta):
super(MultiCaptchaSolver, self).__init__(recaptcha)
self.selection_groups = []
self.dimensions = None
self.challenge_type = None
self.previous_token = None
self.previous_id = None
self.id = "2"
self.metas = list(get_meta(pmeta, 5)[0])
self.next_challenge()
def on_image(self, image, **kwargs):
"""Callback; set this attribute in the parent class."""
raise NotImplementedError
def run(self):
self.first_payload()
def next_challenge(self):
meta = self.metas.pop(0)
self.dimensions = (meta[3], meta[4])
self.rc.show_challenge_goal(meta)
def select_indices(self, indices):
self.selection_groups.append(list(sorted(indices)))
VSlog("Reste a faire :" + str(len(self.metas)))
if self.metas:
self.replace_image()
return
self.on_solved(self.selection_groups)
def first_payload(self):
image = self.rc.get("payload", api=False, params={
"c": self.rc.current_token,
"k": self.rc.api_key,
}).url
self.on_image(image)
def replace_image(self):
selections = self.selection_groups[-1]
r = self.rc.post("replaceimage", data={
"c": self.rc.current_token,
"ds": json.dumps([selections], separators=",:"),
})
data = load_rc_json(r.text)
self.previous_token = self.rc.current_token
self.rc.current_token = data[1]
replacement_id = (data[2] or [None])[0]
self.previous_id = self.id
self.id = replacement_id
self.next_challenge()
image = self.rc.get("payload", api=False, params={
"c": self.previous_token,
"k": self.rc.api_key,
"id": self.previous_id,
}).url
self.on_image(image)
class ReCaptcha(object):
def __init__(self, api_key, site_url, user_agent, debug=False,
make_requests=True):
self.api_key = api_key
self.site_url = get_rc_site_url(site_url)
self._debug = debug
self.co = rc_base64(self.site_url)
self.first_token = None
self.current_token = None
self.user_agent = user_agent
self.js_strings = None
self.rc_version = None
if make_requests:
self.rc_version = get_rc_version(self.user_agent)
self.js_strings = get_js_strings(self.user_agent, self.rc_version)
def on_goal(goal, meta, **_3to2kwargs):
raw = _3to2kwargs['raw']; del _3to2kwargs['raw']
"""Callback; set this attribute in the parent class."""
raise NotImplementedError
def on_token(token, **kwargs):
"""Callback; set this attribute in the parent class."""
raise NotImplementedError
def on_challenge(type, **kwargs):
"""Callback (optional); set this attribute in the parent class."""
pass
def on_challenge_dynamic(solver, **kwargs):
"""Callback; set this attribute in the parent class."""
raise NotImplementedError
def on_challenge_multicaptcha(solver, **kwargs):
"""Callback; set this attribute in the parent class."""
raise NotImplementedError
def on_challenge_blocked(type, **kwargs):
"""Callback; set this attribute in the parent class."""
raise NotImplementedError
def on_challenge_unknown(type, **kwargs):
"""Callback; set this attribute in the parent class."""
raise NotImplementedError
def find_challenge_goal(self, id, raw=False):
start = 0
matching_strings = []
def try_find(start):
index = self.js_strings.index(id, start)
for i in range(FIND_GOAL_SEARCH_DISTANCE):
next_str = self.js_strings[index + i + 1]
if re.search("\bselect all\b", next_str, re.I):
matching_strings.append((i, index, next_str))
start = index + FIND_GOAL_SEARCH_DISTANCE + 1
try:
while True:
try_find(start)
except (ValueError, IndexError):
pass
try:
goal = min(matching_strings)[2]
except ValueError:
return None, None
raw = goal
plain = raw.replace("<strong>", "").replace("</strong>", "")
return raw, plain
def show_challenge_goal(self, meta):
raw, goal = self.find_challenge_goal(meta[0])
self.on_goal(goal, meta, raw=raw)
def get_headers(self, headers):
headers = headers or {}
if "User-Agent" not in headers:
headers["User-Agent"] = self.user_agent
if "Accept-Language" not in headers:
headers["Accept-Language"] = "fr,fr-FR;q=0.8,en-US;q=0.5,en;q=0.3"
return headers
def get(self, url, params=None, api=True, headers=None,
allow_errors=None, **kwargs):
params = params or {}
if api:
params["k"] = self.api_key
params["v"] = self.rc_version
params["hl"] = "fr"
headers = self.get_headers(headers)
r = requests.get(
get_full_url(url), params=params, headers=headers,
**kwargs
)
if not (allow_errors is True or r.status_code in (allow_errors or {})):
r.raise_for_status()
return r
def post(self, url, params=None, data=None, api=True, headers=None,
allow_errors=None, no_debug_response=False, **kwargs):
params = params or {}
data = data or {}
if api:
params["k"] = self.api_key
data["v"] = self.rc_version
params["hl"] = "fr"
headers = self.get_headers(headers)
r = requests.post(
get_full_url(url), params=params, data=data, headers=headers,
**kwargs
)
if not (allow_errors is True or r.status_code in (allow_errors or {})):
r.raise_for_status()
return r
def request_first_token(self):
class Parser(HTMLParser):
def __init__(p_self):
p_self.token = None
HTMLParser.__init__(p_self)
def handle_starttag(p_self, tag, attrs):
attrs = dict(attrs)
if attrs.get("id") == "recaptcha-token":
p_self.token = attrs.get("value")
text = self.get("anchor", params={"co": self.co}).text
parser = Parser()
parser.feed(text)
if not parser.token:
raise RuntimeError(
"Could not get first token. Response:\n{}".format(text),
)
self.first_token = parser.token
self.current_token = self.first_token
def verify(self, response):
VSlog("reponse :" + str(response))
response_text = json.dumps({"response": response}, separators=",:")
response_b64 = rc_base64(response_text)
r = self.post("userverify", data={
"c": self.current_token,
"response": response_b64,
})
uvresp = load_rc_json(r.text)
rresp = get_rresp(uvresp)
uvresp_token = uvresp[1]
return (uvresp_token, rresp)
def get_first_rresp(self):
r = self.post("reload", data={"reason": "fi", "c": self.first_token})
rresp = load_rc_json(r.text)
return rresp
def handle_solved(self, response, **kwargs):
uvtoken, rresp = self.verify(response)
if rresp is not None:
self.solve_challenge(rresp)
return
if not uvtoken:
raise RuntimeError("Got neither uvtoken nor new rresp.")
self.on_token(uvtoken)
def solve_challenge(self, rresp):
challenge_type = rresp[5]
pmeta = rresp[4]
self.current_token = rresp[1]
VSlog("Captcha type :" + str(challenge_type))
solver_class = {
"dynamic": DynamicSolver,
"multicaptcha": MultiCaptchaSolver,
}.get(challenge_type)
handler = {
"dynamic": self.on_challenge_dynamic,
"multicaptcha": self.on_challenge_multicaptcha,
"default": self.on_challenge_blocked,
"doscaptcha": self.on_challenge_blocked,
}.get(challenge_type)
self.on_challenge(challenge_type)
if handler is None:
self.on_challenge_unknown(challenge_type)
return
if solver_class is None:
handler(challenge_type)
return
solver = solver_class(self, pmeta)
solver.on_solved = self.handle_solved
handler(solver)
def run(self):
self.request_first_token()
rresp = self.get_first_rresp()
self.solve_challenge(rresp)