mirror of
https://github.com/yt-dlp/yt-dlp.git
synced 2024-11-25 00:31:26 +01:00
Compare commits
No commits in common. "9ca8d327889e7b5c44323439780a739e8be3313c" and "c57f34ec5f4cd5583da11e9dc03783f94cd9885e" have entirely different histories.
9ca8d32788
...
c57f34ec5f
|
@ -50,14 +50,8 @@ from yt_dlp.networking.exceptions import (
|
|||
TransportError,
|
||||
UnsupportedRequest,
|
||||
)
|
||||
from yt_dlp.networking.impersonate import ImpersonateRequestHandler
|
||||
from yt_dlp.utils._utils import _YDLLogger as FakeLogger
|
||||
from yt_dlp.utils.networking import (
|
||||
HTTPHeaderDict,
|
||||
compile_impersonate_target,
|
||||
parse_impersonate_target,
|
||||
std_headers,
|
||||
)
|
||||
from yt_dlp.utils.networking import HTTPHeaderDict, std_headers
|
||||
|
||||
TEST_DIR = os.path.dirname(os.path.abspath(__file__))
|
||||
|
||||
|
@ -1658,33 +1652,3 @@ class TestResponse:
|
|||
assert res.geturl() == res.url
|
||||
assert res.info() is res.headers
|
||||
assert res.getheader('test') == res.get_header('test')
|
||||
|
||||
|
||||
class TestImpersonate:
|
||||
@pytest.mark.parametrize('target,expected', [
|
||||
('firefox', ('firefox', None, None, None)),
|
||||
('firefox:120', ('firefox', '120', None, None)),
|
||||
('firefox:120:linux', ('firefox', '120', 'linux', None)),
|
||||
('firefox:120:linux:5', ('firefox', '120', 'linux', '5')),
|
||||
('firefox::linux', ('firefox', None, 'linux', None)),
|
||||
('firefox:::5', ('firefox', None, None, '5')),
|
||||
('firefox:::', ('firefox', None, None, None)),
|
||||
('firefox:120::5', ('firefox', '120', None, '5')),
|
||||
('firefox:120:', ('firefox', '120', None, None)),
|
||||
('::120', None)
|
||||
])
|
||||
def test_parse_impersonate_target(self, target, expected):
|
||||
assert parse_impersonate_target(target) == expected
|
||||
|
||||
@pytest.mark.parametrize('target_tuple,expected', [
|
||||
(('firefox', None, None, None), 'firefox'),
|
||||
(('firefox', '120', None, None), 'firefox:120'),
|
||||
(('firefox', '120', 'linux', None), 'firefox:120:linux'),
|
||||
(('firefox', '120', 'linux', '5'), 'firefox:120:linux:5'),
|
||||
(('firefox', None, 'linux', None), 'firefox::linux'),
|
||||
(('firefox', None, None, '5'), 'firefox:::5'),
|
||||
(('firefox', '120', None, '5'), 'firefox:120::5'),
|
||||
((None, '120', None, None), None)
|
||||
])
|
||||
def test_compile_impersonate_target(self, target_tuple, expected):
|
||||
assert compile_impersonate_target(*target_tuple) == expected
|
||||
|
|
|
@ -158,8 +158,6 @@ from .utils.networking import (
|
|||
clean_headers,
|
||||
clean_proxies,
|
||||
std_headers,
|
||||
parse_impersonate_target,
|
||||
compile_impersonate_target
|
||||
)
|
||||
from .version import CHANNEL, ORIGIN, RELEASE_GIT_HEAD, VARIANT, __version__
|
||||
|
||||
|
@ -400,8 +398,7 @@ class YoutubeDL:
|
|||
- "detect_or_warn": check whether we can do anything
|
||||
about it, warn otherwise (default)
|
||||
source_address: Client-side IP address to bind to.
|
||||
impersonate: Client to impersonate for requests.
|
||||
A tuple in the form (client, version, os, os_version)
|
||||
impersonate: curl-impersonate target name to impersonate for requests.
|
||||
sleep_interval_requests: Number of seconds to sleep between requests
|
||||
during extraction
|
||||
sleep_interval: Number of seconds to sleep before each download when
|
||||
|
@ -685,6 +682,20 @@ class YoutubeDL:
|
|||
self.params['http_headers'].pop('Cookie', None)
|
||||
self._request_director = self.build_request_director(_REQUEST_HANDLERS.values(), _RH_PREFERENCES)
|
||||
|
||||
impersonate_target = self.params.get('impersonate')
|
||||
if impersonate_target:
|
||||
# This assumes that all handlers that support impersonation subclass ImpersonateRequestHandler
|
||||
results = self._request_director.collect_from_handlers(
|
||||
lambda x: [x.is_supported_target(impersonate_target)],
|
||||
[lambda _, v: isinstance(v, ImpersonateRequestHandler)]
|
||||
)
|
||||
if not results:
|
||||
self.report_warning('Ignoring --impersonate as required dependencies are not installed. ')
|
||||
|
||||
elif not any(results):
|
||||
self.report_warning(f'Impersonate target "{self.params.get("impersonate")}" is not supported. '
|
||||
f'Supported targets: {join_nonempty(*get_available_impersonate_targets(self._request_director), delim=", ")}')
|
||||
|
||||
if auto_init and auto_init != 'no_verbose_header':
|
||||
self.print_debug_header()
|
||||
|
||||
|
@ -707,21 +718,6 @@ class YoutubeDL:
|
|||
for msg in self.params.get('_deprecation_warnings', []):
|
||||
self.deprecated_feature(msg)
|
||||
|
||||
impersonate_target = self.params.get('impersonate')
|
||||
if impersonate_target:
|
||||
# This assumes that all handlers that support impersonation subclass ImpersonateRequestHandler
|
||||
results = self._request_director.collect_from_handlers(
|
||||
lambda x: [x.is_supported_target(impersonate_target)],
|
||||
[lambda _, v: isinstance(v, ImpersonateRequestHandler)]
|
||||
)
|
||||
if not results:
|
||||
self.report_warning('Ignoring --impersonate as required dependencies are not installed. ')
|
||||
|
||||
elif not any(results):
|
||||
raise ValueError(
|
||||
f'Impersonate target "{compile_impersonate_target(*self.params.get("impersonate"))}" is not available. '
|
||||
f'Use --list-impersonate-targets to see available targets.')
|
||||
|
||||
if 'list-formats' in self.params['compat_opts']:
|
||||
self.params['listformats_table'] = False
|
||||
|
||||
|
@ -4051,12 +4047,6 @@ class YoutubeDL:
|
|||
handler = self._request_director.handlers['Urllib']
|
||||
return handler._get_instance(cookiejar=self.cookiejar, proxies=self.proxies)
|
||||
|
||||
def get_impersonate_targets(self):
|
||||
return sorted(self._request_director.collect_from_handlers(
|
||||
lambda rh: [(*target, rh.RH_NAME) for target in rh.get_supported_targets()],
|
||||
[lambda _, v: isinstance(v, ImpersonateRequestHandler)]
|
||||
), key=lambda x: x[1][0])
|
||||
|
||||
def urlopen(self, req):
|
||||
""" Start an HTTP download """
|
||||
if isinstance(req, str):
|
||||
|
|
|
@ -60,7 +60,7 @@ from .utils import (
|
|||
variadic,
|
||||
write_string,
|
||||
)
|
||||
from .utils.networking import std_headers, parse_impersonate_target, compile_impersonate_target
|
||||
from .utils.networking import std_headers
|
||||
from .YoutubeDL import YoutubeDL
|
||||
|
||||
_IN_CLI = False
|
||||
|
@ -386,12 +386,6 @@ def validate_options(opts):
|
|||
f'Supported keyrings are: {", ".join(sorted(SUPPORTED_KEYRINGS))}')
|
||||
opts.cookiesfrombrowser = (browser_name, profile, keyring, container)
|
||||
|
||||
if opts.impersonate:
|
||||
target = parse_impersonate_target(opts.impersonate)
|
||||
if target is None:
|
||||
raise ValueError(f'invalid impersonate target "{opts.impersonate}"')
|
||||
opts.impersonate = target
|
||||
|
||||
# MetadataParser
|
||||
def metadataparser_actions(f):
|
||||
if isinstance(f, str):
|
||||
|
@ -985,17 +979,6 @@ def _real_main(argv=None):
|
|||
traceback.print_exc()
|
||||
ydl._download_retcode = 100
|
||||
|
||||
if opts.list_impersonate_targets:
|
||||
available_targets = ydl.get_impersonate_targets()
|
||||
rows = [[*[item or '' for item in target], compile_impersonate_target(*target)] for target in
|
||||
available_targets]
|
||||
|
||||
ydl.to_screen(f'[info] Available impersonate targets')
|
||||
ydl.to_stdout(
|
||||
render_table(['Client', 'Version', 'OS', 'OS Version', 'Handler', 'Example'], rows)
|
||||
)
|
||||
return
|
||||
|
||||
if not actual_use:
|
||||
if pre_process:
|
||||
return ydl._download_retcode
|
||||
|
|
|
@ -25,11 +25,6 @@ from ..utils import int_or_none
|
|||
if curl_cffi is None:
|
||||
raise ImportError('curl_cffi is not installed')
|
||||
|
||||
curl_cffi_version = tuple(int_or_none(x, default=0) for x in curl_cffi.__version__.split('.'))
|
||||
|
||||
if curl_cffi_version < (0, 5, 10):
|
||||
raise ImportError('Only curl_cffi>=0.5.10 is supported')
|
||||
|
||||
import curl_cffi.requests
|
||||
from curl_cffi.const import CurlECode, CurlOpt
|
||||
|
||||
|
@ -183,7 +178,7 @@ class CurlCFFIRH(ImpersonateRequestHandler, InstanceStoreMixin):
|
|||
verify=self.verify,
|
||||
max_redirects=5,
|
||||
timeout=timeout,
|
||||
impersonate=self._get_mapped_request_target(request),
|
||||
impersonate=self._get_mapped_target(request),
|
||||
interface=self.source_address,
|
||||
stream=True
|
||||
)
|
||||
|
|
|
@ -11,7 +11,36 @@ from ..utils.networking import std_headers
|
|||
ImpersonateTarget = Tuple[str, Optional[str], Optional[str], Optional[str]]
|
||||
|
||||
|
||||
def _target_within(target1: ImpersonateTarget, target2: ImpersonateTarget):
|
||||
def parse_impersonate_target(target: str) -> ImpersonateTarget:
|
||||
client = version = os = os_vers = None
|
||||
parts = target.split(':')
|
||||
if len(parts):
|
||||
client = parts[0]
|
||||
if len(parts) > 1:
|
||||
version = parts[1]
|
||||
if len(parts) > 2:
|
||||
os = parts[2]
|
||||
if len(parts) > 3:
|
||||
os_vers = parts[3]
|
||||
|
||||
return client, version, os, os_vers
|
||||
|
||||
|
||||
def compile_impersonate_target(browser, version, os, os_vers) -> str:
|
||||
target = browser
|
||||
if version:
|
||||
target += ':' + version
|
||||
if os:
|
||||
if not version:
|
||||
target += ':'
|
||||
target += ':' + os
|
||||
if os_vers:
|
||||
target += ':' + os_vers
|
||||
return target
|
||||
|
||||
|
||||
def target_within(target1: ImpersonateTarget, target2: ImpersonateTarget):
|
||||
# required: check if the browser matches
|
||||
if target1[0] != target2[0]:
|
||||
return False
|
||||
|
||||
|
@ -33,14 +62,17 @@ class ImpersonateRequestHandler(RequestHandler, ABC):
|
|||
This provides a method for checking the validity of the impersonate extension,
|
||||
which can be used in _check_extensions.
|
||||
|
||||
Impersonate targets are defined as a tuple of (client, version, os, os_vers).
|
||||
Note: Impersonate targets are not required to define all fields (except client).
|
||||
Impersonate target tuples are defined as a tuple of (browser, version, os, os_vers) internally.
|
||||
To simplify the interface, this is compiled into a string format of browser:version:os:os_vers to be used externally.
|
||||
- In this handler, "impersonate target tuple" refers to the tuple version,
|
||||
and "impersonate target" refers to the string version.
|
||||
- Impersonate target [tuples] are not required to define all fields (except browser).
|
||||
|
||||
The following may be defined:
|
||||
- `_SUPPORTED_IMPERSONATE_TARGET_TUPLES`: a tuple of supported targets to impersonate.
|
||||
- `_SUPPORTED_IMPERSONATE_TARGET_TUPLES`: a tuple of supported target tuples to impersonate.
|
||||
Any Request with an impersonate target not in this list will raise an UnsupportedRequest.
|
||||
Set to None to disable this check.
|
||||
- `_SUPPORTED_IMPERSONATE_TARGET_TUPLE_MAP`: a dict mapping supported targets to custom targets.
|
||||
- `_SUPPORTED_IMPERSONATE_TARGET_TUPLE_MAP`: a dict mapping supported target tuples to custom targets.
|
||||
This works similar to `_SUPPORTED_IMPERSONATE_TARGET_TUPLES`.
|
||||
|
||||
Note: Only one of `_SUPPORTED_IMPERSONATE_TARGET_TUPLE_MAP` and `_SUPPORTED_IMPERSONATE_TARGET_TUPLES` can be defined.
|
||||
|
@ -53,12 +85,12 @@ class ImpersonateRequestHandler(RequestHandler, ABC):
|
|||
_SUPPORTED_IMPERSONATE_TARGET_TUPLES: tuple[ImpersonateTarget] = ()
|
||||
_SUPPORTED_IMPERSONATE_TARGET_TUPLE_MAP: dict[ImpersonateTarget, Any] = {}
|
||||
|
||||
def __init__(self, *, impersonate: ImpersonateTarget = None, **kwargs):
|
||||
def __init__(self, *, impersonate=None, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self.impersonate = impersonate
|
||||
|
||||
def _check_impersonate_target(self, target: ImpersonateTarget):
|
||||
assert isinstance(target, (tuple, NoneType))
|
||||
def _check_impersonate_target(self, target: str):
|
||||
assert isinstance(target, (str, NoneType))
|
||||
if target is None or not self.get_supported_targets():
|
||||
return
|
||||
if not self.is_supported_target(target):
|
||||
|
@ -73,40 +105,49 @@ class ImpersonateRequestHandler(RequestHandler, ABC):
|
|||
super()._validate(request)
|
||||
self._check_impersonate_target(self.impersonate)
|
||||
|
||||
def _resolve_target(self, target: ImpersonateTarget | None):
|
||||
def _get_supported_target_tuples(self):
|
||||
return tuple(self._SUPPORTED_IMPERSONATE_TARGET_TUPLE_MAP.keys()) or tuple(self._SUPPORTED_IMPERSONATE_TARGET_TUPLES)
|
||||
|
||||
def _resolve_target_tuple(self, target: ImpersonateTarget | None):
|
||||
"""Resolve a target to a supported target."""
|
||||
if not target:
|
||||
return
|
||||
for supported_target in self.get_supported_targets():
|
||||
if _target_within(target, supported_target):
|
||||
for supported_target in self._get_supported_target_tuples():
|
||||
if target_within(target, supported_target):
|
||||
if self.verbose:
|
||||
self._logger.stdout(
|
||||
f'{self.RH_NAME}: resolved impersonate target "{target}" to "{supported_target}"')
|
||||
f'{self.RH_NAME}: resolved impersonate target "{compile_impersonate_target(*target)}" '
|
||||
f'to "{compile_impersonate_target(*supported_target)}"')
|
||||
return supported_target
|
||||
|
||||
def get_supported_targets(self) -> tuple[ImpersonateTarget]:
|
||||
return tuple(self._SUPPORTED_IMPERSONATE_TARGET_TUPLE_MAP.keys()) or tuple(self._SUPPORTED_IMPERSONATE_TARGET_TUPLES)
|
||||
def get_supported_targets(self) -> tuple[str]:
|
||||
return tuple(compile_impersonate_target(*target) for target in self._get_supported_target_tuples())
|
||||
|
||||
def is_supported_target(self, target: ImpersonateTarget):
|
||||
return self._resolve_target(target) is not None
|
||||
def is_supported_target(self, target: str):
|
||||
return self._is_supported_target_tuple(parse_impersonate_target(target))
|
||||
|
||||
def _get_request_target(self, request):
|
||||
"""Get the requested target for the request"""
|
||||
return request.extensions.get('impersonate') or self.impersonate
|
||||
def _is_supported_target_tuple(self, target: ImpersonateTarget):
|
||||
return self._resolve_target_tuple(target) is not None
|
||||
|
||||
def _get_resolved_request_target(self, request) -> ImpersonateTarget:
|
||||
"""Get the resolved target for this request. This gives the matching supported target"""
|
||||
return self._resolve_target(self._get_request_target(request))
|
||||
def _get_target_tuple(self, request):
|
||||
"""Get the requested target tuple for the request"""
|
||||
target = request.extensions.get('impersonate') or self.impersonate
|
||||
if target:
|
||||
return parse_impersonate_target(target)
|
||||
|
||||
def _get_mapped_request_target(self, request):
|
||||
def _get_resolved_target_tuple(self, request) -> ImpersonateTarget:
|
||||
"""Get the resolved target tuple for this request. This gives the matching supported target"""
|
||||
return self._resolve_target_tuple(self._get_target_tuple(request))
|
||||
|
||||
def _get_mapped_target(self, request):
|
||||
"""Get the resolved mapped target for the request target"""
|
||||
resolved_target = self._resolve_target(self._get_request_target(request))
|
||||
resolved_target = self._resolve_target_tuple(self._get_target_tuple(request))
|
||||
return self._SUPPORTED_IMPERSONATE_TARGET_TUPLE_MAP.get(
|
||||
resolved_target, None)
|
||||
|
||||
def _get_impersonate_headers(self, request):
|
||||
headers = self._merge_headers(request.headers)
|
||||
if self._get_request_target(request):
|
||||
if self._get_target_tuple(request):
|
||||
# remove all headers present in std_headers
|
||||
headers.pop('User-Agent', None)
|
||||
for header in std_headers:
|
||||
|
@ -121,6 +162,7 @@ def impersonate_preference(rh, request):
|
|||
return 1000
|
||||
return 0
|
||||
|
||||
|
||||
def get_available_impersonate_targets(director):
|
||||
return director.collect_from_handlers(
|
||||
lambda x: x.get_supported_targets(),
|
||||
|
|
|
@ -513,13 +513,8 @@ def create_parser():
|
|||
)
|
||||
network.add_option(
|
||||
'--impersonate',
|
||||
metavar='CLIENT[:[VERSION][:[OS][:OS_VERSION]]]', dest='impersonate', default=None,
|
||||
help='Client to impersonate for requests',
|
||||
)
|
||||
network.add_option(
|
||||
'--list-impersonate-targets',
|
||||
dest='list_impersonate_targets', default=False, action='store_true',
|
||||
help='List available HTTP clients to impersonate',
|
||||
metavar='TARGET', dest='impersonate', default=None,
|
||||
help='curl-impersonate target name to impersonate for requests.',
|
||||
)
|
||||
network.add_option(
|
||||
'-4', '--force-ipv4',
|
||||
|
|
|
@ -1,10 +1,7 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import collections
|
||||
import random
|
||||
import urllib.parse
|
||||
import urllib.request
|
||||
from typing import Optional, Tuple
|
||||
|
||||
from ._utils import remove_start
|
||||
|
||||
|
@ -165,22 +162,3 @@ def normalize_url(url):
|
|||
query=escape_rfc3986(url_parsed.query),
|
||||
fragment=escape_rfc3986(url_parsed.fragment)
|
||||
).geturl()
|
||||
|
||||
|
||||
def parse_impersonate_target(target: str) -> Tuple[str, Optional[str], Optional[str], Optional[str]] | None:
|
||||
"""
|
||||
Parse an impersonate target string into a tuple of (client, version, os, os_vers)
|
||||
If the target is invalid, return None
|
||||
"""
|
||||
client, version, os, os_vers = [None if (v or '').strip() == '' else v for v in (
|
||||
target.split(':') + [None, None, None, None])][:4]
|
||||
|
||||
if client is not None:
|
||||
return client, version, os, os_vers
|
||||
|
||||
|
||||
def compile_impersonate_target(client, version, os, os_vers, *_) -> str | None:
|
||||
if not client:
|
||||
return
|
||||
filtered_parts = [str(part) if part is not None else '' for part in (client, version, os, os_vers)]
|
||||
return ':'.join(filtered_parts).rstrip(':')
|
||||
|
|
Loading…
Reference in New Issue
Block a user