Compare commits

...

3 Commits

Author SHA1 Message Date
coletdjnz
7abee30b07
isort 2024-01-14 10:47:54 +13:00
coletdjnz
97df30b9b8
Display simplified list of known supported targets if they are not installed 2024-01-14 10:13:00 +13:00
coletdjnz
77292013f6
Add impersonate headers blacklist 2024-01-14 10:02:18 +13:00
4 changed files with 70 additions and 35 deletions

View File

@ -50,13 +50,13 @@ from yt_dlp.networking.exceptions import (
TransportError, TransportError,
UnsupportedRequest, UnsupportedRequest,
) )
from yt_dlp.networking.impersonate import ImpersonateRequestHandler, ImpersonateTarget from yt_dlp.networking.impersonate import (
from yt_dlp.utils._utils import _YDLLogger as FakeLogger ImpersonateRequestHandler,
from yt_dlp.utils import YoutubeDLError ImpersonateTarget,
from yt_dlp.utils.networking import (
HTTPHeaderDict,
std_headers,
) )
from yt_dlp.utils import YoutubeDLError
from yt_dlp.utils._utils import _YDLLogger as FakeLogger
from yt_dlp.utils.networking import HTTPHeaderDict, std_headers
TEST_DIR = os.path.dirname(os.path.abspath(__file__)) TEST_DIR = os.path.dirname(os.path.abspath(__file__))
@ -774,6 +774,21 @@ class TestHTTPImpersonateRequestHandler(TestRequestHandlerBase):
assert res.status == 200 assert res.status == 200
assert std_headers['user-agent'].lower() not in res.read().decode().lower() assert std_headers['user-agent'].lower() not in res.read().decode().lower()
@pytest.mark.parametrize('impersonate', [True, False])
def test_headers_blacklist(self, handler, impersonate):
with handler() as rh:
for header in rh._IMPERSONATE_HEADERS_BLACKLIST:
supported_target = rh.supported_targets[0]
res = validate_and_send(rh, Request(
f'http://127.0.0.1:{self.http_port}/headers',
headers={header: 'testvalue'}, extensions={'impersonate': supported_target} if impersonate else {}))
assert res.status == 200
sent_headers = res.read().decode()
if impersonate:
assert f'{header}: testvalue'.lower() not in sent_headers.lower()
else:
assert f'{header}: testvalue'.lower() in sent_headers.lower()
class TestUrllibRequestHandler(TestRequestHandlerBase): class TestUrllibRequestHandler(TestRequestHandlerBase):
@pytest.mark.parametrize('handler', ['Urllib'], indirect=True) @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
@ -936,27 +951,6 @@ class TestCurlCFFIRequestHandler(TestRequestHandlerBase):
# Check that user agent is added over ours # Check that user agent is added over ours
assert 'User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36' in res assert 'User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36' in res
@pytest.mark.parametrize('handler', ['CurlCFFI'], indirect=True)
def test_headers(self, handler):
with handler(headers=std_headers) as rh:
# Ensure curl-impersonate overrides our standard headers (usually added
res = validate_and_send(
rh, Request(f'http://127.0.0.1:{self.http_port}/headers', extensions={
'impersonate': ImpersonateTarget('safari')}, headers={'x-custom': 'test', 'sec-fetch-mode': 'custom'})).read().decode().lower()
assert std_headers['user-agent'].lower() not in res
assert std_headers['accept-language'].lower() not in res
assert std_headers['sec-fetch-mode'].lower() not in res
# other than UA, custom headers that differ from std_headers should be kept
assert 'sec-fetch-mode: custom' in res
assert 'x-custom: test' in res
# but when not impersonating don't remove std_headers
res = validate_and_send(
rh, Request(f'http://127.0.0.1:{self.http_port}/headers', headers={'x-custom': 'test'})).read().decode().lower()
# std_headers should be present
for k, v in std_headers.items():
assert f'{k}: {v}'.lower() in res
@pytest.mark.parametrize('raised,expected,match', [ @pytest.mark.parametrize('raised,expected,match', [
(lambda: curl_cffi.requests.errors.RequestsError( (lambda: curl_cffi.requests.errors.RequestsError(
'', code=curl_cffi.const.CurlECode.PARTIAL_FILE), IncompleteRead, None), '', code=curl_cffi.const.CurlECode.PARTIAL_FILE), IncompleteRead, None),

View File

@ -101,6 +101,7 @@ from .utils import (
SameFileError, SameFileError,
UnavailableVideoError, UnavailableVideoError,
UserNotLive, UserNotLive,
YoutubeDLError,
age_restricted, age_restricted,
args_to_str, args_to_str,
bug_reports_message, bug_reports_message,
@ -157,7 +158,6 @@ from .utils import (
windows_enable_vt_mode, windows_enable_vt_mode,
write_json_file, write_json_file,
write_string, write_string,
YoutubeDLError,
) )
from .utils._utils import _YDLLogger from .utils._utils import _YDLLogger
from .utils.networking import ( from .utils.networking import (

View File

@ -987,15 +987,39 @@ def _real_main(argv=None):
ydl._download_retcode = 100 ydl._download_retcode = 100
if opts.list_impersonate_targets: if opts.list_impersonate_targets:
known_targets = [
# List of simplified targets we know are supported,
# to help users know what dependencies may be required.
(ImpersonateTarget('chrome'), 'curl_cffi'),
(ImpersonateTarget('edge'), 'curl_cffi'),
(ImpersonateTarget('safari'), 'curl_cffi'),
(ImpersonateTarget('chrome', os='android'), 'curl_cffi'),
]
available_targets = ydl.get_available_impersonate_targets() available_targets = ydl.get_available_impersonate_targets()
rows = [ rows = [
[target.client, target.version, target.os, target.os_vers, handler] [target.client or '-', target.version or '-', target.os or '-', target.os_vers or '-', handler]
for target, handler in available_targets for target, handler in available_targets
] ]
for known_target, known_handler in known_targets:
if not any(
known_target in target and handler == known_handler
for target, handler in available_targets
):
rows.append([
ydl._format_out(known_target.client or '-', ydl.Styles.SUPPRESS),
ydl._format_out(known_target.version or '-', ydl.Styles.SUPPRESS),
ydl._format_out(known_target.os or '-', ydl.Styles.SUPPRESS),
ydl._format_out(known_target.os_vers or '-', ydl.Styles.SUPPRESS),
ydl._format_out(f'{known_handler} (not installed)', ydl.Styles.SUPPRESS),
])
ydl.to_screen('[info] Available impersonate targets') ydl.to_screen('[info] Available impersonate targets')
ydl.to_stdout( ydl.to_stdout(
render_table(['Client', 'Version', 'OS', 'OS Version', 'Source'], rows) render_table(['Client', 'Version', 'OS', 'OS Version', 'Source'], rows, extra_gap=1)
) )
return return

View File

@ -8,7 +8,6 @@ from .common import RequestHandler, register_preference
from .exceptions import UnsupportedRequest from .exceptions import UnsupportedRequest
from ..compat.types import NoneType from ..compat.types import NoneType
from ..utils import classproperty from ..utils import classproperty
from ..utils.networking import std_headers
@dataclass(order=True) @dataclass(order=True)
@ -78,6 +77,26 @@ class ImpersonateRequestHandler(RequestHandler, ABC):
""" """
_SUPPORTED_IMPERSONATE_TARGET_MAP: dict[ImpersonateTarget, Any] = {} _SUPPORTED_IMPERSONATE_TARGET_MAP: dict[ImpersonateTarget, Any] = {}
_IMPERSONATE_HEADERS_BLACKLIST = [
# Headers to remove from provided headers when impersonating.
# In the networking framework, the provided headers are intended
# to give a consistent user agent across request handlers.
# However, it is intended that the impersonation implementation will add the required headers to mimic a client.
# So we need to remove provided headers that may interfere with this behaviour.
# TODO(future): Add a method of excluding headers from this blacklist, such as User-Agent in certain cases.
# TODO(future): "Accept" should be included here, however it is currently required for some sites.
'User-Agent',
'Accept-Language',
'Sec-Fetch-Mode',
'Sec-Fetch-Site',
'Sec-Fetch-User',
'Sec-Fetch-Dest',
'Upgrade-Insecure-Requests',
'Sec-Ch-Ua',
'Sec-Ch-Ua-Mobile',
'Sec-Ch-Ua-Platform',
]
def __init__(self, *, impersonate: ImpersonateTarget = None, **kwargs): def __init__(self, *, impersonate: ImpersonateTarget = None, **kwargs):
super().__init__(**kwargs) super().__init__(**kwargs)
self.impersonate = impersonate self.impersonate = impersonate
@ -130,9 +149,7 @@ class ImpersonateRequestHandler(RequestHandler, ABC):
def _get_impersonate_headers(self, request): def _get_impersonate_headers(self, request):
headers = self._merge_headers(request.headers) headers = self._merge_headers(request.headers)
if self._get_request_target(request) is not None: if self._get_request_target(request) is not None:
# remove all headers present in std_headers for header in self._IMPERSONATE_HEADERS_BLACKLIST:
for header in std_headers:
if header in headers and std_headers[header] == headers[header]:
headers.pop(header, None) headers.pop(header, None)
return headers return headers