Compare commits

..

1 Commits

Author SHA1 Message Date
poka fa272f340c Brut add selenium and co changes 2020-08-09 15:50:19 +02:00
45 changed files with 10106 additions and 448 deletions

1
.gitignore vendored
View File

@ -3,3 +3,4 @@ __pycache__/
yggcrawl/__pycache__/
yggcrawl/__init__.pyc
login.py
yggcrawl/gecko/geckodriver.log

View File

@ -16,49 +16,30 @@ import requests
import json
import sys
import os
import shutil
import subprocess
import login
import time
import re
from termcolor import colored
from optparse import OptionParser
from urllib.parse import unquote
# Load options
parser = OptionParser()
parser.add_option("-s", "--seed", action="store_false", dest="rmTracker", default=True,
help="Keep the tracker for this torrent. So ratio is supported.")
parser.add_option("-q", "--quiet",
action="store_false", dest="verbose", default=True,
help="don't print status messages to stdout")
(options, args) = parser.parse_args()
# Load scraper
from yggcrawl import YggTorrentScraper
scraper = YggTorrentScraper(requests.session())
from yggtorrentscraper import set_yggtorrent_tld
set_yggtorrent_tld("si")
from yggcrawl import set_yggtorrent_tld
set_yggtorrent_tld("se")
name = ' '.join(sys.argv[1:])
# Search torrent name
if ("https://" not in name):
name = re.sub(r'\w*-\w*', '', name)
research = os.popen('./lib/scrabash.sh search --best=true ' + name).read()
research = os.popen('./lib/scrabash.sh search --best=true ' + name).read()
try:
research.index("No torrent found")
except ValueError:
True
else:
print(colored('No torrent named "' + name + '" on YggTorrent', 'blue'))
sys.exit(1)
try:
research.index("No torrent found")
except ValueError:
True
else:
name = re.sub(r'\w*--seed\w*', '', name)
research = unquote(name, errors='strict')
sys.exit(1)
# Allow only one torrent downling in same time, and remove oldest torrent if disk size is full.
# Rollong Files
def rollingFiles():
def isDL():
downloading = os.popen('./trans-ctl.sh downloading').read()
@ -121,11 +102,15 @@ def rollingFiles():
# Download Torrent
def downloadTorrent():
# Download torrent file
if len(os.listdir('data/tmp/torrents') ) != 0:
shutil.rmtree('data/tmp/torrents', ignore_errors=True)
os.mkdir("data/tmp/torrents")
scraper.download_from_torrent_url(research)
os.popen(f'cd data/tmp/torrents/ && mv *.torrent ../../torrents/{idTorrent.strip()}.torrent')
if(scraper.login(login.user, login.passwd)):
print(colored("Login success", 'green'))
subprocess.Popen('[[ $(ls data/tmp/torrents/) ]] && rm data/tmp/torrents/*', executable='/bin/bash', stdout=subprocess.PIPE, stderr=subprocess.PIPE)
scraper.download_from_torrent_url(research)
# os.popen(f'cd data/tmp/torrents/ && mv *.torrent {idTorrent.strip()}.torrent && mv {idTorrent.strip()}.torrent ../../torrents/').read()
os.popen('cd data/tmp/torrents/ && mv *.torrent ../../torrents/')
else:
print(colored("Login failed", 'red'))
sys.exit(1)
# Remove tracker
def removeTracker():
@ -136,18 +121,11 @@ def removeTracker():
time.sleep(tkdelay)
os.popen('./trans-ctl.sh rmtracker ' + name)
os.popen('./trans-ctl.sh rmtracker ' + higherid)
os.replace(f'data/torrents/{idTorrent.strip()}.torrent.added', f'data/meta/{idTorrent.strip()}/{idTorrent.strip()}.torrent')
if(scraper.login(login.user, login.passwd)): #Check if user can login
print(colored("Login success", 'green'))
rollingFiles()
downloadTorrent()
removeTracker() if options.rmTracker else time.sleep(2); os.replace(f'data/torrents/{idTorrent.strip()}.torrent.added', f'data/meta/{idTorrent.strip()}/{idTorrent.strip()}.torrent')
else:
print(colored("Login failed", 'red'))
sys.exit(1)
# print(tkresult)
rollingFiles()
downloadTorrent()
removeTracker()
# End
print(colored("Done", 'green'))

39
imdb-scrap.py Executable file
View File

@ -0,0 +1,39 @@
#!/usr/bin/python3
from bs4 import BeautifulSoup
import requests
import re
# Download IMDB's Top 250 data
#url = 'http://www.imdb.com/chart/top'
url = 'https://www.imdb.com/find?q=didier&ref_=nv_sr_sm'
response = requests.get(url)
soup = BeautifulSoup(response.text, 'lxml')
movies = soup.select('td.titleColumn')
links = [a.attrs.get('href') for a in soup.select('td.titleColumn a')]
crew = [a.attrs.get('title') for a in soup.select('td.titleColumn a')]
ratings = [b.attrs.get('data-value') for b in soup.select('td.posterColumn span[name=ir]')]
votes = [b.attrs.get('data-value') for b in soup.select('td.ratingColumn strong')]
imdb = []
# Store each item into dictionary (data), then put those into a list (imdb)
for index in range(0, len(movies)):
# Seperate movie into: 'place', 'title', 'year'
movie_string = movies[index].get_text()
movie = (' '.join(movie_string.split()).replace('.', ''))
movie_title = movie[len(str(index))+1:-7]
year = re.search('\((.*?)\)', movie_string).group(1)
place = movie[:len(str(index))-(len(movie))]
data = {"movie_title": movie_title,
"year": year,
"place": place,
"star_cast": crew[index],
"rating": ratings[index],
"vote": votes[index],
"link": links[index]}
imdb.append(data)
for item in imdb:
print(item['place'], '-', item['movie_title'], '('+item['year']+') -', 'Starring:', item['star_cast'])

View File

@ -31,6 +31,12 @@ sbotc() {
transmission() {
echo -e "${c_yellow}Installing Transmision...$c_"
sudo apt install transmission-daemon --install-suggests
sudo apt install transmission-cli
# stop
# Copy login.py info to /etc/transmission/settings.json
# start
}
# Install pip tools
@ -68,8 +74,8 @@ pip3() {
iptubes() {
[[ -z $(which pip3) ]] && pip3
/usr/bin/pip3 install $(curl -s https://raw.githubusercontent.com/Harkame/YggTorrentScraper/master/requirements.txt)
chgrp -R debian-transmission data/
chmod -R g+w data/
sudo chgrp -R debian-transmission data/
sudo chmod -R g+w data/
sudo service transmission-daemon restart
cp login.py.template login.py
cd lib/py/
@ -80,16 +86,6 @@ iptubes() {
nano login.py
}
nordvpn() {
wget -qnc https://repo.nordvpn.com/deb/nordvpn/debian/pool/main/nordvpn-release_1.0.0_all.deb -O /tmp/nordvpn.deb
sudo dpkg -i /tmp/nordvpn.dev
rm /tmp/nordvpn.dev
sudo apt update
sudo apt install nordvpn
}
# Check installs
[[ -z $(which ipfs) ]] && ipfs
[[ -z $(which sbotc) ]] && sbotc

View File

@ -16,7 +16,7 @@ try:
except NameError:
from yggcrawl import YggTorrentScraper
scraper = YggTorrentScraper(requests.session())
from yggtorrentscraper import set_yggtorrent_tld
from yggcrawl import set_yggtorrent_tld
set_yggtorrent_tld("se")
cmd = sys.argv[1]

View File

@ -1,22 +0,0 @@
# Ratio.py
Ratio.py is a small command line RatioMaster.Net like in Python3. It fakes upload stats of a torrent.
Current emulators available are:
* Transmission 2.92
## Requirements:
1. Python 3.x
2. pip install -r requirements.txt
## Usage:
```console
foo@bar:~/ratio.py$ python ratio.py -c configuration.json
```
## Configuration example
```js
{
"torrent": "<Torrent file path>",
"upload": "<Upload speed (kB/s)>"
}
```

View File

@ -1,90 +0,0 @@
import re
import logging
import binascii
class bencoding():
def __init__(self):
self.decimal_match = re.compile('\d')
self.data = b''
self.dict = {}
def get_dict(self, key):
if key not in self.dict:
return ''
start = self.dict[key][0]
end = self.dict[key][1]
return self.data[start:end]
def get_item(self, chunks):
item = chunks[self.i]
self.i += 1
if not type(item) == str:
item = bytes([item])
try:
item = item.decode('utf-8')
except:
item = '\\x{}'.format(binascii.hexlify(item))
return item
def decoding_byte_string(self, chunks, item):
# logging.debug('decoding string')
num = ''
while self.decimal_match.search(item):
num += item
item = self.get_item(chunks)
line = ''
for i in range(int(num)):
line += self.get_item(chunks)
return line
def decoding_integer(self, chunks):
# logging.debug('decoding integer')
item = self.get_item(chunks)
num = ''
while item != 'e':
num += item
item = self.get_item(chunks)
return int(num)
def decoding_list(self, chunks):
# logging.debug('decoding list')
item = self.get_item(chunks)
list = []
while item != 'e':
self.i -= 1
list.append(self._dechunk(chunks))
item = self.get_item(chunks)
return list
def decoding_dictionnary(self, chunks):
# logging.debug('decoding dictionnary')
item = self.get_item(chunks)
hash = {}
while item != 'e':
self.i -= 1
key = self._dechunk(chunks)
start = self.i
hash[key] = self._dechunk(chunks)
end = self.i
self.dict[key] = (start, end)
item = self.get_item(chunks)
return hash
def _dechunk(self, chunks):
item = self.get_item(chunks)
if item == 'd':
return self.decoding_dictionnary(chunks)
elif item == 'l':
return self.decoding_list(chunks)
elif item == 'i':
return self.decoding_integer(chunks)
elif self.decimal_match.search(item):
return self.decoding_byte_string(chunks, item)
raise "Invalid input!"
def bdecode(self, data):
self.data = data
chunks = list(self.data)
self.i = 0
root = self._dechunk(chunks)
return root

View File

@ -1,28 +0,0 @@
import requests
from pprint import pformat
def get_headers(headers):
res = ''
for k, v in headers.items():
res += '{}: {}\n'.format(k, v)
return res
def pretty_GET(url, headers, params):
req = requests.Request('GET', url, headers=headers, params=params)
s = requests.Session()
prepared = s.prepare_request(req)
p = '-----START-----\n'
p +=('{} {}\n{}'.format(prepared.method, prepared.url,
get_headers(prepared.headers),
)
)
if prepared.body:
pi += prepared.body
p += '------END------'
return p
def pretty_data(data):
return pformat(data)

View File

@ -1,118 +0,0 @@
from code.decoding_bencoded import bencoding
from code.torrentclientfactory import Transmission292
from code.pretty import pretty_data, pretty_GET
from hashlib import sha1
from urllib.parse import quote_plus
import requests
import logging
import random
from tqdm import tqdm
from time import sleep
from struct import unpack
logging.basicConfig(level=logging.DEBUG)
class process_torrent():
def __init__(self, configuration):
self.configuration = configuration
self.open_torrent()
self.torrentclient = Transmission292(self.tracker_info_hash())
def open_torrent(self):
torrent_file = self.configuration['torrent']
with open(torrent_file, 'rb') as tf:
data = tf.read()
self.b_enc = bencoding()
self.metainfo = self.b_enc.bdecode(data)
self.info = self.metainfo['info']
if 'length' not in self.info:
self.info['length'] = 0
for file in self.info['files']:
self.info['length'] += file['length']
print(pretty_data(self.info['files']))
def tracker_info_hash(self):
raw_info = self.b_enc.get_dict('info')
hash_factory = sha1()
hash_factory.update(raw_info)
hashed = hash_factory.hexdigest()
sha = bytearray.fromhex(hashed)
return str(quote_plus(sha))
def send_request(self, params, headers):
url = self.metainfo['announce']
print(pretty_GET(url, headers, params))
while True:
try:
r = requests.get(url, params=params, headers=headers)
except requests.exceptions.ConnectionError as e:
sleep(1)
continue
break
return r.content
def tracker_start_request(self):
tc = self.torrentclient
headers = tc.get_headers()
params = tc.get_query(uploaded=0,
downloaded=0,
event='started')
print('----------- First Command to Tracker --------')
content = self.send_request(params, headers)
self.tracker_response_parser(content)
def tracker_response_parser(self, tr_response):
b_enc = bencoding()
response = b_enc.bdecode(tr_response)
print('----------- Received Tracker Response --------')
print(pretty_data(response))
raw_peers = b_enc.get_dict('peers')
i = 0
peers = []
while i<len(raw_peers)-6:
peer = raw_peers[i:i+6]
i+=6
unpacked_ip = unpack('BBBB', peer[0:4])
ip = ".".join(str(i) for i in unpacked_ip)
unpacked_port = unpack('!H', peer[4:6])
port = unpacked_port[0]
peers.append((ip, port))
self.interval = response['interval']
def wait(self):
pbar = tqdm(total=self.interval)
print('sleep: {}'.format(self.interval))
t = 0
while t < self.interval:
t += 1
pbar.update(1)
sleep(1)
pbar.close()
def tracker_process(self):
while True:
self.tracker_start_request()
print('----------- Sending Command to Tracker --------')
# get upload
min_up = self.interval-(self.interval*0.1)
max_up = self.interval
randomize_upload = random.randint(min_up, max_up)
uploaded = int(self.configuration['upload'])*1000*randomize_upload
# get download
downloaded = 0
tc = self.torrentclient
headers = tc.get_headers()
params = tc.get_query(uploaded=uploaded,
downloaded=downloaded,
event='stopped')
content = self.send_request(params, headers)
self.tracker_response_parser(content)
self.wait()

View File

@ -1,60 +0,0 @@
import random
import string
class Transmission292():
def __init__(self, info_hash):
self.name = "Transmission 2.92 (14714)"
parameters = {}
# urlencoded 20-byte SHA1 hash of the value of the info key from the Metainfo file
parameters['info_hash'] = info_hash
# urlencoded 20-byte string used as a unique ID for the client
parameters["peer_id"] = self.generate_peer_id()
# The port number that the client is listening on
parameters["port"] = random.randint(1025, 65535)
# Number of peers that the client would like to receive from the tracker
parameters["numwant"] = 80
# An additional identification that is not shared with any other peers
parameters["key"] = self.generate_key()
# Setting this to 1 indicates that the client accepts a compact response
parameters["compact"] = 0
# Setting this to 1 indicates that the client accepts crypto
parameters["supportcrypto"] = 1
self.parameters = parameters
def get_headers(self):
headers = {}
headers['User-Agent'] = 'Transmission/2.92'
headers['Accept'] = '*/*'
headers['Accept-Encoding'] = 'Accept-Encoding: gzip;q=1.0, deflate, identity'
return headers
def get_query(self, uploaded, downloaded, left=0, event=None):
# The total amount uploaded (since the client sent the 'started' event)
self.parameters["uploaded"] = uploaded
# The total amount downloaded (since the client sent the 'started' event)
self.parameters["downloaded"] = downloaded
# The number of bytes this client still has to download
self.parameters["left"] = left
# If specified, must be one of started, completed, stopped
if event:
self.parameters["event"] = event
params = '&'.join('{}={}'.format(k, v)
for k, v in self.parameters.items())
return params
def id_generator(self, chars, size):
id = ''
for _ in range(size):
id += random.choice(chars)
return id
def generate_peer_id(self):
chars = string.ascii_lowercase + string.digits
rand_id = self.id_generator(chars, 12)
peer_id = "-TR2920-" + rand_id
return peer_id
def generate_key(self):
chars = 'ABCDEF' + string.digits
key = self.id_generator(chars, 8)
return key

View File

@ -1,34 +0,0 @@
from code.process_torrent import process_torrent
import argparse
import json
import sys
def parse_args():
"""Create the arguments"""
parser = argparse.ArgumentParser('\nratio.py -c <configuration-file.json>')
parser.add_argument("-c", "--configuration", help="Configuration file")
return parser.parse_args()
def load_configuration(configuration_file):
with open(configuration_file) as f:
configuration = json.load(f)
if 'torrent' not in configuration:
return None
return configuration
if __name__ == "__main__":
args = parse_args()
if args.configuration:
configuration = load_configuration(args.configuration)
else:
sys.exit()
if not configuration:
sys.exit()
to = process_torrent(configuration)
to.tracker_process()

View File

@ -1,2 +0,0 @@
requests
tqdm

View File

@ -51,19 +51,20 @@ get_details() {
# Get image
[[ ! -d img ]] && mkdir img && cd img
url=$(wget -qO- -np -nd $name | awk -v RS=' ' '/.jpg/' | awk -F '"' '{ print $2 }' | head -n1)
[[ $url ]] && curl -s -O $url
curl -s -O $url
fi
}
vpn() {
[[ ! $(which nordvpn) ]] && echo "Installaling NordVPN client... && ./install.sh nordvpn"
vpn_citie=$(shuf -n1 .vpn/countries)
echo "Warning: trying to connect to random cities in the world via NordVPN. If you are connected to this machine via SSH, you will lost the connection..."
echo "VPN connection in 5 seconds, press CTRL+C to cancel..."
sleep 5
nordvpn c $vpn_citie
[[ ! $(which nordvpn) ]] && echo "Installaling NordVPN client... && ./install.sh nordvpn"
vpn_citie=$(shuf -n1 .vpn/countries)
echo "Warning: trying to connect to random cities in the world via NordVPN. If you are connected to this machine via SSH, you will lost the connection..."
echo "VPN connection in 5 seconds, press CTRL+C to cancel..."
sleep 5
nordvpn c $vpn_citie
}
$cmd
$1
[[ $err == 1 ]] && exit 1 || exit 0

11
old-yggcrawl/__init__.py Normal file
View File

@ -0,0 +1,11 @@
"""
__init__.py main
"""
from .yggtorrentscraper import (
YggTorrentScraper,
set_yggtorrent_tld,
get_yggtorrent_tld,
)
from .torrent import Torrent, TorrentComment, TorrentFile
from .categories import categories

3893
old-yggcrawl/categories.py Normal file

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

10
old-yggcrawl/gecko/test.py Executable file
View File

@ -0,0 +1,10 @@
#!/usr/bin/python3
import sys
# Exit if no arguments
if len(sys.argv)==1: sys.exit("Please choose a film ou serie name")
else: args = sys.argv[1:]
args = '+'.join(args)
print(args)

View File

@ -0,0 +1,46 @@
#!/usr/bin/python3
# Early exemple of how to use selenium with gecko to bypass cloudflare bots detections
# The only way to block this should be using of captcha in front of every yggtorrent pages by sessions...
import sys
import time
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
# Exit if no arguments
if len(sys.argv)==1: sys.exit("Please choose a film ou serie name")
else: args = sys.argv[1:]
args = '+'.join(args)
search_url = f"https://www2.yggtorrent.se/engine/search?name={args}&description=&file=&uploader=&category=all&sub_category=&do=search&order=desc&sort=seed"
# Load webdriver with Gecko
options = webdriver.FirefoxOptions()
options.add_argument('-headless')
driver = webdriver.Firefox(options=options, executable_path=r'/usr/local/bin/geckodriver')
driver.get(search_url)
# Wait to bypass cloudflare
print("Page atteinte, attente de redirection anti-crawling...")
wait = WebDriverWait(driver, 10)
page_search = wait.until(lambda driver: driver.current_url != search_url)
# Wait 2 seconds to load page
print("Anti-crawling passé, affichage dans 2 secondes ...")
time.sleep(2)
# Filter torrent urls
elems = driver.find_elements_by_css_selector(".results [href]")
links = [elem.get_attribute('href') for elem in elems]
links = [k for k in links if '/torrent/' in k]
# Print torrents urls
#print("\n".join(links))
print(links[0])
driver.quit()

View File

@ -0,0 +1,26 @@
import unittest
from ..yggtorrentscraper import (
YggTorrentScraper,
set_yggtorrent_tld,
get_yggtorrent_tld,
)
class TestChangeYggtorrentTLD(unittest.TestCase):
current_yggtorrent_tld = get_yggtorrent_tld()
def test_read_tld(self):
self.current_yggtorrent_tld = get_yggtorrent_tld()
self.assertTrue(self.current_yggtorrent_tld == "se")
def test_set_yggtorrent_tld(self):
set_yggtorrent_tld("newtld")
self.assertTrue(get_yggtorrent_tld() == "newtld")
pass
def tearDown(self):
set_yggtorrent_tld(self.current_yggtorrent_tld)

View File

@ -0,0 +1,65 @@
import os
import shutil
import unittest
import requests
from ..yggtorrentscraper import YggTorrentScraper
class TestDownload(unittest.TestCase):
scraper = None
destination_path = None
def __init__(self, *args, **kwargs):
super(TestDownload, self).__init__(*args, **kwargs)
yggtorrent_identifiant = os.environ.get("YGGTORRENT_IDENTIFIANT")
yggtorrent_password = os.environ.get("YGGTORRENT_PASSWORD")
self.destination_path = os.path.join(
".", "yggtorrentscraper", "tests", "test_download"
)
self.scraper = YggTorrentScraper(requests.session())
self.scraper.login(yggtorrent_identifiant, yggtorrent_password)
def test_download_from_torrent(self):
most_completed = self.scraper.most_completed()
torrent = self.scraper.extract_details(most_completed[0])
self.assertTrue(torrent.url is not None)
file_full_path = self.scraper.download_from_torrent(
torrent=torrent, destination_path=self.destination_path
)
self.assertTrue(os.path.getsize(file_full_path) > 1000)
def test_download_from_torrent_url(self):
file_full_path = self.scraper.download_from_torrent_url(
torrent_url="https://www2.yggtorrent.pe/torrent/filmvideo/serie-tv/440445-game-of-thrones-s08e02-multi-1080p-amzn-web-dl-dd5-1-x264-ark01",
destination_path=self.destination_path,
)
self.assertTrue(os.path.getsize(file_full_path) > 1000)
def test_download_from_torrent_download_url(self):
most_completed = self.scraper.most_completed()
torrent = self.scraper.extract_details(most_completed[0])
self.assertTrue(torrent.url is not None)
file_full_path = self.scraper.download_from_torrent_download_url(
torrent_url=torrent.url, destination_path=self.destination_path
)
self.assertTrue(os.path.getsize(file_full_path) > 1000)
def tearDown(self):
if os.path.exists(self.destination_path):
shutil.rmtree(self.destination_path, ignore_errors=True)
self.scraper.logout()

View File

@ -0,0 +1,60 @@
import os
import unittest
import requests
from ..yggtorrentscraper import YggTorrentScraper
class TestExtractDetails(unittest.TestCase):
scraper = YggTorrentScraper(requests.session())
def test_extract_details(self):
torrent = self.scraper.extract_details(
"https://www2.yggtorrent.pe/torrent/filmvideo/serie-tv/440445-game-of-thrones-s08e02-multi-1080p-amzn-web-dl-dd5-1-x264-ark01"
)
self.assertTrue(torrent.name is not None)
self.assertTrue(torrent.uploaded_datetime is not None)
self.assertTrue(torrent.size is not None)
self.assertTrue(torrent.uploader is not None)
self.assertTrue(len(torrent.keywords) > 0)
self.assertTrue(torrent.completed > -1)
self.assertTrue(torrent.seeders > -1)
self.assertTrue(torrent.leechers > -1)
self.assertTrue(torrent.url is None)
self.assertTrue(len(torrent.files) > 0)
self.assertTrue(len(torrent.comments) > 0)
def test_extract_details_logged(self):
yggtorrent_identifiant = os.environ.get("YGGTORRENT_IDENTIFIANT")
yggtorrent_password = os.environ.get("YGGTORRENT_PASSWORD")
self.scraper.login(yggtorrent_identifiant, yggtorrent_password)
torrent = self.scraper.extract_details(
"https://www2.yggtorrent.pe/torrent/filmvideo/serie-tv/440445-game-of-thrones-s08e02-multi-1080p-amzn-web-dl-dd5-1-x264-ark01"
)
self.assertTrue(torrent.name is not None)
self.assertTrue(torrent.uploaded_datetime is not None)
self.assertTrue(torrent.size is not None)
self.assertTrue(torrent.uploader is not None)
self.assertTrue(len(torrent.keywords) > 0)
self.assertTrue(torrent.completed > -1)
self.assertTrue(torrent.seeders > -1)
self.assertTrue(torrent.leechers > -1)
self.assertTrue(torrent.url is not None)
self.assertTrue(len(torrent.files) > 0)
self.assertTrue(len(torrent.comments) > 0)
def tearDown(self):
self.scraper.logout()

View File

@ -0,0 +1,30 @@
import os
import unittest
import requests
from ..yggtorrentscraper import YggTorrentScraper
class TestAuthentification(unittest.TestCase):
def setUp(self):
self.scraper = YggTorrentScraper(requests.session())
def test_login_success(self):
yggtorrent_identifiant = os.environ.get("YGGTORRENT_IDENTIFIANT")
yggtorrent_password = os.environ.get("YGGTORRENT_PASSWORD")
self.assertTrue(yggtorrent_identifiant is not None)
self.assertTrue(yggtorrent_password is not None)
self.assertTrue(self.scraper.login(yggtorrent_identifiant, yggtorrent_password))
self.scraper.logout()
def test_login_failed(self):
self.assertFalse(self.scraper.login("myidentifiant", "mypassword"))
self.scraper.logout()
def tearDown(self):
self.scraper.logout()

View File

@ -0,0 +1,27 @@
import os
import unittest
import requests
from ..yggtorrentscraper import YggTorrentScraper
class TestLogout(unittest.TestCase):
def setUp(self):
self.scraper = YggTorrentScraper(requests.session())
def test_logout_success(self):
yggtorrent_identifiant = os.environ.get("YGGTORRENT_IDENTIFIANT")
yggtorrent_password = os.environ.get("YGGTORRENT_PASSWORD")
self.assertTrue(self.scraper.login(yggtorrent_identifiant, yggtorrent_password))
self.assertTrue(self.scraper.logout())
def test_logout_failed(self):
self.scraper.login("myidentifiant", "mypassword")
self.assertFalse(self.scraper.logout())
def tearDown(self):
self.scraper.logout()

View File

@ -0,0 +1,16 @@
import unittest
import requests
from ..yggtorrentscraper import YggTorrentScraper
class TestMostCompleted(unittest.TestCase):
scraper = YggTorrentScraper(session=requests.session())
def test_most_completed(self):
most_completed = self.scraper.most_completed()
self.assertEqual(len(most_completed), 100)
def tearDown(self):
self.scraper.logout()

View File

@ -0,0 +1,70 @@
import unittest
import requests
from ..yggtorrentscraper import YggTorrentScraper
class TestResearch(unittest.TestCase):
scraper = YggTorrentScraper(requests.session())
torrent_name = "walking dead s09"
torrent_uploader = "brandit"
torrent_name_2 = "blue oyster cult"
def test_search_by_name(self):
torrents_url = self.scraper.search({"name": self.torrent_name})
torrent = self.scraper.extract_details(torrents_url[0])
splited_searched_name = self.torrent_name.split(" ")
for word in splited_searched_name:
self.assertTrue(word.lower() in torrent.name.lower())
def test_search_by_uploader(self):
torrents_url = self.scraper.search(
{"name": self.torrent_name, "uploader": self.torrent_uploader}
)
for torrent_url in torrents_url:
torrent = self.scraper.extract_details(torrent_url)
self.assertTrue(torrent.uploader.lower() == self.torrent_uploader.lower())
def test_search_sort_completed_asc(self):
torrents_url = self.scraper.search(
{"name": "blue oyster cult", "sort": "completed", "order": "asc"}
)
torrent_old = None
for torrent_url in torrents_url:
torrent = self.scraper.extract_details(torrent_url)
if torrent_old is not None:
self.assertTrue(torrent_old.completed <= torrent.completed)
torrent_old = torrent
def test_search_sort_completed_desc(self):
torrents_url = self.scraper.search(
{"name": "blue oyster cult", "sort": "completed", "order": "desc"}
)
torrent_old = None
for torrent_url in torrents_url:
torrent = self.scraper.extract_details(torrent_url)
if torrent_old is not None:
self.assertTrue(torrent_old.completed >= torrent.completed)
torrent_old = torrent
def test_search_multiple_page(self):
torrents_url = self.scraper.search({"name": "walking dead"})
self.assertTrue(len(torrents_url) > 200)
def tearDown(self):
self.scraper.logout()

View File

@ -0,0 +1,32 @@
import os
import unittest
import requests
from ..yggtorrentscraper import YggTorrentScraper
class TestTorrent(unittest.TestCase):
scraper = YggTorrentScraper(requests.session())
def test_str(self):
torrent_url = self.scraper.most_completed()[0]
torrent = self.scraper.extract_details(torrent_url)
torrent.__str__(files=True, comments=True)
def test_str_logged(self):
yggtorrent_identifiant = os.environ.get("YGGTORRENT_IDENTIFIANT")
yggtorrent_password = os.environ.get("YGGTORRENT_PASSWORD")
self.scraper.login(yggtorrent_identifiant, yggtorrent_password)
torrent_url = self.scraper.most_completed()[0]
torrent = self.scraper.extract_details(torrent_url)
torrent.__str__(files=True, comments=True)
def tearDown(self):
self.scraper.logout()

146
old-yggcrawl/torrent.py Normal file
View File

@ -0,0 +1,146 @@
import os
class Torrent:
"""
Torrent entity
"""
name = None
uploaded_datetime = None
size = None
uploader = None
keywords = []
completed = -1
seeders = -1
leechers = -1
url = None
files = []
comments = []
def __str__(self, comments=False, files=False):
to_string = ""
to_string += "Name : "
to_string += self.name
to_string += os.linesep
to_string += "Url : "
if self.url is not None:
to_string += self.url
else:
to_string += "N/A"
to_string += os.linesep
to_string += os.linesep
to_string += f"Keywords ({len(self.keywords)}) : "
to_string += os.linesep
for keyword in self.keywords:
to_string += f"- {keyword}"
to_string += os.linesep
to_string += os.linesep
to_string += "Uploaded : "
to_string += str(self.uploaded_datetime)
to_string += os.linesep
to_string += "Size : "
to_string += str(self.size)
to_string += os.linesep
to_string += "Uploader : "
to_string += self.uploader
to_string += os.linesep
to_string += "Completed : "
to_string += str(self.completed)
to_string += os.linesep
to_string += "Seeders : "
to_string += str(self.seeders)
to_string += os.linesep
to_string += "Leechers : "
to_string += str(self.leechers)
to_string += os.linesep
to_string += os.linesep
to_string += f"Files ({len(self.files)})"
to_string += os.linesep
if files:
for file in self.files:
to_string += str(file)
to_string += os.linesep
to_string += os.linesep
to_string += f"Comments ({len(self.comments)})"
to_string += os.linesep
if comments:
for comment in self.comments:
to_string += str(comment)
to_string += os.linesep
return to_string
class TorrentFile:
"""
Torrent's file entity
"""
size = ""
file_name = ""
def __str__(self):
to_string = ""
to_string += "size : "
to_string += self.size
to_string += os.linesep
to_string += "file_name : "
to_string += self.file_name
to_string += os.linesep
return to_string
class TorrentComment:
"""
Torrent's comment entity
"""
author = ""
posted = ""
text = ""
def __str__(self):
to_string = ""
to_string += "Author : "
to_string += self.author
to_string += os.linesep
to_string += "Posted : "
to_string += str(self.posted)
to_string += os.linesep
to_string += "Text : "
to_string += str(self.text)
to_string += os.linesep
return to_string

View File

@ -0,0 +1,457 @@
import datetime
import logging
import os
import re
import requests
from bs4 import BeautifulSoup
from .torrent import Torrent, TorrentComment, TorrentFile
from .categories import categories
YGGTORRENT_TLD = "se"
YGGTORRENT_BASE_URL = f"https://www2.yggtorrent.{YGGTORRENT_TLD}"
YGGTORRENT_LOGIN_URL = f"{YGGTORRENT_BASE_URL}/user/login"
YGGTORRENT_LOGOUT_URL = f"{YGGTORRENT_BASE_URL}/user/logout?attempt=1"
YGGTORRENT_SEARCH_URL = f"{YGGTORRENT_BASE_URL}/engine/search?name="
logger = logging.getLogger("yggtorrentscraper")
YGGTORRENT_DOMAIN = f".yggtorrent.{YGGTORRENT_TLD}"
YGGTORRENT_TOKEN_COOKIE = "ygg_"
YGGTORRENT_SEARCH_URL_DESCRIPTION = "&description="
YGGTORRENT_SEARCH_URL_FILE = "&file="
YGGTORRENT_SEARCH_URL_UPLOADER = "&uploader="
YGGTORRENT_SEARCH_URL_CATEGORY = "&category="
YGGTORRENT_SEARCH_URL_SUB_CATEGORY = "&sub_category="
YGGTORRENT_SEARCH_URL_ORDER = "&order="
YGGTORRENT_SEARCH_URL_SORT = "&sort="
YGGTORRENT_SEARCH_URL_DO = "&do="
YGGTORRENT_SEARCH_URL_PAGE = "&page="
YGGTORRENT_SEARCH_URL_DESCRIPTION = "&description="
YGGTORRENT_SEARCH_URL_FILE = "&file="
YGGTORRENT_SEARCH_URL_UPLOADER = "&uploader="
YGGTORRENT_SEARCH_URL_CATEGORY = "&category="
YGGTORRENT_SEARCH_URL_SUB_CATEGORY = "&sub_category="
YGGTORRENT_SEARCH_URL_ORDER = "&order="
YGGTORRENT_SEARCH_URL_SORT = "&sort="
YGGTORRENT_SEARCH_URL_DO = "&do="
YGGTORRENT_SEARCH_URL_PAGE = "&page="
YGGTORRENT_GET_FILES = f"{YGGTORRENT_BASE_URL}/engine/get_files?torrent="
YGGTORRENT_GET_INFO = f"https://www2.yggtorrent.se/engine/get_nfo?torrent="
YGGTORRENT_MOST_COMPLETED_URL = f"{YGGTORRENT_BASE_URL}/engine/mostcompleted"
TORRENT_PER_PAGE = 50
YGGTORRENT_FILES_URL = f"{YGGTORRENT_BASE_URL}/engine/get_files?torrent="
headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'}
def set_yggtorrent_tld(yggtorrent_tld=None):
"""
Redefine all string variable according to new TLD
"""
global YGGTORRENT_TLD
global YGGTORRENT_BASE_URL
global YGGTORRENT_LOGIN_URL
global YGGTORRENT_SEARCH_URL
global YGGTORRENT_DOMAIN
global YGGTORRENT_GET_FILES
global YGGTORRENT_GET_INFO
global YGGTORRENT_MOST_COMPLETED_URL
global YGGTORRENT_FILES_URL
YGGTORRENT_TLD = yggtorrent_tld
YGGTORRENT_BASE_URL = f"https://www2.yggtorrent.{YGGTORRENT_TLD}"
YGGTORRENT_LOGIN_URL = f"{YGGTORRENT_BASE_URL}/user/login"
YGGTORRENT_SEARCH_URL = f"{YGGTORRENT_BASE_URL}/user/logout"
YGGTORRENT_SEARCH_URL = f"{YGGTORRENT_BASE_URL}/engine/search?name="
YGGTORRENT_DOMAIN = ".yggtorrent.se"
YGGTORRENT_GET_FILES = f"{YGGTORRENT_BASE_URL}/engine/get_files?torrent="
YGGTORRENT_GET_INFO = f"https://www2.yggtorrentchg/engine/get_nfo?torrent="
YGGTORRENT_MOST_COMPLETED_URL = f"{YGGTORRENT_BASE_URL}/engine/mostcompleted"
YGGTORRENT_FILES_URL = f"{YGGTORRENT_BASE_URL}/engine/get_files?torrent="
def get_yggtorrent_tld():
return YGGTORRENT_TLD
class YggTorrentScraper:
session = None
def __init__(self, session):
self.session = session
def login(self, identifiant, password):
"""
Login request with the specified identifiant and password, return an yggtorrent_token, necessary to download
"""
self.session.cookies.clear()
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"User-Agent": "PostmanRuntime/7.17.1",
"Accept": "*/*",
"Cache-Control": "no-cache",
"Host": f"www2.yggtorrent.{YGGTORRENT_TLD}",
"Accept-Encoding": "gzip, deflate",
"Connection": "keep-alive",
}
response = self.session.post(
YGGTORRENT_LOGIN_URL,
data={"id": identifiant, "pass": password},
headers=headers,
)
logger.debug("status_code : %s", response.status_code)
yggtorrent_token = None
if response.status_code == 200:
logger.debug("Login successful")
yggtorrent_token = response.cookies.get_dict()[YGGTORRENT_TOKEN_COOKIE]
cookie = requests.cookies.create_cookie(
domain=YGGTORRENT_DOMAIN,
name=YGGTORRENT_TOKEN_COOKIE,
value=yggtorrent_token,
)
self.session.cookies.set_cookie(cookie)
return True
else:
logger.debug("Login failed")
return False
def logout(self):
"""
Logout request
"""
response = self.session.get(YGGTORRENT_LOGOUT_URL, headers=headers)
self.session.cookies.clear()
logger.debug("status_code : %s", response.status_code)
if response.status_code == 200:
logger.debug("Logout successful")
return True
else:
logger.debug("Logout failed")
return False
#kopa
def search_old(self, parameters):
search_url = create_search_url(parameters)
torrents_url = self.get_torrents_url(search_url, parameters)
return torrents_url
def search(self, parameters):
# torrents_url = os.popen('gecko/torrent_search.py didier')
torrents_url = exec(open('/home/iptubes/astroport-iptubes/yggcrawl/gecko/torrent_search.py').read())
return torrents_url
def extract_details(self, torrent_url):
"""
Extract informations from torrent's url
"""
logger.debug("torrent_url : %s", torrent_url)
torrents = []
response = self.session.get(torrent_url, headers=headers)
torrent_page = BeautifulSoup(response.content, features="lxml")
torrent = Torrent()
term_tags = torrent_page.find_all("a", {"class": "term"})
for term_tag in term_tags:
torrent.keywords.append(term_tag.text)
connection_tags = torrent_page.find("tr", {"id": "adv_search_cat"}).find_all(
"strong"
)
informations_tag = (
torrent_page.find("table", {"class": "informations"})
.find("tbody")
.find_all("tr")
)
download_button = torrent_page.find("a", {"class": "butt"})
if download_button.has_attr("href"):
torrent.url = download_button["href"]
torrent.seeders = int(connection_tags[0].text.replace(" ", ""))
torrent.leechers = int(connection_tags[1].text.replace(" ", ""))
torrent.completed = int(connection_tags[2].text.replace(" ", ""))
torrent.name = informations_tag[0].find_all("td")[1].text
torrent.size = informations_tag[3].find_all("td")[1].text
torrent.uploader = informations_tag[5].find_all("td")[1].text
mydatetime = re.search(
"([0-9]*\/[0-9]*\/[0-9]* [0-9]*:[0-9]*)",
informations_tag[6].find_all("td")[1].text,
0,
).group(0)
torrent.uploaded_datetime = datetime.datetime.strptime(
mydatetime, "%d/%m/%Y %H:%M"
)
message_tags = torrent_page.find_all("div", {"class": "message"})
for message_tag in message_tags:
torrent_comment = TorrentComment()
torrent_comment.author = message_tag.find("a").text
torrent_comment.posted = message_tag.find("strong").text
torrent_comment.text = message_tag.find(
"span", {"id": "comment_text"}
).text.strip()
torrent.comments.append(torrent_comment)
torrents.append(torrent)
torrent_id = torrent_page.find("form", {"id": "report-torrent"}).find(
"input", {"type": "hidden", "name": "target"}
)["value"]
response = self.session.get(YGGTORRENT_GET_FILES + torrent_id, headers=headers)
files_page = BeautifulSoup(response.content, features="lxml")
file_tags = files_page.find_all("tr")
for file_tag in file_tags:
torrent_file = TorrentFile()
td_tags = file_tag.find_all("td")
torrent_file.file_size = (
td_tags[0]
.text.replace("\\r", "")
.replace("\\n", "")
.replace("\\t", "")
.strip()
)
torrent_file.file_name = (
td_tags[1]
.text.replace("\\r", "")
.replace("\\n", "")
.replace("\\t", "")
.replace("\\", "")
.replace(" ", "")
.strip()
)
torrent.files.append(torrent_file)
return torrent
def most_completed(self):
"""
Return the most completed torrents url (TOP 100)
"""
header = {"Accept": "application/json, text/javascript, */*; q=0.01"}
self.session.post(YGGTORRENT_MOST_COMPLETED_URL, headers=header)
json_response = self.session.post(
YGGTORRENT_MOST_COMPLETED_URL, headers=header
).json()
torrents_url = []
for json_item in json_response:
root = BeautifulSoup(json_item[1], features="lxml")
a_tag = root.find("a")
torrents_url.append(a_tag["href"])
return torrents_url
#kopaa
def get_torrents_url(self, search_url, parameters):
"""
Return
"""
response = self.session.get(search_url, headers=headers)
search_page = BeautifulSoup(response.content, features="lxml")
pagination = search_page.find("ul", {"class": "pagination"})
if pagination is None:
limit_page = 1
else:
pagination_item = pagination.find_all("a")
limit_page = int(pagination_item[-1]["data-ci-pagination-page"])
torrents = []
for page in range(0, limit_page):
parameters["page"] = page * TORRENT_PER_PAGE
search_url = create_search_url(parameters)
response = self.session.get(search_url, headers=headers)
search_page = BeautifulSoup(response.content, features="lxml")
torrents_tag = search_page.findAll("a", {"id": "torrent_name"})
for torrent_tag in torrents_tag:
torrents.append(torrent_tag["href"])
return torrents
def download_from_torrent_url(self, torrent_url=None, destination_path="./data/tmp/torrents/"):
if torrent_url is not None:
torrent = self.extract_details(torrent_url)
return self.download_from_torrent_download_url(
torrent_url=torrent.url, destination_path=destination_path
)
def download_from_torrent(self, torrent=None, destination_path="./data/tmp/torrents/"):
if torrent is not None:
return self.download_from_torrent_download_url(
torrent_url=torrent.url, destination_path=destination_path
)
def download_from_torrent_download_url(
self, torrent_url=None, destination_path="./data/tmp/torrents/"
):
if torrent_url is None:
raise Exception("Invalid torrent_url, make sure you are logged")
response = self.session.get(YGGTORRENT_BASE_URL + torrent_url, headers=headers)
temp_file_name = response.headers.get("content-disposition")
file_name = temp_file_name[temp_file_name.index("filename=") + 10 : -1]
if not os.path.exists(destination_path):
os.makedirs(destination_path)
file_full_path = os.path.join(destination_path, file_name)
file = open(file_full_path, "wb")
file.write(response.content)
file.close()
return file_full_path
def create_search_url(parameters):
"""
Return a formated URL for torrent's search
"""
formated_search_url = YGGTORRENT_SEARCH_URL
if "name" in parameters:
formated_search_url += parameters["name"].replace(" ", "+")
if "page" in parameters:
formated_search_url += YGGTORRENT_SEARCH_URL_PAGE
formated_search_url += str(parameters["page"])
if "descriptions" in parameters:
formated_search_url += YGGTORRENT_SEARCH_URL_DESCRIPTION
for description in parameters["descriptions"]:
formated_search_url += description
formated_search_url += "+"
if "files" in parameters:
formated_search_url += YGGTORRENT_SEARCH_URL_FILE
for file in parameters["files"]:
formated_search_url += file
formated_search_url += "+"
if "uploader" in parameters:
formated_search_url += YGGTORRENT_SEARCH_URL_UPLOADER
formated_search_url += parameters["uploader"]
if "sort" in parameters:
formated_search_url += YGGTORRENT_SEARCH_URL_SORT
formated_search_url += parameters["sort"]
if "order" in parameters:
formated_search_url += YGGTORRENT_SEARCH_URL_ORDER
formated_search_url += parameters["order"]
if "category" in parameters:
for category in categories:
if parameters["category"] == category["name"]:
formated_search_url += YGGTORRENT_SEARCH_URL_CATEGORY
formated_search_url += category["id"]
if "subcategory" in parameters:
for subcategory in category["subcategories"]:
if parameters["subcategory"] == subcategory["name"]:
formated_search_url += YGGTORRENT_SEARCH_URL_SUB_CATEGORY
formated_search_url += subcategory["id"]
if "options" in parameters:
for key, values in parameters["options"].items():
for option in subcategory["options"]:
if key == option["name"]:
for searched_value in values:
for index, value in enumerate(
option["values"]
):
if searched_value == value:
formated_search_url += (
"&option_"
)
formated_search_url += option[
"name"
]
# options_index.append(index)
if "multiple" in option:
formated_search_url += (
"%3Amultiple"
)
formated_search_url += "[]="
formated_search_url += str(
index + 1
)
formated_search_url += YGGTORRENT_SEARCH_URL_DO
formated_search_url += "search"
return formated_search_url

25
selenium.py Executable file
View File

@ -0,0 +1,25 @@
#!/usr/bin/python3
import sys
from yggcrawl import YggTorrentScraperSelenium
from selenium import webdriver
if __name__ == "__main__":
options = webdriver.ChromeOptions()
options.add_argument("--log-level=3")
options.add_argument("--disable-blink-features")
options.add_argument("--disable-blink-features=AutomationControlled")
options.add_experimental_option("excludeSwitches", ["enable-logging"])
driver = webdriver.Chrome("D:\chromedriver.exe", options=options)
scraper = YggTorrentScraperSelenium(driver=driver)
# or
# scraper = YggTorrentScraperSelenium(driver_path="D:\chromedriver.exe")
if scraper.login("myidentifiant", "mypassword"):
print("Login success")
torrents_url = scraper.search({"name": "walking dead"})
print(torrents_url)
else:
print("Login failed")

17
tata.sh Executable file
View File

@ -0,0 +1,17 @@
#!/usr/bin/env bash
readWords() {
declare -i int="$1"
(( int == 0 )) && {
printf "%s\n" "$int is 0, cant find 0 words"
return 1
}
while read getWords;do
if [[ ${#getWords} -eq $int ]];then
printf "%s\n" "$getWords"
fi
done < /usr/share/dict/words
}
readWords 20

View File

@ -26,12 +26,9 @@ getid() {
# Get ID
else
j=0
for i in $name; do
if [[ $j == 0 ]];then
result=$($transcmd --list | grep -vE 'Sum:|ID Done' | grep -iw "$i")
else
result=$(echo "$result" | grep -iw "$i")
fi
for i in "$name"; do
[[ $j == 0 ]] && result=$($transcmd --list | grep -vE 'Sum:|ID Done' | grep -i "$i")
result=$(echo "$result" | grep -vE 'Sum:|ID Done' | grep -iw "$i")
((j++))
done
fi
@ -39,7 +36,7 @@ getid() {
echo "$result" | awk '{ print $1 }'
else
echo "No torrent found"
fi
fi
}
getlowerid() {
@ -89,16 +86,13 @@ case "$1" in
remove)
idt=$(getid | tr -d '*')
if [[ $idt =~ ^[+-]?[0-9]+([.][0-9]+)?$ ]]; then
torrentList=$($transcmd --list | grep -vE 'Sum:|ID Done' )
IFS=$'\n'
for i in $torrentList; do
for i in "$($transcmd --list | grep -vE 'Sum:|ID Done' )"; do
if [[ $(echo "$i" | awk '{ print $1 }') == $idt ]]; then
fileName=$(echo "$i" | awk '{ print $NF }')
break
fi
done
IFS=$' '
[[ ! $fileName ]] && echo "Can't find torrent to remove." && exit 1
cd data/meta
torrentId=$(grep -r $fileName | head -n1 | awk -F '/' '{ print $1 }')
rm -rf $torrentId

View File

@ -7,5 +7,11 @@ from .yggtorrentscraper import (
set_yggtorrent_tld,
get_yggtorrent_tld,
)
from .yggtorrentscraper_selenium import (
YggTorrentScraperSelenium,
set_yggtorrent_tld,
get_yggtorrent_tld,
)
from .torrent import Torrent, TorrentComment, TorrentFile
from .categories import categories

35
yggcrawl/debug.log Normal file
View File

@ -0,0 +1,35 @@
[0512/133633.143:ERROR:process_reader_win.cc(123)] NtOpenThread: {Accès refusé} Un processus a demandé laccès a un objet, mais il ne bénéficie pas des autorisations nécessaires. (0xc0000022)
[0512/133633.143:ERROR:process_reader_win.cc(123)] NtOpenThread: {Accès refusé} Un processus a demandé laccès a un objet, mais il ne bénéficie pas des autorisations nécessaires. (0xc0000022)
[0512/133633.144:ERROR:exception_snapshot_win.cc(98)] thread ID 14952 not found in process
[0512/133633.144:ERROR:exception_snapshot_win.cc(98)] thread ID 49204 not found in process
[0512/133633.158:ERROR:process_reader_win.cc(123)] NtOpenThread: {Accès refusé} Un processus a demandé laccès a un objet, mais il ne bénéficie pas des autorisations nécessaires. (0xc0000022)
[0512/133633.158:ERROR:exception_snapshot_win.cc(98)] thread ID 46960 not found in process
[0512/133700.448:ERROR:process_reader_win.cc(123)] NtOpenThread: {Accès refusé} Un processus a demandé laccès a un objet, mais il ne bénéficie pas des autorisations nécessaires. (0xc0000022)
[0512/133700.448:ERROR:exception_snapshot_win.cc(98)] thread ID 45656 not found in process
[0512/133700.470:ERROR:process_reader_win.cc(123)] NtOpenThread: {Accès refusé} Un processus a demandé laccès a un objet, mais il ne bénéficie pas des autorisations nécessaires. (0xc0000022)
[0512/133700.470:ERROR:exception_snapshot_win.cc(98)] thread ID 47508 not found in process
[0512/134349.949:ERROR:process_reader_win.cc(123)] NtOpenThread: {Accès refusé} Un processus a demandé laccès a un objet, mais il ne bénéficie pas des autorisations nécessaires. (0xc0000022)
[0512/134349.962:ERROR:exception_snapshot_win.cc(98)] thread ID 39532 not found in process
[0512/134349.956:ERROR:process_reader_win.cc(123)] NtOpenThread: {Accès refusé} Un processus a demandé laccès a un objet, mais il ne bénéficie pas des autorisations nécessaires. (0xc0000022)
[0512/134349.962:ERROR:exception_snapshot_win.cc(98)] thread ID 48284 not found in process
[0512/134349.963:ERROR:process_reader_win.cc(123)] NtOpenThread: {Accès refusé} Un processus a demandé laccès a un objet, mais il ne bénéficie pas des autorisations nécessaires. (0xc0000022)
[0512/134349.963:ERROR:exception_snapshot_win.cc(98)] thread ID 51964 not found in process
[0512/134349.982:ERROR:process_reader_win.cc(123)] NtOpenThread: {Accès refusé} Un processus a demandé laccès a un objet, mais il ne bénéficie pas des autorisations nécessaires. (0xc0000022)
[0512/134349.982:ERROR:exception_snapshot_win.cc(98)] thread ID 19944 not found in process
[0512/134349.986:ERROR:process_reader_win.cc(123)] NtOpenThread: {Accès refusé} Un processus a demandé laccès a un objet, mais il ne bénéficie pas des autorisations nécessaires. (0xc0000022)
[0512/134349.986:ERROR:process_reader_win.cc(123)] NtOpenThread: {Accès refusé} Un processus a demandé laccès a un objet, mais il ne bénéficie pas des autorisations nécessaires. (0xc0000022)
[0512/134349.986:ERROR:exception_snapshot_win.cc(98)] thread ID 32616 not found in process
[0512/134349.986:ERROR:exception_snapshot_win.cc(98)] thread ID 42324 not found in process
[0512/135659.781:ERROR:process_reader_win.cc(123)] NtOpenThread: {Accès refusé} Un processus a demandé laccès a un objet, mais il ne bénéficie pas des autorisations nécessaires. (0xc0000022)
[0512/135659.784:ERROR:exception_snapshot_win.cc(98)] thread ID 50808 not found in process
[0512/135659.830:ERROR:process_reader_win.cc(151)] SuspendThread: Accès refusé. (0x5)
[0512/135659.830:ERROR:process_reader_win.cc(123)] NtOpenThread: {Accès refusé} Un processus a demandé laccès a un objet, mais il ne bénéficie pas des autorisations nécessaires. (0xc0000022)
[0512/135659.831:ERROR:exception_snapshot_win.cc(98)] thread ID 45060 not found in process
[0512/135659.847:ERROR:process_reader_win.cc(123)] NtOpenThread: {Accès refusé} Un processus a demandé laccès a un objet, mais il ne bénéficie pas des autorisations nécessaires. (0xc0000022)
[0512/135659.847:ERROR:exception_snapshot_win.cc(98)] thread ID 52004 not found in process
[0512/135659.882:ERROR:process_reader_win.cc(123)] NtOpenThread: {Accès refusé} Un processus a demandé laccès a un objet, mais il ne bénéficie pas des autorisations nécessaires. (0xc0000022)
[0512/135659.882:ERROR:exception_snapshot_win.cc(98)] thread ID 56756 not found in process
[0512/135659.994:ERROR:process_reader_win.cc(123)] NtOpenThread: {Accès refusé} Un processus a demandé laccès a un objet, mais il ne bénéficie pas des autorisations nécessaires. (0xc0000022)
[0512/135659.994:ERROR:exception_snapshot_win.cc(98)] thread ID 22536 not found in process
[0512/135700.025:ERROR:process_reader_win.cc(123)] NtOpenThread: {Accès refusé} Un processus a demandé laccès a un objet, mais il ne bénéficie pas des autorisations nécessaires. (0xc0000022)
[0512/135700.025:ERROR:exception_snapshot_win.cc(98)] thread ID 56652 not found in process

View File

@ -3,6 +3,7 @@ import shutil
import unittest
import requests
import cloudscraper
from ..yggtorrentscraper import YggTorrentScraper
@ -20,7 +21,7 @@ class TestDownload(unittest.TestCase):
".", "yggtorrentscraper", "tests", "test_download"
)
self.scraper = YggTorrentScraper(requests.session())
self.scraper = YggTorrentScraper(cloudscraper.create_scraper())
self.scraper.login(yggtorrent_identifiant, yggtorrent_password)

View File

@ -2,12 +2,12 @@ import os
import unittest
import requests
import cloudscraper
from ..yggtorrentscraper import YggTorrentScraper
class TestExtractDetails(unittest.TestCase):
scraper = YggTorrentScraper(requests.session())
self.scraper = YggTorrentScraper(cloudscraper.create_scraper())
def test_extract_details(self):
torrent = self.scraper.extract_details(

View File

@ -2,13 +2,13 @@ import os
import unittest
import requests
import cloudscraper
from ..yggtorrentscraper import YggTorrentScraper
class TestAuthentification(unittest.TestCase):
def setUp(self):
self.scraper = YggTorrentScraper(requests.session())
self.scraper = YggTorrentScraper(cloudscraper.create_scraper())
def test_login_success(self):
yggtorrent_identifiant = os.environ.get("YGGTORRENT_IDENTIFIANT")

View File

@ -2,13 +2,13 @@ import os
import unittest
import requests
import cloudscraper
from ..yggtorrentscraper import YggTorrentScraper
class TestLogout(unittest.TestCase):
def setUp(self):
self.scraper = YggTorrentScraper(requests.session())
self.scraper = YggTorrentScraper(cloudscraper.create_scraper())
def test_logout_success(self):
yggtorrent_identifiant = os.environ.get("YGGTORRENT_IDENTIFIANT")

View File

@ -1,11 +1,12 @@
import unittest
import requests
import cloudscraper
from ..yggtorrentscraper import YggTorrentScraper
class TestMostCompleted(unittest.TestCase):
scraper = YggTorrentScraper(session=requests.session())
self.scraper = YggTorrentScraper(cloudscraper.create_scraper())
def test_most_completed(self):
most_completed = self.scraper.most_completed()

View File

@ -1,12 +1,13 @@
import unittest
import requests
import cloudscraper
from ..yggtorrentscraper import YggTorrentScraper
class TestResearch(unittest.TestCase):
scraper = YggTorrentScraper(requests.session())
self.scraper = YggTorrentScraper(cloudscraper.create_scraper())
torrent_name = "walking dead s09"
torrent_uploader = "brandit"

View File

@ -2,12 +2,12 @@ import os
import unittest
import requests
import cloudscraper
from ..yggtorrentscraper import YggTorrentScraper
class TestTorrent(unittest.TestCase):
scraper = YggTorrentScraper(requests.session())
self.scraper = YggTorrentScraper(cloudscraper.create_scraper())
def test_str(self):
torrent_url = self.scraper.most_completed()[0]

View File

@ -18,6 +18,7 @@ class Torrent:
leechers = -1
url = None
download_url = None
files = []
comments = []
@ -36,6 +37,11 @@ class Torrent:
else:
to_string += "N/A"
if self.download_url is not None:
to_string += self.download_url
else:
to_string += "N/A"
to_string += os.linesep
to_string += os.linesep

View File

@ -9,7 +9,7 @@ from bs4 import BeautifulSoup
from .torrent import Torrent, TorrentComment, TorrentFile
from .categories import categories
YGGTORRENT_TLD = "si"
YGGTORRENT_TLD = "se"
YGGTORRENT_BASE_URL = f"https://www2.yggtorrent.{YGGTORRENT_TLD}"
@ -44,7 +44,7 @@ YGGTORRENT_SEARCH_URL_DO = "&do="
YGGTORRENT_SEARCH_URL_PAGE = "&page="
YGGTORRENT_GET_FILES = f"{YGGTORRENT_BASE_URL}/engine/get_files?torrent="
YGGTORRENT_GET_INFO = f"{YGGTORRENT_BASE_URL}/engine/get_nfo?torrent="
YGGTORRENT_GET_INFO = f"https://www2.yggtorrentchg/engine/get_nfo?torrent="
YGGTORRENT_MOST_COMPLETED_URL = f"{YGGTORRENT_BASE_URL}/engine/mostcompleted"
@ -162,6 +162,7 @@ class YggTorrentScraper:
def search(self, parameters):
search_url = create_search_url(parameters)
torrents_url = self.get_torrents_url(search_url, parameters)
return torrents_url
@ -328,8 +329,7 @@ class YggTorrentScraper:
return torrents
#kopa
def download_from_torrent_url(self, torrent_url=None, destination_path="./data/tmp/torrents/"):
def download_from_torrent_url(self, torrent_url=None, destination_path="./"):
if torrent_url is not None:
torrent = self.extract_details(torrent_url)
@ -337,14 +337,14 @@ class YggTorrentScraper:
torrent_url=torrent.url, destination_path=destination_path
)
def download_from_torrent(self, torrent=None, destination_path="./data/tmp/torrents/"):
def download_from_torrent(self, torrent=None, destination_path="./"):
if torrent is not None:
return self.download_from_torrent_download_url(
torrent_url=torrent.url, destination_path=destination_path
)
def download_from_torrent_download_url(
self, torrent_url=None, destination_path="./data/tmp/torrents/"
self, torrent_url=None, destination_path="./"
):
if torrent_url is None:
raise Exception("Invalid torrent_url, make sure you are logged")

View File

@ -0,0 +1,452 @@
import datetime
import logging
import os
import re
import requests
from bs4 import BeautifulSoup
from .torrent import Torrent, TorrentComment, TorrentFile
from .categories import categories
YGGTORRENT_TLD = "se"
YGGTORRENT_BASE_URL = f"https://www2.yggtorrent.{YGGTORRENT_TLD}"
YGGTORRENT_LOGIN_URL = f"{YGGTORRENT_BASE_URL}/user/login"
YGGTORRENT_LOGOUT_URL = f"{YGGTORRENT_BASE_URL}/user/logout?attempt=1"
YGGTORRENT_SEARCH_URL = f"{YGGTORRENT_BASE_URL}/engine/search?name="
logger = logging.getLogger("yggtorrentscraper")
YGGTORRENT_DOMAIN = f".yggtorrent.{YGGTORRENT_TLD}"
YGGTORRENT_TOKEN_COOKIE = "ygg_"
YGGTORRENT_SEARCH_URL_DESCRIPTION = "&description="
YGGTORRENT_SEARCH_URL_FILE = "&file="
YGGTORRENT_SEARCH_URL_UPLOADER = "&uploader="
YGGTORRENT_SEARCH_URL_CATEGORY = "&category="
YGGTORRENT_SEARCH_URL_SUB_CATEGORY = "&sub_category="
YGGTORRENT_SEARCH_URL_ORDER = "&order="
YGGTORRENT_SEARCH_URL_SORT = "&sort="
YGGTORRENT_SEARCH_URL_DO = "&do="
YGGTORRENT_SEARCH_URL_PAGE = "&page="
YGGTORRENT_SEARCH_URL_DESCRIPTION = "&description="
YGGTORRENT_SEARCH_URL_FILE = "&file="
YGGTORRENT_SEARCH_URL_UPLOADER = "&uploader="
YGGTORRENT_SEARCH_URL_CATEGORY = "&category="
YGGTORRENT_SEARCH_URL_SUB_CATEGORY = "&sub_category="
YGGTORRENT_SEARCH_URL_ORDER = "&order="
YGGTORRENT_SEARCH_URL_SORT = "&sort="
YGGTORRENT_SEARCH_URL_DO = "&do="
YGGTORRENT_SEARCH_URL_PAGE = "&page="
YGGTORRENT_GET_FILES = f"{YGGTORRENT_BASE_URL}/engine/get_files?torrent="
YGGTORRENT_GET_INFO = f"https://www2.yggtorrentchg/engine/get_nfo?torrent="
YGGTORRENT_MOST_COMPLETED_URL = f"{YGGTORRENT_BASE_URL}/engine/mostcompleted"
TORRENT_PER_PAGE = 50
YGGTORRENT_FILES_URL = f"{YGGTORRENT_BASE_URL}/engine/get_files?torrent="
def set_yggtorrent_tld(yggtorrent_tld=None):
"""
Redefine all string variable according to new TLD
"""
global YGGTORRENT_TLD
global YGGTORRENT_BASE_URL
global YGGTORRENT_LOGIN_URL
global YGGTORRENT_SEARCH_URL
global YGGTORRENT_DOMAIN
global YGGTORRENT_GET_FILES
global YGGTORRENT_GET_INFO
global YGGTORRENT_MOST_COMPLETED_URL
global YGGTORRENT_FILES_URL
YGGTORRENT_TLD = yggtorrent_tld
YGGTORRENT_BASE_URL = f"https://www2.yggtorrent.{YGGTORRENT_TLD}"
YGGTORRENT_LOGIN_URL = f"{YGGTORRENT_BASE_URL}/user/login"
YGGTORRENT_SEARCH_URL = f"{YGGTORRENT_BASE_URL}/user/logout"
YGGTORRENT_SEARCH_URL = f"{YGGTORRENT_BASE_URL}/engine/search?name="
YGGTORRENT_DOMAIN = ".yggtorrent.gg"
YGGTORRENT_GET_FILES = f"{YGGTORRENT_BASE_URL}/engine/get_files?torrent="
YGGTORRENT_GET_INFO = f"https://www2.yggtorrentchg/engine/get_nfo?torrent="
YGGTORRENT_MOST_COMPLETED_URL = f"{YGGTORRENT_BASE_URL}/engine/mostcompleted"
YGGTORRENT_FILES_URL = f"{YGGTORRENT_BASE_URL}/engine/get_files?torrent="
def get_yggtorrent_tld():
return YGGTORRENT_TLD
class YggTorrentScraper:
session = None
def __init__(self, session):
self.session = session
def login(self, identifiant, password):
"""
Login request with the specified identifiant and password, return an yggtorrent_token, necessary to download
"""
self.session.cookies.clear()
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"User-Agent": "PostmanRuntime/7.17.1",
"Accept": "*/*",
"Cache-Control": "no-cache",
"Host": f"www.yggtorrent.{YGGTORRENT_TLD}",
"Accept-Encoding": "gzip, deflate",
"Connection": "keep-alive",
}
response = self.session.post(
YGGTORRENT_LOGIN_URL,
data={"id": identifiant, "pass": password},
headers=headers,
)
logger.debug("status_code : %s", response.status_code)
yggtorrent_token = None
if response.status_code == 200:
logger.debug("Login successful")
yggtorrent_token = response.cookies.get_dict()[YGGTORRENT_TOKEN_COOKIE]
cookie = requests.cookies.create_cookie(
domain=YGGTORRENT_DOMAIN,
name=YGGTORRENT_TOKEN_COOKIE,
value=yggtorrent_token,
)
self.session.cookies.set_cookie(cookie)
return True
else:
logger.debug("Login failed")
return False
def logout(self):
"""
Logout request
"""
response = self.session.get(YGGTORRENT_LOGOUT_URL)
self.session.cookies.clear()
logger.debug("status_code : %s", response.status_code)
if response.status_code == 200:
logger.debug("Logout successful")
return True
else:
logger.debug("Logout failed")
return False
def search(self, parameters):
search_url = create_search_url(parameters)
torrents_url = self.get_torrents_url(search_url, parameters)
return torrents_url
def extract_details(self, torrent_url):
"""
Extract informations from torrent's url
"""
logger.debug("torrent_url : %s", torrent_url)
torrents = []
response = self.session.get(torrent_url)
torrent_page = BeautifulSoup(response.content, features="lxml")
torrent = Torrent()
term_tags = torrent_page.find_all("a", {"class": "term"})
for term_tag in term_tags:
torrent.keywords.append(term_tag.text)
connection_tags = torrent_page.find("tr", {"id": "adv_search_cat"}).find_all(
"strong"
)
informations_tag = (
torrent_page.find("table", {"class": "informations"})
.find("tbody")
.find_all("tr")
)
download_button = torrent_page.find("a", {"class": "butt"})
if download_button.has_attr("href"):
torrent.url = download_button["href"]
torrent.seeders = int(connection_tags[0].text.replace(" ", ""))
torrent.leechers = int(connection_tags[1].text.replace(" ", ""))
torrent.completed = int(connection_tags[2].text.replace(" ", ""))
torrent.name = informations_tag[0].find_all("td")[1].text
torrent.size = informations_tag[3].find_all("td")[1].text
torrent.uploader = informations_tag[5].find_all("td")[1].text
mydatetime = re.search(
"([0-9]*\/[0-9]*\/[0-9]* [0-9]*:[0-9]*)",
informations_tag[6].find_all("td")[1].text,
0,
).group(0)
torrent.uploaded_datetime = datetime.datetime.strptime(
mydatetime, "%d/%m/%Y %H:%M"
)
message_tags = torrent_page.find_all("div", {"class": "message"})
for message_tag in message_tags:
torrent_comment = TorrentComment()
torrent_comment.author = message_tag.find("a").text
torrent_comment.posted = message_tag.find("strong").text
torrent_comment.text = message_tag.find(
"span", {"id": "comment_text"}
).text.strip()
torrent.comments.append(torrent_comment)
torrents.append(torrent)
torrent_id = torrent_page.find("form", {"id": "report-torrent"}).find(
"input", {"type": "hidden", "name": "target"}
)["value"]
response = self.session.get(YGGTORRENT_GET_FILES + torrent_id)
files_page = BeautifulSoup(response.content, features="lxml")
file_tags = files_page.find_all("tr")
for file_tag in file_tags:
torrent_file = TorrentFile()
td_tags = file_tag.find_all("td")
torrent_file.file_size = (
td_tags[0]
.text.replace("\\r", "")
.replace("\\n", "")
.replace("\\t", "")
.strip()
)
torrent_file.file_name = (
td_tags[1]
.text.replace("\\r", "")
.replace("\\n", "")
.replace("\\t", "")
.replace("\\", "")
.replace(" ", "")
.strip()
)
torrent.files.append(torrent_file)
return torrent
def most_completed(self):
"""
Return the most completed torrents url (TOP 100)
"""
header = {"Accept": "application/json, text/javascript, */*; q=0.01"}
self.session.post(YGGTORRENT_MOST_COMPLETED_URL, headers=header)
json_response = self.session.post(
YGGTORRENT_MOST_COMPLETED_URL, headers=header
).json()
torrents_url = []
for json_item in json_response:
root = BeautifulSoup(json_item[1], features="lxml")
a_tag = root.find("a")
torrents_url.append(a_tag["href"])
return torrents_url
def get_torrents_url(self, search_url, parameters):
"""
Return
"""
response = self.session.get(search_url)
search_page = BeautifulSoup(response.content, features="lxml")
pagination = search_page.find("ul", {"class": "pagination"})
if pagination is None:
limit_page = 1
else:
pagination_item = pagination.find_all("a")
limit_page = int(pagination_item[-1]["data-ci-pagination-page"])
torrents = []
for page in range(0, limit_page):
parameters["page"] = page * TORRENT_PER_PAGE
search_url = create_search_url(parameters)
response = self.session.get(search_url)
search_page = BeautifulSoup(response.content, features="lxml")
torrents_tag = search_page.findAll("a", {"id": "torrent_name"})
for torrent_tag in torrents_tag:
torrents.append(torrent_tag["href"])
return torrents
def download_from_torrent_url(self, torrent_url=None, destination_path="./"):
if torrent_url is not None:
torrent = self.extract_details(torrent_url)
return self.download_from_torrent_download_url(
torrent_url=torrent.url, destination_path=destination_path
)
def download_from_torrent(self, torrent=None, destination_path="./"):
if torrent is not None:
return self.download_from_torrent_download_url(
torrent_url=torrent.url, destination_path=destination_path
)
def download_from_torrent_download_url(
self, torrent_url=None, destination_path="./"
):
if torrent_url is None:
raise Exception("Invalid torrent_url, make sure you are logged")
response = self.session.get(YGGTORRENT_BASE_URL + torrent_url)
temp_file_name = response.headers.get("content-disposition")
file_name = temp_file_name[temp_file_name.index("filename=") + 10 : -1]
if not os.path.exists(destination_path):
os.makedirs(destination_path)
file_full_path = os.path.join(destination_path, file_name)
file = open(file_full_path, "wb")
file.write(response.content)
file.close()
return file_full_path
def create_search_url(parameters):
"""
Return a formated URL for torrent's search
"""
formated_search_url = YGGTORRENT_SEARCH_URL
if "name" in parameters:
formated_search_url += parameters["name"].replace(" ", "+")
if "page" in parameters:
formated_search_url += YGGTORRENT_SEARCH_URL_PAGE
formated_search_url += str(parameters["page"])
if "descriptions" in parameters:
formated_search_url += YGGTORRENT_SEARCH_URL_DESCRIPTION
for description in parameters["descriptions"]:
formated_search_url += description
formated_search_url += "+"
if "files" in parameters:
formated_search_url += YGGTORRENT_SEARCH_URL_FILE
for file in parameters["files"]:
formated_search_url += file
formated_search_url += "+"
if "uploader" in parameters:
formated_search_url += YGGTORRENT_SEARCH_URL_UPLOADER
formated_search_url += parameters["uploader"]
if "sort" in parameters:
formated_search_url += YGGTORRENT_SEARCH_URL_SORT
formated_search_url += parameters["sort"]
if "order" in parameters:
formated_search_url += YGGTORRENT_SEARCH_URL_ORDER
formated_search_url += parameters["order"]
if "category" in parameters:
for category in categories:
if parameters["category"] == category["name"]:
formated_search_url += YGGTORRENT_SEARCH_URL_CATEGORY
formated_search_url += category["id"]
if "subcategory" in parameters:
for subcategory in category["subcategories"]:
if parameters["subcategory"] == subcategory["name"]:
formated_search_url += YGGTORRENT_SEARCH_URL_SUB_CATEGORY
formated_search_url += subcategory["id"]
if "options" in parameters:
for key, values in parameters["options"].items():
for option in subcategory["options"]:
if key == option["name"]:
for searched_value in values:
for index, value in enumerate(
option["values"]
):
if searched_value == value:
formated_search_url += (
"&option_"
)
formated_search_url += option[
"name"
]
# options_index.append(index)
if "multiple" in option:
formated_search_url += (
"%3Amultiple"
)
formated_search_url += "[]="
formated_search_url += str(
index + 1
)
formated_search_url += YGGTORRENT_SEARCH_URL_DO
formated_search_url += "search"
return formated_search_url

View File

@ -0,0 +1,476 @@
import datetime
import logging
import os
import re
import requests
from bs4 import BeautifulSoup
from .torrent import Torrent, TorrentComment, TorrentFile
from .categories import categories
import sys
import time
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
from random import randint
YGGTORRENT_TLD = "se"
YGGTORRENT_BASE_URL = f"https://www2.yggtorrent.{YGGTORRENT_TLD}"
YGGTORRENT_LOGIN_URL = f"{YGGTORRENT_BASE_URL}/user/login"
YGGTORRENT_LOGOUT_URL = f"{YGGTORRENT_BASE_URL}/user/logout"
YGGTORRENT_SEARCH_URL = f"{YGGTORRENT_BASE_URL}/engine/search?name="
logger = logging.getLogger("yggtorrentscraper")
YGGTORRENT_DOMAIN = f".yggtorrent.{YGGTORRENT_TLD}"
YGGTORRENT_TOKEN_COOKIE = "ygg_"
YGGTORRENT_SEARCH_URL_DESCRIPTION = "&description="
YGGTORRENT_SEARCH_URL_FILE = "&file="
YGGTORRENT_SEARCH_URL_UPLOADER = "&uploader="
YGGTORRENT_SEARCH_URL_CATEGORY = "&category="
YGGTORRENT_SEARCH_URL_SUB_CATEGORY = "&sub_category="
YGGTORRENT_SEARCH_URL_ORDER = "&order="
YGGTORRENT_SEARCH_URL_SORT = "&sort="
YGGTORRENT_SEARCH_URL_DO = "&do="
YGGTORRENT_SEARCH_URL_PAGE = "&page="
YGGTORRENT_SEARCH_URL_DESCRIPTION = "&description="
YGGTORRENT_SEARCH_URL_FILE = "&file="
YGGTORRENT_SEARCH_URL_UPLOADER = "&uploader="
YGGTORRENT_SEARCH_URL_CATEGORY = "&category="
YGGTORRENT_SEARCH_URL_SUB_CATEGORY = "&sub_category="
YGGTORRENT_SEARCH_URL_ORDER = "&order="
YGGTORRENT_SEARCH_URL_SORT = "&sort="
YGGTORRENT_SEARCH_URL_DO = "&do="
YGGTORRENT_SEARCH_URL_PAGE = "&page="
YGGTORRENT_GET_FILES = f"{YGGTORRENT_BASE_URL}/engine/get_files?torrent="
YGGTORRENT_GET_INFO = f"https://www2.yggtorrentchg/engine/get_nfo?torrent="
YGGTORRENT_MOST_COMPLETED_URL = f"{YGGTORRENT_BASE_URL}/engine/mostcompleted"
TORRENT_PER_PAGE = 50
YGGTORRENT_FILES_URL = f"{YGGTORRENT_BASE_URL}/engine/get_files?torrent="
def set_yggtorrent_tld(yggtorrent_tld=None):
"""
Redefine all string variable according to new TLD
"""
global YGGTORRENT_TLD
global YGGTORRENT_BASE_URL
global YGGTORRENT_LOGIN_URL
global YGGTORRENT_SEARCH_URL
global YGGTORRENT_DOMAIN
global YGGTORRENT_GET_FILES
global YGGTORRENT_GET_INFO
global YGGTORRENT_MOST_COMPLETED_URL
global YGGTORRENT_FILES_URL
YGGTORRENT_TLD = yggtorrent_tld
YGGTORRENT_BASE_URL = f"https://www2.yggtorrent.{YGGTORRENT_TLD}"
YGGTORRENT_LOGIN_URL = f"{YGGTORRENT_BASE_URL}/user/login"
YGGTORRENT_SEARCH_URL = f"{YGGTORRENT_BASE_URL}/user/logout"
YGGTORRENT_SEARCH_URL = f"{YGGTORRENT_BASE_URL}/engine/search?name="
YGGTORRENT_DOMAIN = ".yggtorrent.gg"
YGGTORRENT_GET_FILES = f"{YGGTORRENT_BASE_URL}/engine/get_files?torrent="
YGGTORRENT_GET_INFO = f"https://www2.yggtorrentchg/engine/get_nfo?torrent="
YGGTORRENT_MOST_COMPLETED_URL = f"{YGGTORRENT_BASE_URL}/engine/mostcompleted"
YGGTORRENT_FILES_URL = f"{YGGTORRENT_BASE_URL}/engine/get_files?torrent="
def get_yggtorrent_tld():
return YGGTORRENT_TLD
class YggTorrentScraperSelenium:
def __init__(self, driver=None, driver_path=None):
if driver_path is not None:
options = webdriver.ChromeOptions()
options.add_argument("--log-level=3")
options.add_argument("--disable-blink-features")
options.add_argument("--disable-blink-features=AutomationControlled")
options.add_experimental_option("excludeSwitches", ["enable-logging"])
self.driver = webdriver.Chrome(driver_path, options=options)
else:
self.driver = driver
def login(self, identifiant, password):
self.driver.get(YGGTORRENT_BASE_URL)
WebDriverWait(self.driver, 30000).until(
EC.presence_of_element_located((By.CSS_SELECTOR, "#title"))
)
register_button = self.driver.find_element_by_css_selector("#register")
self.driver.execute_script("arguments[0].click();", register_button)
input_identifiant = self.driver.find_element_by_css_selector("input[name='id']")
input_identifiant.clear()
input_identifiant.send_keys(identifiant)
input_password = self.driver.find_element_by_css_selector("input[name='pass']")
input_password.clear()
input_password.send_keys(password)
login_button = self.driver.find_element_by_css_selector("#user-login button")
self.driver.execute_script("arguments[0].click();", login_button)
time.sleep(1)
account_banned = self.driver.find_element_by_css_selector("#ban_msg_login")
invalid_password = self.driver.find_element_by_css_selector("#login_msg_pass")
not_activated_account = self.driver.find_element_by_css_selector(
"#login_msg_mail"
)
if (
len(account_banned.get_attribute("style")) == 0
or len(invalid_password.get_attribute("style")) == 0
or len(not_activated_account.get_attribute("style")) == 0
):
return False
try:
WebDriverWait(self.driver, 5).until(
EC.presence_of_element_located((By.CSS_SELECTOR, "#panel-btn"))
)
except TimeoutException:
return False
return True
def logout(self):
"""
Logout request
"""
# <a href="https://www2.yggtorrent.se/user/logout"> Déconnexion</a>
self.driver.get(YGGTORRENT_LOGOUT_URL)
time.sleep(1)
try:
panel_button = self.driver.find_element_by_css_selector("#panel-btn")
except NoSuchElementException:
return True
return False
def search(self, parameters):
search_url = create_search_url(parameters)
torrents_url = self.get_torrents_url(search_url, parameters)
return torrents_url
def extract_details(self, torrent_url):
"""
Extract informations from torrent's url
"""
logger.debug("torrent_url : %s", torrent_url)
self.driver.get(torrent_url)
WebDriverWait(self.driver, 30000).until(
EC.presence_of_element_located((By.CSS_SELECTOR, "#title"))
)
torrents = []
torrent_page = BeautifulSoup(self.driver.page_source, features="lxml")
torrent = Torrent()
torrent.url = torrent_url
term_tags = torrent_page.find_all("a", {"class": "term"})
for term_tag in term_tags:
torrent.keywords.append(term_tag.text)
connection_tags = torrent_page.find("tr", {"id": "adv_search_cat"}).find_all(
"strong"
)
informations_tag = (
torrent_page.find("table", {"class": "informations"})
.find("tbody")
.find_all("tr")
)
download_button = torrent_page.find("a", {"class": "butt"})
if download_button.has_attr("href"):
torrent.download_url = download_button["href"]
torrent.seeders = int(connection_tags[0].text.replace(" ", ""))
torrent.leechers = int(connection_tags[1].text.replace(" ", ""))
torrent.completed = int(connection_tags[2].text.replace(" ", ""))
torrent.name = informations_tag[0].find_all("td")[1].text
torrent.size = informations_tag[3].find_all("td")[1].text
torrent.uploader = informations_tag[5].find_all("td")[1].text
mydatetime = re.search(
"([0-9]*\/[0-9]*\/[0-9]* [0-9]*:[0-9]*)",
informations_tag[6].find_all("td")[1].text,
0,
).group(0)
torrent.uploaded_datetime = datetime.datetime.strptime(
mydatetime, "%d/%m/%Y %H:%M"
)
message_tags = torrent_page.find_all("div", {"class": "message"})
for message_tag in message_tags:
torrent_comment = TorrentComment()
torrent_comment.author = message_tag.find("a").text
torrent_comment.posted = message_tag.find("strong").text
torrent_comment.text = message_tag.find(
"span", {"id": "comment_text"}
).text.strip()
torrent.comments.append(torrent_comment)
torrents.append(torrent)
torrent_id = torrent_page.find("form", {"id": "report-torrent"}).find(
"input", {"type": "hidden", "name": "target"}
)["value"]
self.driver.get(torrent_url)
WebDriverWait(self.driver, 30000).until(
EC.presence_of_element_located((By.CSS_SELECTOR, "#informationsContainer"))
)
files_page = BeautifulSoup(self.driver.page_source, features="lxml")
file_tags = files_page.find_all("tr")
for file_tag in file_tags:
torrent_file = TorrentFile()
td_tags = file_tag.find_all("td")
torrent_file.file_size = (
td_tags[0]
.text.replace("\\r", "")
.replace("\\n", "")
.replace("\\t", "")
.strip()
)
torrent_file.file_name = (
td_tags[1]
.text.replace("\\r", "")
.replace("\\n", "")
.replace("\\t", "")
.replace("\\", "")
.replace(" ", "")
.strip()
)
torrent.files.append(torrent_file)
return torrent
def most_completed(self):
"""
Return the most completed torrents url (TOP 100)
"""
self.driver.get(YGGTORRENT_MOST_COMPLETED_URL)
WebDriverWait(self.driver, 30000).until(
EC.presence_of_element_located(
(By.CSS_SELECTOR, "#DataTables_Table_0_wrapper")
)
)
torrents_url = []
root = BeautifulSoup(self.driver.page_source, features="lxml")
tbody_element = root.find("tbody")
tr_elements = tbody_element.find_all("tr")
for tr_element in tr_elements:
a_elements = tr_element.find_all("a")
a_element = a_elements[1]
torrents_url.append(a_element["href"])
return torrents_url
def get_torrents_url(self, search_url, parameters):
"""
Return
"""
self.driver.get(search_url)
WebDriverWait(self.driver, 30000).until(
EC.presence_of_element_located((By.CSS_SELECTOR, "#criteriarecherche"))
)
search_page = BeautifulSoup(self.driver.page_source, features="lxml")
pagination = search_page.find("ul", {"class": "pagination"})
if pagination is None:
limit_page = 1
else:
pagination_item = pagination.find_all("a")
limit_page = int(pagination_item[-1]["data-ci-pagination-page"])
torrents = []
for page in range(0, limit_page):
parameters["page"] = page * TORRENT_PER_PAGE
search_url = create_search_url(parameters)
self.driver.get(search_url)
WebDriverWait(self.driver, 30000).until(
EC.presence_of_element_located(
(By.CSS_SELECTOR, "#over-18-notification")
)
)
search_page = BeautifulSoup(self.driver.page_source, features="lxml")
torrents_tag = search_page.findAll("a", {"id": "torrent_name"})
for torrent_tag in torrents_tag:
torrents.append(torrent_tag["href"])
return torrents
def download_from_torrent_url(self, torrent_url=None, destination_path="./"):
if torrent_url is not None:
self.driver.get(torrent_url)
WebDriverWait(self.driver, 30000).until(
EC.presence_of_element_located((By.CSS_SELECTOR, "#title"))
)
download_button = self.driver.find_element_by_css_selector("a.butt")
self.driver.execute_script("arguments[0].click();", download_button)
def download_from_torrent(self, torrent=None, destination_path="./"):
if torrent is not None:
self.download_from_torrent_url(torrent.url)
def create_search_url(parameters):
"""
Return a formated URL for torrent's search
"""
formated_search_url = YGGTORRENT_SEARCH_URL
if "name" in parameters:
formated_search_url += parameters["name"].replace(" ", "+")
if "page" in parameters:
formated_search_url += YGGTORRENT_SEARCH_URL_PAGE
formated_search_url += str(parameters["page"])
if "descriptions" in parameters:
formated_search_url += YGGTORRENT_SEARCH_URL_DESCRIPTION
for description in parameters["descriptions"]:
formated_search_url += description
formated_search_url += "+"
if "files" in parameters:
formated_search_url += YGGTORRENT_SEARCH_URL_FILE
for file in parameters["files"]:
formated_search_url += file
formated_search_url += "+"
if "uploader" in parameters:
formated_search_url += YGGTORRENT_SEARCH_URL_UPLOADER
formated_search_url += parameters["uploader"]
if "sort" in parameters:
formated_search_url += YGGTORRENT_SEARCH_URL_SORT
formated_search_url += parameters["sort"]
if "order" in parameters:
formated_search_url += YGGTORRENT_SEARCH_URL_ORDER
formated_search_url += parameters["order"]
if "category" in parameters:
for category in categories:
if parameters["category"] == category["name"]:
formated_search_url += YGGTORRENT_SEARCH_URL_CATEGORY
formated_search_url += category["id"]
if "subcategory" in parameters:
for subcategory in category["subcategories"]:
if parameters["subcategory"] == subcategory["name"]:
formated_search_url += YGGTORRENT_SEARCH_URL_SUB_CATEGORY
formated_search_url += subcategory["id"]
if "options" in parameters:
for key, values in parameters["options"].items():
for option in subcategory["options"]:
if key == option["name"]:
for searched_value in values:
for index, value in enumerate(
option["values"]
):
if searched_value == value:
formated_search_url += (
"&option_"
)
formated_search_url += option[
"name"
]
# options_index.append(index)
if "multiple" in option:
formated_search_url += (
"%3Amultiple"
)
formated_search_url += "[]="
formated_search_url += str(
index + 1
)
formated_search_url += YGGTORRENT_SEARCH_URL_DO
formated_search_url += "search"
return formated_search_url