Compare commits

..

No commits in common. "9ca8d327889e7b5c44323439780a739e8be3313c" and "c57f34ec5f4cd5583da11e9dc03783f94cd9885e" have entirely different histories.

7 changed files with 87 additions and 140 deletions

View File

@ -50,14 +50,8 @@ from yt_dlp.networking.exceptions import (
TransportError,
UnsupportedRequest,
)
from yt_dlp.networking.impersonate import ImpersonateRequestHandler
from yt_dlp.utils._utils import _YDLLogger as FakeLogger
from yt_dlp.utils.networking import (
HTTPHeaderDict,
compile_impersonate_target,
parse_impersonate_target,
std_headers,
)
from yt_dlp.utils.networking import HTTPHeaderDict, std_headers
TEST_DIR = os.path.dirname(os.path.abspath(__file__))
@ -1658,33 +1652,3 @@ class TestResponse:
assert res.geturl() == res.url
assert res.info() is res.headers
assert res.getheader('test') == res.get_header('test')
class TestImpersonate:
@pytest.mark.parametrize('target,expected', [
('firefox', ('firefox', None, None, None)),
('firefox:120', ('firefox', '120', None, None)),
('firefox:120:linux', ('firefox', '120', 'linux', None)),
('firefox:120:linux:5', ('firefox', '120', 'linux', '5')),
('firefox::linux', ('firefox', None, 'linux', None)),
('firefox:::5', ('firefox', None, None, '5')),
('firefox:::', ('firefox', None, None, None)),
('firefox:120::5', ('firefox', '120', None, '5')),
('firefox:120:', ('firefox', '120', None, None)),
('::120', None)
])
def test_parse_impersonate_target(self, target, expected):
assert parse_impersonate_target(target) == expected
@pytest.mark.parametrize('target_tuple,expected', [
(('firefox', None, None, None), 'firefox'),
(('firefox', '120', None, None), 'firefox:120'),
(('firefox', '120', 'linux', None), 'firefox:120:linux'),
(('firefox', '120', 'linux', '5'), 'firefox:120:linux:5'),
(('firefox', None, 'linux', None), 'firefox::linux'),
(('firefox', None, None, '5'), 'firefox:::5'),
(('firefox', '120', None, '5'), 'firefox:120::5'),
((None, '120', None, None), None)
])
def test_compile_impersonate_target(self, target_tuple, expected):
assert compile_impersonate_target(*target_tuple) == expected

View File

@ -158,8 +158,6 @@ from .utils.networking import (
clean_headers,
clean_proxies,
std_headers,
parse_impersonate_target,
compile_impersonate_target
)
from .version import CHANNEL, ORIGIN, RELEASE_GIT_HEAD, VARIANT, __version__
@ -400,8 +398,7 @@ class YoutubeDL:
- "detect_or_warn": check whether we can do anything
about it, warn otherwise (default)
source_address: Client-side IP address to bind to.
impersonate: Client to impersonate for requests.
A tuple in the form (client, version, os, os_version)
impersonate: curl-impersonate target name to impersonate for requests.
sleep_interval_requests: Number of seconds to sleep between requests
during extraction
sleep_interval: Number of seconds to sleep before each download when
@ -685,6 +682,20 @@ class YoutubeDL:
self.params['http_headers'].pop('Cookie', None)
self._request_director = self.build_request_director(_REQUEST_HANDLERS.values(), _RH_PREFERENCES)
impersonate_target = self.params.get('impersonate')
if impersonate_target:
# This assumes that all handlers that support impersonation subclass ImpersonateRequestHandler
results = self._request_director.collect_from_handlers(
lambda x: [x.is_supported_target(impersonate_target)],
[lambda _, v: isinstance(v, ImpersonateRequestHandler)]
)
if not results:
self.report_warning('Ignoring --impersonate as required dependencies are not installed. ')
elif not any(results):
self.report_warning(f'Impersonate target "{self.params.get("impersonate")}" is not supported. '
f'Supported targets: {join_nonempty(*get_available_impersonate_targets(self._request_director), delim=", ")}')
if auto_init and auto_init != 'no_verbose_header':
self.print_debug_header()
@ -707,21 +718,6 @@ class YoutubeDL:
for msg in self.params.get('_deprecation_warnings', []):
self.deprecated_feature(msg)
impersonate_target = self.params.get('impersonate')
if impersonate_target:
# This assumes that all handlers that support impersonation subclass ImpersonateRequestHandler
results = self._request_director.collect_from_handlers(
lambda x: [x.is_supported_target(impersonate_target)],
[lambda _, v: isinstance(v, ImpersonateRequestHandler)]
)
if not results:
self.report_warning('Ignoring --impersonate as required dependencies are not installed. ')
elif not any(results):
raise ValueError(
f'Impersonate target "{compile_impersonate_target(*self.params.get("impersonate"))}" is not available. '
f'Use --list-impersonate-targets to see available targets.')
if 'list-formats' in self.params['compat_opts']:
self.params['listformats_table'] = False
@ -4051,12 +4047,6 @@ class YoutubeDL:
handler = self._request_director.handlers['Urllib']
return handler._get_instance(cookiejar=self.cookiejar, proxies=self.proxies)
def get_impersonate_targets(self):
return sorted(self._request_director.collect_from_handlers(
lambda rh: [(*target, rh.RH_NAME) for target in rh.get_supported_targets()],
[lambda _, v: isinstance(v, ImpersonateRequestHandler)]
), key=lambda x: x[1][0])
def urlopen(self, req):
""" Start an HTTP download """
if isinstance(req, str):

View File

@ -60,7 +60,7 @@ from .utils import (
variadic,
write_string,
)
from .utils.networking import std_headers, parse_impersonate_target, compile_impersonate_target
from .utils.networking import std_headers
from .YoutubeDL import YoutubeDL
_IN_CLI = False
@ -386,12 +386,6 @@ def validate_options(opts):
f'Supported keyrings are: {", ".join(sorted(SUPPORTED_KEYRINGS))}')
opts.cookiesfrombrowser = (browser_name, profile, keyring, container)
if opts.impersonate:
target = parse_impersonate_target(opts.impersonate)
if target is None:
raise ValueError(f'invalid impersonate target "{opts.impersonate}"')
opts.impersonate = target
# MetadataParser
def metadataparser_actions(f):
if isinstance(f, str):
@ -985,17 +979,6 @@ def _real_main(argv=None):
traceback.print_exc()
ydl._download_retcode = 100
if opts.list_impersonate_targets:
available_targets = ydl.get_impersonate_targets()
rows = [[*[item or '' for item in target], compile_impersonate_target(*target)] for target in
available_targets]
ydl.to_screen(f'[info] Available impersonate targets')
ydl.to_stdout(
render_table(['Client', 'Version', 'OS', 'OS Version', 'Handler', 'Example'], rows)
)
return
if not actual_use:
if pre_process:
return ydl._download_retcode

View File

@ -25,11 +25,6 @@ from ..utils import int_or_none
if curl_cffi is None:
raise ImportError('curl_cffi is not installed')
curl_cffi_version = tuple(int_or_none(x, default=0) for x in curl_cffi.__version__.split('.'))
if curl_cffi_version < (0, 5, 10):
raise ImportError('Only curl_cffi>=0.5.10 is supported')
import curl_cffi.requests
from curl_cffi.const import CurlECode, CurlOpt
@ -183,7 +178,7 @@ class CurlCFFIRH(ImpersonateRequestHandler, InstanceStoreMixin):
verify=self.verify,
max_redirects=5,
timeout=timeout,
impersonate=self._get_mapped_request_target(request),
impersonate=self._get_mapped_target(request),
interface=self.source_address,
stream=True
)

View File

@ -11,7 +11,36 @@ from ..utils.networking import std_headers
ImpersonateTarget = Tuple[str, Optional[str], Optional[str], Optional[str]]
def _target_within(target1: ImpersonateTarget, target2: ImpersonateTarget):
def parse_impersonate_target(target: str) -> ImpersonateTarget:
client = version = os = os_vers = None
parts = target.split(':')
if len(parts):
client = parts[0]
if len(parts) > 1:
version = parts[1]
if len(parts) > 2:
os = parts[2]
if len(parts) > 3:
os_vers = parts[3]
return client, version, os, os_vers
def compile_impersonate_target(browser, version, os, os_vers) -> str:
target = browser
if version:
target += ':' + version
if os:
if not version:
target += ':'
target += ':' + os
if os_vers:
target += ':' + os_vers
return target
def target_within(target1: ImpersonateTarget, target2: ImpersonateTarget):
# required: check if the browser matches
if target1[0] != target2[0]:
return False
@ -33,14 +62,17 @@ class ImpersonateRequestHandler(RequestHandler, ABC):
This provides a method for checking the validity of the impersonate extension,
which can be used in _check_extensions.
Impersonate targets are defined as a tuple of (client, version, os, os_vers).
Note: Impersonate targets are not required to define all fields (except client).
Impersonate target tuples are defined as a tuple of (browser, version, os, os_vers) internally.
To simplify the interface, this is compiled into a string format of browser:version:os:os_vers to be used externally.
- In this handler, "impersonate target tuple" refers to the tuple version,
and "impersonate target" refers to the string version.
- Impersonate target [tuples] are not required to define all fields (except browser).
The following may be defined:
- `_SUPPORTED_IMPERSONATE_TARGET_TUPLES`: a tuple of supported targets to impersonate.
- `_SUPPORTED_IMPERSONATE_TARGET_TUPLES`: a tuple of supported target tuples to impersonate.
Any Request with an impersonate target not in this list will raise an UnsupportedRequest.
Set to None to disable this check.
- `_SUPPORTED_IMPERSONATE_TARGET_TUPLE_MAP`: a dict mapping supported targets to custom targets.
- `_SUPPORTED_IMPERSONATE_TARGET_TUPLE_MAP`: a dict mapping supported target tuples to custom targets.
This works similar to `_SUPPORTED_IMPERSONATE_TARGET_TUPLES`.
Note: Only one of `_SUPPORTED_IMPERSONATE_TARGET_TUPLE_MAP` and `_SUPPORTED_IMPERSONATE_TARGET_TUPLES` can be defined.
@ -53,12 +85,12 @@ class ImpersonateRequestHandler(RequestHandler, ABC):
_SUPPORTED_IMPERSONATE_TARGET_TUPLES: tuple[ImpersonateTarget] = ()
_SUPPORTED_IMPERSONATE_TARGET_TUPLE_MAP: dict[ImpersonateTarget, Any] = {}
def __init__(self, *, impersonate: ImpersonateTarget = None, **kwargs):
def __init__(self, *, impersonate=None, **kwargs):
super().__init__(**kwargs)
self.impersonate = impersonate
def _check_impersonate_target(self, target: ImpersonateTarget):
assert isinstance(target, (tuple, NoneType))
def _check_impersonate_target(self, target: str):
assert isinstance(target, (str, NoneType))
if target is None or not self.get_supported_targets():
return
if not self.is_supported_target(target):
@ -73,40 +105,49 @@ class ImpersonateRequestHandler(RequestHandler, ABC):
super()._validate(request)
self._check_impersonate_target(self.impersonate)
def _resolve_target(self, target: ImpersonateTarget | None):
def _get_supported_target_tuples(self):
return tuple(self._SUPPORTED_IMPERSONATE_TARGET_TUPLE_MAP.keys()) or tuple(self._SUPPORTED_IMPERSONATE_TARGET_TUPLES)
def _resolve_target_tuple(self, target: ImpersonateTarget | None):
"""Resolve a target to a supported target."""
if not target:
return
for supported_target in self.get_supported_targets():
if _target_within(target, supported_target):
for supported_target in self._get_supported_target_tuples():
if target_within(target, supported_target):
if self.verbose:
self._logger.stdout(
f'{self.RH_NAME}: resolved impersonate target "{target}" to "{supported_target}"')
f'{self.RH_NAME}: resolved impersonate target "{compile_impersonate_target(*target)}" '
f'to "{compile_impersonate_target(*supported_target)}"')
return supported_target
def get_supported_targets(self) -> tuple[ImpersonateTarget]:
return tuple(self._SUPPORTED_IMPERSONATE_TARGET_TUPLE_MAP.keys()) or tuple(self._SUPPORTED_IMPERSONATE_TARGET_TUPLES)
def get_supported_targets(self) -> tuple[str]:
return tuple(compile_impersonate_target(*target) for target in self._get_supported_target_tuples())
def is_supported_target(self, target: ImpersonateTarget):
return self._resolve_target(target) is not None
def is_supported_target(self, target: str):
return self._is_supported_target_tuple(parse_impersonate_target(target))
def _get_request_target(self, request):
"""Get the requested target for the request"""
return request.extensions.get('impersonate') or self.impersonate
def _is_supported_target_tuple(self, target: ImpersonateTarget):
return self._resolve_target_tuple(target) is not None
def _get_resolved_request_target(self, request) -> ImpersonateTarget:
"""Get the resolved target for this request. This gives the matching supported target"""
return self._resolve_target(self._get_request_target(request))
def _get_target_tuple(self, request):
"""Get the requested target tuple for the request"""
target = request.extensions.get('impersonate') or self.impersonate
if target:
return parse_impersonate_target(target)
def _get_mapped_request_target(self, request):
def _get_resolved_target_tuple(self, request) -> ImpersonateTarget:
"""Get the resolved target tuple for this request. This gives the matching supported target"""
return self._resolve_target_tuple(self._get_target_tuple(request))
def _get_mapped_target(self, request):
"""Get the resolved mapped target for the request target"""
resolved_target = self._resolve_target(self._get_request_target(request))
resolved_target = self._resolve_target_tuple(self._get_target_tuple(request))
return self._SUPPORTED_IMPERSONATE_TARGET_TUPLE_MAP.get(
resolved_target, None)
def _get_impersonate_headers(self, request):
headers = self._merge_headers(request.headers)
if self._get_request_target(request):
if self._get_target_tuple(request):
# remove all headers present in std_headers
headers.pop('User-Agent', None)
for header in std_headers:
@ -121,6 +162,7 @@ def impersonate_preference(rh, request):
return 1000
return 0
def get_available_impersonate_targets(director):
return director.collect_from_handlers(
lambda x: x.get_supported_targets(),

View File

@ -513,13 +513,8 @@ def create_parser():
)
network.add_option(
'--impersonate',
metavar='CLIENT[:[VERSION][:[OS][:OS_VERSION]]]', dest='impersonate', default=None,
help='Client to impersonate for requests',
)
network.add_option(
'--list-impersonate-targets',
dest='list_impersonate_targets', default=False, action='store_true',
help='List available HTTP clients to impersonate',
metavar='TARGET', dest='impersonate', default=None,
help='curl-impersonate target name to impersonate for requests.',
)
network.add_option(
'-4', '--force-ipv4',

View File

@ -1,10 +1,7 @@
from __future__ import annotations
import collections
import random
import urllib.parse
import urllib.request
from typing import Optional, Tuple
from ._utils import remove_start
@ -165,22 +162,3 @@ def normalize_url(url):
query=escape_rfc3986(url_parsed.query),
fragment=escape_rfc3986(url_parsed.fragment)
).geturl()
def parse_impersonate_target(target: str) -> Tuple[str, Optional[str], Optional[str], Optional[str]] | None:
"""
Parse an impersonate target string into a tuple of (client, version, os, os_vers)
If the target is invalid, return None
"""
client, version, os, os_vers = [None if (v or '').strip() == '' else v for v in (
target.split(':') + [None, None, None, None])][:4]
if client is not None:
return client, version, os, os_vers
def compile_impersonate_target(client, version, os, os_vers, *_) -> str | None:
if not client:
return
filtered_parts = [str(part) if part is not None else '' for part in (client, version, os, os_vers)]
return ':'.join(filtered_parts).rstrip(':')