Compare commits

...

5 Commits

Author SHA1 Message Date
coletdjnz
9ca8d32788
fix cli 2023-12-02 12:51:39 +13:00
coletdjnz
5c45aa393f
show handler name in impersonate target table 2023-12-02 12:49:39 +13:00
coletdjnz
3ab4524aab
add table 2023-12-02 12:32:11 +13:00
coletdjnz
ea3efb1427
cli improvements 2023-12-02 11:16:16 +13:00
coletdjnz
45fc64d11f
support version 2023-12-02 10:25:57 +13:00
7 changed files with 140 additions and 87 deletions

View File

@ -50,8 +50,14 @@ from yt_dlp.networking.exceptions import (
TransportError, TransportError,
UnsupportedRequest, UnsupportedRequest,
) )
from yt_dlp.networking.impersonate import ImpersonateRequestHandler
from yt_dlp.utils._utils import _YDLLogger as FakeLogger from yt_dlp.utils._utils import _YDLLogger as FakeLogger
from yt_dlp.utils.networking import HTTPHeaderDict, std_headers from yt_dlp.utils.networking import (
HTTPHeaderDict,
compile_impersonate_target,
parse_impersonate_target,
std_headers,
)
TEST_DIR = os.path.dirname(os.path.abspath(__file__)) TEST_DIR = os.path.dirname(os.path.abspath(__file__))
@ -1652,3 +1658,33 @@ class TestResponse:
assert res.geturl() == res.url assert res.geturl() == res.url
assert res.info() is res.headers assert res.info() is res.headers
assert res.getheader('test') == res.get_header('test') assert res.getheader('test') == res.get_header('test')
class TestImpersonate:
@pytest.mark.parametrize('target,expected', [
('firefox', ('firefox', None, None, None)),
('firefox:120', ('firefox', '120', None, None)),
('firefox:120:linux', ('firefox', '120', 'linux', None)),
('firefox:120:linux:5', ('firefox', '120', 'linux', '5')),
('firefox::linux', ('firefox', None, 'linux', None)),
('firefox:::5', ('firefox', None, None, '5')),
('firefox:::', ('firefox', None, None, None)),
('firefox:120::5', ('firefox', '120', None, '5')),
('firefox:120:', ('firefox', '120', None, None)),
('::120', None)
])
def test_parse_impersonate_target(self, target, expected):
assert parse_impersonate_target(target) == expected
@pytest.mark.parametrize('target_tuple,expected', [
(('firefox', None, None, None), 'firefox'),
(('firefox', '120', None, None), 'firefox:120'),
(('firefox', '120', 'linux', None), 'firefox:120:linux'),
(('firefox', '120', 'linux', '5'), 'firefox:120:linux:5'),
(('firefox', None, 'linux', None), 'firefox::linux'),
(('firefox', None, None, '5'), 'firefox:::5'),
(('firefox', '120', None, '5'), 'firefox:120::5'),
((None, '120', None, None), None)
])
def test_compile_impersonate_target(self, target_tuple, expected):
assert compile_impersonate_target(*target_tuple) == expected

View File

@ -158,6 +158,8 @@ from .utils.networking import (
clean_headers, clean_headers,
clean_proxies, clean_proxies,
std_headers, std_headers,
parse_impersonate_target,
compile_impersonate_target
) )
from .version import CHANNEL, ORIGIN, RELEASE_GIT_HEAD, VARIANT, __version__ from .version import CHANNEL, ORIGIN, RELEASE_GIT_HEAD, VARIANT, __version__
@ -398,7 +400,8 @@ class YoutubeDL:
- "detect_or_warn": check whether we can do anything - "detect_or_warn": check whether we can do anything
about it, warn otherwise (default) about it, warn otherwise (default)
source_address: Client-side IP address to bind to. source_address: Client-side IP address to bind to.
impersonate: curl-impersonate target name to impersonate for requests. impersonate: Client to impersonate for requests.
A tuple in the form (client, version, os, os_version)
sleep_interval_requests: Number of seconds to sleep between requests sleep_interval_requests: Number of seconds to sleep between requests
during extraction during extraction
sleep_interval: Number of seconds to sleep before each download when sleep_interval: Number of seconds to sleep before each download when
@ -682,20 +685,6 @@ class YoutubeDL:
self.params['http_headers'].pop('Cookie', None) self.params['http_headers'].pop('Cookie', None)
self._request_director = self.build_request_director(_REQUEST_HANDLERS.values(), _RH_PREFERENCES) self._request_director = self.build_request_director(_REQUEST_HANDLERS.values(), _RH_PREFERENCES)
impersonate_target = self.params.get('impersonate')
if impersonate_target:
# This assumes that all handlers that support impersonation subclass ImpersonateRequestHandler
results = self._request_director.collect_from_handlers(
lambda x: [x.is_supported_target(impersonate_target)],
[lambda _, v: isinstance(v, ImpersonateRequestHandler)]
)
if not results:
self.report_warning('Ignoring --impersonate as required dependencies are not installed. ')
elif not any(results):
self.report_warning(f'Impersonate target "{self.params.get("impersonate")}" is not supported. '
f'Supported targets: {join_nonempty(*get_available_impersonate_targets(self._request_director), delim=", ")}')
if auto_init and auto_init != 'no_verbose_header': if auto_init and auto_init != 'no_verbose_header':
self.print_debug_header() self.print_debug_header()
@ -718,6 +707,21 @@ class YoutubeDL:
for msg in self.params.get('_deprecation_warnings', []): for msg in self.params.get('_deprecation_warnings', []):
self.deprecated_feature(msg) self.deprecated_feature(msg)
impersonate_target = self.params.get('impersonate')
if impersonate_target:
# This assumes that all handlers that support impersonation subclass ImpersonateRequestHandler
results = self._request_director.collect_from_handlers(
lambda x: [x.is_supported_target(impersonate_target)],
[lambda _, v: isinstance(v, ImpersonateRequestHandler)]
)
if not results:
self.report_warning('Ignoring --impersonate as required dependencies are not installed. ')
elif not any(results):
raise ValueError(
f'Impersonate target "{compile_impersonate_target(*self.params.get("impersonate"))}" is not available. '
f'Use --list-impersonate-targets to see available targets.')
if 'list-formats' in self.params['compat_opts']: if 'list-formats' in self.params['compat_opts']:
self.params['listformats_table'] = False self.params['listformats_table'] = False
@ -4047,6 +4051,12 @@ class YoutubeDL:
handler = self._request_director.handlers['Urllib'] handler = self._request_director.handlers['Urllib']
return handler._get_instance(cookiejar=self.cookiejar, proxies=self.proxies) return handler._get_instance(cookiejar=self.cookiejar, proxies=self.proxies)
def get_impersonate_targets(self):
return sorted(self._request_director.collect_from_handlers(
lambda rh: [(*target, rh.RH_NAME) for target in rh.get_supported_targets()],
[lambda _, v: isinstance(v, ImpersonateRequestHandler)]
), key=lambda x: x[1][0])
def urlopen(self, req): def urlopen(self, req):
""" Start an HTTP download """ """ Start an HTTP download """
if isinstance(req, str): if isinstance(req, str):

View File

@ -60,7 +60,7 @@ from .utils import (
variadic, variadic,
write_string, write_string,
) )
from .utils.networking import std_headers from .utils.networking import std_headers, parse_impersonate_target, compile_impersonate_target
from .YoutubeDL import YoutubeDL from .YoutubeDL import YoutubeDL
_IN_CLI = False _IN_CLI = False
@ -386,6 +386,12 @@ def validate_options(opts):
f'Supported keyrings are: {", ".join(sorted(SUPPORTED_KEYRINGS))}') f'Supported keyrings are: {", ".join(sorted(SUPPORTED_KEYRINGS))}')
opts.cookiesfrombrowser = (browser_name, profile, keyring, container) opts.cookiesfrombrowser = (browser_name, profile, keyring, container)
if opts.impersonate:
target = parse_impersonate_target(opts.impersonate)
if target is None:
raise ValueError(f'invalid impersonate target "{opts.impersonate}"')
opts.impersonate = target
# MetadataParser # MetadataParser
def metadataparser_actions(f): def metadataparser_actions(f):
if isinstance(f, str): if isinstance(f, str):
@ -979,6 +985,17 @@ def _real_main(argv=None):
traceback.print_exc() traceback.print_exc()
ydl._download_retcode = 100 ydl._download_retcode = 100
if opts.list_impersonate_targets:
available_targets = ydl.get_impersonate_targets()
rows = [[*[item or '' for item in target], compile_impersonate_target(*target)] for target in
available_targets]
ydl.to_screen(f'[info] Available impersonate targets')
ydl.to_stdout(
render_table(['Client', 'Version', 'OS', 'OS Version', 'Handler', 'Example'], rows)
)
return
if not actual_use: if not actual_use:
if pre_process: if pre_process:
return ydl._download_retcode return ydl._download_retcode

View File

@ -25,6 +25,11 @@ from ..utils import int_or_none
if curl_cffi is None: if curl_cffi is None:
raise ImportError('curl_cffi is not installed') raise ImportError('curl_cffi is not installed')
curl_cffi_version = tuple(int_or_none(x, default=0) for x in curl_cffi.__version__.split('.'))
if curl_cffi_version < (0, 5, 10):
raise ImportError('Only curl_cffi>=0.5.10 is supported')
import curl_cffi.requests import curl_cffi.requests
from curl_cffi.const import CurlECode, CurlOpt from curl_cffi.const import CurlECode, CurlOpt
@ -178,7 +183,7 @@ class CurlCFFIRH(ImpersonateRequestHandler, InstanceStoreMixin):
verify=self.verify, verify=self.verify,
max_redirects=5, max_redirects=5,
timeout=timeout, timeout=timeout,
impersonate=self._get_mapped_target(request), impersonate=self._get_mapped_request_target(request),
interface=self.source_address, interface=self.source_address,
stream=True stream=True
) )

View File

@ -11,36 +11,7 @@ from ..utils.networking import std_headers
ImpersonateTarget = Tuple[str, Optional[str], Optional[str], Optional[str]] ImpersonateTarget = Tuple[str, Optional[str], Optional[str], Optional[str]]
def parse_impersonate_target(target: str) -> ImpersonateTarget: def _target_within(target1: ImpersonateTarget, target2: ImpersonateTarget):
client = version = os = os_vers = None
parts = target.split(':')
if len(parts):
client = parts[0]
if len(parts) > 1:
version = parts[1]
if len(parts) > 2:
os = parts[2]
if len(parts) > 3:
os_vers = parts[3]
return client, version, os, os_vers
def compile_impersonate_target(browser, version, os, os_vers) -> str:
target = browser
if version:
target += ':' + version
if os:
if not version:
target += ':'
target += ':' + os
if os_vers:
target += ':' + os_vers
return target
def target_within(target1: ImpersonateTarget, target2: ImpersonateTarget):
# required: check if the browser matches
if target1[0] != target2[0]: if target1[0] != target2[0]:
return False return False
@ -62,17 +33,14 @@ class ImpersonateRequestHandler(RequestHandler, ABC):
This provides a method for checking the validity of the impersonate extension, This provides a method for checking the validity of the impersonate extension,
which can be used in _check_extensions. which can be used in _check_extensions.
Impersonate target tuples are defined as a tuple of (browser, version, os, os_vers) internally. Impersonate targets are defined as a tuple of (client, version, os, os_vers).
To simplify the interface, this is compiled into a string format of browser:version:os:os_vers to be used externally. Note: Impersonate targets are not required to define all fields (except client).
- In this handler, "impersonate target tuple" refers to the tuple version,
and "impersonate target" refers to the string version.
- Impersonate target [tuples] are not required to define all fields (except browser).
The following may be defined: The following may be defined:
- `_SUPPORTED_IMPERSONATE_TARGET_TUPLES`: a tuple of supported target tuples to impersonate. - `_SUPPORTED_IMPERSONATE_TARGET_TUPLES`: a tuple of supported targets to impersonate.
Any Request with an impersonate target not in this list will raise an UnsupportedRequest. Any Request with an impersonate target not in this list will raise an UnsupportedRequest.
Set to None to disable this check. Set to None to disable this check.
- `_SUPPORTED_IMPERSONATE_TARGET_TUPLE_MAP`: a dict mapping supported target tuples to custom targets. - `_SUPPORTED_IMPERSONATE_TARGET_TUPLE_MAP`: a dict mapping supported targets to custom targets.
This works similar to `_SUPPORTED_IMPERSONATE_TARGET_TUPLES`. This works similar to `_SUPPORTED_IMPERSONATE_TARGET_TUPLES`.
Note: Only one of `_SUPPORTED_IMPERSONATE_TARGET_TUPLE_MAP` and `_SUPPORTED_IMPERSONATE_TARGET_TUPLES` can be defined. Note: Only one of `_SUPPORTED_IMPERSONATE_TARGET_TUPLE_MAP` and `_SUPPORTED_IMPERSONATE_TARGET_TUPLES` can be defined.
@ -85,12 +53,12 @@ class ImpersonateRequestHandler(RequestHandler, ABC):
_SUPPORTED_IMPERSONATE_TARGET_TUPLES: tuple[ImpersonateTarget] = () _SUPPORTED_IMPERSONATE_TARGET_TUPLES: tuple[ImpersonateTarget] = ()
_SUPPORTED_IMPERSONATE_TARGET_TUPLE_MAP: dict[ImpersonateTarget, Any] = {} _SUPPORTED_IMPERSONATE_TARGET_TUPLE_MAP: dict[ImpersonateTarget, Any] = {}
def __init__(self, *, impersonate=None, **kwargs): def __init__(self, *, impersonate: ImpersonateTarget = None, **kwargs):
super().__init__(**kwargs) super().__init__(**kwargs)
self.impersonate = impersonate self.impersonate = impersonate
def _check_impersonate_target(self, target: str): def _check_impersonate_target(self, target: ImpersonateTarget):
assert isinstance(target, (str, NoneType)) assert isinstance(target, (tuple, NoneType))
if target is None or not self.get_supported_targets(): if target is None or not self.get_supported_targets():
return return
if not self.is_supported_target(target): if not self.is_supported_target(target):
@ -105,49 +73,40 @@ class ImpersonateRequestHandler(RequestHandler, ABC):
super()._validate(request) super()._validate(request)
self._check_impersonate_target(self.impersonate) self._check_impersonate_target(self.impersonate)
def _get_supported_target_tuples(self): def _resolve_target(self, target: ImpersonateTarget | None):
return tuple(self._SUPPORTED_IMPERSONATE_TARGET_TUPLE_MAP.keys()) or tuple(self._SUPPORTED_IMPERSONATE_TARGET_TUPLES)
def _resolve_target_tuple(self, target: ImpersonateTarget | None):
"""Resolve a target to a supported target.""" """Resolve a target to a supported target."""
if not target: if not target:
return return
for supported_target in self._get_supported_target_tuples(): for supported_target in self.get_supported_targets():
if target_within(target, supported_target): if _target_within(target, supported_target):
if self.verbose: if self.verbose:
self._logger.stdout( self._logger.stdout(
f'{self.RH_NAME}: resolved impersonate target "{compile_impersonate_target(*target)}" ' f'{self.RH_NAME}: resolved impersonate target "{target}" to "{supported_target}"')
f'to "{compile_impersonate_target(*supported_target)}"')
return supported_target return supported_target
def get_supported_targets(self) -> tuple[str]: def get_supported_targets(self) -> tuple[ImpersonateTarget]:
return tuple(compile_impersonate_target(*target) for target in self._get_supported_target_tuples()) return tuple(self._SUPPORTED_IMPERSONATE_TARGET_TUPLE_MAP.keys()) or tuple(self._SUPPORTED_IMPERSONATE_TARGET_TUPLES)
def is_supported_target(self, target: str): def is_supported_target(self, target: ImpersonateTarget):
return self._is_supported_target_tuple(parse_impersonate_target(target)) return self._resolve_target(target) is not None
def _is_supported_target_tuple(self, target: ImpersonateTarget): def _get_request_target(self, request):
return self._resolve_target_tuple(target) is not None """Get the requested target for the request"""
return request.extensions.get('impersonate') or self.impersonate
def _get_target_tuple(self, request): def _get_resolved_request_target(self, request) -> ImpersonateTarget:
"""Get the requested target tuple for the request""" """Get the resolved target for this request. This gives the matching supported target"""
target = request.extensions.get('impersonate') or self.impersonate return self._resolve_target(self._get_request_target(request))
if target:
return parse_impersonate_target(target)
def _get_resolved_target_tuple(self, request) -> ImpersonateTarget: def _get_mapped_request_target(self, request):
"""Get the resolved target tuple for this request. This gives the matching supported target"""
return self._resolve_target_tuple(self._get_target_tuple(request))
def _get_mapped_target(self, request):
"""Get the resolved mapped target for the request target""" """Get the resolved mapped target for the request target"""
resolved_target = self._resolve_target_tuple(self._get_target_tuple(request)) resolved_target = self._resolve_target(self._get_request_target(request))
return self._SUPPORTED_IMPERSONATE_TARGET_TUPLE_MAP.get( return self._SUPPORTED_IMPERSONATE_TARGET_TUPLE_MAP.get(
resolved_target, None) resolved_target, None)
def _get_impersonate_headers(self, request): def _get_impersonate_headers(self, request):
headers = self._merge_headers(request.headers) headers = self._merge_headers(request.headers)
if self._get_target_tuple(request): if self._get_request_target(request):
# remove all headers present in std_headers # remove all headers present in std_headers
headers.pop('User-Agent', None) headers.pop('User-Agent', None)
for header in std_headers: for header in std_headers:
@ -162,7 +121,6 @@ def impersonate_preference(rh, request):
return 1000 return 1000
return 0 return 0
def get_available_impersonate_targets(director): def get_available_impersonate_targets(director):
return director.collect_from_handlers( return director.collect_from_handlers(
lambda x: x.get_supported_targets(), lambda x: x.get_supported_targets(),

View File

@ -513,8 +513,13 @@ def create_parser():
) )
network.add_option( network.add_option(
'--impersonate', '--impersonate',
metavar='TARGET', dest='impersonate', default=None, metavar='CLIENT[:[VERSION][:[OS][:OS_VERSION]]]', dest='impersonate', default=None,
help='curl-impersonate target name to impersonate for requests.', help='Client to impersonate for requests',
)
network.add_option(
'--list-impersonate-targets',
dest='list_impersonate_targets', default=False, action='store_true',
help='List available HTTP clients to impersonate',
) )
network.add_option( network.add_option(
'-4', '--force-ipv4', '-4', '--force-ipv4',

View File

@ -1,7 +1,10 @@
from __future__ import annotations
import collections import collections
import random import random
import urllib.parse import urllib.parse
import urllib.request import urllib.request
from typing import Optional, Tuple
from ._utils import remove_start from ._utils import remove_start
@ -162,3 +165,22 @@ def normalize_url(url):
query=escape_rfc3986(url_parsed.query), query=escape_rfc3986(url_parsed.query),
fragment=escape_rfc3986(url_parsed.fragment) fragment=escape_rfc3986(url_parsed.fragment)
).geturl() ).geturl()
def parse_impersonate_target(target: str) -> Tuple[str, Optional[str], Optional[str], Optional[str]] | None:
"""
Parse an impersonate target string into a tuple of (client, version, os, os_vers)
If the target is invalid, return None
"""
client, version, os, os_vers = [None if (v or '').strip() == '' else v for v in (
target.split(':') + [None, None, None, None])][:4]
if client is not None:
return client, version, os, os_vers
def compile_impersonate_target(client, version, os, os_vers, *_) -> str | None:
if not client:
return
filtered_parts = [str(part) if part is not None else '' for part in (client, version, os, os_vers)]
return ':'.join(filtered_parts).rstrip(':')