Compare commits

..

No commits in common. "9ca8d327889e7b5c44323439780a739e8be3313c" and "c57f34ec5f4cd5583da11e9dc03783f94cd9885e" have entirely different histories.

7 changed files with 87 additions and 140 deletions

View File

@ -50,14 +50,8 @@ from yt_dlp.networking.exceptions import (
TransportError, TransportError,
UnsupportedRequest, UnsupportedRequest,
) )
from yt_dlp.networking.impersonate import ImpersonateRequestHandler
from yt_dlp.utils._utils import _YDLLogger as FakeLogger from yt_dlp.utils._utils import _YDLLogger as FakeLogger
from yt_dlp.utils.networking import ( from yt_dlp.utils.networking import HTTPHeaderDict, std_headers
HTTPHeaderDict,
compile_impersonate_target,
parse_impersonate_target,
std_headers,
)
TEST_DIR = os.path.dirname(os.path.abspath(__file__)) TEST_DIR = os.path.dirname(os.path.abspath(__file__))
@ -1658,33 +1652,3 @@ class TestResponse:
assert res.geturl() == res.url assert res.geturl() == res.url
assert res.info() is res.headers assert res.info() is res.headers
assert res.getheader('test') == res.get_header('test') assert res.getheader('test') == res.get_header('test')
class TestImpersonate:
@pytest.mark.parametrize('target,expected', [
('firefox', ('firefox', None, None, None)),
('firefox:120', ('firefox', '120', None, None)),
('firefox:120:linux', ('firefox', '120', 'linux', None)),
('firefox:120:linux:5', ('firefox', '120', 'linux', '5')),
('firefox::linux', ('firefox', None, 'linux', None)),
('firefox:::5', ('firefox', None, None, '5')),
('firefox:::', ('firefox', None, None, None)),
('firefox:120::5', ('firefox', '120', None, '5')),
('firefox:120:', ('firefox', '120', None, None)),
('::120', None)
])
def test_parse_impersonate_target(self, target, expected):
assert parse_impersonate_target(target) == expected
@pytest.mark.parametrize('target_tuple,expected', [
(('firefox', None, None, None), 'firefox'),
(('firefox', '120', None, None), 'firefox:120'),
(('firefox', '120', 'linux', None), 'firefox:120:linux'),
(('firefox', '120', 'linux', '5'), 'firefox:120:linux:5'),
(('firefox', None, 'linux', None), 'firefox::linux'),
(('firefox', None, None, '5'), 'firefox:::5'),
(('firefox', '120', None, '5'), 'firefox:120::5'),
((None, '120', None, None), None)
])
def test_compile_impersonate_target(self, target_tuple, expected):
assert compile_impersonate_target(*target_tuple) == expected

View File

@ -158,8 +158,6 @@ from .utils.networking import (
clean_headers, clean_headers,
clean_proxies, clean_proxies,
std_headers, std_headers,
parse_impersonate_target,
compile_impersonate_target
) )
from .version import CHANNEL, ORIGIN, RELEASE_GIT_HEAD, VARIANT, __version__ from .version import CHANNEL, ORIGIN, RELEASE_GIT_HEAD, VARIANT, __version__
@ -400,8 +398,7 @@ class YoutubeDL:
- "detect_or_warn": check whether we can do anything - "detect_or_warn": check whether we can do anything
about it, warn otherwise (default) about it, warn otherwise (default)
source_address: Client-side IP address to bind to. source_address: Client-side IP address to bind to.
impersonate: Client to impersonate for requests. impersonate: curl-impersonate target name to impersonate for requests.
A tuple in the form (client, version, os, os_version)
sleep_interval_requests: Number of seconds to sleep between requests sleep_interval_requests: Number of seconds to sleep between requests
during extraction during extraction
sleep_interval: Number of seconds to sleep before each download when sleep_interval: Number of seconds to sleep before each download when
@ -685,6 +682,20 @@ class YoutubeDL:
self.params['http_headers'].pop('Cookie', None) self.params['http_headers'].pop('Cookie', None)
self._request_director = self.build_request_director(_REQUEST_HANDLERS.values(), _RH_PREFERENCES) self._request_director = self.build_request_director(_REQUEST_HANDLERS.values(), _RH_PREFERENCES)
impersonate_target = self.params.get('impersonate')
if impersonate_target:
# This assumes that all handlers that support impersonation subclass ImpersonateRequestHandler
results = self._request_director.collect_from_handlers(
lambda x: [x.is_supported_target(impersonate_target)],
[lambda _, v: isinstance(v, ImpersonateRequestHandler)]
)
if not results:
self.report_warning('Ignoring --impersonate as required dependencies are not installed. ')
elif not any(results):
self.report_warning(f'Impersonate target "{self.params.get("impersonate")}" is not supported. '
f'Supported targets: {join_nonempty(*get_available_impersonate_targets(self._request_director), delim=", ")}')
if auto_init and auto_init != 'no_verbose_header': if auto_init and auto_init != 'no_verbose_header':
self.print_debug_header() self.print_debug_header()
@ -707,21 +718,6 @@ class YoutubeDL:
for msg in self.params.get('_deprecation_warnings', []): for msg in self.params.get('_deprecation_warnings', []):
self.deprecated_feature(msg) self.deprecated_feature(msg)
impersonate_target = self.params.get('impersonate')
if impersonate_target:
# This assumes that all handlers that support impersonation subclass ImpersonateRequestHandler
results = self._request_director.collect_from_handlers(
lambda x: [x.is_supported_target(impersonate_target)],
[lambda _, v: isinstance(v, ImpersonateRequestHandler)]
)
if not results:
self.report_warning('Ignoring --impersonate as required dependencies are not installed. ')
elif not any(results):
raise ValueError(
f'Impersonate target "{compile_impersonate_target(*self.params.get("impersonate"))}" is not available. '
f'Use --list-impersonate-targets to see available targets.')
if 'list-formats' in self.params['compat_opts']: if 'list-formats' in self.params['compat_opts']:
self.params['listformats_table'] = False self.params['listformats_table'] = False
@ -4051,12 +4047,6 @@ class YoutubeDL:
handler = self._request_director.handlers['Urllib'] handler = self._request_director.handlers['Urllib']
return handler._get_instance(cookiejar=self.cookiejar, proxies=self.proxies) return handler._get_instance(cookiejar=self.cookiejar, proxies=self.proxies)
def get_impersonate_targets(self):
return sorted(self._request_director.collect_from_handlers(
lambda rh: [(*target, rh.RH_NAME) for target in rh.get_supported_targets()],
[lambda _, v: isinstance(v, ImpersonateRequestHandler)]
), key=lambda x: x[1][0])
def urlopen(self, req): def urlopen(self, req):
""" Start an HTTP download """ """ Start an HTTP download """
if isinstance(req, str): if isinstance(req, str):

View File

@ -60,7 +60,7 @@ from .utils import (
variadic, variadic,
write_string, write_string,
) )
from .utils.networking import std_headers, parse_impersonate_target, compile_impersonate_target from .utils.networking import std_headers
from .YoutubeDL import YoutubeDL from .YoutubeDL import YoutubeDL
_IN_CLI = False _IN_CLI = False
@ -386,12 +386,6 @@ def validate_options(opts):
f'Supported keyrings are: {", ".join(sorted(SUPPORTED_KEYRINGS))}') f'Supported keyrings are: {", ".join(sorted(SUPPORTED_KEYRINGS))}')
opts.cookiesfrombrowser = (browser_name, profile, keyring, container) opts.cookiesfrombrowser = (browser_name, profile, keyring, container)
if opts.impersonate:
target = parse_impersonate_target(opts.impersonate)
if target is None:
raise ValueError(f'invalid impersonate target "{opts.impersonate}"')
opts.impersonate = target
# MetadataParser # MetadataParser
def metadataparser_actions(f): def metadataparser_actions(f):
if isinstance(f, str): if isinstance(f, str):
@ -985,17 +979,6 @@ def _real_main(argv=None):
traceback.print_exc() traceback.print_exc()
ydl._download_retcode = 100 ydl._download_retcode = 100
if opts.list_impersonate_targets:
available_targets = ydl.get_impersonate_targets()
rows = [[*[item or '' for item in target], compile_impersonate_target(*target)] for target in
available_targets]
ydl.to_screen(f'[info] Available impersonate targets')
ydl.to_stdout(
render_table(['Client', 'Version', 'OS', 'OS Version', 'Handler', 'Example'], rows)
)
return
if not actual_use: if not actual_use:
if pre_process: if pre_process:
return ydl._download_retcode return ydl._download_retcode

View File

@ -25,11 +25,6 @@ from ..utils import int_or_none
if curl_cffi is None: if curl_cffi is None:
raise ImportError('curl_cffi is not installed') raise ImportError('curl_cffi is not installed')
curl_cffi_version = tuple(int_or_none(x, default=0) for x in curl_cffi.__version__.split('.'))
if curl_cffi_version < (0, 5, 10):
raise ImportError('Only curl_cffi>=0.5.10 is supported')
import curl_cffi.requests import curl_cffi.requests
from curl_cffi.const import CurlECode, CurlOpt from curl_cffi.const import CurlECode, CurlOpt
@ -183,7 +178,7 @@ class CurlCFFIRH(ImpersonateRequestHandler, InstanceStoreMixin):
verify=self.verify, verify=self.verify,
max_redirects=5, max_redirects=5,
timeout=timeout, timeout=timeout,
impersonate=self._get_mapped_request_target(request), impersonate=self._get_mapped_target(request),
interface=self.source_address, interface=self.source_address,
stream=True stream=True
) )

View File

@ -11,7 +11,36 @@ from ..utils.networking import std_headers
ImpersonateTarget = Tuple[str, Optional[str], Optional[str], Optional[str]] ImpersonateTarget = Tuple[str, Optional[str], Optional[str], Optional[str]]
def _target_within(target1: ImpersonateTarget, target2: ImpersonateTarget): def parse_impersonate_target(target: str) -> ImpersonateTarget:
client = version = os = os_vers = None
parts = target.split(':')
if len(parts):
client = parts[0]
if len(parts) > 1:
version = parts[1]
if len(parts) > 2:
os = parts[2]
if len(parts) > 3:
os_vers = parts[3]
return client, version, os, os_vers
def compile_impersonate_target(browser, version, os, os_vers) -> str:
target = browser
if version:
target += ':' + version
if os:
if not version:
target += ':'
target += ':' + os
if os_vers:
target += ':' + os_vers
return target
def target_within(target1: ImpersonateTarget, target2: ImpersonateTarget):
# required: check if the browser matches
if target1[0] != target2[0]: if target1[0] != target2[0]:
return False return False
@ -33,14 +62,17 @@ class ImpersonateRequestHandler(RequestHandler, ABC):
This provides a method for checking the validity of the impersonate extension, This provides a method for checking the validity of the impersonate extension,
which can be used in _check_extensions. which can be used in _check_extensions.
Impersonate targets are defined as a tuple of (client, version, os, os_vers). Impersonate target tuples are defined as a tuple of (browser, version, os, os_vers) internally.
Note: Impersonate targets are not required to define all fields (except client). To simplify the interface, this is compiled into a string format of browser:version:os:os_vers to be used externally.
- In this handler, "impersonate target tuple" refers to the tuple version,
and "impersonate target" refers to the string version.
- Impersonate target [tuples] are not required to define all fields (except browser).
The following may be defined: The following may be defined:
- `_SUPPORTED_IMPERSONATE_TARGET_TUPLES`: a tuple of supported targets to impersonate. - `_SUPPORTED_IMPERSONATE_TARGET_TUPLES`: a tuple of supported target tuples to impersonate.
Any Request with an impersonate target not in this list will raise an UnsupportedRequest. Any Request with an impersonate target not in this list will raise an UnsupportedRequest.
Set to None to disable this check. Set to None to disable this check.
- `_SUPPORTED_IMPERSONATE_TARGET_TUPLE_MAP`: a dict mapping supported targets to custom targets. - `_SUPPORTED_IMPERSONATE_TARGET_TUPLE_MAP`: a dict mapping supported target tuples to custom targets.
This works similar to `_SUPPORTED_IMPERSONATE_TARGET_TUPLES`. This works similar to `_SUPPORTED_IMPERSONATE_TARGET_TUPLES`.
Note: Only one of `_SUPPORTED_IMPERSONATE_TARGET_TUPLE_MAP` and `_SUPPORTED_IMPERSONATE_TARGET_TUPLES` can be defined. Note: Only one of `_SUPPORTED_IMPERSONATE_TARGET_TUPLE_MAP` and `_SUPPORTED_IMPERSONATE_TARGET_TUPLES` can be defined.
@ -53,12 +85,12 @@ class ImpersonateRequestHandler(RequestHandler, ABC):
_SUPPORTED_IMPERSONATE_TARGET_TUPLES: tuple[ImpersonateTarget] = () _SUPPORTED_IMPERSONATE_TARGET_TUPLES: tuple[ImpersonateTarget] = ()
_SUPPORTED_IMPERSONATE_TARGET_TUPLE_MAP: dict[ImpersonateTarget, Any] = {} _SUPPORTED_IMPERSONATE_TARGET_TUPLE_MAP: dict[ImpersonateTarget, Any] = {}
def __init__(self, *, impersonate: ImpersonateTarget = None, **kwargs): def __init__(self, *, impersonate=None, **kwargs):
super().__init__(**kwargs) super().__init__(**kwargs)
self.impersonate = impersonate self.impersonate = impersonate
def _check_impersonate_target(self, target: ImpersonateTarget): def _check_impersonate_target(self, target: str):
assert isinstance(target, (tuple, NoneType)) assert isinstance(target, (str, NoneType))
if target is None or not self.get_supported_targets(): if target is None or not self.get_supported_targets():
return return
if not self.is_supported_target(target): if not self.is_supported_target(target):
@ -73,40 +105,49 @@ class ImpersonateRequestHandler(RequestHandler, ABC):
super()._validate(request) super()._validate(request)
self._check_impersonate_target(self.impersonate) self._check_impersonate_target(self.impersonate)
def _resolve_target(self, target: ImpersonateTarget | None): def _get_supported_target_tuples(self):
return tuple(self._SUPPORTED_IMPERSONATE_TARGET_TUPLE_MAP.keys()) or tuple(self._SUPPORTED_IMPERSONATE_TARGET_TUPLES)
def _resolve_target_tuple(self, target: ImpersonateTarget | None):
"""Resolve a target to a supported target.""" """Resolve a target to a supported target."""
if not target: if not target:
return return
for supported_target in self.get_supported_targets(): for supported_target in self._get_supported_target_tuples():
if _target_within(target, supported_target): if target_within(target, supported_target):
if self.verbose: if self.verbose:
self._logger.stdout( self._logger.stdout(
f'{self.RH_NAME}: resolved impersonate target "{target}" to "{supported_target}"') f'{self.RH_NAME}: resolved impersonate target "{compile_impersonate_target(*target)}" '
f'to "{compile_impersonate_target(*supported_target)}"')
return supported_target return supported_target
def get_supported_targets(self) -> tuple[ImpersonateTarget]: def get_supported_targets(self) -> tuple[str]:
return tuple(self._SUPPORTED_IMPERSONATE_TARGET_TUPLE_MAP.keys()) or tuple(self._SUPPORTED_IMPERSONATE_TARGET_TUPLES) return tuple(compile_impersonate_target(*target) for target in self._get_supported_target_tuples())
def is_supported_target(self, target: ImpersonateTarget): def is_supported_target(self, target: str):
return self._resolve_target(target) is not None return self._is_supported_target_tuple(parse_impersonate_target(target))
def _get_request_target(self, request): def _is_supported_target_tuple(self, target: ImpersonateTarget):
"""Get the requested target for the request""" return self._resolve_target_tuple(target) is not None
return request.extensions.get('impersonate') or self.impersonate
def _get_resolved_request_target(self, request) -> ImpersonateTarget: def _get_target_tuple(self, request):
"""Get the resolved target for this request. This gives the matching supported target""" """Get the requested target tuple for the request"""
return self._resolve_target(self._get_request_target(request)) target = request.extensions.get('impersonate') or self.impersonate
if target:
return parse_impersonate_target(target)
def _get_mapped_request_target(self, request): def _get_resolved_target_tuple(self, request) -> ImpersonateTarget:
"""Get the resolved target tuple for this request. This gives the matching supported target"""
return self._resolve_target_tuple(self._get_target_tuple(request))
def _get_mapped_target(self, request):
"""Get the resolved mapped target for the request target""" """Get the resolved mapped target for the request target"""
resolved_target = self._resolve_target(self._get_request_target(request)) resolved_target = self._resolve_target_tuple(self._get_target_tuple(request))
return self._SUPPORTED_IMPERSONATE_TARGET_TUPLE_MAP.get( return self._SUPPORTED_IMPERSONATE_TARGET_TUPLE_MAP.get(
resolved_target, None) resolved_target, None)
def _get_impersonate_headers(self, request): def _get_impersonate_headers(self, request):
headers = self._merge_headers(request.headers) headers = self._merge_headers(request.headers)
if self._get_request_target(request): if self._get_target_tuple(request):
# remove all headers present in std_headers # remove all headers present in std_headers
headers.pop('User-Agent', None) headers.pop('User-Agent', None)
for header in std_headers: for header in std_headers:
@ -121,6 +162,7 @@ def impersonate_preference(rh, request):
return 1000 return 1000
return 0 return 0
def get_available_impersonate_targets(director): def get_available_impersonate_targets(director):
return director.collect_from_handlers( return director.collect_from_handlers(
lambda x: x.get_supported_targets(), lambda x: x.get_supported_targets(),

View File

@ -513,13 +513,8 @@ def create_parser():
) )
network.add_option( network.add_option(
'--impersonate', '--impersonate',
metavar='CLIENT[:[VERSION][:[OS][:OS_VERSION]]]', dest='impersonate', default=None, metavar='TARGET', dest='impersonate', default=None,
help='Client to impersonate for requests', help='curl-impersonate target name to impersonate for requests.',
)
network.add_option(
'--list-impersonate-targets',
dest='list_impersonate_targets', default=False, action='store_true',
help='List available HTTP clients to impersonate',
) )
network.add_option( network.add_option(
'-4', '--force-ipv4', '-4', '--force-ipv4',

View File

@ -1,10 +1,7 @@
from __future__ import annotations
import collections import collections
import random import random
import urllib.parse import urllib.parse
import urllib.request import urllib.request
from typing import Optional, Tuple
from ._utils import remove_start from ._utils import remove_start
@ -165,22 +162,3 @@ def normalize_url(url):
query=escape_rfc3986(url_parsed.query), query=escape_rfc3986(url_parsed.query),
fragment=escape_rfc3986(url_parsed.fragment) fragment=escape_rfc3986(url_parsed.fragment)
).geturl() ).geturl()
def parse_impersonate_target(target: str) -> Tuple[str, Optional[str], Optional[str], Optional[str]] | None:
"""
Parse an impersonate target string into a tuple of (client, version, os, os_vers)
If the target is invalid, return None
"""
client, version, os, os_vers = [None if (v or '').strip() == '' else v for v in (
target.split(':') + [None, None, None, None])][:4]
if client is not None:
return client, version, os, os_vers
def compile_impersonate_target(client, version, os, os_vers, *_) -> str | None:
if not client:
return
filtered_parts = [str(part) if part is not None else '' for part in (client, version, os, os_vers)]
return ':'.join(filtered_parts).rstrip(':')