Build ownmade yggcrawler
This commit is contained in:
parent
b80b27f7e5
commit
60fee6227f
6
crawl.py
6
crawl.py
|
@ -3,7 +3,7 @@ import json
|
|||
import sys
|
||||
import login
|
||||
|
||||
from yggtorrentscraper import YggTorrentScraper
|
||||
from yggcrawl import YggTorrentScraper
|
||||
scraper = YggTorrentScraper(requests.session())
|
||||
from yggtorrentscraper import set_yggtorrent_tld
|
||||
set_yggtorrent_tld("se")
|
||||
|
@ -33,7 +33,7 @@ most_completed = scraper.most_completed()
|
|||
|
||||
if(scraper.login(login.user, login.passwd)):
|
||||
print("Login success")
|
||||
first_torrent = research[0]
|
||||
scraper.download_from_torrent_url(first_torrent)
|
||||
else:
|
||||
print("Login failed")
|
||||
|
||||
#scraper.download_from_torrent_url('')
|
||||
|
|
|
@ -15,8 +15,8 @@ sbotc() {
|
|||
# Install YGGTorrentScraper
|
||||
yggts() {
|
||||
echo -e "${c_yellow}Installing YGGTorrentScraper...$c_"
|
||||
[[ -z $(which pip3) ]] && sudo apt install python3-pip
|
||||
pip3 install yggtorrentscraper
|
||||
# [[ -z $(which pip3) ]] && sudo apt install python3-pip
|
||||
#pip3 install yggtorrentscraper
|
||||
}
|
||||
|
||||
torrengo() {
|
||||
|
|
|
@ -0,0 +1,11 @@
|
|||
"""
|
||||
__init__.py main
|
||||
"""
|
||||
|
||||
from .yggtorrentscraper import (
|
||||
YggTorrentScraper,
|
||||
set_yggtorrent_tld,
|
||||
get_yggtorrent_tld,
|
||||
)
|
||||
from .torrent import Torrent, TorrentComment, TorrentFile
|
||||
from .categories import categories
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,26 @@
|
|||
import unittest
|
||||
|
||||
from ..yggtorrentscraper import (
|
||||
YggTorrentScraper,
|
||||
set_yggtorrent_tld,
|
||||
get_yggtorrent_tld,
|
||||
)
|
||||
|
||||
|
||||
class TestChangeYggtorrentTLD(unittest.TestCase):
|
||||
current_yggtorrent_tld = get_yggtorrent_tld()
|
||||
|
||||
def test_read_tld(self):
|
||||
self.current_yggtorrent_tld = get_yggtorrent_tld()
|
||||
|
||||
self.assertTrue(self.current_yggtorrent_tld == "se")
|
||||
|
||||
def test_set_yggtorrent_tld(self):
|
||||
|
||||
set_yggtorrent_tld("newtld")
|
||||
|
||||
self.assertTrue(get_yggtorrent_tld() == "newtld")
|
||||
pass
|
||||
|
||||
def tearDown(self):
|
||||
set_yggtorrent_tld(self.current_yggtorrent_tld)
|
|
@ -0,0 +1,65 @@
|
|||
import os
|
||||
import shutil
|
||||
import unittest
|
||||
|
||||
import requests
|
||||
|
||||
from ..yggtorrentscraper import YggTorrentScraper
|
||||
|
||||
|
||||
class TestDownload(unittest.TestCase):
|
||||
scraper = None
|
||||
destination_path = None
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(TestDownload, self).__init__(*args, **kwargs)
|
||||
yggtorrent_identifiant = os.environ.get("YGGTORRENT_IDENTIFIANT")
|
||||
yggtorrent_password = os.environ.get("YGGTORRENT_PASSWORD")
|
||||
|
||||
self.destination_path = os.path.join(
|
||||
".", "yggtorrentscraper", "tests", "test_download"
|
||||
)
|
||||
|
||||
self.scraper = YggTorrentScraper(requests.session())
|
||||
|
||||
self.scraper.login(yggtorrent_identifiant, yggtorrent_password)
|
||||
|
||||
def test_download_from_torrent(self):
|
||||
most_completed = self.scraper.most_completed()
|
||||
|
||||
torrent = self.scraper.extract_details(most_completed[0])
|
||||
|
||||
self.assertTrue(torrent.url is not None)
|
||||
|
||||
file_full_path = self.scraper.download_from_torrent(
|
||||
torrent=torrent, destination_path=self.destination_path
|
||||
)
|
||||
|
||||
self.assertTrue(os.path.getsize(file_full_path) > 1000)
|
||||
|
||||
def test_download_from_torrent_url(self):
|
||||
file_full_path = self.scraper.download_from_torrent_url(
|
||||
torrent_url="https://www2.yggtorrent.pe/torrent/filmvideo/serie-tv/440445-game-of-thrones-s08e02-multi-1080p-amzn-web-dl-dd5-1-x264-ark01",
|
||||
destination_path=self.destination_path,
|
||||
)
|
||||
|
||||
self.assertTrue(os.path.getsize(file_full_path) > 1000)
|
||||
|
||||
def test_download_from_torrent_download_url(self):
|
||||
most_completed = self.scraper.most_completed()
|
||||
|
||||
torrent = self.scraper.extract_details(most_completed[0])
|
||||
|
||||
self.assertTrue(torrent.url is not None)
|
||||
|
||||
file_full_path = self.scraper.download_from_torrent_download_url(
|
||||
torrent_url=torrent.url, destination_path=self.destination_path
|
||||
)
|
||||
|
||||
self.assertTrue(os.path.getsize(file_full_path) > 1000)
|
||||
|
||||
def tearDown(self):
|
||||
if os.path.exists(self.destination_path):
|
||||
shutil.rmtree(self.destination_path, ignore_errors=True)
|
||||
|
||||
self.scraper.logout()
|
|
@ -0,0 +1,60 @@
|
|||
import os
|
||||
import unittest
|
||||
|
||||
import requests
|
||||
|
||||
from ..yggtorrentscraper import YggTorrentScraper
|
||||
|
||||
|
||||
class TestExtractDetails(unittest.TestCase):
|
||||
scraper = YggTorrentScraper(requests.session())
|
||||
|
||||
def test_extract_details(self):
|
||||
torrent = self.scraper.extract_details(
|
||||
"https://www2.yggtorrent.pe/torrent/filmvideo/serie-tv/440445-game-of-thrones-s08e02-multi-1080p-amzn-web-dl-dd5-1-x264-ark01"
|
||||
)
|
||||
|
||||
self.assertTrue(torrent.name is not None)
|
||||
self.assertTrue(torrent.uploaded_datetime is not None)
|
||||
self.assertTrue(torrent.size is not None)
|
||||
self.assertTrue(torrent.uploader is not None)
|
||||
|
||||
self.assertTrue(len(torrent.keywords) > 0)
|
||||
|
||||
self.assertTrue(torrent.completed > -1)
|
||||
self.assertTrue(torrent.seeders > -1)
|
||||
self.assertTrue(torrent.leechers > -1)
|
||||
|
||||
self.assertTrue(torrent.url is None)
|
||||
|
||||
self.assertTrue(len(torrent.files) > 0)
|
||||
self.assertTrue(len(torrent.comments) > 0)
|
||||
|
||||
def test_extract_details_logged(self):
|
||||
yggtorrent_identifiant = os.environ.get("YGGTORRENT_IDENTIFIANT")
|
||||
yggtorrent_password = os.environ.get("YGGTORRENT_PASSWORD")
|
||||
|
||||
self.scraper.login(yggtorrent_identifiant, yggtorrent_password)
|
||||
|
||||
torrent = self.scraper.extract_details(
|
||||
"https://www2.yggtorrent.pe/torrent/filmvideo/serie-tv/440445-game-of-thrones-s08e02-multi-1080p-amzn-web-dl-dd5-1-x264-ark01"
|
||||
)
|
||||
|
||||
self.assertTrue(torrent.name is not None)
|
||||
self.assertTrue(torrent.uploaded_datetime is not None)
|
||||
self.assertTrue(torrent.size is not None)
|
||||
self.assertTrue(torrent.uploader is not None)
|
||||
|
||||
self.assertTrue(len(torrent.keywords) > 0)
|
||||
|
||||
self.assertTrue(torrent.completed > -1)
|
||||
self.assertTrue(torrent.seeders > -1)
|
||||
self.assertTrue(torrent.leechers > -1)
|
||||
|
||||
self.assertTrue(torrent.url is not None)
|
||||
|
||||
self.assertTrue(len(torrent.files) > 0)
|
||||
self.assertTrue(len(torrent.comments) > 0)
|
||||
|
||||
def tearDown(self):
|
||||
self.scraper.logout()
|
|
@ -0,0 +1,30 @@
|
|||
import os
|
||||
import unittest
|
||||
|
||||
import requests
|
||||
|
||||
from ..yggtorrentscraper import YggTorrentScraper
|
||||
|
||||
|
||||
class TestAuthentification(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.scraper = YggTorrentScraper(requests.session())
|
||||
|
||||
def test_login_success(self):
|
||||
yggtorrent_identifiant = os.environ.get("YGGTORRENT_IDENTIFIANT")
|
||||
yggtorrent_password = os.environ.get("YGGTORRENT_PASSWORD")
|
||||
|
||||
self.assertTrue(yggtorrent_identifiant is not None)
|
||||
self.assertTrue(yggtorrent_password is not None)
|
||||
|
||||
self.assertTrue(self.scraper.login(yggtorrent_identifiant, yggtorrent_password))
|
||||
|
||||
self.scraper.logout()
|
||||
|
||||
def test_login_failed(self):
|
||||
self.assertFalse(self.scraper.login("myidentifiant", "mypassword"))
|
||||
|
||||
self.scraper.logout()
|
||||
|
||||
def tearDown(self):
|
||||
self.scraper.logout()
|
|
@ -0,0 +1,27 @@
|
|||
import os
|
||||
import unittest
|
||||
|
||||
import requests
|
||||
|
||||
from ..yggtorrentscraper import YggTorrentScraper
|
||||
|
||||
|
||||
class TestLogout(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.scraper = YggTorrentScraper(requests.session())
|
||||
|
||||
def test_logout_success(self):
|
||||
yggtorrent_identifiant = os.environ.get("YGGTORRENT_IDENTIFIANT")
|
||||
yggtorrent_password = os.environ.get("YGGTORRENT_PASSWORD")
|
||||
|
||||
self.assertTrue(self.scraper.login(yggtorrent_identifiant, yggtorrent_password))
|
||||
|
||||
self.assertTrue(self.scraper.logout())
|
||||
|
||||
def test_logout_failed(self):
|
||||
self.scraper.login("myidentifiant", "mypassword")
|
||||
|
||||
self.assertFalse(self.scraper.logout())
|
||||
|
||||
def tearDown(self):
|
||||
self.scraper.logout()
|
|
@ -0,0 +1,16 @@
|
|||
import unittest
|
||||
|
||||
import requests
|
||||
from ..yggtorrentscraper import YggTorrentScraper
|
||||
|
||||
|
||||
class TestMostCompleted(unittest.TestCase):
|
||||
scraper = YggTorrentScraper(session=requests.session())
|
||||
|
||||
def test_most_completed(self):
|
||||
most_completed = self.scraper.most_completed()
|
||||
|
||||
self.assertEqual(len(most_completed), 100)
|
||||
|
||||
def tearDown(self):
|
||||
self.scraper.logout()
|
|
@ -0,0 +1,70 @@
|
|||
import unittest
|
||||
|
||||
import requests
|
||||
|
||||
from ..yggtorrentscraper import YggTorrentScraper
|
||||
|
||||
|
||||
class TestResearch(unittest.TestCase):
|
||||
scraper = YggTorrentScraper(requests.session())
|
||||
|
||||
torrent_name = "walking dead s09"
|
||||
torrent_uploader = "brandit"
|
||||
|
||||
torrent_name_2 = "blue oyster cult"
|
||||
|
||||
def test_search_by_name(self):
|
||||
torrents_url = self.scraper.search({"name": self.torrent_name})
|
||||
|
||||
torrent = self.scraper.extract_details(torrents_url[0])
|
||||
|
||||
splited_searched_name = self.torrent_name.split(" ")
|
||||
|
||||
for word in splited_searched_name:
|
||||
self.assertTrue(word.lower() in torrent.name.lower())
|
||||
|
||||
def test_search_by_uploader(self):
|
||||
torrents_url = self.scraper.search(
|
||||
{"name": self.torrent_name, "uploader": self.torrent_uploader}
|
||||
)
|
||||
|
||||
for torrent_url in torrents_url:
|
||||
torrent = self.scraper.extract_details(torrent_url)
|
||||
|
||||
self.assertTrue(torrent.uploader.lower() == self.torrent_uploader.lower())
|
||||
|
||||
def test_search_sort_completed_asc(self):
|
||||
torrents_url = self.scraper.search(
|
||||
{"name": "blue oyster cult", "sort": "completed", "order": "asc"}
|
||||
)
|
||||
|
||||
torrent_old = None
|
||||
|
||||
for torrent_url in torrents_url:
|
||||
torrent = self.scraper.extract_details(torrent_url)
|
||||
|
||||
if torrent_old is not None:
|
||||
self.assertTrue(torrent_old.completed <= torrent.completed)
|
||||
torrent_old = torrent
|
||||
|
||||
def test_search_sort_completed_desc(self):
|
||||
torrents_url = self.scraper.search(
|
||||
{"name": "blue oyster cult", "sort": "completed", "order": "desc"}
|
||||
)
|
||||
|
||||
torrent_old = None
|
||||
|
||||
for torrent_url in torrents_url:
|
||||
torrent = self.scraper.extract_details(torrent_url)
|
||||
|
||||
if torrent_old is not None:
|
||||
self.assertTrue(torrent_old.completed >= torrent.completed)
|
||||
torrent_old = torrent
|
||||
|
||||
def test_search_multiple_page(self):
|
||||
torrents_url = self.scraper.search({"name": "walking dead"})
|
||||
|
||||
self.assertTrue(len(torrents_url) > 200)
|
||||
|
||||
def tearDown(self):
|
||||
self.scraper.logout()
|
|
@ -0,0 +1,32 @@
|
|||
import os
|
||||
import unittest
|
||||
|
||||
import requests
|
||||
|
||||
from ..yggtorrentscraper import YggTorrentScraper
|
||||
|
||||
|
||||
class TestTorrent(unittest.TestCase):
|
||||
scraper = YggTorrentScraper(requests.session())
|
||||
|
||||
def test_str(self):
|
||||
torrent_url = self.scraper.most_completed()[0]
|
||||
|
||||
torrent = self.scraper.extract_details(torrent_url)
|
||||
|
||||
torrent.__str__(files=True, comments=True)
|
||||
|
||||
def test_str_logged(self):
|
||||
yggtorrent_identifiant = os.environ.get("YGGTORRENT_IDENTIFIANT")
|
||||
yggtorrent_password = os.environ.get("YGGTORRENT_PASSWORD")
|
||||
|
||||
self.scraper.login(yggtorrent_identifiant, yggtorrent_password)
|
||||
|
||||
torrent_url = self.scraper.most_completed()[0]
|
||||
|
||||
torrent = self.scraper.extract_details(torrent_url)
|
||||
|
||||
torrent.__str__(files=True, comments=True)
|
||||
|
||||
def tearDown(self):
|
||||
self.scraper.logout()
|
|
@ -0,0 +1,146 @@
|
|||
import os
|
||||
|
||||
|
||||
class Torrent:
|
||||
"""
|
||||
Torrent entity
|
||||
"""
|
||||
|
||||
name = None
|
||||
uploaded_datetime = None
|
||||
size = None
|
||||
uploader = None
|
||||
|
||||
keywords = []
|
||||
|
||||
completed = -1
|
||||
seeders = -1
|
||||
leechers = -1
|
||||
|
||||
url = None
|
||||
|
||||
files = []
|
||||
comments = []
|
||||
|
||||
def __str__(self, comments=False, files=False):
|
||||
to_string = ""
|
||||
|
||||
to_string += "Name : "
|
||||
to_string += self.name
|
||||
to_string += os.linesep
|
||||
|
||||
to_string += "Url : "
|
||||
|
||||
if self.url is not None:
|
||||
to_string += self.url
|
||||
else:
|
||||
to_string += "N/A"
|
||||
|
||||
to_string += os.linesep
|
||||
to_string += os.linesep
|
||||
|
||||
to_string += f"Keywords ({len(self.keywords)}) : "
|
||||
to_string += os.linesep
|
||||
|
||||
for keyword in self.keywords:
|
||||
to_string += f"- {keyword}"
|
||||
to_string += os.linesep
|
||||
|
||||
to_string += os.linesep
|
||||
|
||||
to_string += "Uploaded : "
|
||||
to_string += str(self.uploaded_datetime)
|
||||
to_string += os.linesep
|
||||
|
||||
to_string += "Size : "
|
||||
to_string += str(self.size)
|
||||
to_string += os.linesep
|
||||
|
||||
to_string += "Uploader : "
|
||||
to_string += self.uploader
|
||||
to_string += os.linesep
|
||||
|
||||
to_string += "Completed : "
|
||||
to_string += str(self.completed)
|
||||
to_string += os.linesep
|
||||
|
||||
to_string += "Seeders : "
|
||||
to_string += str(self.seeders)
|
||||
to_string += os.linesep
|
||||
|
||||
to_string += "Leechers : "
|
||||
to_string += str(self.leechers)
|
||||
to_string += os.linesep
|
||||
|
||||
to_string += os.linesep
|
||||
|
||||
to_string += f"Files ({len(self.files)})"
|
||||
to_string += os.linesep
|
||||
|
||||
if files:
|
||||
for file in self.files:
|
||||
to_string += str(file)
|
||||
to_string += os.linesep
|
||||
|
||||
to_string += os.linesep
|
||||
|
||||
to_string += f"Comments ({len(self.comments)})"
|
||||
to_string += os.linesep
|
||||
|
||||
if comments:
|
||||
for comment in self.comments:
|
||||
to_string += str(comment)
|
||||
to_string += os.linesep
|
||||
|
||||
return to_string
|
||||
|
||||
|
||||
class TorrentFile:
|
||||
|
||||
"""
|
||||
Torrent's file entity
|
||||
"""
|
||||
|
||||
size = ""
|
||||
file_name = ""
|
||||
|
||||
def __str__(self):
|
||||
to_string = ""
|
||||
|
||||
to_string += "size : "
|
||||
to_string += self.size
|
||||
to_string += os.linesep
|
||||
|
||||
to_string += "file_name : "
|
||||
to_string += self.file_name
|
||||
to_string += os.linesep
|
||||
|
||||
return to_string
|
||||
|
||||
|
||||
class TorrentComment:
|
||||
|
||||
"""
|
||||
Torrent's comment entity
|
||||
"""
|
||||
|
||||
author = ""
|
||||
posted = ""
|
||||
text = ""
|
||||
|
||||
def __str__(self):
|
||||
to_string = ""
|
||||
|
||||
to_string += "Author : "
|
||||
to_string += self.author
|
||||
to_string += os.linesep
|
||||
|
||||
to_string += "Posted : "
|
||||
to_string += str(self.posted)
|
||||
to_string += os.linesep
|
||||
|
||||
to_string += "Text : "
|
||||
to_string += str(self.text)
|
||||
to_string += os.linesep
|
||||
|
||||
return to_string
|
|
@ -0,0 +1,453 @@
|
|||
import datetime
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
|
||||
import requests
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
from .torrent import Torrent, TorrentComment, TorrentFile
|
||||
from .categories import categories
|
||||
|
||||
YGGTORRENT_TLD = "se"
|
||||
|
||||
YGGTORRENT_BASE_URL = f"https://www2.yggtorrent.{YGGTORRENT_TLD}"
|
||||
|
||||
YGGTORRENT_LOGIN_URL = f"{YGGTORRENT_BASE_URL}/user/login"
|
||||
YGGTORRENT_LOGOUT_URL = f"{YGGTORRENT_BASE_URL}/user/logout?attempt=1"
|
||||
|
||||
YGGTORRENT_SEARCH_URL = f"{YGGTORRENT_BASE_URL}/engine/search?name="
|
||||
|
||||
logger = logging.getLogger("yggtorrentscraper")
|
||||
|
||||
YGGTORRENT_DOMAIN = f".yggtorrent.{YGGTORRENT_TLD}"
|
||||
YGGTORRENT_TOKEN_COOKIE = "ygg_"
|
||||
|
||||
YGGTORRENT_SEARCH_URL_DESCRIPTION = "&description="
|
||||
YGGTORRENT_SEARCH_URL_FILE = "&file="
|
||||
YGGTORRENT_SEARCH_URL_UPLOADER = "&uploader="
|
||||
YGGTORRENT_SEARCH_URL_CATEGORY = "&category="
|
||||
YGGTORRENT_SEARCH_URL_SUB_CATEGORY = "&sub_category="
|
||||
YGGTORRENT_SEARCH_URL_ORDER = "&order="
|
||||
YGGTORRENT_SEARCH_URL_SORT = "&sort="
|
||||
YGGTORRENT_SEARCH_URL_DO = "&do="
|
||||
YGGTORRENT_SEARCH_URL_PAGE = "&page="
|
||||
|
||||
YGGTORRENT_SEARCH_URL_DESCRIPTION = "&description="
|
||||
YGGTORRENT_SEARCH_URL_FILE = "&file="
|
||||
YGGTORRENT_SEARCH_URL_UPLOADER = "&uploader="
|
||||
YGGTORRENT_SEARCH_URL_CATEGORY = "&category="
|
||||
YGGTORRENT_SEARCH_URL_SUB_CATEGORY = "&sub_category="
|
||||
YGGTORRENT_SEARCH_URL_ORDER = "&order="
|
||||
YGGTORRENT_SEARCH_URL_SORT = "&sort="
|
||||
YGGTORRENT_SEARCH_URL_DO = "&do="
|
||||
YGGTORRENT_SEARCH_URL_PAGE = "&page="
|
||||
|
||||
YGGTORRENT_GET_FILES = f"{YGGTORRENT_BASE_URL}/engine/get_files?torrent="
|
||||
YGGTORRENT_GET_INFO = f"https://www2.yggtorrentchg/engine/get_nfo?torrent="
|
||||
|
||||
YGGTORRENT_MOST_COMPLETED_URL = f"{YGGTORRENT_BASE_URL}/engine/mostcompleted"
|
||||
|
||||
TORRENT_PER_PAGE = 50
|
||||
|
||||
YGGTORRENT_FILES_URL = f"{YGGTORRENT_BASE_URL}/engine/get_files?torrent="
|
||||
|
||||
|
||||
def set_yggtorrent_tld(yggtorrent_tld=None):
|
||||
"""
|
||||
Redefine all string variable according to new TLD
|
||||
"""
|
||||
|
||||
global YGGTORRENT_TLD
|
||||
global YGGTORRENT_BASE_URL
|
||||
global YGGTORRENT_LOGIN_URL
|
||||
global YGGTORRENT_SEARCH_URL
|
||||
global YGGTORRENT_DOMAIN
|
||||
global YGGTORRENT_GET_FILES
|
||||
global YGGTORRENT_GET_INFO
|
||||
global YGGTORRENT_MOST_COMPLETED_URL
|
||||
global YGGTORRENT_FILES_URL
|
||||
|
||||
YGGTORRENT_TLD = yggtorrent_tld
|
||||
|
||||
YGGTORRENT_BASE_URL = f"https://www2.yggtorrent.{YGGTORRENT_TLD}"
|
||||
|
||||
YGGTORRENT_LOGIN_URL = f"{YGGTORRENT_BASE_URL}/user/login"
|
||||
YGGTORRENT_SEARCH_URL = f"{YGGTORRENT_BASE_URL}/user/logout"
|
||||
|
||||
YGGTORRENT_SEARCH_URL = f"{YGGTORRENT_BASE_URL}/engine/search?name="
|
||||
|
||||
YGGTORRENT_DOMAIN = ".yggtorrent.gg"
|
||||
|
||||
YGGTORRENT_GET_FILES = f"{YGGTORRENT_BASE_URL}/engine/get_files?torrent="
|
||||
YGGTORRENT_GET_INFO = f"https://www2.yggtorrentchg/engine/get_nfo?torrent="
|
||||
|
||||
YGGTORRENT_MOST_COMPLETED_URL = f"{YGGTORRENT_BASE_URL}/engine/mostcompleted"
|
||||
|
||||
YGGTORRENT_FILES_URL = f"{YGGTORRENT_BASE_URL}/engine/get_files?torrent="
|
||||
|
||||
|
||||
def get_yggtorrent_tld():
|
||||
return YGGTORRENT_TLD
|
||||
|
||||
|
||||
class YggTorrentScraper:
|
||||
session = None
|
||||
|
||||
def __init__(self, session):
|
||||
self.session = session
|
||||
|
||||
def login(self, identifiant, password):
|
||||
"""
|
||||
Login request with the specified identifiant and password, return an yggtorrent_token, necessary to download
|
||||
"""
|
||||
self.session.cookies.clear()
|
||||
|
||||
headers = {
|
||||
"Content-Type": "application/x-www-form-urlencoded",
|
||||
"User-Agent": "PostmanRuntime/7.17.1",
|
||||
"Accept": "*/*",
|
||||
"Cache-Control": "no-cache",
|
||||
"Host": f"www.yggtorrent.{YGGTORRENT_TLD}",
|
||||
"Accept-Encoding": "gzip, deflate",
|
||||
"Connection": "keep-alive",
|
||||
}
|
||||
|
||||
response = self.session.post(
|
||||
YGGTORRENT_LOGIN_URL,
|
||||
data={"id": identifiant, "pass": password},
|
||||
headers=headers,
|
||||
)
|
||||
|
||||
logger.debug("status_code : %s", response.status_code)
|
||||
|
||||
yggtorrent_token = None
|
||||
|
||||
if response.status_code == 200:
|
||||
logger.debug("Login successful")
|
||||
yggtorrent_token = response.cookies.get_dict()[YGGTORRENT_TOKEN_COOKIE]
|
||||
|
||||
cookie = requests.cookies.create_cookie(
|
||||
domain=YGGTORRENT_DOMAIN,
|
||||
name=YGGTORRENT_TOKEN_COOKIE,
|
||||
value=yggtorrent_token,
|
||||
)
|
||||
|
||||
self.session.cookies.set_cookie(cookie)
|
||||
|
||||
return True
|
||||
else:
|
||||
logger.debug("Login failed")
|
||||
|
||||
return False
|
||||
|
||||
def logout(self):
|
||||
"""
|
||||
Logout request
|
||||
"""
|
||||
response = self.session.get(YGGTORRENT_LOGOUT_URL)
|
||||
|
||||
self.session.cookies.clear()
|
||||
|
||||
logger.debug("status_code : %s", response.status_code)
|
||||
|
||||
if response.status_code == 200:
|
||||
logger.debug("Logout successful")
|
||||
|
||||
return True
|
||||
else:
|
||||
logger.debug("Logout failed")
|
||||
|
||||
return False
|
||||
|
||||
def search(self, parameters):
|
||||
search_url = create_search_url(parameters)
|
||||
|
||||
torrents_url = self.get_torrents_url(search_url, parameters)
|
||||
|
||||
return torrents_url
|
||||
|
||||
def extract_details(self, torrent_url):
|
||||
"""
|
||||
Extract informations from torrent's url
|
||||
"""
|
||||
logger.debug("torrent_url : %s", torrent_url)
|
||||
|
||||
torrents = []
|
||||
|
||||
response = self.session.get(torrent_url)
|
||||
|
||||
torrent_page = BeautifulSoup(response.content, features="lxml")
|
||||
|
||||
torrent = Torrent()
|
||||
|
||||
term_tags = torrent_page.find_all("a", {"class": "term"})
|
||||
|
||||
for term_tag in term_tags:
|
||||
torrent.keywords.append(term_tag.text)
|
||||
|
||||
connection_tags = torrent_page.find("tr", {"id": "adv_search_cat"}).find_all(
|
||||
"strong"
|
||||
)
|
||||
|
||||
informations_tag = (
|
||||
torrent_page.find("table", {"class": "informations"})
|
||||
.find("tbody")
|
||||
.find_all("tr")
|
||||
)
|
||||
|
||||
download_button = torrent_page.find("a", {"class": "butt"})
|
||||
|
||||
if download_button.has_attr("href"):
|
||||
torrent.url = download_button["href"]
|
||||
|
||||
torrent.seeders = int(connection_tags[0].text.replace(" ", ""))
|
||||
torrent.leechers = int(connection_tags[1].text.replace(" ", ""))
|
||||
torrent.completed = int(connection_tags[2].text.replace(" ", ""))
|
||||
|
||||
torrent.name = informations_tag[0].find_all("td")[1].text
|
||||
torrent.size = informations_tag[3].find_all("td")[1].text
|
||||
torrent.uploader = informations_tag[5].find_all("td")[1].text
|
||||
|
||||
mydatetime = re.search(
|
||||
"([0-9]*\/[0-9]*\/[0-9]* [0-9]*:[0-9]*)",
|
||||
informations_tag[6].find_all("td")[1].text,
|
||||
0,
|
||||
).group(0)
|
||||
|
||||
torrent.uploaded_datetime = datetime.datetime.strptime(
|
||||
mydatetime, "%d/%m/%Y %H:%M"
|
||||
)
|
||||
|
||||
message_tags = torrent_page.find_all("div", {"class": "message"})
|
||||
|
||||
for message_tag in message_tags:
|
||||
torrent_comment = TorrentComment()
|
||||
|
||||
torrent_comment.author = message_tag.find("a").text
|
||||
torrent_comment.posted = message_tag.find("strong").text
|
||||
torrent_comment.text = message_tag.find(
|
||||
"span", {"id": "comment_text"}
|
||||
).text.strip()
|
||||
|
||||
torrent.comments.append(torrent_comment)
|
||||
|
||||
torrents.append(torrent)
|
||||
|
||||
torrent_id = torrent_page.find("form", {"id": "report-torrent"}).find(
|
||||
"input", {"type": "hidden", "name": "target"}
|
||||
)["value"]
|
||||
|
||||
response = self.session.get(YGGTORRENT_GET_FILES + torrent_id)
|
||||
|
||||
files_page = BeautifulSoup(response.content, features="lxml")
|
||||
|
||||
file_tags = files_page.find_all("tr")
|
||||
|
||||
for file_tag in file_tags:
|
||||
torrent_file = TorrentFile()
|
||||
|
||||
td_tags = file_tag.find_all("td")
|
||||
|
||||
torrent_file.file_size = (
|
||||
td_tags[0]
|
||||
.text.replace("\\r", "")
|
||||
.replace("\\n", "")
|
||||
.replace("\\t", "")
|
||||
.strip()
|
||||
)
|
||||
torrent_file.file_name = (
|
||||
td_tags[1]
|
||||
.text.replace("\\r", "")
|
||||
.replace("\\n", "")
|
||||
.replace("\\t", "")
|
||||
.replace("\\", "")
|
||||
.replace(" ", "")
|
||||
.strip()
|
||||
)
|
||||
|
||||
torrent.files.append(torrent_file)
|
||||
|
||||
return torrent
|
||||
|
||||
def most_completed(self):
|
||||
"""
|
||||
Return the most completed torrents url (TOP 100)
|
||||
"""
|
||||
|
||||
header = {"Accept": "application/json, text/javascript, */*; q=0.01"}
|
||||
self.session.post(YGGTORRENT_MOST_COMPLETED_URL, headers=header)
|
||||
|
||||
json_response = self.session.post(
|
||||
YGGTORRENT_MOST_COMPLETED_URL, headers=header
|
||||
).json()
|
||||
|
||||
torrents_url = []
|
||||
|
||||
for json_item in json_response:
|
||||
root = BeautifulSoup(json_item[1], features="lxml")
|
||||
|
||||
a_tag = root.find("a")
|
||||
|
||||
torrents_url.append(a_tag["href"])
|
||||
|
||||
return torrents_url
|
||||
|
||||
def get_torrents_url(self, search_url, parameters):
|
||||
"""
|
||||
Return
|
||||
"""
|
||||
|
||||
response = self.session.get(search_url)
|
||||
|
||||
search_page = BeautifulSoup(response.content, features="lxml")
|
||||
|
||||
pagination = search_page.find("ul", {"class": "pagination"})
|
||||
|
||||
if pagination is None:
|
||||
limit_page = 1
|
||||
else:
|
||||
pagination_item = pagination.find_all("a")
|
||||
|
||||
limit_page = int(pagination_item[-1]["data-ci-pagination-page"])
|
||||
|
||||
torrents = []
|
||||
|
||||
for page in range(0, limit_page):
|
||||
parameters["page"] = page * TORRENT_PER_PAGE
|
||||
|
||||
search_url = create_search_url(parameters)
|
||||
|
||||
response = self.session.get(search_url)
|
||||
|
||||
search_page = BeautifulSoup(response.content, features="lxml")
|
||||
|
||||
torrents_tag = search_page.findAll("a", {"id": "torrent_name"})
|
||||
|
||||
for torrent_tag in torrents_tag:
|
||||
torrents.append(torrent_tag["href"])
|
||||
|
||||
return torrents
|
||||
|
||||
#kopa
|
||||
def download_from_torrent_url(self, torrent_url=None, destination_path="./data/files/"):
|
||||
if torrent_url is not None:
|
||||
torrent = self.extract_details(torrent_url)
|
||||
|
||||
return self.download_from_torrent_download_url(
|
||||
torrent_url=torrent.url, destination_path=destination_path
|
||||
)
|
||||
|
||||
def download_from_torrent(self, torrent=None, destination_path="./data/files/"):
|
||||
if torrent is not None:
|
||||
return self.download_from_torrent_download_url(
|
||||
torrent_url=torrent.url, destination_path=destination_path
|
||||
)
|
||||
|
||||
def download_from_torrent_download_url(
|
||||
self, torrent_url=None, destination_path="./data/files/"
|
||||
):
|
||||
if torrent_url is None:
|
||||
raise Exception("Invalid torrent_url, make sure you are logged")
|
||||
|
||||
response = self.session.get(YGGTORRENT_BASE_URL + torrent_url)
|
||||
|
||||
temp_file_name = response.headers.get("content-disposition")
|
||||
|
||||
file_name = temp_file_name[temp_file_name.index("filename=") + 10 : -1]
|
||||
|
||||
if not os.path.exists(destination_path):
|
||||
os.makedirs(destination_path)
|
||||
|
||||
file_full_path = os.path.join(destination_path, file_name)
|
||||
|
||||
file = open(file_full_path, "wb")
|
||||
|
||||
file.write(response.content)
|
||||
|
||||
file.close()
|
||||
|
||||
return file_full_path
|
||||
|
||||
|
||||
def create_search_url(parameters):
|
||||
"""
|
||||
Return a formated URL for torrent's search
|
||||
"""
|
||||
|
||||
formated_search_url = YGGTORRENT_SEARCH_URL
|
||||
|
||||
if "name" in parameters:
|
||||
formated_search_url += parameters["name"].replace(" ", "+")
|
||||
|
||||
if "page" in parameters:
|
||||
formated_search_url += YGGTORRENT_SEARCH_URL_PAGE
|
||||
formated_search_url += str(parameters["page"])
|
||||
|
||||
if "descriptions" in parameters:
|
||||
formated_search_url += YGGTORRENT_SEARCH_URL_DESCRIPTION
|
||||
|
||||
for description in parameters["descriptions"]:
|
||||
formated_search_url += description
|
||||
formated_search_url += "+"
|
||||
|
||||
if "files" in parameters:
|
||||
formated_search_url += YGGTORRENT_SEARCH_URL_FILE
|
||||
|
||||
for file in parameters["files"]:
|
||||
formated_search_url += file
|
||||
formated_search_url += "+"
|
||||
|
||||
if "uploader" in parameters:
|
||||
formated_search_url += YGGTORRENT_SEARCH_URL_UPLOADER
|
||||
formated_search_url += parameters["uploader"]
|
||||
|
||||
if "sort" in parameters:
|
||||
formated_search_url += YGGTORRENT_SEARCH_URL_SORT
|
||||
formated_search_url += parameters["sort"]
|
||||
|
||||
if "order" in parameters:
|
||||
formated_search_url += YGGTORRENT_SEARCH_URL_ORDER
|
||||
formated_search_url += parameters["order"]
|
||||
|
||||
if "category" in parameters:
|
||||
for category in categories:
|
||||
if parameters["category"] == category["name"]:
|
||||
formated_search_url += YGGTORRENT_SEARCH_URL_CATEGORY
|
||||
formated_search_url += category["id"]
|
||||
|
||||
if "subcategory" in parameters:
|
||||
for subcategory in category["subcategories"]:
|
||||
if parameters["subcategory"] == subcategory["name"]:
|
||||
formated_search_url += YGGTORRENT_SEARCH_URL_SUB_CATEGORY
|
||||
formated_search_url += subcategory["id"]
|
||||
if "options" in parameters:
|
||||
for key, values in parameters["options"].items():
|
||||
for option in subcategory["options"]:
|
||||
if key == option["name"]:
|
||||
for searched_value in values:
|
||||
for index, value in enumerate(
|
||||
option["values"]
|
||||
):
|
||||
if searched_value == value:
|
||||
formated_search_url += (
|
||||
"&option_"
|
||||
)
|
||||
formated_search_url += option[
|
||||
"name"
|
||||
]
|
||||
# options_index.append(index)
|
||||
if "multiple" in option:
|
||||
formated_search_url += (
|
||||
"%3Amultiple"
|
||||
)
|
||||
|
||||
formated_search_url += "[]="
|
||||
formated_search_url += str(
|
||||
index + 1
|
||||
)
|
||||
|
||||
formated_search_url += YGGTORRENT_SEARCH_URL_DO
|
||||
formated_search_url += "search"
|
||||
|
||||
return formated_search_url
|
Loading…
Reference in New Issue