Compare commits

..

5 Commits

Author SHA1 Message Date
coletdjnz
7dcf23c03a
cleanup 2023-12-18 20:43:35 +13:00
coletdjnz
e9c206d982
generate README.md 2023-12-18 20:41:55 +13:00
coletdjnz
312c8a4ff5
Add ImpersonateTarget dataclass to replace tuple usage 2023-12-18 20:41:25 +13:00
coletdjnz
17923645d7
Remove get_handlers and collect_from_handlers from director 2023-12-18 19:35:32 +13:00
coletdjnz
3a158aff68
Remove _SUPPORTED_IMPERSONATE_TARGET_TUPLES 2023-12-18 19:20:50 +13:00
8 changed files with 169 additions and 156 deletions

View File

@ -475,8 +475,10 @@ If you fork the project on GitHub, you can run your fork's [build workflow](.git
direct connection direct connection
--socket-timeout SECONDS Time to wait before giving up, in seconds --socket-timeout SECONDS Time to wait before giving up, in seconds
--source-address IP Client-side IP address to bind to --source-address IP Client-side IP address to bind to
--impersonate CLIENT[:[VERSION][:[OS][:OS_VERSION]]] --impersonate [CLIENT[:[VERSION][:[OS][:OS_VERSION]]]]
Client to impersonate for requests Client to impersonate for requests. Pass in
an empty string (--impersonate "") to
impersonate any client
--list-impersonate-targets List available clients to impersonate --list-impersonate-targets List available clients to impersonate
-4, --force-ipv4 Make all connections via IPv4 -4, --force-ipv4 Make all connections via IPv4
-6, --force-ipv6 Make all connections via IPv6 -6, --force-ipv6 Make all connections via IPv6

View File

@ -50,12 +50,10 @@ from yt_dlp.networking.exceptions import (
TransportError, TransportError,
UnsupportedRequest, UnsupportedRequest,
) )
from yt_dlp.networking.impersonate import ImpersonateRequestHandler from yt_dlp.networking.impersonate import ImpersonateRequestHandler, ImpersonateTarget
from yt_dlp.utils._utils import _YDLLogger as FakeLogger from yt_dlp.utils._utils import _YDLLogger as FakeLogger
from yt_dlp.utils.networking import ( from yt_dlp.utils.networking import (
HTTPHeaderDict, HTTPHeaderDict,
compile_impersonate_target,
parse_impersonate_target,
std_headers, std_headers,
) )
@ -913,9 +911,9 @@ class TestCurlCFFIRequestHandler(TestRequestHandlerBase):
@pytest.mark.parametrize('handler', ['CurlCFFI'], indirect=True) @pytest.mark.parametrize('handler', ['CurlCFFI'], indirect=True)
@pytest.mark.parametrize('params,extensions', [ @pytest.mark.parametrize('params,extensions', [
({}, {'impersonate': ('chrome',)}), ({}, {'impersonate': ImpersonateTarget('chrome')}),
({'impersonate': ('chrome', '110')}, {}), ({'impersonate': ImpersonateTarget('chrome', '110')}, {}),
({'impersonate': ('chrome', '99')}, {'impersonate': ('chrome', '110')}), ({'impersonate': ImpersonateTarget('chrome', '99')}, {'impersonate': ImpersonateTarget('chrome', '110')}),
]) ])
def test_impersonate(self, handler, params, extensions): def test_impersonate(self, handler, params, extensions):
with handler(headers=std_headers, **params) as rh: with handler(headers=std_headers, **params) as rh:
@ -931,7 +929,7 @@ class TestCurlCFFIRequestHandler(TestRequestHandlerBase):
# Ensure curl-impersonate overrides our standard headers (usually added # Ensure curl-impersonate overrides our standard headers (usually added
res = validate_and_send( res = validate_and_send(
rh, Request(f'http://127.0.0.1:{self.http_port}/headers', extensions={ rh, Request(f'http://127.0.0.1:{self.http_port}/headers', extensions={
'impersonate': ('safari', )}, headers={'x-custom': 'test', 'sec-fetch-mode': 'custom'})).read().decode().lower() 'impersonate': ImpersonateTarget('safari')}, headers={'x-custom': 'test', 'sec-fetch-mode': 'custom'})).read().decode().lower()
assert std_headers['user-agent'].lower() not in res assert std_headers['user-agent'].lower() not in res
assert std_headers['accept-language'].lower() not in res assert std_headers['accept-language'].lower() not in res
@ -1143,11 +1141,12 @@ class TestRequestHandlerValidation:
({'timeout': 1}, False), ({'timeout': 1}, False),
({'timeout': 'notatimeout'}, AssertionError), ({'timeout': 'notatimeout'}, AssertionError),
({'unsupported': 'value'}, UnsupportedRequest), ({'unsupported': 'value'}, UnsupportedRequest),
({'impersonate': ('badtarget', None, None, None)}, UnsupportedRequest), ({'impersonate': ImpersonateTarget('badtarget', None, None, None)}, UnsupportedRequest),
({'impersonate': 123}, AssertionError), ({'impersonate': 123}, AssertionError),
({'impersonate': ('chrome', None, None, None)}, False), ({'impersonate': ImpersonateTarget('chrome', None, None, None)}, False),
({'impersonate': (None, None, None, None)}, False), ({'impersonate': ImpersonateTarget(None, None, None, None)}, False),
({'impersonate': ()}, False) ({'impersonate': ImpersonateTarget()}, False),
({'impersonate': 'chrome'}, AssertionError)
]), ]),
(NoCheckRH, 'http', [ (NoCheckRH, 'http', [
({'cookiejar': 'notacookiejar'}, False), ({'cookiejar': 'notacookiejar'}, False),
@ -1447,7 +1446,7 @@ class TestYoutubeDLNetworking:
RequestError, RequestError,
match=r'Impersonate target "test" is not available. This request requires browser impersonation' match=r'Impersonate target "test" is not available. This request requires browser impersonation'
): ):
ydl.urlopen(Request('http://', extensions={'impersonate': ('test', None, None, None)})) ydl.urlopen(Request('http://', extensions={'impersonate': ImpersonateTarget('test', None, None, None)}))
def test_unsupported_impersonate_extension(self): def test_unsupported_impersonate_extension(self):
class FakeHTTPRHYDL(FakeYDL): class FakeHTTPRHYDL(FakeYDL):
@ -1457,7 +1456,7 @@ class TestYoutubeDLNetworking:
pass pass
_SUPPORTED_URL_SCHEMES = ('http',) _SUPPORTED_URL_SCHEMES = ('http',)
_SUPPORTED_IMPERSONATE_TARGET_TUPLES = [('firefox',)] _SUPPORTED_IMPERSONATE_TARGET_MAP = {ImpersonateTarget('firefox',): 'test'}
_SUPPORTED_PROXY_SCHEMES = None _SUPPORTED_PROXY_SCHEMES = None
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
@ -1468,14 +1467,14 @@ class TestYoutubeDLNetworking:
RequestError, RequestError,
match=r'Impersonate target "test" is not available. This request requires browser impersonation' match=r'Impersonate target "test" is not available. This request requires browser impersonation'
): ):
ydl.urlopen(Request('http://', extensions={'impersonate': ('test', None, None, None)})) ydl.urlopen(Request('http://', extensions={'impersonate': ImpersonateTarget('test', None, None, None)}))
def test_raise_impersonate_error(self): def test_raise_impersonate_error(self):
with pytest.raises( with pytest.raises(
ValueError, ValueError,
match=r'Impersonate target "test" is not available. Use --list-impersonate-targets to see available targets.' match=r'Impersonate target "test" is not available. Use --list-impersonate-targets to see available targets.'
): ):
FakeYDL({'impersonate': ('test', None, None, None)}) FakeYDL({'impersonate': ImpersonateTarget('test', None, None, None)})
def test_pass_impersonate_param(self, monkeypatch): def test_pass_impersonate_param(self, monkeypatch):
@ -1484,17 +1483,17 @@ class TestYoutubeDLNetworking:
pass pass
_SUPPORTED_URL_SCHEMES = ('http',) _SUPPORTED_URL_SCHEMES = ('http',)
_SUPPORTED_IMPERSONATE_TARGET_TUPLES = [('firefox',)] _SUPPORTED_IMPERSONATE_TARGET_MAP = {ImpersonateTarget('firefox'): 'test'}
# Bypass the check on initialize # Bypass the check on initialize
brh = FakeYDL.build_request_director brh = FakeYDL.build_request_director
monkeypatch.setattr(FakeYDL, 'build_request_director', lambda cls, handlers, preferences=None: brh(cls, handlers=[IRH])) monkeypatch.setattr(FakeYDL, 'build_request_director', lambda cls, handlers, preferences=None: brh(cls, handlers=[IRH]))
with FakeYDL({ with FakeYDL({
'impersonate': ('firefox', None, None, None) 'impersonate': ImpersonateTarget('firefox', None, None, None)
}) as ydl: }) as ydl:
rh = self.build_handler(ydl, IRH) rh = self.build_handler(ydl, IRH)
assert rh.impersonate == ('firefox', None, None, None) assert rh.impersonate == ImpersonateTarget('firefox', None, None, None)
def test_get_impersonate_targets(self): def test_get_impersonate_targets(self):
handlers = [] handlers = []
@ -1503,17 +1502,21 @@ class TestYoutubeDLNetworking:
def _send(self, request: Request): def _send(self, request: Request):
pass pass
_SUPPORTED_URL_SCHEMES = ('http',) _SUPPORTED_URL_SCHEMES = ('http',)
_SUPPORTED_IMPERSONATE_TARGET_TUPLES = [(target_client,)] _SUPPORTED_IMPERSONATE_TARGET_MAP = {ImpersonateTarget(target_client,): 'test'}
RH_KEY = target_client RH_KEY = target_client
RH_NAME = target_client RH_NAME = target_client
handlers.append(TestRH) handlers.append(TestRH)
with FakeYDL() as ydl: with FakeYDL() as ydl:
ydl._request_director = ydl.build_request_director(handlers) ydl._request_director = ydl.build_request_director(handlers)
assert set(ydl.get_impersonate_targets()) == {('firefox', 'firefox'), ('chrome', 'chrome'), ('edge', 'edge')} assert set(ydl.get_available_impersonate_targets()) == {
assert ydl.impersonate_target_available(('firefox', )) (ImpersonateTarget('chrome'), 'chrome'),
assert ydl.impersonate_target_available(()) (ImpersonateTarget('firefox'), 'firefox'),
assert not ydl.impersonate_target_available(('safari',)) (ImpersonateTarget('edge'), 'edge')
}
assert ydl.impersonate_target_available(ImpersonateTarget('firefox'))
assert ydl.impersonate_target_available(ImpersonateTarget())
assert not ydl.impersonate_target_available(ImpersonateTarget('safari'))
@pytest.mark.parametrize('proxy_key,proxy_url,expected', [ @pytest.mark.parametrize('proxy_key,proxy_url,expected', [
('http', '__noproxy__', None), ('http', '__noproxy__', None),
@ -1809,38 +1812,51 @@ class TestResponse:
assert res.getheader('test') == res.get_header('test') assert res.getheader('test') == res.get_header('test')
# TODO: move these to test_utils.py when that moves to pytest class TestImpersonateTarget:
class TestImpersonate: @pytest.mark.parametrize('target_str,expected', [
@pytest.mark.parametrize('target,expected', [ ('firefox', ImpersonateTarget('firefox', None, None, None)),
('firefox', ('firefox', None, None, None)), ('firefox:120', ImpersonateTarget('firefox', '120', None, None)),
('firefox:120', ('firefox', '120', None, None)), ('firefox:120:linux', ImpersonateTarget('firefox', '120', 'linux', None)),
('firefox:120:linux', ('firefox', '120', 'linux', None)), ('firefox:120:linux:5', ImpersonateTarget('firefox', '120', 'linux', '5')),
('firefox:120:linux:5', ('firefox', '120', 'linux', '5')), ('firefox::linux', ImpersonateTarget('firefox', None, 'linux', None)),
('firefox::linux', ('firefox', None, 'linux', None)), ('firefox:::5', ImpersonateTarget('firefox', None, None, '5')),
('firefox:::5', ('firefox', None, None, '5')), ('firefox:::', ImpersonateTarget('firefox', None, None, None)),
('firefox:::', ('firefox', None, None, None)), ('firefox:120::5', ImpersonateTarget('firefox', '120', None, '5')),
('firefox:120::5', ('firefox', '120', None, '5')), ('firefox:120:', ImpersonateTarget('firefox', '120', None, None)),
('firefox:120:', ('firefox', '120', None, None)), ('::120', ImpersonateTarget(None, None, '120', None)),
('::120', (None, None, '120', None)), (':', ImpersonateTarget(None, None, None, None)),
(':', (None, None, None, None)), (':::', ImpersonateTarget(None, None, None, None)),
(':::', (None, None, None, None)), ('', ImpersonateTarget(None, None, None, None)),
('', (None, None, None, None)),
]) ])
def test_parse_impersonate_target(self, target, expected): def test_target_from_str(self, target_str, expected):
assert parse_impersonate_target(target) == expected assert ImpersonateTarget.from_str(target_str) == expected
@pytest.mark.parametrize('target_tuple,expected', [ @pytest.mark.parametrize('target,expected', [
(('firefox', None, None, None), 'firefox'), (ImpersonateTarget('firefox', None, None, None), 'firefox'),
(('firefox', '120', None, None), 'firefox:120'), (ImpersonateTarget('firefox', '120', None, None), 'firefox:120'),
(('firefox', '120', 'linux', None), 'firefox:120:linux'), (ImpersonateTarget('firefox', '120', 'linux', None), 'firefox:120:linux'),
(('firefox', '120', 'linux', '5'), 'firefox:120:linux:5'), (ImpersonateTarget('firefox', '120', 'linux', '5'), 'firefox:120:linux:5'),
(('firefox', None, 'linux', None), 'firefox::linux'), (ImpersonateTarget('firefox', None, 'linux', None), 'firefox::linux'),
(('firefox', None, None, '5'), 'firefox:::5'), (ImpersonateTarget('firefox', None, None, '5'), 'firefox:::5'),
(('firefox', '120', None, '5'), 'firefox:120::5'), (ImpersonateTarget('firefox', '120', None, '5'), 'firefox:120::5'),
((None, '120', None, None), ':120'), (ImpersonateTarget(None, '120', None, None), ':120'),
(('firefox', ), 'firefox'), (ImpersonateTarget('firefox', ), 'firefox'),
(('firefox', None, 'linux'), 'firefox::linux'), (ImpersonateTarget('firefox', None, 'linux'), 'firefox::linux'),
((None, None, None, None), ''), (ImpersonateTarget(None, None, None, None), ''),
]) ])
def test_compile_impersonate_target(self, target_tuple, expected): def test_str(self, target, expected):
assert compile_impersonate_target(*target_tuple) == expected assert str(target) == expected
@pytest.mark.parametrize('target1,target2,is_in,is_eq', [
(ImpersonateTarget('firefox', None, None, None), ImpersonateTarget('firefox', None, None, None), True, True),
(ImpersonateTarget('firefox', None, None, None), ImpersonateTarget('firefox', '120', None, None), True, False),
(ImpersonateTarget('firefox', None, 'linux', 'test'), ImpersonateTarget('firefox', '120', 'linux', None), True, False),
(ImpersonateTarget('firefox', '121', 'linux', 'test'), ImpersonateTarget('firefox', '120', 'linux', 'test'), False, False),
(ImpersonateTarget('firefox'), ImpersonateTarget('firefox', '120', 'linux', 'test'), True, False),
(ImpersonateTarget('firefox', '120', 'linux', 'test'), ImpersonateTarget('firefox'), True, False),
(ImpersonateTarget(), ImpersonateTarget('firefox', '120', 'linux'), True, False),
(ImpersonateTarget(), ImpersonateTarget(), True, True),
])
def test_impersonate_target_in(self, target1, target2, is_in, is_eq):
assert (target1 in target2) is is_in
assert (target1 == target2) is is_eq

View File

@ -164,7 +164,6 @@ from .utils.networking import (
HTTPHeaderDict, HTTPHeaderDict,
clean_headers, clean_headers,
clean_proxies, clean_proxies,
compile_impersonate_target,
std_headers, std_headers,
) )
from .version import CHANNEL, ORIGIN, RELEASE_GIT_HEAD, VARIANT, __version__ from .version import CHANNEL, ORIGIN, RELEASE_GIT_HEAD, VARIANT, __version__
@ -407,7 +406,7 @@ class YoutubeDL:
about it, warn otherwise (default) about it, warn otherwise (default)
source_address: Client-side IP address to bind to. source_address: Client-side IP address to bind to.
impersonate: Client to impersonate for requests. impersonate: Client to impersonate for requests.
A tuple in the form (client, version, os, os_version) An ImpersonateTarget (from yt_dlp.networking.impersonate)
sleep_interval_requests: Number of seconds to sleep between requests sleep_interval_requests: Number of seconds to sleep between requests
during extraction during extraction
sleep_interval: Number of seconds to sleep before each download when sleep_interval: Number of seconds to sleep before each download when
@ -718,7 +717,7 @@ class YoutubeDL:
# This assumes that all handlers that support impersonation subclass ImpersonateRequestHandler # This assumes that all handlers that support impersonation subclass ImpersonateRequestHandler
if not self.impersonate_target_available(impersonate_target): if not self.impersonate_target_available(impersonate_target):
raise ValueError( raise ValueError(
f'Impersonate target "{compile_impersonate_target(*self.params.get("impersonate"))}" is not available. ' f'Impersonate target "{self.params.get("impersonate")}" is not available. '
f'Use --list-impersonate-targets to see available targets.') f'Use --list-impersonate-targets to see available targets.')
if 'list-formats' in self.params['compat_opts']: if 'list-formats' in self.params['compat_opts']:
@ -4049,16 +4048,18 @@ class YoutubeDL:
handler = self._request_director.handlers['Urllib'] handler = self._request_director.handlers['Urllib']
return handler._get_instance(cookiejar=self.cookiejar, proxies=self.proxies) return handler._get_instance(cookiejar=self.cookiejar, proxies=self.proxies)
def get_impersonate_targets(self): def get_available_impersonate_targets(self):
return sorted(self._request_director.collect_from_handlers( return sorted(
lambda rh: [(*target, rh.RH_NAME) for target in rh.get_supported_targets()], itertools.chain.from_iterable(
[lambda _, v: isinstance(v, ImpersonateRequestHandler)] [[(target, rh.RH_NAME) for target in rh.supported_targets]
), key=lambda x: x[0]) for rh in self._request_director.handlers.values()
if isinstance(rh, ImpersonateRequestHandler)]), key=lambda x: x[0])
def impersonate_target_available(self, target): def impersonate_target_available(self, target):
return any(self._request_director.collect_from_handlers( return any(
lambda x: [x.is_supported_target(target)], rh.is_supported_target(target)
[lambda _, v: isinstance(v, ImpersonateRequestHandler)])) for rh in self._request_director.handlers.values()
if isinstance(rh, ImpersonateRequestHandler))
def urlopen(self, req): def urlopen(self, req):
""" Start an HTTP download """ """ Start an HTTP download """
@ -4109,7 +4110,7 @@ class YoutubeDL:
elif re.match(r'unsupported (?:extensions: impersonate|impersonate target)', ue.msg.lower()): elif re.match(r'unsupported (?:extensions: impersonate|impersonate target)', ue.msg.lower()):
raise RequestError( raise RequestError(
f'Impersonate target "{compile_impersonate_target(*req.extensions["impersonate"])}" is not available.' f'Impersonate target "{req.extensions["impersonate"]}" is not available.'
f' This request requires browser impersonation, however you may be missing dependencies' f' This request requires browser impersonation, however you may be missing dependencies'
f' required to support this target. See the documentation for more information.') f' required to support this target. See the documentation for more information.')
raise raise

View File

@ -1,5 +1,7 @@
import sys import sys
from .networking.impersonate import ImpersonateTarget
if sys.version_info < (3, 8): if sys.version_info < (3, 8):
raise ImportError( raise ImportError(
f'You are using an unsupported version of Python. Only Python versions 3.8 and above are supported by yt-dlp') # noqa: F541 f'You are using an unsupported version of Python. Only Python versions 3.8 and above are supported by yt-dlp') # noqa: F541
@ -60,7 +62,7 @@ from .utils import (
variadic, variadic,
write_string, write_string,
) )
from .utils.networking import std_headers, parse_impersonate_target, compile_impersonate_target from .utils.networking import std_headers
from .YoutubeDL import YoutubeDL from .YoutubeDL import YoutubeDL
_IN_CLI = False _IN_CLI = False
@ -387,10 +389,7 @@ def validate_options(opts):
opts.cookiesfrombrowser = (browser_name, profile, keyring, container) opts.cookiesfrombrowser = (browser_name, profile, keyring, container)
if opts.impersonate is not None: if opts.impersonate is not None:
target = parse_impersonate_target(opts.impersonate) opts.impersonate = ImpersonateTarget.from_str(opts.impersonate)
if target is None:
raise ValueError(f'invalid impersonate target "{opts.impersonate}"')
opts.impersonate = target
# MetadataParser # MetadataParser
def metadataparser_actions(f): def metadataparser_actions(f):
@ -986,9 +985,11 @@ def _real_main(argv=None):
ydl._download_retcode = 100 ydl._download_retcode = 100
if opts.list_impersonate_targets: if opts.list_impersonate_targets:
available_targets = ydl.get_impersonate_targets() available_targets = ydl.get_available_impersonate_targets()
rows = [[*[item or '' for item in target], compile_impersonate_target(*target)] for target in rows = [
available_targets] [target.client, target.version, target.os, target.os_vers, handler, str(target)]
for target, handler in available_targets
]
ydl.to_screen('[info] Available impersonate targets') ydl.to_screen('[info] Available impersonate targets')
ydl.to_stdout( ydl.to_stdout(

View File

@ -18,7 +18,7 @@ from .exceptions import (
SSLError, SSLError,
TransportError, TransportError,
) )
from .impersonate import ImpersonateRequestHandler from .impersonate import ImpersonateRequestHandler, ImpersonateTarget
from ..dependencies import curl_cffi from ..dependencies import curl_cffi
from ..utils import int_or_none from ..utils import int_or_none
@ -106,17 +106,17 @@ class CurlCFFIRH(ImpersonateRequestHandler, InstanceStoreMixin):
_SUPPORTED_URL_SCHEMES = ('http', 'https') _SUPPORTED_URL_SCHEMES = ('http', 'https')
_SUPPORTED_FEATURES = (Features.NO_PROXY, Features.ALL_PROXY) _SUPPORTED_FEATURES = (Features.NO_PROXY, Features.ALL_PROXY)
_SUPPORTED_PROXY_SCHEMES = ('http', 'https', 'socks4', 'socks4a', 'socks5', 'socks5h') _SUPPORTED_PROXY_SCHEMES = ('http', 'https', 'socks4', 'socks4a', 'socks5', 'socks5h')
_SUPPORTED_IMPERSONATE_TARGET_TUPLE_MAP = { _SUPPORTED_IMPERSONATE_TARGET_MAP = {
('chrome', '110', 'windows', '10'): curl_cffi.requests.BrowserType.chrome110, ImpersonateTarget('chrome', '110', 'windows', '10'): curl_cffi.requests.BrowserType.chrome110,
('chrome', '107', 'windows', '10'): curl_cffi.requests.BrowserType.chrome107, ImpersonateTarget('chrome', '107', 'windows', '10'): curl_cffi.requests.BrowserType.chrome107,
('chrome', '104', 'windows', '10'): curl_cffi.requests.BrowserType.chrome104, ImpersonateTarget('chrome', '104', 'windows', '10'): curl_cffi.requests.BrowserType.chrome104,
('chrome', '101', 'windows', '10'): curl_cffi.requests.BrowserType.chrome101, ImpersonateTarget('chrome', '101', 'windows', '10'): curl_cffi.requests.BrowserType.chrome101,
('chrome', '99', 'windows', '10'): curl_cffi.requests.BrowserType.chrome99, ImpersonateTarget('chrome', '99', 'windows', '10'): curl_cffi.requests.BrowserType.chrome99,
('chrome', '99', 'android', '12'): curl_cffi.requests.BrowserType.chrome99_android, ImpersonateTarget('chrome', '99', 'android', '12'): curl_cffi.requests.BrowserType.chrome99_android,
('edge', '101', 'windows', '10'): curl_cffi.requests.BrowserType.edge101, ImpersonateTarget('edge', '101', 'windows', '10'): curl_cffi.requests.BrowserType.edge101,
('edge', '99', 'windows', '10'): curl_cffi.requests.BrowserType.edge99, ImpersonateTarget('edge', '99', 'windows', '10'): curl_cffi.requests.BrowserType.edge99,
('safari', '15.5', 'macos', '12.4'): curl_cffi.requests.BrowserType.safari15_5, ImpersonateTarget('safari', '15.5', 'macos', '12.4'): curl_cffi.requests.BrowserType.safari15_5,
('safari', '15.3', 'macos', '11.6.4'): curl_cffi.requests.BrowserType.safari15_3, ImpersonateTarget('safari', '15.3', 'macos', '11.6.4'): curl_cffi.requests.BrowserType.safari15_3,
} }
def _create_instance(self, cookiejar=None): def _create_instance(self, cookiejar=None):

View File

@ -5,7 +5,6 @@ import copy
import enum import enum
import functools import functools
import io import io
import itertools
import typing import typing
import urllib.parse import urllib.parse
import urllib.request import urllib.request
@ -75,22 +74,6 @@ class RequestDirector:
assert isinstance(handler, RequestHandler), 'handler must be a RequestHandler' assert isinstance(handler, RequestHandler), 'handler must be a RequestHandler'
self.handlers[handler.RH_KEY] = handler self.handlers[handler.RH_KEY] = handler
def get_handlers(self, filters=None):
"""Return filtered handlers
@param filters: list of filters in the form of func(key, value) -> bool
"""
if not filters:
filters = []
return dict(filter(lambda x: all(f(x[0], x[1]) for f in filters), self.handlers.items()))
def collect_from_handlers(self, collect_func, filters=None):
"""
Collects data from handlers
@param collect_func: function to collect data from a handler, in the form of func(handler) -> Iterable
@param filters: list of filters for get_handlers()
"""
return list(itertools.chain.from_iterable(collect_func(rh) for rh in self.get_handlers(filters).values()))
def _get_handlers(self, request: Request) -> list[RequestHandler]: def _get_handlers(self, request: Request) -> list[RequestHandler]:
"""Sorts handlers by preference, given a request""" """Sorts handlers by preference, given a request"""
preferences = { preferences = {

View File

@ -1,26 +1,60 @@
from __future__ import annotations from __future__ import annotations
from abc import ABC from abc import ABC
from typing import Any, Optional, Tuple from dataclasses import dataclass
from typing import Any, Optional
from .common import RequestHandler, register_preference from .common import RequestHandler, register_preference
from .exceptions import UnsupportedRequest from .exceptions import UnsupportedRequest
from ..compat.types import NoneType from ..compat.types import NoneType
from ..utils import classproperty
from ..utils.networking import std_headers from ..utils.networking import std_headers
ImpersonateTarget = Tuple[Optional[str], Optional[str], Optional[str], Optional[str]]
@dataclass(order=True)
class ImpersonateTarget:
"""
A target for browser impersonation.
def _target_within(target1: ImpersonateTarget, target2: ImpersonateTarget): Parameters:
for i in range(0, min(len(target1), len(target2))): @param client: the client to impersonate
if ( @param version: the client version to impersonate
target1[i] @param os: the client OS to impersonate
and target2[i] @param os_vers: the client OS version to impersonate
and target1[i] != target2[i]
): Note: None is used to indicate to match any.
"""
client: Optional[str] = None
version: Optional[str] = None
os: Optional[str] = None
os_vers: Optional[str] = None
def __contains__(self, target: ImpersonateTarget):
if not isinstance(target, ImpersonateTarget):
return False return False
return (
(self.client is None or target.client is None or self.client == target.client)
and (self.version is None or target.version is None or self.version == target.version)
and (self.os is None or target.os is None or self.os == target.os)
and (self.os_vers is None or target.os_vers is None or self.os_vers == target.os_vers)
)
return True def __str__(self):
filtered_parts = [
str(part) if part is not None else ''
for part in (self.client, self.version, self.os, self.os_vers)
]
return ':'.join(filtered_parts).rstrip(':')
@classmethod
def from_str(cls, target: str):
return ImpersonateTarget(*[
None if (v or '').strip() == '' else v
for v in (target.split(':') + [None, None, None, None])[:4]
])
def __hash__(self):
return hash((self.client, self.version, self.os, self.os_vers))
class ImpersonateRequestHandler(RequestHandler, ABC): class ImpersonateRequestHandler(RequestHandler, ABC):
@ -30,33 +64,28 @@ class ImpersonateRequestHandler(RequestHandler, ABC):
This provides a method for checking the validity of the impersonate extension, This provides a method for checking the validity of the impersonate extension,
which can be used in _check_extensions. which can be used in _check_extensions.
Impersonate targets are defined as a tuple of (client, version, os, os_vers). Impersonate targets consist of a client, version, os and os_vers.
Note: Impersonate targets are not required to define all fields (except client). See the ImpersonateTarget class for more details.
The following may be defined: The following may be defined:
- `_SUPPORTED_IMPERSONATE_TARGET_TUPLES`: a tuple of supported targets to impersonate. - `_SUPPORTED_IMPERSONATE_TARGET_MAP`: a dict mapping supported targets to custom object.
Any Request with an impersonate target not in this list will raise an UnsupportedRequest. Any Request with an impersonate target not in this list will raise an UnsupportedRequest.
Set to None to disable this check. Set to None to disable this check.
- `_SUPPORTED_IMPERSONATE_TARGET_TUPLE_MAP`: a dict mapping supported targets to custom targets.
This works similar to `_SUPPORTED_IMPERSONATE_TARGET_TUPLES`.
Note: Only one of `_SUPPORTED_IMPERSONATE_TARGET_TUPLE_MAP` and `_SUPPORTED_IMPERSONATE_TARGET_TUPLES` can be defined.
Note: Entries are in order of preference Note: Entries are in order of preference
Parameters: Parameters:
@param impersonate: the default impersonate target to use for requests. @param impersonate: the default impersonate target to use for requests.
Set to None to disable impersonation. Set to None to disable impersonation.
""" """
_SUPPORTED_IMPERSONATE_TARGET_TUPLES: tuple[ImpersonateTarget] = () _SUPPORTED_IMPERSONATE_TARGET_MAP: dict[ImpersonateTarget, Any] = {}
_SUPPORTED_IMPERSONATE_TARGET_TUPLE_MAP: dict[ImpersonateTarget, Any] = {}
def __init__(self, *, impersonate: ImpersonateTarget = None, **kwargs): def __init__(self, *, impersonate: ImpersonateTarget = None, **kwargs):
super().__init__(**kwargs) super().__init__(**kwargs)
self.impersonate = impersonate self.impersonate = impersonate
def _check_impersonate_target(self, target: ImpersonateTarget): def _check_impersonate_target(self, target: ImpersonateTarget):
assert isinstance(target, (tuple, NoneType)) assert isinstance(target, (ImpersonateTarget, NoneType))
if target is None or not self.get_supported_targets(): if target is None or not self.supported_targets:
return return
if not self.is_supported_target(target): if not self.is_supported_target(target):
raise UnsupportedRequest(f'Unsupported impersonate target: {target}') raise UnsupportedRequest(f'Unsupported impersonate target: {target}')
@ -74,31 +103,29 @@ class ImpersonateRequestHandler(RequestHandler, ABC):
"""Resolve a target to a supported target.""" """Resolve a target to a supported target."""
if target is None: if target is None:
return return
for supported_target in self.get_supported_targets(): for supported_target in self.supported_targets:
if _target_within(target, supported_target): if target in supported_target:
if self.verbose: if self.verbose:
self._logger.stdout( self._logger.stdout(
f'{self.RH_NAME}: resolved impersonate target {target} to {supported_target}') f'{self.RH_NAME}: resolved impersonate target {target} to {supported_target}')
return supported_target return supported_target
def get_supported_targets(self) -> tuple[ImpersonateTarget]: @classproperty
return tuple(self._SUPPORTED_IMPERSONATE_TARGET_TUPLE_MAP.keys()) or tuple(self._SUPPORTED_IMPERSONATE_TARGET_TUPLES) def supported_targets(self) -> tuple[ImpersonateTarget]:
return tuple(self._SUPPORTED_IMPERSONATE_TARGET_MAP.keys())
def is_supported_target(self, target: ImpersonateTarget): def is_supported_target(self, target: ImpersonateTarget):
assert isinstance(target, ImpersonateTarget)
return self._resolve_target(target) is not None return self._resolve_target(target) is not None
def _get_request_target(self, request): def _get_request_target(self, request):
"""Get the requested target for the request""" """Get the requested target for the request"""
return request.extensions.get('impersonate') or self.impersonate return request.extensions.get('impersonate') or self.impersonate
def _get_resolved_request_target(self, request) -> ImpersonateTarget:
"""Get the resolved target for this request. This gives the matching supported target"""
return self._resolve_target(self._get_request_target(request))
def _get_mapped_request_target(self, request): def _get_mapped_request_target(self, request):
"""Get the resolved mapped target for the request target""" """Get the resolved mapped target for the request target"""
resolved_target = self._resolve_target(self._get_request_target(request)) resolved_target = self._resolve_target(self._get_request_target(request))
return self._SUPPORTED_IMPERSONATE_TARGET_TUPLE_MAP.get( return self._SUPPORTED_IMPERSONATE_TARGET_MAP.get(
resolved_target, None) resolved_target, None)
def _get_impersonate_headers(self, request): def _get_impersonate_headers(self, request):

View File

@ -165,20 +165,3 @@ def normalize_url(url):
query=escape_rfc3986(url_parsed.query), query=escape_rfc3986(url_parsed.query),
fragment=escape_rfc3986(url_parsed.fragment) fragment=escape_rfc3986(url_parsed.fragment)
).geturl() ).geturl()
def parse_impersonate_target(target: str) -> Tuple[Optional[str], Optional[str], Optional[str], Optional[str]] | None:
"""
Parse an impersonate target string into a tuple of (client, version, os, os_vers)
If the target is invalid, return None
"""
client, version, os, os_vers = [None if (v or '').strip() == '' else v for v in (
target.split(':') + [None, None, None, None])][:4]
return client, version, os, os_vers
def compile_impersonate_target(*args) -> str | None:
client, version, os, os_vers = (list(args) + [None, None, None, None])[:4]
filtered_parts = [str(part) if part is not None else '' for part in (client, version, os, os_vers)]
return ':'.join(filtered_parts).rstrip(':')