Compare commits

..

No commits in common. "40fa63575921de3d8d25469729fe94b696fe83cb" and "9ca8d327889e7b5c44323439780a739e8be3313c" have entirely different histories.

8 changed files with 57 additions and 186 deletions

View File

@ -475,9 +475,8 @@ If you fork the project on GitHub, you can run your fork's [build workflow](.git
direct connection
--socket-timeout SECONDS Time to wait before giving up, in seconds
--source-address IP Client-side IP address to bind to
--impersonate CLIENT[:[VERSION][:[OS][:OS_VERSION]]]
Client to impersonate for requests
--list-impersonate-targets List available clients to impersonate
--impersonate TARGET curl-impersonate target name to impersonate
for requests.
-4, --force-ipv4 Make all connections via IPv4
-6, --force-ipv6 Make all connections via IPv6
--enable-file-urls Enable file:// URLs. This is disabled by

View File

@ -29,7 +29,7 @@ from http.cookiejar import CookieJar
from test.conftest import validate_and_send
from test.helper import FakeYDL, http_server_port
from yt_dlp.cookies import YoutubeDLCookieJar
from yt_dlp.dependencies import brotli, requests, urllib3, curl_cffi
from yt_dlp.dependencies import brotli, requests, urllib3
from yt_dlp.networking import (
HEADRequest,
PUTRequest,
@ -913,9 +913,9 @@ class TestCurlCFFIRequestHandler(TestRequestHandlerBase):
@pytest.mark.parametrize('handler', ['CurlCFFI'], indirect=True)
@pytest.mark.parametrize('params,extensions', [
({}, {'impersonate': ('chrome',)}),
({'impersonate': ('chrome', '110')}, {}),
({'impersonate': ('chrome', '99')}, {'impersonate': ('chrome', '110')}),
({}, {'impersonate': 'chrome:110'}),
({'impersonate': 'chrome:110'}, {}),
({'impersonate': 'chrome:99'}, {'impersonate': 'chrome:110'})
])
def test_impersonate(self, handler, params, extensions):
with handler(headers=std_headers, **params) as rh:
@ -931,7 +931,7 @@ class TestCurlCFFIRequestHandler(TestRequestHandlerBase):
# Ensure curl-impersonate overrides our standard headers (usually added
res = validate_and_send(
rh, Request(f'http://127.0.0.1:{self.http_port}/headers', extensions={
'impersonate': ('safari', )}, headers={'x-custom': 'test', 'sec-fetch-mode': 'custom'})).read().decode().lower()
'impersonate': 'safari'}, headers={'x-custom': 'test', 'sec-fetch-mode': 'custom'})).read().decode().lower()
assert std_headers['user-agent'].lower() not in res
assert std_headers['accept-language'].lower() not in res
@ -946,74 +946,6 @@ class TestCurlCFFIRequestHandler(TestRequestHandlerBase):
assert std_headers['accept-language'].lower() in res
assert 'x-custom: test' in res
@pytest.mark.parametrize('raised,expected,match', [
(lambda: curl_cffi.requests.errors.RequestsError(
'', code=curl_cffi.const.CurlECode.PARTIAL_FILE), IncompleteRead, None),
(lambda: curl_cffi.requests.errors.RequestsError(
'', code=curl_cffi.const.CurlECode.OPERATION_TIMEDOUT), TransportError, None),
(lambda: curl_cffi.requests.errors.RequestsError(
'', code=curl_cffi.const.CurlECode.RECV_ERROR), TransportError, None),
])
@pytest.mark.parametrize('handler', ['CurlCFFI'], indirect=True)
def test_response_error_mapping(self, handler, monkeypatch, raised, expected, match):
import curl_cffi.requests
from yt_dlp.networking._curlcffi import CurlCFFIResponseAdapter
curl_res = curl_cffi.requests.Response()
res = CurlCFFIResponseAdapter(curl_res)
def mock_read(*args, **kwargs):
try:
raise raised()
except Exception as e:
e.response = curl_res
raise
monkeypatch.setattr(res.fp, 'read', mock_read)
with pytest.raises(expected, match=match) as exc_info:
res.read()
assert exc_info.type is expected
@pytest.mark.parametrize('raised,expected,match', [
(lambda: curl_cffi.requests.errors.RequestsError(
'', code=curl_cffi.const.CurlECode.OPERATION_TIMEDOUT), TransportError, None),
(lambda: curl_cffi.requests.errors.RequestsError(
'', code=curl_cffi.const.CurlECode.PEER_FAILED_VERIFICATION), CertificateVerifyError, None),
(lambda: curl_cffi.requests.errors.RequestsError(
'', code=curl_cffi.const.CurlECode.SSL_CONNECT_ERROR), SSLError, None),
(lambda: curl_cffi.requests.errors.RequestsError(
'', code=curl_cffi.const.CurlECode.TOO_MANY_REDIRECTS), HTTPError, None),
(lambda: curl_cffi.requests.errors.RequestsError(
'', code=curl_cffi.const.CurlECode.PROXY), ProxyError, None),
])
@pytest.mark.parametrize('handler', ['CurlCFFI'], indirect=True)
def test_request_error_mapping(self, handler, monkeypatch, raised, expected, match):
import curl_cffi.requests
curl_res = curl_cffi.requests.Response()
curl_res.status_code = 301
with handler() as rh:
original_get_instance = rh._get_instance
def mock_get_instance(*args, **kwargs):
instance = original_get_instance(*args, **kwargs)
def request(*_, **__):
try:
raise raised()
except Exception as e:
e.response = curl_res
raise
monkeypatch.setattr(instance, 'request', request)
return instance
monkeypatch.setattr(rh, '_get_instance', mock_get_instance)
with pytest.raises(expected) as exc_info:
rh.send(Request('http://fake'))
assert exc_info.type is expected
def run_validation(handler, error, req, **handler_kwargs):
with handler(**handler_kwargs) as rh:
@ -1142,9 +1074,9 @@ class TestRequestHandlerValidation:
({'timeout': 1}, False),
({'timeout': 'notatimeout'}, AssertionError),
({'unsupported': 'value'}, UnsupportedRequest),
({'impersonate': ('badtarget', None, None, None)}, UnsupportedRequest),
({'impersonate': 'badtarget'}, UnsupportedRequest),
({'impersonate': 123}, AssertionError),
({'impersonate': ('chrome', None, None, None)}, False)
({'impersonate': 'chrome'}, False)
]),
(NoCheckRH, 'http', [
({'cookiejar': 'notacookiejar'}, False),
@ -1223,10 +1155,6 @@ class FakeResponse(Response):
class FakeRH(RequestHandler):
def __init__(self, *args, **params):
self.params = params
super().__init__(*args, **params)
def _validate(self, request):
return
@ -1432,72 +1360,6 @@ class TestYoutubeDLNetworking:
with pytest.raises(SSLError, match='testerror'):
ydl.urlopen('ssl://testerror')
def test_unsupported_impersonate_target(self):
class FakeImpersonationRHYDL(FakeYDL):
def __init__(self, *args, **kwargs):
class HTTPRH(RequestHandler):
def _send(self, request: Request):
pass
_SUPPORTED_URL_SCHEMES = ('http',)
_SUPPORTED_PROXY_SCHEMES = None
super().__init__(*args, **kwargs)
self._request_director = self.build_request_director([HTTPRH])
with FakeImpersonationRHYDL() as ydl:
with pytest.raises(
RequestError,
match=r'Impersonate target "test" is not available. This request requires browser impersonation'
):
ydl.urlopen(Request('http://', extensions={'impersonate': ('test', None, None, None)}))
def test_unsupported_impersonate_extension(self):
class FakeHTTPRHYDL(FakeYDL):
def __init__(self, *args, **kwargs):
class IRH(ImpersonateRequestHandler):
def _send(self, request: Request):
pass
_SUPPORTED_URL_SCHEMES = ('http',)
_SUPPORTED_IMPERSONATE_TARGET_TUPLES = [('firefox',)]
_SUPPORTED_PROXY_SCHEMES = None
super().__init__(*args, **kwargs)
self._request_director = self.build_request_director([IRH])
with FakeHTTPRHYDL() as ydl:
with pytest.raises(
RequestError,
match=r'Impersonate target "test" is not available. This request requires browser impersonation'
):
ydl.urlopen(Request('http://', extensions={'impersonate': ('test', None, None, None)}))
def test_raise_impersonate_error(self):
with pytest.raises(
ValueError,
match=r'Impersonate target "test" is not available. Use --list-impersonate-targets to see available targets.'
):
FakeYDL({'impersonate': ('test', None, None, None)})
def test_pass_impersonate_param(self, monkeypatch):
class IRH(ImpersonateRequestHandler):
def _send(self, request: Request):
pass
_SUPPORTED_URL_SCHEMES = ('http',)
_SUPPORTED_IMPERSONATE_TARGET_TUPLES = [('firefox',)]
# Bypass the check on initialize
brh = FakeYDL.build_request_director
monkeypatch.setattr(FakeYDL, 'build_request_director', lambda cls, handlers, preferences=None: brh(cls, handlers=[IRH]))
with FakeYDL({
'impersonate': ('firefox', None, None, None)
}) as ydl:
rh = self.build_handler(ydl, IRH)
assert rh.impersonate == ('firefox', None, None, None)
@pytest.mark.parametrize('proxy_key,proxy_url,expected', [
('http', '__noproxy__', None),
('no', '127.0.0.1,foo.bar', '127.0.0.1,foo.bar'),
@ -1798,7 +1660,6 @@ class TestResponse:
assert res.getheader('test') == res.get_header('test')
# TODO: move these to test_utils.py when that moves to pytest
class TestImpersonate:
@pytest.mark.parametrize('target,expected', [
('firefox', ('firefox', None, None, None)),
@ -1823,9 +1684,7 @@ class TestImpersonate:
(('firefox', None, 'linux', None), 'firefox::linux'),
(('firefox', None, None, '5'), 'firefox:::5'),
(('firefox', '120', None, '5'), 'firefox:120::5'),
((None, '120', None, None), None),
(('firefox', ), 'firefox'),
(('firefox', None, 'linux'), 'firefox::linux'),
((None, '120', None, None), None)
])
def test_compile_impersonate_target(self, target_tuple, expected):
assert compile_impersonate_target(*target_tuple) == expected

View File

@ -24,7 +24,6 @@ import traceback
import unicodedata
from .cache import Cache
from .compat import functools, urllib # isort: split
from .compat import compat_os_name, compat_shlex_quote, urllib_req_to_req
from .cookies import LenientSimpleCookie, load_cookies
@ -44,7 +43,7 @@ from .networking.exceptions import (
_CompatHTTPError,
network_exceptions,
)
from .networking.impersonate import ImpersonateRequestHandler
from .networking.impersonate import ImpersonateRequestHandler, get_available_impersonate_targets
from .plugins import directories as plugin_directories
from .postprocessor import _PLUGIN_CLASSES as plugin_pps
from .postprocessor import (
@ -62,13 +61,7 @@ from .postprocessor import (
get_postprocessor,
)
from .postprocessor.ffmpeg import resolve_mapping as resolve_recode_mapping
from .update import (
REPOSITORY,
_get_system_deprecation,
_make_label,
current_git_head,
detect_variant,
)
from .update import REPOSITORY, _get_system_deprecation, _make_label, current_git_head, detect_variant
from .utils import (
DEFAULT_OUTTMPL,
IDENTITY,
@ -164,8 +157,9 @@ from .utils.networking import (
HTTPHeaderDict,
clean_headers,
clean_proxies,
compile_impersonate_target,
std_headers,
parse_impersonate_target,
compile_impersonate_target
)
from .version import CHANNEL, ORIGIN, RELEASE_GIT_HEAD, VARIANT, __version__
@ -720,7 +714,10 @@ class YoutubeDL:
lambda x: [x.is_supported_target(impersonate_target)],
[lambda _, v: isinstance(v, ImpersonateRequestHandler)]
)
if not any(results):
if not results:
self.report_warning('Ignoring --impersonate as required dependencies are not installed. ')
elif not any(results):
raise ValueError(
f'Impersonate target "{compile_impersonate_target(*self.params.get("impersonate"))}" is not available. '
f'Use --list-impersonate-targets to see available targets.')
@ -3914,9 +3911,10 @@ class YoutubeDL:
# These imports can be slow. So import them only as needed
from .extractor.extractors import _LAZY_LOADER
from .extractor.extractors import _PLUGIN_CLASSES as plugin_ies
from .extractor.extractors import \
from .extractor.extractors import (
_PLUGIN_CLASSES as plugin_ies,
_PLUGIN_OVERRIDES as plugin_ie_overrides
)
def get_encoding(stream):
ret = str(getattr(stream, 'encoding', 'missing (%s)' % type(stream).__name__))
@ -4057,7 +4055,7 @@ class YoutubeDL:
return sorted(self._request_director.collect_from_handlers(
lambda rh: [(*target, rh.RH_NAME) for target in rh.get_supported_targets()],
[lambda _, v: isinstance(v, ImpersonateRequestHandler)]
), key=lambda x: x[0])
), key=lambda x: x[1][0])
def urlopen(self, req):
""" Start an HTTP download """
@ -4090,10 +4088,7 @@ class YoutubeDL:
raise RequestError(
'file:// URLs are disabled by default in yt-dlp for security reasons. '
'Use --enable-file-urls to enable at your own risk.', cause=ue) from ue
if (
'unsupported proxy type: "https"' in ue.msg.lower()
and 'requests' not in self._request_director.handlers
):
if 'unsupported proxy type: "https"' in ue.msg.lower():
raise RequestError(
'To use an HTTPS proxy for this request, one of the following dependencies needs to be installed: requests')
@ -4105,12 +4100,23 @@ class YoutubeDL:
'This request requires WebSocket support. '
'Ensure one of the following dependencies are installed: websockets',
cause=ue) from ue
elif re.match(r'unsupported (?:extensions: impersonate|impersonate target)', ue.msg.lower()):
"""
ue = traverse_obj(
unsupported_errors,
(lambda _, v: isinstance(v.handler, ImpersonateRequestHandler) and 'unsupported impersonate target' in v.msg.lower()), get_all=False)
if ue:
# TODO: when we have multiple impersonation, will need to make this handle
# cases where the unsupported target is due to a missing library.
raise RequestError(
f'Impersonate target "{compile_impersonate_target(*req.extensions["impersonate"])}" is not available.'
f' This request requires browser impersonation, however you may be missing dependencies'
f' required to support this target. See the documentation for more information.')
f'The requested impersonation target is not supported: {req.extensions.get("impersonate")}.', cause=ue) from ue
if list(filter(lambda ue: re.search(r'unsupported extensions:.*impersonate', ue.msg.lower()), unsupported_errors)):
self.report_warning(
'To impersonate a browser for this request please install one of: curl_cffi. '
'Retrying request without impersonation...')
new_req = req.copy()
new_req.extensions.pop('impersonate')
return _urlopen(new_req)"""
raise
except SSLError as e:
if 'UNSAFE_LEGACY_RENEGOTIATION_DISABLED' in str(e):

View File

@ -990,15 +990,10 @@ def _real_main(argv=None):
rows = [[*[item or '' for item in target], compile_impersonate_target(*target)] for target in
available_targets]
ydl.to_screen('[info] Available impersonate targets')
ydl.to_screen(f'[info] Available impersonate targets')
ydl.to_stdout(
render_table(['Client', 'Version', 'OS', 'OS Version', 'Handler', 'Example'], rows)
)
if not available_targets:
ydl.to_stdout('You are missing dependencies for impersonation. See the README for more info.')
ydl.to_stdout(
'If the above table is missing targets, you may be missing dependencies for impersonation. '
'See the documentation for more information.')
return
if not actual_use:

View File

@ -97,7 +97,7 @@ class CurlCFFIResponseAdapter(Response):
partial=self.fp.bytes_read,
expected=content_length - self.fp.bytes_read if content_length is not None else None,
cause=e) from e
raise TransportError(cause=e) from e
raise
@register_rh
@ -198,6 +198,13 @@ class CurlCFFIRH(ImpersonateRequestHandler, InstanceStoreMixin):
max_redirects_exceeded = True
curl_response = e.response
elif e.code == CurlECode.PARTIAL_FILE:
partial = e.response.content
content_length = int_or_none(e.response.headers.get('Content-Length'))
raise IncompleteRead(
partial=len(partial),
expected=content_length - len(partial) if content_length is not None else None,
cause=e) from e
elif e.code == CurlECode.PROXY:
raise ProxyError(cause=e) from e
else:

View File

@ -15,7 +15,7 @@ def _target_within(target1: ImpersonateTarget, target2: ImpersonateTarget):
if target1[0] != target2[0]:
return False
for i in range(1, min(len(target1), len(target2))):
for i in range(1, len(target2)):
if (
target1[i]
and target2[i]
@ -120,3 +120,9 @@ def impersonate_preference(rh, request):
if request.extensions.get('impersonate') or rh.impersonate:
return 1000
return 0
def get_available_impersonate_targets(director):
return director.collect_from_handlers(
lambda x: x.get_supported_targets(),
[lambda _, v: isinstance(v, ImpersonateRequestHandler)]
)

View File

@ -519,7 +519,7 @@ def create_parser():
network.add_option(
'--list-impersonate-targets',
dest='list_impersonate_targets', default=False, action='store_true',
help='List available clients to impersonate',
help='List available HTTP clients to impersonate',
)
network.add_option(
'-4', '--force-ipv4',

View File

@ -179,8 +179,7 @@ def parse_impersonate_target(target: str) -> Tuple[str, Optional[str], Optional[
return client, version, os, os_vers
def compile_impersonate_target(*args) -> str | None:
client, version, os, os_vers = (list(args) + [None, None, None, None])[:4]
def compile_impersonate_target(client, version, os, os_vers, *_) -> str | None:
if not client:
return
filtered_parts = [str(part) if part is not None else '' for part in (client, version, os, os_vers)]