# Copyright (C) 2017, 2019 nickolas360 # # This file is part of librecaptcha. # # librecaptcha is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # librecaptcha is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with librecaptcha. If not, see . from resources.lib.comaddon import VSlog # import du dialog progress from .errors import UserError from .extract_strings import extract_and_save from threading import Thread try: from html.parser import HTMLParser from urllib.parse import urlparse except: from HTMLParser import HTMLParser from urlparse import urlparse import requests import base64 import json import re import time import xbmcvfs BASE_URL = "https://www.google.com/recaptcha/api2/" API_JS_URL = "https://www.google.com/recaptcha/api.js" JS_URL_TEMPLATE = "https://www.gstatic.com/recaptcha/releases/{}/recaptcha__fr.js" STRINGS_VERSION = "0.1.0" STRINGS_PATH = 'special://home/userdata/addon_data/plugin.video.vstream' DYNAMIC_SELECT_DELAY = 4.5 # seconds FIND_GOAL_SEARCH_DISTANCE = 10 def get_full_url(url): return BASE_URL.rstrip("/") + "/" + url.lstrip("/") def get_rc_site_url(url): parsed = urlparse(url) if not parsed.hostname: raise UserError("Error: Site URL has no hostname.") if not parsed.scheme: raise UserError("Error: Site URL has no scheme.") if parsed.scheme not in ["http", "https"]: raise UserError( "Error: Site URL has invalid scheme: {}".format(parsed.scheme), ) port = parsed.port if port is None: port = {"http": 80, "https": 443}[parsed.scheme] return "{}://{}:{}".format(parsed.scheme, parsed.hostname, port) def rc_base64(string): data = string try: if isinstance(string, unicode): data = string.encode() except: if isinstance(string, str): data = string.encode() return base64.b64encode(data, b"-_").decode().replace("=", ".") def load_rc_json(text): return json.loads(text.split("\n", 1)[1]) def get_meta(pmeta, probable_index): if not isinstance(pmeta, list): raise TypeError("pmeta is not a list: {!r}".format(pmeta)) def matches(meta): return meta and isinstance(meta, list) if probable_index < len(pmeta): meta = pmeta[probable_index] if matches(meta): return meta for child in pmeta: if matches(child): return child raise RuntimeError("Could not find meta; pmeta: {!r}".format(pmeta)) def get_rresp(uvresp): if not isinstance(uvresp, list): raise TypeError("uvresp is not a list: {!r}".format(uvresp)) for child in uvresp: if child and isinstance(child, list) and child[0] == "rresp": return child return None def get_js_strings(user_agent, rc_version): def get_json(): f = xbmcvfs.File(STRINGS_PATH) version, text = f.read().split("\n", 1) if version != "{}/{}".format(STRINGS_VERSION, rc_version): raise OSError("Incorrect version: {}".format(version)) return json.loads(text) try: return get_json() except (OSError, ValueError): pass result = extract_and_save( JS_URL_TEMPLATE.format(rc_version), STRINGS_PATH, STRINGS_VERSION, rc_version, user_agent, ) return result def get_rc_version(user_agent): match = re.search("/recaptcha/releases/(.+?)/", requests.get( API_JS_URL, headers={ "User-Agent": user_agent, }, ).text) if match is None: raise RuntimeError("Could not extract version from api.js.") return match.group(1) class Solver(object): def __init__(self, recaptcha): self.rc = recaptcha def on_solved(response, **kwargs): """Callback; set this attribute in the parent class.""" raise NotImplementedError class HasGrid(object): @property def num_rows(self): return self.dimensions[0] @property def num_columns(self): return self.dimensions[1] @property def num_tiles(self): return self.num_rows * self.num_columns class DynamicSolver(Solver, HasGrid): def __init__(self, recaptcha, pmeta): super(DynamicSolver, self).__init__(recaptcha) self.selections = [] meta = get_meta(pmeta, 1) self.meta = meta self.dimensions = (meta[3], meta[4]) self.tile_index_map = list(range(self.num_tiles)) self.last_request_map = [0] * self.num_tiles self.latest_index = self.num_tiles - 1 def on_initial_image(self, image, **kwargs): """Callback; set this attribute in the parent class.""" raise NotImplementedError def on_tile_image(self, index, image, **kwargs): """Callback; set this attribute in the parent class.""" raise NotImplementedError def run(self): self.rc.show_challenge_goal(self.meta) self.first_payload() def finish(self, block=True): if block: time.sleep(self.final_timeout) self.on_solved(self.selections) @property def final_timeout(self): return max(self.get_timeout(i) for i in range(self.num_tiles)) def get_timeout(self, index): elapsed = time.time() - self.last_request_map[index] duration = max(DYNAMIC_SELECT_DELAY - elapsed, 0) return duration def first_payload(self): image = self.rc.get("payload", api=False, params={ "c": self.rc.current_token, "k": self.rc.api_key, }).url self.on_initial_image(image) def select_tile(self, index): def target(): time.sleep(self.get_timeout(index)) self.on_tile_image(index, image) image = self.replace_tile(index) myThread = Thread(target=target) myThread.daemon = True myThread.start() def replace_tile(self, index): real_index = self.tile_index_map[int(index)] self.selections.append(int(real_index)) r = self.rc.post("replaceimage", data={ "c": self.rc.current_token, "ds": "[{}]".format(real_index), }) self.last_request_map[index] = time.time() data = load_rc_json(r.text) self.latest_index += 1 self.tile_index_map[index] = self.latest_index self.rc.current_token = data[1] replacement_id = data[2][0] image = self.rc.get("payload", api=False, params={ "c": self.rc.current_token, "k": self.rc.api_key, "id": replacement_id, }).url return image class MultiCaptchaSolver(Solver, HasGrid): def __init__(self, recaptcha, pmeta): super(MultiCaptchaSolver, self).__init__(recaptcha) self.selection_groups = [] self.dimensions = None self.challenge_type = None self.previous_token = None self.previous_id = None self.id = "2" self.metas = list(get_meta(pmeta, 5)[0]) self.next_challenge() def on_image(self, image, **kwargs): """Callback; set this attribute in the parent class.""" raise NotImplementedError def run(self): self.first_payload() def next_challenge(self): meta = self.metas.pop(0) self.dimensions = (meta[3], meta[4]) self.rc.show_challenge_goal(meta) def select_indices(self, indices): self.selection_groups.append(list(sorted(indices))) VSlog("Reste a faire :" + str(len(self.metas))) if self.metas: self.replace_image() return self.on_solved(self.selection_groups) def first_payload(self): image = self.rc.get("payload", api=False, params={ "c": self.rc.current_token, "k": self.rc.api_key, }).url self.on_image(image) def replace_image(self): selections = self.selection_groups[-1] r = self.rc.post("replaceimage", data={ "c": self.rc.current_token, "ds": json.dumps([selections], separators=",:"), }) data = load_rc_json(r.text) self.previous_token = self.rc.current_token self.rc.current_token = data[1] replacement_id = (data[2] or [None])[0] self.previous_id = self.id self.id = replacement_id self.next_challenge() image = self.rc.get("payload", api=False, params={ "c": self.previous_token, "k": self.rc.api_key, "id": self.previous_id, }).url self.on_image(image) class ReCaptcha(object): def __init__(self, api_key, site_url, user_agent, debug=False, make_requests=True): self.api_key = api_key self.site_url = get_rc_site_url(site_url) self._debug = debug self.co = rc_base64(self.site_url) self.first_token = None self.current_token = None self.user_agent = user_agent self.js_strings = None self.rc_version = None if make_requests: self.rc_version = get_rc_version(self.user_agent) self.js_strings = get_js_strings(self.user_agent, self.rc_version) def on_goal(goal, meta, **_3to2kwargs): raw = _3to2kwargs['raw']; del _3to2kwargs['raw'] """Callback; set this attribute in the parent class.""" raise NotImplementedError def on_token(token, **kwargs): """Callback; set this attribute in the parent class.""" raise NotImplementedError def on_challenge(type, **kwargs): """Callback (optional); set this attribute in the parent class.""" pass def on_challenge_dynamic(solver, **kwargs): """Callback; set this attribute in the parent class.""" raise NotImplementedError def on_challenge_multicaptcha(solver, **kwargs): """Callback; set this attribute in the parent class.""" raise NotImplementedError def on_challenge_blocked(type, **kwargs): """Callback; set this attribute in the parent class.""" raise NotImplementedError def on_challenge_unknown(type, **kwargs): """Callback; set this attribute in the parent class.""" raise NotImplementedError def find_challenge_goal(self, id, raw=False): start = 0 matching_strings = [] def try_find(start): index = self.js_strings.index(id, start) for i in range(FIND_GOAL_SEARCH_DISTANCE): next_str = self.js_strings[index + i + 1] if re.search("\bselect all\b", next_str, re.I): matching_strings.append((i, index, next_str)) start = index + FIND_GOAL_SEARCH_DISTANCE + 1 try: while True: try_find(start) except (ValueError, IndexError): pass try: goal = min(matching_strings)[2] except ValueError: return None, None raw = goal plain = raw.replace("", "").replace("", "") return raw, plain def show_challenge_goal(self, meta): raw, goal = self.find_challenge_goal(meta[0]) self.on_goal(goal, meta, raw=raw) def get_headers(self, headers): headers = headers or {} if "User-Agent" not in headers: headers["User-Agent"] = self.user_agent if "Accept-Language" not in headers: headers["Accept-Language"] = "fr,fr-FR;q=0.8,en-US;q=0.5,en;q=0.3" return headers def get(self, url, params=None, api=True, headers=None, allow_errors=None, **kwargs): params = params or {} if api: params["k"] = self.api_key params["v"] = self.rc_version params["hl"] = "fr" headers = self.get_headers(headers) r = requests.get( get_full_url(url), params=params, headers=headers, **kwargs ) if not (allow_errors is True or r.status_code in (allow_errors or {})): r.raise_for_status() return r def post(self, url, params=None, data=None, api=True, headers=None, allow_errors=None, no_debug_response=False, **kwargs): params = params or {} data = data or {} if api: params["k"] = self.api_key data["v"] = self.rc_version params["hl"] = "fr" headers = self.get_headers(headers) r = requests.post( get_full_url(url), params=params, data=data, headers=headers, **kwargs ) if not (allow_errors is True or r.status_code in (allow_errors or {})): r.raise_for_status() return r def request_first_token(self): class Parser(HTMLParser): def __init__(p_self): p_self.token = None HTMLParser.__init__(p_self) def handle_starttag(p_self, tag, attrs): attrs = dict(attrs) if attrs.get("id") == "recaptcha-token": p_self.token = attrs.get("value") text = self.get("anchor", params={"co": self.co}).text parser = Parser() parser.feed(text) if not parser.token: raise RuntimeError( "Could not get first token. Response:\n{}".format(text), ) self.first_token = parser.token self.current_token = self.first_token def verify(self, response): VSlog("reponse :" + str(response)) response_text = json.dumps({"response": response}, separators=",:") response_b64 = rc_base64(response_text) r = self.post("userverify", data={ "c": self.current_token, "response": response_b64, }) uvresp = load_rc_json(r.text) rresp = get_rresp(uvresp) uvresp_token = uvresp[1] return (uvresp_token, rresp) def get_first_rresp(self): r = self.post("reload", data={"reason": "fi", "c": self.first_token}) rresp = load_rc_json(r.text) return rresp def handle_solved(self, response, **kwargs): uvtoken, rresp = self.verify(response) if rresp is not None: self.solve_challenge(rresp) return if not uvtoken: raise RuntimeError("Got neither uvtoken nor new rresp.") self.on_token(uvtoken) def solve_challenge(self, rresp): challenge_type = rresp[5] pmeta = rresp[4] self.current_token = rresp[1] VSlog("Captcha type :" + str(challenge_type)) solver_class = { "dynamic": DynamicSolver, "multicaptcha": MultiCaptchaSolver, }.get(challenge_type) handler = { "dynamic": self.on_challenge_dynamic, "multicaptcha": self.on_challenge_multicaptcha, "default": self.on_challenge_blocked, "doscaptcha": self.on_challenge_blocked, }.get(challenge_type) self.on_challenge(challenge_type) if handler is None: self.on_challenge_unknown(challenge_type) return if solver_class is None: handler(challenge_type) return solver = solver_class(self, pmeta) solver.on_solved = self.handle_solved handler(solver) def run(self): self.request_first_token() rresp = self.get_first_rresp() self.solve_challenge(rresp)