Compare commits

..

3 Commits

Author SHA1 Message Date
coletdjnz
40fa635759
error responses testing 2023-12-02 20:39:34 +13:00
coletdjnz
f97ab81997
Fixups and improve test coverage 2023-12-02 17:07:43 +13:00
coletdjnz
180109028d
fix sorting 2023-12-02 13:03:25 +13:00
8 changed files with 186 additions and 57 deletions

View File

@ -475,8 +475,9 @@ If you fork the project on GitHub, you can run your fork's [build workflow](.git
direct connection
--socket-timeout SECONDS Time to wait before giving up, in seconds
--source-address IP Client-side IP address to bind to
--impersonate TARGET curl-impersonate target name to impersonate
for requests.
--impersonate CLIENT[:[VERSION][:[OS][:OS_VERSION]]]
Client to impersonate for requests
--list-impersonate-targets List available clients to impersonate
-4, --force-ipv4 Make all connections via IPv4
-6, --force-ipv6 Make all connections via IPv6
--enable-file-urls Enable file:// URLs. This is disabled by

View File

@ -29,7 +29,7 @@ from http.cookiejar import CookieJar
from test.conftest import validate_and_send
from test.helper import FakeYDL, http_server_port
from yt_dlp.cookies import YoutubeDLCookieJar
from yt_dlp.dependencies import brotli, requests, urllib3
from yt_dlp.dependencies import brotli, requests, urllib3, curl_cffi
from yt_dlp.networking import (
HEADRequest,
PUTRequest,
@ -913,9 +913,9 @@ class TestCurlCFFIRequestHandler(TestRequestHandlerBase):
@pytest.mark.parametrize('handler', ['CurlCFFI'], indirect=True)
@pytest.mark.parametrize('params,extensions', [
({}, {'impersonate': 'chrome:110'}),
({'impersonate': 'chrome:110'}, {}),
({'impersonate': 'chrome:99'}, {'impersonate': 'chrome:110'})
({}, {'impersonate': ('chrome',)}),
({'impersonate': ('chrome', '110')}, {}),
({'impersonate': ('chrome', '99')}, {'impersonate': ('chrome', '110')}),
])
def test_impersonate(self, handler, params, extensions):
with handler(headers=std_headers, **params) as rh:
@ -931,7 +931,7 @@ class TestCurlCFFIRequestHandler(TestRequestHandlerBase):
# Ensure curl-impersonate overrides our standard headers (usually added
res = validate_and_send(
rh, Request(f'http://127.0.0.1:{self.http_port}/headers', extensions={
'impersonate': 'safari'}, headers={'x-custom': 'test', 'sec-fetch-mode': 'custom'})).read().decode().lower()
'impersonate': ('safari', )}, headers={'x-custom': 'test', 'sec-fetch-mode': 'custom'})).read().decode().lower()
assert std_headers['user-agent'].lower() not in res
assert std_headers['accept-language'].lower() not in res
@ -946,6 +946,74 @@ class TestCurlCFFIRequestHandler(TestRequestHandlerBase):
assert std_headers['accept-language'].lower() in res
assert 'x-custom: test' in res
@pytest.mark.parametrize('raised,expected,match', [
(lambda: curl_cffi.requests.errors.RequestsError(
'', code=curl_cffi.const.CurlECode.PARTIAL_FILE), IncompleteRead, None),
(lambda: curl_cffi.requests.errors.RequestsError(
'', code=curl_cffi.const.CurlECode.OPERATION_TIMEDOUT), TransportError, None),
(lambda: curl_cffi.requests.errors.RequestsError(
'', code=curl_cffi.const.CurlECode.RECV_ERROR), TransportError, None),
])
@pytest.mark.parametrize('handler', ['CurlCFFI'], indirect=True)
def test_response_error_mapping(self, handler, monkeypatch, raised, expected, match):
import curl_cffi.requests
from yt_dlp.networking._curlcffi import CurlCFFIResponseAdapter
curl_res = curl_cffi.requests.Response()
res = CurlCFFIResponseAdapter(curl_res)
def mock_read(*args, **kwargs):
try:
raise raised()
except Exception as e:
e.response = curl_res
raise
monkeypatch.setattr(res.fp, 'read', mock_read)
with pytest.raises(expected, match=match) as exc_info:
res.read()
assert exc_info.type is expected
@pytest.mark.parametrize('raised,expected,match', [
(lambda: curl_cffi.requests.errors.RequestsError(
'', code=curl_cffi.const.CurlECode.OPERATION_TIMEDOUT), TransportError, None),
(lambda: curl_cffi.requests.errors.RequestsError(
'', code=curl_cffi.const.CurlECode.PEER_FAILED_VERIFICATION), CertificateVerifyError, None),
(lambda: curl_cffi.requests.errors.RequestsError(
'', code=curl_cffi.const.CurlECode.SSL_CONNECT_ERROR), SSLError, None),
(lambda: curl_cffi.requests.errors.RequestsError(
'', code=curl_cffi.const.CurlECode.TOO_MANY_REDIRECTS), HTTPError, None),
(lambda: curl_cffi.requests.errors.RequestsError(
'', code=curl_cffi.const.CurlECode.PROXY), ProxyError, None),
])
@pytest.mark.parametrize('handler', ['CurlCFFI'], indirect=True)
def test_request_error_mapping(self, handler, monkeypatch, raised, expected, match):
import curl_cffi.requests
curl_res = curl_cffi.requests.Response()
curl_res.status_code = 301
with handler() as rh:
original_get_instance = rh._get_instance
def mock_get_instance(*args, **kwargs):
instance = original_get_instance(*args, **kwargs)
def request(*_, **__):
try:
raise raised()
except Exception as e:
e.response = curl_res
raise
monkeypatch.setattr(instance, 'request', request)
return instance
monkeypatch.setattr(rh, '_get_instance', mock_get_instance)
with pytest.raises(expected) as exc_info:
rh.send(Request('http://fake'))
assert exc_info.type is expected
def run_validation(handler, error, req, **handler_kwargs):
with handler(**handler_kwargs) as rh:
@ -1074,9 +1142,9 @@ class TestRequestHandlerValidation:
({'timeout': 1}, False),
({'timeout': 'notatimeout'}, AssertionError),
({'unsupported': 'value'}, UnsupportedRequest),
({'impersonate': 'badtarget'}, UnsupportedRequest),
({'impersonate': ('badtarget', None, None, None)}, UnsupportedRequest),
({'impersonate': 123}, AssertionError),
({'impersonate': 'chrome'}, False)
({'impersonate': ('chrome', None, None, None)}, False)
]),
(NoCheckRH, 'http', [
({'cookiejar': 'notacookiejar'}, False),
@ -1155,6 +1223,10 @@ class FakeResponse(Response):
class FakeRH(RequestHandler):
def __init__(self, *args, **params):
self.params = params
super().__init__(*args, **params)
def _validate(self, request):
return
@ -1360,6 +1432,72 @@ class TestYoutubeDLNetworking:
with pytest.raises(SSLError, match='testerror'):
ydl.urlopen('ssl://testerror')
def test_unsupported_impersonate_target(self):
class FakeImpersonationRHYDL(FakeYDL):
def __init__(self, *args, **kwargs):
class HTTPRH(RequestHandler):
def _send(self, request: Request):
pass
_SUPPORTED_URL_SCHEMES = ('http',)
_SUPPORTED_PROXY_SCHEMES = None
super().__init__(*args, **kwargs)
self._request_director = self.build_request_director([HTTPRH])
with FakeImpersonationRHYDL() as ydl:
with pytest.raises(
RequestError,
match=r'Impersonate target "test" is not available. This request requires browser impersonation'
):
ydl.urlopen(Request('http://', extensions={'impersonate': ('test', None, None, None)}))
def test_unsupported_impersonate_extension(self):
class FakeHTTPRHYDL(FakeYDL):
def __init__(self, *args, **kwargs):
class IRH(ImpersonateRequestHandler):
def _send(self, request: Request):
pass
_SUPPORTED_URL_SCHEMES = ('http',)
_SUPPORTED_IMPERSONATE_TARGET_TUPLES = [('firefox',)]
_SUPPORTED_PROXY_SCHEMES = None
super().__init__(*args, **kwargs)
self._request_director = self.build_request_director([IRH])
with FakeHTTPRHYDL() as ydl:
with pytest.raises(
RequestError,
match=r'Impersonate target "test" is not available. This request requires browser impersonation'
):
ydl.urlopen(Request('http://', extensions={'impersonate': ('test', None, None, None)}))
def test_raise_impersonate_error(self):
with pytest.raises(
ValueError,
match=r'Impersonate target "test" is not available. Use --list-impersonate-targets to see available targets.'
):
FakeYDL({'impersonate': ('test', None, None, None)})
def test_pass_impersonate_param(self, monkeypatch):
class IRH(ImpersonateRequestHandler):
def _send(self, request: Request):
pass
_SUPPORTED_URL_SCHEMES = ('http',)
_SUPPORTED_IMPERSONATE_TARGET_TUPLES = [('firefox',)]
# Bypass the check on initialize
brh = FakeYDL.build_request_director
monkeypatch.setattr(FakeYDL, 'build_request_director', lambda cls, handlers, preferences=None: brh(cls, handlers=[IRH]))
with FakeYDL({
'impersonate': ('firefox', None, None, None)
}) as ydl:
rh = self.build_handler(ydl, IRH)
assert rh.impersonate == ('firefox', None, None, None)
@pytest.mark.parametrize('proxy_key,proxy_url,expected', [
('http', '__noproxy__', None),
('no', '127.0.0.1,foo.bar', '127.0.0.1,foo.bar'),
@ -1660,6 +1798,7 @@ class TestResponse:
assert res.getheader('test') == res.get_header('test')
# TODO: move these to test_utils.py when that moves to pytest
class TestImpersonate:
@pytest.mark.parametrize('target,expected', [
('firefox', ('firefox', None, None, None)),
@ -1684,7 +1823,9 @@ class TestImpersonate:
(('firefox', None, 'linux', None), 'firefox::linux'),
(('firefox', None, None, '5'), 'firefox:::5'),
(('firefox', '120', None, '5'), 'firefox:120::5'),
((None, '120', None, None), None)
((None, '120', None, None), None),
(('firefox', ), 'firefox'),
(('firefox', None, 'linux'), 'firefox::linux'),
])
def test_compile_impersonate_target(self, target_tuple, expected):
assert compile_impersonate_target(*target_tuple) == expected

View File

@ -24,6 +24,7 @@ import traceback
import unicodedata
from .cache import Cache
from .compat import functools, urllib # isort: split
from .compat import compat_os_name, compat_shlex_quote, urllib_req_to_req
from .cookies import LenientSimpleCookie, load_cookies
@ -43,7 +44,7 @@ from .networking.exceptions import (
_CompatHTTPError,
network_exceptions,
)
from .networking.impersonate import ImpersonateRequestHandler, get_available_impersonate_targets
from .networking.impersonate import ImpersonateRequestHandler
from .plugins import directories as plugin_directories
from .postprocessor import _PLUGIN_CLASSES as plugin_pps
from .postprocessor import (
@ -61,7 +62,13 @@ from .postprocessor import (
get_postprocessor,
)
from .postprocessor.ffmpeg import resolve_mapping as resolve_recode_mapping
from .update import REPOSITORY, _get_system_deprecation, _make_label, current_git_head, detect_variant
from .update import (
REPOSITORY,
_get_system_deprecation,
_make_label,
current_git_head,
detect_variant,
)
from .utils import (
DEFAULT_OUTTMPL,
IDENTITY,
@ -157,9 +164,8 @@ from .utils.networking import (
HTTPHeaderDict,
clean_headers,
clean_proxies,
compile_impersonate_target,
std_headers,
parse_impersonate_target,
compile_impersonate_target
)
from .version import CHANNEL, ORIGIN, RELEASE_GIT_HEAD, VARIANT, __version__
@ -714,10 +720,7 @@ class YoutubeDL:
lambda x: [x.is_supported_target(impersonate_target)],
[lambda _, v: isinstance(v, ImpersonateRequestHandler)]
)
if not results:
self.report_warning('Ignoring --impersonate as required dependencies are not installed. ')
elif not any(results):
if not any(results):
raise ValueError(
f'Impersonate target "{compile_impersonate_target(*self.params.get("impersonate"))}" is not available. '
f'Use --list-impersonate-targets to see available targets.')
@ -3911,10 +3914,9 @@ class YoutubeDL:
# These imports can be slow. So import them only as needed
from .extractor.extractors import _LAZY_LOADER
from .extractor.extractors import (
_PLUGIN_CLASSES as plugin_ies,
from .extractor.extractors import _PLUGIN_CLASSES as plugin_ies
from .extractor.extractors import \
_PLUGIN_OVERRIDES as plugin_ie_overrides
)
def get_encoding(stream):
ret = str(getattr(stream, 'encoding', 'missing (%s)' % type(stream).__name__))
@ -4055,7 +4057,7 @@ class YoutubeDL:
return sorted(self._request_director.collect_from_handlers(
lambda rh: [(*target, rh.RH_NAME) for target in rh.get_supported_targets()],
[lambda _, v: isinstance(v, ImpersonateRequestHandler)]
), key=lambda x: x[1][0])
), key=lambda x: x[0])
def urlopen(self, req):
""" Start an HTTP download """
@ -4088,7 +4090,10 @@ class YoutubeDL:
raise RequestError(
'file:// URLs are disabled by default in yt-dlp for security reasons. '
'Use --enable-file-urls to enable at your own risk.', cause=ue) from ue
if 'unsupported proxy type: "https"' in ue.msg.lower():
if (
'unsupported proxy type: "https"' in ue.msg.lower()
and 'requests' not in self._request_director.handlers
):
raise RequestError(
'To use an HTTPS proxy for this request, one of the following dependencies needs to be installed: requests')
@ -4100,23 +4105,12 @@ class YoutubeDL:
'This request requires WebSocket support. '
'Ensure one of the following dependencies are installed: websockets',
cause=ue) from ue
"""
ue = traverse_obj(
unsupported_errors,
(lambda _, v: isinstance(v.handler, ImpersonateRequestHandler) and 'unsupported impersonate target' in v.msg.lower()), get_all=False)
if ue:
# TODO: when we have multiple impersonation, will need to make this handle
# cases where the unsupported target is due to a missing library.
raise RequestError(
f'The requested impersonation target is not supported: {req.extensions.get("impersonate")}.', cause=ue) from ue
if list(filter(lambda ue: re.search(r'unsupported extensions:.*impersonate', ue.msg.lower()), unsupported_errors)):
self.report_warning(
'To impersonate a browser for this request please install one of: curl_cffi. '
'Retrying request without impersonation...')
new_req = req.copy()
new_req.extensions.pop('impersonate')
return _urlopen(new_req)"""
elif re.match(r'unsupported (?:extensions: impersonate|impersonate target)', ue.msg.lower()):
raise RequestError(
f'Impersonate target "{compile_impersonate_target(*req.extensions["impersonate"])}" is not available.'
f' This request requires browser impersonation, however you may be missing dependencies'
f' required to support this target. See the documentation for more information.')
raise
except SSLError as e:
if 'UNSAFE_LEGACY_RENEGOTIATION_DISABLED' in str(e):

View File

@ -990,10 +990,15 @@ def _real_main(argv=None):
rows = [[*[item or '' for item in target], compile_impersonate_target(*target)] for target in
available_targets]
ydl.to_screen(f'[info] Available impersonate targets')
ydl.to_screen('[info] Available impersonate targets')
ydl.to_stdout(
render_table(['Client', 'Version', 'OS', 'OS Version', 'Handler', 'Example'], rows)
)
if not available_targets:
ydl.to_stdout('You are missing dependencies for impersonation. See the README for more info.')
ydl.to_stdout(
'If the above table is missing targets, you may be missing dependencies for impersonation. '
'See the documentation for more information.')
return
if not actual_use:

View File

@ -97,7 +97,7 @@ class CurlCFFIResponseAdapter(Response):
partial=self.fp.bytes_read,
expected=content_length - self.fp.bytes_read if content_length is not None else None,
cause=e) from e
raise
raise TransportError(cause=e) from e
@register_rh
@ -198,13 +198,6 @@ class CurlCFFIRH(ImpersonateRequestHandler, InstanceStoreMixin):
max_redirects_exceeded = True
curl_response = e.response
elif e.code == CurlECode.PARTIAL_FILE:
partial = e.response.content
content_length = int_or_none(e.response.headers.get('Content-Length'))
raise IncompleteRead(
partial=len(partial),
expected=content_length - len(partial) if content_length is not None else None,
cause=e) from e
elif e.code == CurlECode.PROXY:
raise ProxyError(cause=e) from e
else:

View File

@ -15,7 +15,7 @@ def _target_within(target1: ImpersonateTarget, target2: ImpersonateTarget):
if target1[0] != target2[0]:
return False
for i in range(1, len(target2)):
for i in range(1, min(len(target1), len(target2))):
if (
target1[i]
and target2[i]
@ -120,9 +120,3 @@ def impersonate_preference(rh, request):
if request.extensions.get('impersonate') or rh.impersonate:
return 1000
return 0
def get_available_impersonate_targets(director):
return director.collect_from_handlers(
lambda x: x.get_supported_targets(),
[lambda _, v: isinstance(v, ImpersonateRequestHandler)]
)

View File

@ -519,7 +519,7 @@ def create_parser():
network.add_option(
'--list-impersonate-targets',
dest='list_impersonate_targets', default=False, action='store_true',
help='List available HTTP clients to impersonate',
help='List available clients to impersonate',
)
network.add_option(
'-4', '--force-ipv4',

View File

@ -179,7 +179,8 @@ def parse_impersonate_target(target: str) -> Tuple[str, Optional[str], Optional[
return client, version, os, os_vers
def compile_impersonate_target(client, version, os, os_vers, *_) -> str | None:
def compile_impersonate_target(*args) -> str | None:
client, version, os, os_vers = (list(args) + [None, None, None, None])[:4]
if not client:
return
filtered_parts = [str(part) if part is not None else '' for part in (client, version, os, os_vers)]