mirror of
https://github.com/yt-dlp/yt-dlp.git
synced 2024-11-24 16:21:24 +01:00
Compare commits
5 Commits
c57f34ec5f
...
9ca8d32788
Author | SHA1 | Date | |
---|---|---|---|
|
9ca8d32788 | ||
|
5c45aa393f | ||
|
3ab4524aab | ||
|
ea3efb1427 | ||
|
45fc64d11f |
|
@ -50,8 +50,14 @@ from yt_dlp.networking.exceptions import (
|
||||||
TransportError,
|
TransportError,
|
||||||
UnsupportedRequest,
|
UnsupportedRequest,
|
||||||
)
|
)
|
||||||
|
from yt_dlp.networking.impersonate import ImpersonateRequestHandler
|
||||||
from yt_dlp.utils._utils import _YDLLogger as FakeLogger
|
from yt_dlp.utils._utils import _YDLLogger as FakeLogger
|
||||||
from yt_dlp.utils.networking import HTTPHeaderDict, std_headers
|
from yt_dlp.utils.networking import (
|
||||||
|
HTTPHeaderDict,
|
||||||
|
compile_impersonate_target,
|
||||||
|
parse_impersonate_target,
|
||||||
|
std_headers,
|
||||||
|
)
|
||||||
|
|
||||||
TEST_DIR = os.path.dirname(os.path.abspath(__file__))
|
TEST_DIR = os.path.dirname(os.path.abspath(__file__))
|
||||||
|
|
||||||
|
@ -1652,3 +1658,33 @@ class TestResponse:
|
||||||
assert res.geturl() == res.url
|
assert res.geturl() == res.url
|
||||||
assert res.info() is res.headers
|
assert res.info() is res.headers
|
||||||
assert res.getheader('test') == res.get_header('test')
|
assert res.getheader('test') == res.get_header('test')
|
||||||
|
|
||||||
|
|
||||||
|
class TestImpersonate:
|
||||||
|
@pytest.mark.parametrize('target,expected', [
|
||||||
|
('firefox', ('firefox', None, None, None)),
|
||||||
|
('firefox:120', ('firefox', '120', None, None)),
|
||||||
|
('firefox:120:linux', ('firefox', '120', 'linux', None)),
|
||||||
|
('firefox:120:linux:5', ('firefox', '120', 'linux', '5')),
|
||||||
|
('firefox::linux', ('firefox', None, 'linux', None)),
|
||||||
|
('firefox:::5', ('firefox', None, None, '5')),
|
||||||
|
('firefox:::', ('firefox', None, None, None)),
|
||||||
|
('firefox:120::5', ('firefox', '120', None, '5')),
|
||||||
|
('firefox:120:', ('firefox', '120', None, None)),
|
||||||
|
('::120', None)
|
||||||
|
])
|
||||||
|
def test_parse_impersonate_target(self, target, expected):
|
||||||
|
assert parse_impersonate_target(target) == expected
|
||||||
|
|
||||||
|
@pytest.mark.parametrize('target_tuple,expected', [
|
||||||
|
(('firefox', None, None, None), 'firefox'),
|
||||||
|
(('firefox', '120', None, None), 'firefox:120'),
|
||||||
|
(('firefox', '120', 'linux', None), 'firefox:120:linux'),
|
||||||
|
(('firefox', '120', 'linux', '5'), 'firefox:120:linux:5'),
|
||||||
|
(('firefox', None, 'linux', None), 'firefox::linux'),
|
||||||
|
(('firefox', None, None, '5'), 'firefox:::5'),
|
||||||
|
(('firefox', '120', None, '5'), 'firefox:120::5'),
|
||||||
|
((None, '120', None, None), None)
|
||||||
|
])
|
||||||
|
def test_compile_impersonate_target(self, target_tuple, expected):
|
||||||
|
assert compile_impersonate_target(*target_tuple) == expected
|
||||||
|
|
|
@ -158,6 +158,8 @@ from .utils.networking import (
|
||||||
clean_headers,
|
clean_headers,
|
||||||
clean_proxies,
|
clean_proxies,
|
||||||
std_headers,
|
std_headers,
|
||||||
|
parse_impersonate_target,
|
||||||
|
compile_impersonate_target
|
||||||
)
|
)
|
||||||
from .version import CHANNEL, ORIGIN, RELEASE_GIT_HEAD, VARIANT, __version__
|
from .version import CHANNEL, ORIGIN, RELEASE_GIT_HEAD, VARIANT, __version__
|
||||||
|
|
||||||
|
@ -398,7 +400,8 @@ class YoutubeDL:
|
||||||
- "detect_or_warn": check whether we can do anything
|
- "detect_or_warn": check whether we can do anything
|
||||||
about it, warn otherwise (default)
|
about it, warn otherwise (default)
|
||||||
source_address: Client-side IP address to bind to.
|
source_address: Client-side IP address to bind to.
|
||||||
impersonate: curl-impersonate target name to impersonate for requests.
|
impersonate: Client to impersonate for requests.
|
||||||
|
A tuple in the form (client, version, os, os_version)
|
||||||
sleep_interval_requests: Number of seconds to sleep between requests
|
sleep_interval_requests: Number of seconds to sleep between requests
|
||||||
during extraction
|
during extraction
|
||||||
sleep_interval: Number of seconds to sleep before each download when
|
sleep_interval: Number of seconds to sleep before each download when
|
||||||
|
@ -682,20 +685,6 @@ class YoutubeDL:
|
||||||
self.params['http_headers'].pop('Cookie', None)
|
self.params['http_headers'].pop('Cookie', None)
|
||||||
self._request_director = self.build_request_director(_REQUEST_HANDLERS.values(), _RH_PREFERENCES)
|
self._request_director = self.build_request_director(_REQUEST_HANDLERS.values(), _RH_PREFERENCES)
|
||||||
|
|
||||||
impersonate_target = self.params.get('impersonate')
|
|
||||||
if impersonate_target:
|
|
||||||
# This assumes that all handlers that support impersonation subclass ImpersonateRequestHandler
|
|
||||||
results = self._request_director.collect_from_handlers(
|
|
||||||
lambda x: [x.is_supported_target(impersonate_target)],
|
|
||||||
[lambda _, v: isinstance(v, ImpersonateRequestHandler)]
|
|
||||||
)
|
|
||||||
if not results:
|
|
||||||
self.report_warning('Ignoring --impersonate as required dependencies are not installed. ')
|
|
||||||
|
|
||||||
elif not any(results):
|
|
||||||
self.report_warning(f'Impersonate target "{self.params.get("impersonate")}" is not supported. '
|
|
||||||
f'Supported targets: {join_nonempty(*get_available_impersonate_targets(self._request_director), delim=", ")}')
|
|
||||||
|
|
||||||
if auto_init and auto_init != 'no_verbose_header':
|
if auto_init and auto_init != 'no_verbose_header':
|
||||||
self.print_debug_header()
|
self.print_debug_header()
|
||||||
|
|
||||||
|
@ -718,6 +707,21 @@ class YoutubeDL:
|
||||||
for msg in self.params.get('_deprecation_warnings', []):
|
for msg in self.params.get('_deprecation_warnings', []):
|
||||||
self.deprecated_feature(msg)
|
self.deprecated_feature(msg)
|
||||||
|
|
||||||
|
impersonate_target = self.params.get('impersonate')
|
||||||
|
if impersonate_target:
|
||||||
|
# This assumes that all handlers that support impersonation subclass ImpersonateRequestHandler
|
||||||
|
results = self._request_director.collect_from_handlers(
|
||||||
|
lambda x: [x.is_supported_target(impersonate_target)],
|
||||||
|
[lambda _, v: isinstance(v, ImpersonateRequestHandler)]
|
||||||
|
)
|
||||||
|
if not results:
|
||||||
|
self.report_warning('Ignoring --impersonate as required dependencies are not installed. ')
|
||||||
|
|
||||||
|
elif not any(results):
|
||||||
|
raise ValueError(
|
||||||
|
f'Impersonate target "{compile_impersonate_target(*self.params.get("impersonate"))}" is not available. '
|
||||||
|
f'Use --list-impersonate-targets to see available targets.')
|
||||||
|
|
||||||
if 'list-formats' in self.params['compat_opts']:
|
if 'list-formats' in self.params['compat_opts']:
|
||||||
self.params['listformats_table'] = False
|
self.params['listformats_table'] = False
|
||||||
|
|
||||||
|
@ -4047,6 +4051,12 @@ class YoutubeDL:
|
||||||
handler = self._request_director.handlers['Urllib']
|
handler = self._request_director.handlers['Urllib']
|
||||||
return handler._get_instance(cookiejar=self.cookiejar, proxies=self.proxies)
|
return handler._get_instance(cookiejar=self.cookiejar, proxies=self.proxies)
|
||||||
|
|
||||||
|
def get_impersonate_targets(self):
|
||||||
|
return sorted(self._request_director.collect_from_handlers(
|
||||||
|
lambda rh: [(*target, rh.RH_NAME) for target in rh.get_supported_targets()],
|
||||||
|
[lambda _, v: isinstance(v, ImpersonateRequestHandler)]
|
||||||
|
), key=lambda x: x[1][0])
|
||||||
|
|
||||||
def urlopen(self, req):
|
def urlopen(self, req):
|
||||||
""" Start an HTTP download """
|
""" Start an HTTP download """
|
||||||
if isinstance(req, str):
|
if isinstance(req, str):
|
||||||
|
|
|
@ -60,7 +60,7 @@ from .utils import (
|
||||||
variadic,
|
variadic,
|
||||||
write_string,
|
write_string,
|
||||||
)
|
)
|
||||||
from .utils.networking import std_headers
|
from .utils.networking import std_headers, parse_impersonate_target, compile_impersonate_target
|
||||||
from .YoutubeDL import YoutubeDL
|
from .YoutubeDL import YoutubeDL
|
||||||
|
|
||||||
_IN_CLI = False
|
_IN_CLI = False
|
||||||
|
@ -386,6 +386,12 @@ def validate_options(opts):
|
||||||
f'Supported keyrings are: {", ".join(sorted(SUPPORTED_KEYRINGS))}')
|
f'Supported keyrings are: {", ".join(sorted(SUPPORTED_KEYRINGS))}')
|
||||||
opts.cookiesfrombrowser = (browser_name, profile, keyring, container)
|
opts.cookiesfrombrowser = (browser_name, profile, keyring, container)
|
||||||
|
|
||||||
|
if opts.impersonate:
|
||||||
|
target = parse_impersonate_target(opts.impersonate)
|
||||||
|
if target is None:
|
||||||
|
raise ValueError(f'invalid impersonate target "{opts.impersonate}"')
|
||||||
|
opts.impersonate = target
|
||||||
|
|
||||||
# MetadataParser
|
# MetadataParser
|
||||||
def metadataparser_actions(f):
|
def metadataparser_actions(f):
|
||||||
if isinstance(f, str):
|
if isinstance(f, str):
|
||||||
|
@ -979,6 +985,17 @@ def _real_main(argv=None):
|
||||||
traceback.print_exc()
|
traceback.print_exc()
|
||||||
ydl._download_retcode = 100
|
ydl._download_retcode = 100
|
||||||
|
|
||||||
|
if opts.list_impersonate_targets:
|
||||||
|
available_targets = ydl.get_impersonate_targets()
|
||||||
|
rows = [[*[item or '' for item in target], compile_impersonate_target(*target)] for target in
|
||||||
|
available_targets]
|
||||||
|
|
||||||
|
ydl.to_screen(f'[info] Available impersonate targets')
|
||||||
|
ydl.to_stdout(
|
||||||
|
render_table(['Client', 'Version', 'OS', 'OS Version', 'Handler', 'Example'], rows)
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
if not actual_use:
|
if not actual_use:
|
||||||
if pre_process:
|
if pre_process:
|
||||||
return ydl._download_retcode
|
return ydl._download_retcode
|
||||||
|
|
|
@ -25,6 +25,11 @@ from ..utils import int_or_none
|
||||||
if curl_cffi is None:
|
if curl_cffi is None:
|
||||||
raise ImportError('curl_cffi is not installed')
|
raise ImportError('curl_cffi is not installed')
|
||||||
|
|
||||||
|
curl_cffi_version = tuple(int_or_none(x, default=0) for x in curl_cffi.__version__.split('.'))
|
||||||
|
|
||||||
|
if curl_cffi_version < (0, 5, 10):
|
||||||
|
raise ImportError('Only curl_cffi>=0.5.10 is supported')
|
||||||
|
|
||||||
import curl_cffi.requests
|
import curl_cffi.requests
|
||||||
from curl_cffi.const import CurlECode, CurlOpt
|
from curl_cffi.const import CurlECode, CurlOpt
|
||||||
|
|
||||||
|
@ -178,7 +183,7 @@ class CurlCFFIRH(ImpersonateRequestHandler, InstanceStoreMixin):
|
||||||
verify=self.verify,
|
verify=self.verify,
|
||||||
max_redirects=5,
|
max_redirects=5,
|
||||||
timeout=timeout,
|
timeout=timeout,
|
||||||
impersonate=self._get_mapped_target(request),
|
impersonate=self._get_mapped_request_target(request),
|
||||||
interface=self.source_address,
|
interface=self.source_address,
|
||||||
stream=True
|
stream=True
|
||||||
)
|
)
|
||||||
|
|
|
@ -11,36 +11,7 @@ from ..utils.networking import std_headers
|
||||||
ImpersonateTarget = Tuple[str, Optional[str], Optional[str], Optional[str]]
|
ImpersonateTarget = Tuple[str, Optional[str], Optional[str], Optional[str]]
|
||||||
|
|
||||||
|
|
||||||
def parse_impersonate_target(target: str) -> ImpersonateTarget:
|
def _target_within(target1: ImpersonateTarget, target2: ImpersonateTarget):
|
||||||
client = version = os = os_vers = None
|
|
||||||
parts = target.split(':')
|
|
||||||
if len(parts):
|
|
||||||
client = parts[0]
|
|
||||||
if len(parts) > 1:
|
|
||||||
version = parts[1]
|
|
||||||
if len(parts) > 2:
|
|
||||||
os = parts[2]
|
|
||||||
if len(parts) > 3:
|
|
||||||
os_vers = parts[3]
|
|
||||||
|
|
||||||
return client, version, os, os_vers
|
|
||||||
|
|
||||||
|
|
||||||
def compile_impersonate_target(browser, version, os, os_vers) -> str:
|
|
||||||
target = browser
|
|
||||||
if version:
|
|
||||||
target += ':' + version
|
|
||||||
if os:
|
|
||||||
if not version:
|
|
||||||
target += ':'
|
|
||||||
target += ':' + os
|
|
||||||
if os_vers:
|
|
||||||
target += ':' + os_vers
|
|
||||||
return target
|
|
||||||
|
|
||||||
|
|
||||||
def target_within(target1: ImpersonateTarget, target2: ImpersonateTarget):
|
|
||||||
# required: check if the browser matches
|
|
||||||
if target1[0] != target2[0]:
|
if target1[0] != target2[0]:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
@ -62,17 +33,14 @@ class ImpersonateRequestHandler(RequestHandler, ABC):
|
||||||
This provides a method for checking the validity of the impersonate extension,
|
This provides a method for checking the validity of the impersonate extension,
|
||||||
which can be used in _check_extensions.
|
which can be used in _check_extensions.
|
||||||
|
|
||||||
Impersonate target tuples are defined as a tuple of (browser, version, os, os_vers) internally.
|
Impersonate targets are defined as a tuple of (client, version, os, os_vers).
|
||||||
To simplify the interface, this is compiled into a string format of browser:version:os:os_vers to be used externally.
|
Note: Impersonate targets are not required to define all fields (except client).
|
||||||
- In this handler, "impersonate target tuple" refers to the tuple version,
|
|
||||||
and "impersonate target" refers to the string version.
|
|
||||||
- Impersonate target [tuples] are not required to define all fields (except browser).
|
|
||||||
|
|
||||||
The following may be defined:
|
The following may be defined:
|
||||||
- `_SUPPORTED_IMPERSONATE_TARGET_TUPLES`: a tuple of supported target tuples to impersonate.
|
- `_SUPPORTED_IMPERSONATE_TARGET_TUPLES`: a tuple of supported targets to impersonate.
|
||||||
Any Request with an impersonate target not in this list will raise an UnsupportedRequest.
|
Any Request with an impersonate target not in this list will raise an UnsupportedRequest.
|
||||||
Set to None to disable this check.
|
Set to None to disable this check.
|
||||||
- `_SUPPORTED_IMPERSONATE_TARGET_TUPLE_MAP`: a dict mapping supported target tuples to custom targets.
|
- `_SUPPORTED_IMPERSONATE_TARGET_TUPLE_MAP`: a dict mapping supported targets to custom targets.
|
||||||
This works similar to `_SUPPORTED_IMPERSONATE_TARGET_TUPLES`.
|
This works similar to `_SUPPORTED_IMPERSONATE_TARGET_TUPLES`.
|
||||||
|
|
||||||
Note: Only one of `_SUPPORTED_IMPERSONATE_TARGET_TUPLE_MAP` and `_SUPPORTED_IMPERSONATE_TARGET_TUPLES` can be defined.
|
Note: Only one of `_SUPPORTED_IMPERSONATE_TARGET_TUPLE_MAP` and `_SUPPORTED_IMPERSONATE_TARGET_TUPLES` can be defined.
|
||||||
|
@ -85,12 +53,12 @@ class ImpersonateRequestHandler(RequestHandler, ABC):
|
||||||
_SUPPORTED_IMPERSONATE_TARGET_TUPLES: tuple[ImpersonateTarget] = ()
|
_SUPPORTED_IMPERSONATE_TARGET_TUPLES: tuple[ImpersonateTarget] = ()
|
||||||
_SUPPORTED_IMPERSONATE_TARGET_TUPLE_MAP: dict[ImpersonateTarget, Any] = {}
|
_SUPPORTED_IMPERSONATE_TARGET_TUPLE_MAP: dict[ImpersonateTarget, Any] = {}
|
||||||
|
|
||||||
def __init__(self, *, impersonate=None, **kwargs):
|
def __init__(self, *, impersonate: ImpersonateTarget = None, **kwargs):
|
||||||
super().__init__(**kwargs)
|
super().__init__(**kwargs)
|
||||||
self.impersonate = impersonate
|
self.impersonate = impersonate
|
||||||
|
|
||||||
def _check_impersonate_target(self, target: str):
|
def _check_impersonate_target(self, target: ImpersonateTarget):
|
||||||
assert isinstance(target, (str, NoneType))
|
assert isinstance(target, (tuple, NoneType))
|
||||||
if target is None or not self.get_supported_targets():
|
if target is None or not self.get_supported_targets():
|
||||||
return
|
return
|
||||||
if not self.is_supported_target(target):
|
if not self.is_supported_target(target):
|
||||||
|
@ -105,49 +73,40 @@ class ImpersonateRequestHandler(RequestHandler, ABC):
|
||||||
super()._validate(request)
|
super()._validate(request)
|
||||||
self._check_impersonate_target(self.impersonate)
|
self._check_impersonate_target(self.impersonate)
|
||||||
|
|
||||||
def _get_supported_target_tuples(self):
|
def _resolve_target(self, target: ImpersonateTarget | None):
|
||||||
return tuple(self._SUPPORTED_IMPERSONATE_TARGET_TUPLE_MAP.keys()) or tuple(self._SUPPORTED_IMPERSONATE_TARGET_TUPLES)
|
|
||||||
|
|
||||||
def _resolve_target_tuple(self, target: ImpersonateTarget | None):
|
|
||||||
"""Resolve a target to a supported target."""
|
"""Resolve a target to a supported target."""
|
||||||
if not target:
|
if not target:
|
||||||
return
|
return
|
||||||
for supported_target in self._get_supported_target_tuples():
|
for supported_target in self.get_supported_targets():
|
||||||
if target_within(target, supported_target):
|
if _target_within(target, supported_target):
|
||||||
if self.verbose:
|
if self.verbose:
|
||||||
self._logger.stdout(
|
self._logger.stdout(
|
||||||
f'{self.RH_NAME}: resolved impersonate target "{compile_impersonate_target(*target)}" '
|
f'{self.RH_NAME}: resolved impersonate target "{target}" to "{supported_target}"')
|
||||||
f'to "{compile_impersonate_target(*supported_target)}"')
|
|
||||||
return supported_target
|
return supported_target
|
||||||
|
|
||||||
def get_supported_targets(self) -> tuple[str]:
|
def get_supported_targets(self) -> tuple[ImpersonateTarget]:
|
||||||
return tuple(compile_impersonate_target(*target) for target in self._get_supported_target_tuples())
|
return tuple(self._SUPPORTED_IMPERSONATE_TARGET_TUPLE_MAP.keys()) or tuple(self._SUPPORTED_IMPERSONATE_TARGET_TUPLES)
|
||||||
|
|
||||||
def is_supported_target(self, target: str):
|
def is_supported_target(self, target: ImpersonateTarget):
|
||||||
return self._is_supported_target_tuple(parse_impersonate_target(target))
|
return self._resolve_target(target) is not None
|
||||||
|
|
||||||
def _is_supported_target_tuple(self, target: ImpersonateTarget):
|
def _get_request_target(self, request):
|
||||||
return self._resolve_target_tuple(target) is not None
|
"""Get the requested target for the request"""
|
||||||
|
return request.extensions.get('impersonate') or self.impersonate
|
||||||
|
|
||||||
def _get_target_tuple(self, request):
|
def _get_resolved_request_target(self, request) -> ImpersonateTarget:
|
||||||
"""Get the requested target tuple for the request"""
|
"""Get the resolved target for this request. This gives the matching supported target"""
|
||||||
target = request.extensions.get('impersonate') or self.impersonate
|
return self._resolve_target(self._get_request_target(request))
|
||||||
if target:
|
|
||||||
return parse_impersonate_target(target)
|
|
||||||
|
|
||||||
def _get_resolved_target_tuple(self, request) -> ImpersonateTarget:
|
def _get_mapped_request_target(self, request):
|
||||||
"""Get the resolved target tuple for this request. This gives the matching supported target"""
|
|
||||||
return self._resolve_target_tuple(self._get_target_tuple(request))
|
|
||||||
|
|
||||||
def _get_mapped_target(self, request):
|
|
||||||
"""Get the resolved mapped target for the request target"""
|
"""Get the resolved mapped target for the request target"""
|
||||||
resolved_target = self._resolve_target_tuple(self._get_target_tuple(request))
|
resolved_target = self._resolve_target(self._get_request_target(request))
|
||||||
return self._SUPPORTED_IMPERSONATE_TARGET_TUPLE_MAP.get(
|
return self._SUPPORTED_IMPERSONATE_TARGET_TUPLE_MAP.get(
|
||||||
resolved_target, None)
|
resolved_target, None)
|
||||||
|
|
||||||
def _get_impersonate_headers(self, request):
|
def _get_impersonate_headers(self, request):
|
||||||
headers = self._merge_headers(request.headers)
|
headers = self._merge_headers(request.headers)
|
||||||
if self._get_target_tuple(request):
|
if self._get_request_target(request):
|
||||||
# remove all headers present in std_headers
|
# remove all headers present in std_headers
|
||||||
headers.pop('User-Agent', None)
|
headers.pop('User-Agent', None)
|
||||||
for header in std_headers:
|
for header in std_headers:
|
||||||
|
@ -162,7 +121,6 @@ def impersonate_preference(rh, request):
|
||||||
return 1000
|
return 1000
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
|
|
||||||
def get_available_impersonate_targets(director):
|
def get_available_impersonate_targets(director):
|
||||||
return director.collect_from_handlers(
|
return director.collect_from_handlers(
|
||||||
lambda x: x.get_supported_targets(),
|
lambda x: x.get_supported_targets(),
|
||||||
|
|
|
@ -513,8 +513,13 @@ def create_parser():
|
||||||
)
|
)
|
||||||
network.add_option(
|
network.add_option(
|
||||||
'--impersonate',
|
'--impersonate',
|
||||||
metavar='TARGET', dest='impersonate', default=None,
|
metavar='CLIENT[:[VERSION][:[OS][:OS_VERSION]]]', dest='impersonate', default=None,
|
||||||
help='curl-impersonate target name to impersonate for requests.',
|
help='Client to impersonate for requests',
|
||||||
|
)
|
||||||
|
network.add_option(
|
||||||
|
'--list-impersonate-targets',
|
||||||
|
dest='list_impersonate_targets', default=False, action='store_true',
|
||||||
|
help='List available HTTP clients to impersonate',
|
||||||
)
|
)
|
||||||
network.add_option(
|
network.add_option(
|
||||||
'-4', '--force-ipv4',
|
'-4', '--force-ipv4',
|
||||||
|
|
|
@ -1,7 +1,10 @@
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
import collections
|
import collections
|
||||||
import random
|
import random
|
||||||
import urllib.parse
|
import urllib.parse
|
||||||
import urllib.request
|
import urllib.request
|
||||||
|
from typing import Optional, Tuple
|
||||||
|
|
||||||
from ._utils import remove_start
|
from ._utils import remove_start
|
||||||
|
|
||||||
|
@ -162,3 +165,22 @@ def normalize_url(url):
|
||||||
query=escape_rfc3986(url_parsed.query),
|
query=escape_rfc3986(url_parsed.query),
|
||||||
fragment=escape_rfc3986(url_parsed.fragment)
|
fragment=escape_rfc3986(url_parsed.fragment)
|
||||||
).geturl()
|
).geturl()
|
||||||
|
|
||||||
|
|
||||||
|
def parse_impersonate_target(target: str) -> Tuple[str, Optional[str], Optional[str], Optional[str]] | None:
|
||||||
|
"""
|
||||||
|
Parse an impersonate target string into a tuple of (client, version, os, os_vers)
|
||||||
|
If the target is invalid, return None
|
||||||
|
"""
|
||||||
|
client, version, os, os_vers = [None if (v or '').strip() == '' else v for v in (
|
||||||
|
target.split(':') + [None, None, None, None])][:4]
|
||||||
|
|
||||||
|
if client is not None:
|
||||||
|
return client, version, os, os_vers
|
||||||
|
|
||||||
|
|
||||||
|
def compile_impersonate_target(client, version, os, os_vers, *_) -> str | None:
|
||||||
|
if not client:
|
||||||
|
return
|
||||||
|
filtered_parts = [str(part) if part is not None else '' for part in (client, version, os, os_vers)]
|
||||||
|
return ':'.join(filtered_parts).rstrip(':')
|
||||||
|
|
Loading…
Reference in New Issue
Block a user