mirror of
https://github.com/yt-dlp/yt-dlp.git
synced 2024-11-25 00:31:26 +01:00
Compare commits
3 Commits
9ca8d32788
...
40fa635759
Author | SHA1 | Date | |
---|---|---|---|
|
40fa635759 | ||
|
f97ab81997 | ||
|
180109028d |
|
@ -475,8 +475,9 @@ If you fork the project on GitHub, you can run your fork's [build workflow](.git
|
||||||
direct connection
|
direct connection
|
||||||
--socket-timeout SECONDS Time to wait before giving up, in seconds
|
--socket-timeout SECONDS Time to wait before giving up, in seconds
|
||||||
--source-address IP Client-side IP address to bind to
|
--source-address IP Client-side IP address to bind to
|
||||||
--impersonate TARGET curl-impersonate target name to impersonate
|
--impersonate CLIENT[:[VERSION][:[OS][:OS_VERSION]]]
|
||||||
for requests.
|
Client to impersonate for requests
|
||||||
|
--list-impersonate-targets List available clients to impersonate
|
||||||
-4, --force-ipv4 Make all connections via IPv4
|
-4, --force-ipv4 Make all connections via IPv4
|
||||||
-6, --force-ipv6 Make all connections via IPv6
|
-6, --force-ipv6 Make all connections via IPv6
|
||||||
--enable-file-urls Enable file:// URLs. This is disabled by
|
--enable-file-urls Enable file:// URLs. This is disabled by
|
||||||
|
|
|
@ -29,7 +29,7 @@ from http.cookiejar import CookieJar
|
||||||
from test.conftest import validate_and_send
|
from test.conftest import validate_and_send
|
||||||
from test.helper import FakeYDL, http_server_port
|
from test.helper import FakeYDL, http_server_port
|
||||||
from yt_dlp.cookies import YoutubeDLCookieJar
|
from yt_dlp.cookies import YoutubeDLCookieJar
|
||||||
from yt_dlp.dependencies import brotli, requests, urllib3
|
from yt_dlp.dependencies import brotli, requests, urllib3, curl_cffi
|
||||||
from yt_dlp.networking import (
|
from yt_dlp.networking import (
|
||||||
HEADRequest,
|
HEADRequest,
|
||||||
PUTRequest,
|
PUTRequest,
|
||||||
|
@ -913,9 +913,9 @@ class TestCurlCFFIRequestHandler(TestRequestHandlerBase):
|
||||||
|
|
||||||
@pytest.mark.parametrize('handler', ['CurlCFFI'], indirect=True)
|
@pytest.mark.parametrize('handler', ['CurlCFFI'], indirect=True)
|
||||||
@pytest.mark.parametrize('params,extensions', [
|
@pytest.mark.parametrize('params,extensions', [
|
||||||
({}, {'impersonate': 'chrome:110'}),
|
({}, {'impersonate': ('chrome',)}),
|
||||||
({'impersonate': 'chrome:110'}, {}),
|
({'impersonate': ('chrome', '110')}, {}),
|
||||||
({'impersonate': 'chrome:99'}, {'impersonate': 'chrome:110'})
|
({'impersonate': ('chrome', '99')}, {'impersonate': ('chrome', '110')}),
|
||||||
])
|
])
|
||||||
def test_impersonate(self, handler, params, extensions):
|
def test_impersonate(self, handler, params, extensions):
|
||||||
with handler(headers=std_headers, **params) as rh:
|
with handler(headers=std_headers, **params) as rh:
|
||||||
|
@ -931,7 +931,7 @@ class TestCurlCFFIRequestHandler(TestRequestHandlerBase):
|
||||||
# Ensure curl-impersonate overrides our standard headers (usually added
|
# Ensure curl-impersonate overrides our standard headers (usually added
|
||||||
res = validate_and_send(
|
res = validate_and_send(
|
||||||
rh, Request(f'http://127.0.0.1:{self.http_port}/headers', extensions={
|
rh, Request(f'http://127.0.0.1:{self.http_port}/headers', extensions={
|
||||||
'impersonate': 'safari'}, headers={'x-custom': 'test', 'sec-fetch-mode': 'custom'})).read().decode().lower()
|
'impersonate': ('safari', )}, headers={'x-custom': 'test', 'sec-fetch-mode': 'custom'})).read().decode().lower()
|
||||||
|
|
||||||
assert std_headers['user-agent'].lower() not in res
|
assert std_headers['user-agent'].lower() not in res
|
||||||
assert std_headers['accept-language'].lower() not in res
|
assert std_headers['accept-language'].lower() not in res
|
||||||
|
@ -946,6 +946,74 @@ class TestCurlCFFIRequestHandler(TestRequestHandlerBase):
|
||||||
assert std_headers['accept-language'].lower() in res
|
assert std_headers['accept-language'].lower() in res
|
||||||
assert 'x-custom: test' in res
|
assert 'x-custom: test' in res
|
||||||
|
|
||||||
|
@pytest.mark.parametrize('raised,expected,match', [
|
||||||
|
(lambda: curl_cffi.requests.errors.RequestsError(
|
||||||
|
'', code=curl_cffi.const.CurlECode.PARTIAL_FILE), IncompleteRead, None),
|
||||||
|
(lambda: curl_cffi.requests.errors.RequestsError(
|
||||||
|
'', code=curl_cffi.const.CurlECode.OPERATION_TIMEDOUT), TransportError, None),
|
||||||
|
(lambda: curl_cffi.requests.errors.RequestsError(
|
||||||
|
'', code=curl_cffi.const.CurlECode.RECV_ERROR), TransportError, None),
|
||||||
|
])
|
||||||
|
@pytest.mark.parametrize('handler', ['CurlCFFI'], indirect=True)
|
||||||
|
def test_response_error_mapping(self, handler, monkeypatch, raised, expected, match):
|
||||||
|
import curl_cffi.requests
|
||||||
|
from yt_dlp.networking._curlcffi import CurlCFFIResponseAdapter
|
||||||
|
curl_res = curl_cffi.requests.Response()
|
||||||
|
res = CurlCFFIResponseAdapter(curl_res)
|
||||||
|
|
||||||
|
def mock_read(*args, **kwargs):
|
||||||
|
try:
|
||||||
|
raise raised()
|
||||||
|
except Exception as e:
|
||||||
|
e.response = curl_res
|
||||||
|
raise
|
||||||
|
monkeypatch.setattr(res.fp, 'read', mock_read)
|
||||||
|
|
||||||
|
with pytest.raises(expected, match=match) as exc_info:
|
||||||
|
res.read()
|
||||||
|
|
||||||
|
assert exc_info.type is expected
|
||||||
|
|
||||||
|
@pytest.mark.parametrize('raised,expected,match', [
|
||||||
|
(lambda: curl_cffi.requests.errors.RequestsError(
|
||||||
|
'', code=curl_cffi.const.CurlECode.OPERATION_TIMEDOUT), TransportError, None),
|
||||||
|
(lambda: curl_cffi.requests.errors.RequestsError(
|
||||||
|
'', code=curl_cffi.const.CurlECode.PEER_FAILED_VERIFICATION), CertificateVerifyError, None),
|
||||||
|
(lambda: curl_cffi.requests.errors.RequestsError(
|
||||||
|
'', code=curl_cffi.const.CurlECode.SSL_CONNECT_ERROR), SSLError, None),
|
||||||
|
(lambda: curl_cffi.requests.errors.RequestsError(
|
||||||
|
'', code=curl_cffi.const.CurlECode.TOO_MANY_REDIRECTS), HTTPError, None),
|
||||||
|
(lambda: curl_cffi.requests.errors.RequestsError(
|
||||||
|
'', code=curl_cffi.const.CurlECode.PROXY), ProxyError, None),
|
||||||
|
])
|
||||||
|
@pytest.mark.parametrize('handler', ['CurlCFFI'], indirect=True)
|
||||||
|
def test_request_error_mapping(self, handler, monkeypatch, raised, expected, match):
|
||||||
|
import curl_cffi.requests
|
||||||
|
curl_res = curl_cffi.requests.Response()
|
||||||
|
curl_res.status_code = 301
|
||||||
|
|
||||||
|
with handler() as rh:
|
||||||
|
original_get_instance = rh._get_instance
|
||||||
|
|
||||||
|
def mock_get_instance(*args, **kwargs):
|
||||||
|
instance = original_get_instance(*args, **kwargs)
|
||||||
|
|
||||||
|
def request(*_, **__):
|
||||||
|
try:
|
||||||
|
raise raised()
|
||||||
|
except Exception as e:
|
||||||
|
e.response = curl_res
|
||||||
|
raise
|
||||||
|
monkeypatch.setattr(instance, 'request', request)
|
||||||
|
return instance
|
||||||
|
|
||||||
|
monkeypatch.setattr(rh, '_get_instance', mock_get_instance)
|
||||||
|
|
||||||
|
with pytest.raises(expected) as exc_info:
|
||||||
|
rh.send(Request('http://fake'))
|
||||||
|
|
||||||
|
assert exc_info.type is expected
|
||||||
|
|
||||||
|
|
||||||
def run_validation(handler, error, req, **handler_kwargs):
|
def run_validation(handler, error, req, **handler_kwargs):
|
||||||
with handler(**handler_kwargs) as rh:
|
with handler(**handler_kwargs) as rh:
|
||||||
|
@ -1074,9 +1142,9 @@ class TestRequestHandlerValidation:
|
||||||
({'timeout': 1}, False),
|
({'timeout': 1}, False),
|
||||||
({'timeout': 'notatimeout'}, AssertionError),
|
({'timeout': 'notatimeout'}, AssertionError),
|
||||||
({'unsupported': 'value'}, UnsupportedRequest),
|
({'unsupported': 'value'}, UnsupportedRequest),
|
||||||
({'impersonate': 'badtarget'}, UnsupportedRequest),
|
({'impersonate': ('badtarget', None, None, None)}, UnsupportedRequest),
|
||||||
({'impersonate': 123}, AssertionError),
|
({'impersonate': 123}, AssertionError),
|
||||||
({'impersonate': 'chrome'}, False)
|
({'impersonate': ('chrome', None, None, None)}, False)
|
||||||
]),
|
]),
|
||||||
(NoCheckRH, 'http', [
|
(NoCheckRH, 'http', [
|
||||||
({'cookiejar': 'notacookiejar'}, False),
|
({'cookiejar': 'notacookiejar'}, False),
|
||||||
|
@ -1155,6 +1223,10 @@ class FakeResponse(Response):
|
||||||
|
|
||||||
class FakeRH(RequestHandler):
|
class FakeRH(RequestHandler):
|
||||||
|
|
||||||
|
def __init__(self, *args, **params):
|
||||||
|
self.params = params
|
||||||
|
super().__init__(*args, **params)
|
||||||
|
|
||||||
def _validate(self, request):
|
def _validate(self, request):
|
||||||
return
|
return
|
||||||
|
|
||||||
|
@ -1360,6 +1432,72 @@ class TestYoutubeDLNetworking:
|
||||||
with pytest.raises(SSLError, match='testerror'):
|
with pytest.raises(SSLError, match='testerror'):
|
||||||
ydl.urlopen('ssl://testerror')
|
ydl.urlopen('ssl://testerror')
|
||||||
|
|
||||||
|
def test_unsupported_impersonate_target(self):
|
||||||
|
class FakeImpersonationRHYDL(FakeYDL):
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
class HTTPRH(RequestHandler):
|
||||||
|
def _send(self, request: Request):
|
||||||
|
pass
|
||||||
|
_SUPPORTED_URL_SCHEMES = ('http',)
|
||||||
|
_SUPPORTED_PROXY_SCHEMES = None
|
||||||
|
|
||||||
|
super().__init__(*args, **kwargs)
|
||||||
|
self._request_director = self.build_request_director([HTTPRH])
|
||||||
|
|
||||||
|
with FakeImpersonationRHYDL() as ydl:
|
||||||
|
with pytest.raises(
|
||||||
|
RequestError,
|
||||||
|
match=r'Impersonate target "test" is not available. This request requires browser impersonation'
|
||||||
|
):
|
||||||
|
ydl.urlopen(Request('http://', extensions={'impersonate': ('test', None, None, None)}))
|
||||||
|
|
||||||
|
def test_unsupported_impersonate_extension(self):
|
||||||
|
class FakeHTTPRHYDL(FakeYDL):
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
class IRH(ImpersonateRequestHandler):
|
||||||
|
def _send(self, request: Request):
|
||||||
|
pass
|
||||||
|
|
||||||
|
_SUPPORTED_URL_SCHEMES = ('http',)
|
||||||
|
_SUPPORTED_IMPERSONATE_TARGET_TUPLES = [('firefox',)]
|
||||||
|
_SUPPORTED_PROXY_SCHEMES = None
|
||||||
|
|
||||||
|
super().__init__(*args, **kwargs)
|
||||||
|
self._request_director = self.build_request_director([IRH])
|
||||||
|
|
||||||
|
with FakeHTTPRHYDL() as ydl:
|
||||||
|
with pytest.raises(
|
||||||
|
RequestError,
|
||||||
|
match=r'Impersonate target "test" is not available. This request requires browser impersonation'
|
||||||
|
):
|
||||||
|
ydl.urlopen(Request('http://', extensions={'impersonate': ('test', None, None, None)}))
|
||||||
|
|
||||||
|
def test_raise_impersonate_error(self):
|
||||||
|
with pytest.raises(
|
||||||
|
ValueError,
|
||||||
|
match=r'Impersonate target "test" is not available. Use --list-impersonate-targets to see available targets.'
|
||||||
|
):
|
||||||
|
FakeYDL({'impersonate': ('test', None, None, None)})
|
||||||
|
|
||||||
|
def test_pass_impersonate_param(self, monkeypatch):
|
||||||
|
|
||||||
|
class IRH(ImpersonateRequestHandler):
|
||||||
|
def _send(self, request: Request):
|
||||||
|
pass
|
||||||
|
|
||||||
|
_SUPPORTED_URL_SCHEMES = ('http',)
|
||||||
|
_SUPPORTED_IMPERSONATE_TARGET_TUPLES = [('firefox',)]
|
||||||
|
|
||||||
|
# Bypass the check on initialize
|
||||||
|
brh = FakeYDL.build_request_director
|
||||||
|
monkeypatch.setattr(FakeYDL, 'build_request_director', lambda cls, handlers, preferences=None: brh(cls, handlers=[IRH]))
|
||||||
|
|
||||||
|
with FakeYDL({
|
||||||
|
'impersonate': ('firefox', None, None, None)
|
||||||
|
}) as ydl:
|
||||||
|
rh = self.build_handler(ydl, IRH)
|
||||||
|
assert rh.impersonate == ('firefox', None, None, None)
|
||||||
|
|
||||||
@pytest.mark.parametrize('proxy_key,proxy_url,expected', [
|
@pytest.mark.parametrize('proxy_key,proxy_url,expected', [
|
||||||
('http', '__noproxy__', None),
|
('http', '__noproxy__', None),
|
||||||
('no', '127.0.0.1,foo.bar', '127.0.0.1,foo.bar'),
|
('no', '127.0.0.1,foo.bar', '127.0.0.1,foo.bar'),
|
||||||
|
@ -1660,6 +1798,7 @@ class TestResponse:
|
||||||
assert res.getheader('test') == res.get_header('test')
|
assert res.getheader('test') == res.get_header('test')
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: move these to test_utils.py when that moves to pytest
|
||||||
class TestImpersonate:
|
class TestImpersonate:
|
||||||
@pytest.mark.parametrize('target,expected', [
|
@pytest.mark.parametrize('target,expected', [
|
||||||
('firefox', ('firefox', None, None, None)),
|
('firefox', ('firefox', None, None, None)),
|
||||||
|
@ -1684,7 +1823,9 @@ class TestImpersonate:
|
||||||
(('firefox', None, 'linux', None), 'firefox::linux'),
|
(('firefox', None, 'linux', None), 'firefox::linux'),
|
||||||
(('firefox', None, None, '5'), 'firefox:::5'),
|
(('firefox', None, None, '5'), 'firefox:::5'),
|
||||||
(('firefox', '120', None, '5'), 'firefox:120::5'),
|
(('firefox', '120', None, '5'), 'firefox:120::5'),
|
||||||
((None, '120', None, None), None)
|
((None, '120', None, None), None),
|
||||||
|
(('firefox', ), 'firefox'),
|
||||||
|
(('firefox', None, 'linux'), 'firefox::linux'),
|
||||||
])
|
])
|
||||||
def test_compile_impersonate_target(self, target_tuple, expected):
|
def test_compile_impersonate_target(self, target_tuple, expected):
|
||||||
assert compile_impersonate_target(*target_tuple) == expected
|
assert compile_impersonate_target(*target_tuple) == expected
|
||||||
|
|
|
@ -24,6 +24,7 @@ import traceback
|
||||||
import unicodedata
|
import unicodedata
|
||||||
|
|
||||||
from .cache import Cache
|
from .cache import Cache
|
||||||
|
|
||||||
from .compat import functools, urllib # isort: split
|
from .compat import functools, urllib # isort: split
|
||||||
from .compat import compat_os_name, compat_shlex_quote, urllib_req_to_req
|
from .compat import compat_os_name, compat_shlex_quote, urllib_req_to_req
|
||||||
from .cookies import LenientSimpleCookie, load_cookies
|
from .cookies import LenientSimpleCookie, load_cookies
|
||||||
|
@ -43,7 +44,7 @@ from .networking.exceptions import (
|
||||||
_CompatHTTPError,
|
_CompatHTTPError,
|
||||||
network_exceptions,
|
network_exceptions,
|
||||||
)
|
)
|
||||||
from .networking.impersonate import ImpersonateRequestHandler, get_available_impersonate_targets
|
from .networking.impersonate import ImpersonateRequestHandler
|
||||||
from .plugins import directories as plugin_directories
|
from .plugins import directories as plugin_directories
|
||||||
from .postprocessor import _PLUGIN_CLASSES as plugin_pps
|
from .postprocessor import _PLUGIN_CLASSES as plugin_pps
|
||||||
from .postprocessor import (
|
from .postprocessor import (
|
||||||
|
@ -61,7 +62,13 @@ from .postprocessor import (
|
||||||
get_postprocessor,
|
get_postprocessor,
|
||||||
)
|
)
|
||||||
from .postprocessor.ffmpeg import resolve_mapping as resolve_recode_mapping
|
from .postprocessor.ffmpeg import resolve_mapping as resolve_recode_mapping
|
||||||
from .update import REPOSITORY, _get_system_deprecation, _make_label, current_git_head, detect_variant
|
from .update import (
|
||||||
|
REPOSITORY,
|
||||||
|
_get_system_deprecation,
|
||||||
|
_make_label,
|
||||||
|
current_git_head,
|
||||||
|
detect_variant,
|
||||||
|
)
|
||||||
from .utils import (
|
from .utils import (
|
||||||
DEFAULT_OUTTMPL,
|
DEFAULT_OUTTMPL,
|
||||||
IDENTITY,
|
IDENTITY,
|
||||||
|
@ -157,9 +164,8 @@ from .utils.networking import (
|
||||||
HTTPHeaderDict,
|
HTTPHeaderDict,
|
||||||
clean_headers,
|
clean_headers,
|
||||||
clean_proxies,
|
clean_proxies,
|
||||||
|
compile_impersonate_target,
|
||||||
std_headers,
|
std_headers,
|
||||||
parse_impersonate_target,
|
|
||||||
compile_impersonate_target
|
|
||||||
)
|
)
|
||||||
from .version import CHANNEL, ORIGIN, RELEASE_GIT_HEAD, VARIANT, __version__
|
from .version import CHANNEL, ORIGIN, RELEASE_GIT_HEAD, VARIANT, __version__
|
||||||
|
|
||||||
|
@ -714,10 +720,7 @@ class YoutubeDL:
|
||||||
lambda x: [x.is_supported_target(impersonate_target)],
|
lambda x: [x.is_supported_target(impersonate_target)],
|
||||||
[lambda _, v: isinstance(v, ImpersonateRequestHandler)]
|
[lambda _, v: isinstance(v, ImpersonateRequestHandler)]
|
||||||
)
|
)
|
||||||
if not results:
|
if not any(results):
|
||||||
self.report_warning('Ignoring --impersonate as required dependencies are not installed. ')
|
|
||||||
|
|
||||||
elif not any(results):
|
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
f'Impersonate target "{compile_impersonate_target(*self.params.get("impersonate"))}" is not available. '
|
f'Impersonate target "{compile_impersonate_target(*self.params.get("impersonate"))}" is not available. '
|
||||||
f'Use --list-impersonate-targets to see available targets.')
|
f'Use --list-impersonate-targets to see available targets.')
|
||||||
|
@ -3911,10 +3914,9 @@ class YoutubeDL:
|
||||||
|
|
||||||
# These imports can be slow. So import them only as needed
|
# These imports can be slow. So import them only as needed
|
||||||
from .extractor.extractors import _LAZY_LOADER
|
from .extractor.extractors import _LAZY_LOADER
|
||||||
from .extractor.extractors import (
|
from .extractor.extractors import _PLUGIN_CLASSES as plugin_ies
|
||||||
_PLUGIN_CLASSES as plugin_ies,
|
from .extractor.extractors import \
|
||||||
_PLUGIN_OVERRIDES as plugin_ie_overrides
|
_PLUGIN_OVERRIDES as plugin_ie_overrides
|
||||||
)
|
|
||||||
|
|
||||||
def get_encoding(stream):
|
def get_encoding(stream):
|
||||||
ret = str(getattr(stream, 'encoding', 'missing (%s)' % type(stream).__name__))
|
ret = str(getattr(stream, 'encoding', 'missing (%s)' % type(stream).__name__))
|
||||||
|
@ -4055,7 +4057,7 @@ class YoutubeDL:
|
||||||
return sorted(self._request_director.collect_from_handlers(
|
return sorted(self._request_director.collect_from_handlers(
|
||||||
lambda rh: [(*target, rh.RH_NAME) for target in rh.get_supported_targets()],
|
lambda rh: [(*target, rh.RH_NAME) for target in rh.get_supported_targets()],
|
||||||
[lambda _, v: isinstance(v, ImpersonateRequestHandler)]
|
[lambda _, v: isinstance(v, ImpersonateRequestHandler)]
|
||||||
), key=lambda x: x[1][0])
|
), key=lambda x: x[0])
|
||||||
|
|
||||||
def urlopen(self, req):
|
def urlopen(self, req):
|
||||||
""" Start an HTTP download """
|
""" Start an HTTP download """
|
||||||
|
@ -4088,7 +4090,10 @@ class YoutubeDL:
|
||||||
raise RequestError(
|
raise RequestError(
|
||||||
'file:// URLs are disabled by default in yt-dlp for security reasons. '
|
'file:// URLs are disabled by default in yt-dlp for security reasons. '
|
||||||
'Use --enable-file-urls to enable at your own risk.', cause=ue) from ue
|
'Use --enable-file-urls to enable at your own risk.', cause=ue) from ue
|
||||||
if 'unsupported proxy type: "https"' in ue.msg.lower():
|
if (
|
||||||
|
'unsupported proxy type: "https"' in ue.msg.lower()
|
||||||
|
and 'requests' not in self._request_director.handlers
|
||||||
|
):
|
||||||
raise RequestError(
|
raise RequestError(
|
||||||
'To use an HTTPS proxy for this request, one of the following dependencies needs to be installed: requests')
|
'To use an HTTPS proxy for this request, one of the following dependencies needs to be installed: requests')
|
||||||
|
|
||||||
|
@ -4100,23 +4105,12 @@ class YoutubeDL:
|
||||||
'This request requires WebSocket support. '
|
'This request requires WebSocket support. '
|
||||||
'Ensure one of the following dependencies are installed: websockets',
|
'Ensure one of the following dependencies are installed: websockets',
|
||||||
cause=ue) from ue
|
cause=ue) from ue
|
||||||
"""
|
|
||||||
ue = traverse_obj(
|
|
||||||
unsupported_errors,
|
|
||||||
(lambda _, v: isinstance(v.handler, ImpersonateRequestHandler) and 'unsupported impersonate target' in v.msg.lower()), get_all=False)
|
|
||||||
if ue:
|
|
||||||
# TODO: when we have multiple impersonation, will need to make this handle
|
|
||||||
# cases where the unsupported target is due to a missing library.
|
|
||||||
raise RequestError(
|
|
||||||
f'The requested impersonation target is not supported: {req.extensions.get("impersonate")}.', cause=ue) from ue
|
|
||||||
|
|
||||||
if list(filter(lambda ue: re.search(r'unsupported extensions:.*impersonate', ue.msg.lower()), unsupported_errors)):
|
elif re.match(r'unsupported (?:extensions: impersonate|impersonate target)', ue.msg.lower()):
|
||||||
self.report_warning(
|
raise RequestError(
|
||||||
'To impersonate a browser for this request please install one of: curl_cffi. '
|
f'Impersonate target "{compile_impersonate_target(*req.extensions["impersonate"])}" is not available.'
|
||||||
'Retrying request without impersonation...')
|
f' This request requires browser impersonation, however you may be missing dependencies'
|
||||||
new_req = req.copy()
|
f' required to support this target. See the documentation for more information.')
|
||||||
new_req.extensions.pop('impersonate')
|
|
||||||
return _urlopen(new_req)"""
|
|
||||||
raise
|
raise
|
||||||
except SSLError as e:
|
except SSLError as e:
|
||||||
if 'UNSAFE_LEGACY_RENEGOTIATION_DISABLED' in str(e):
|
if 'UNSAFE_LEGACY_RENEGOTIATION_DISABLED' in str(e):
|
||||||
|
|
|
@ -990,10 +990,15 @@ def _real_main(argv=None):
|
||||||
rows = [[*[item or '' for item in target], compile_impersonate_target(*target)] for target in
|
rows = [[*[item or '' for item in target], compile_impersonate_target(*target)] for target in
|
||||||
available_targets]
|
available_targets]
|
||||||
|
|
||||||
ydl.to_screen(f'[info] Available impersonate targets')
|
ydl.to_screen('[info] Available impersonate targets')
|
||||||
ydl.to_stdout(
|
ydl.to_stdout(
|
||||||
render_table(['Client', 'Version', 'OS', 'OS Version', 'Handler', 'Example'], rows)
|
render_table(['Client', 'Version', 'OS', 'OS Version', 'Handler', 'Example'], rows)
|
||||||
)
|
)
|
||||||
|
if not available_targets:
|
||||||
|
ydl.to_stdout('You are missing dependencies for impersonation. See the README for more info.')
|
||||||
|
ydl.to_stdout(
|
||||||
|
'If the above table is missing targets, you may be missing dependencies for impersonation. '
|
||||||
|
'See the documentation for more information.')
|
||||||
return
|
return
|
||||||
|
|
||||||
if not actual_use:
|
if not actual_use:
|
||||||
|
|
|
@ -97,7 +97,7 @@ class CurlCFFIResponseAdapter(Response):
|
||||||
partial=self.fp.bytes_read,
|
partial=self.fp.bytes_read,
|
||||||
expected=content_length - self.fp.bytes_read if content_length is not None else None,
|
expected=content_length - self.fp.bytes_read if content_length is not None else None,
|
||||||
cause=e) from e
|
cause=e) from e
|
||||||
raise
|
raise TransportError(cause=e) from e
|
||||||
|
|
||||||
|
|
||||||
@register_rh
|
@register_rh
|
||||||
|
@ -198,13 +198,6 @@ class CurlCFFIRH(ImpersonateRequestHandler, InstanceStoreMixin):
|
||||||
max_redirects_exceeded = True
|
max_redirects_exceeded = True
|
||||||
curl_response = e.response
|
curl_response = e.response
|
||||||
|
|
||||||
elif e.code == CurlECode.PARTIAL_FILE:
|
|
||||||
partial = e.response.content
|
|
||||||
content_length = int_or_none(e.response.headers.get('Content-Length'))
|
|
||||||
raise IncompleteRead(
|
|
||||||
partial=len(partial),
|
|
||||||
expected=content_length - len(partial) if content_length is not None else None,
|
|
||||||
cause=e) from e
|
|
||||||
elif e.code == CurlECode.PROXY:
|
elif e.code == CurlECode.PROXY:
|
||||||
raise ProxyError(cause=e) from e
|
raise ProxyError(cause=e) from e
|
||||||
else:
|
else:
|
||||||
|
|
|
@ -15,7 +15,7 @@ def _target_within(target1: ImpersonateTarget, target2: ImpersonateTarget):
|
||||||
if target1[0] != target2[0]:
|
if target1[0] != target2[0]:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
for i in range(1, len(target2)):
|
for i in range(1, min(len(target1), len(target2))):
|
||||||
if (
|
if (
|
||||||
target1[i]
|
target1[i]
|
||||||
and target2[i]
|
and target2[i]
|
||||||
|
@ -120,9 +120,3 @@ def impersonate_preference(rh, request):
|
||||||
if request.extensions.get('impersonate') or rh.impersonate:
|
if request.extensions.get('impersonate') or rh.impersonate:
|
||||||
return 1000
|
return 1000
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
def get_available_impersonate_targets(director):
|
|
||||||
return director.collect_from_handlers(
|
|
||||||
lambda x: x.get_supported_targets(),
|
|
||||||
[lambda _, v: isinstance(v, ImpersonateRequestHandler)]
|
|
||||||
)
|
|
||||||
|
|
|
@ -519,7 +519,7 @@ def create_parser():
|
||||||
network.add_option(
|
network.add_option(
|
||||||
'--list-impersonate-targets',
|
'--list-impersonate-targets',
|
||||||
dest='list_impersonate_targets', default=False, action='store_true',
|
dest='list_impersonate_targets', default=False, action='store_true',
|
||||||
help='List available HTTP clients to impersonate',
|
help='List available clients to impersonate',
|
||||||
)
|
)
|
||||||
network.add_option(
|
network.add_option(
|
||||||
'-4', '--force-ipv4',
|
'-4', '--force-ipv4',
|
||||||
|
|
|
@ -179,7 +179,8 @@ def parse_impersonate_target(target: str) -> Tuple[str, Optional[str], Optional[
|
||||||
return client, version, os, os_vers
|
return client, version, os, os_vers
|
||||||
|
|
||||||
|
|
||||||
def compile_impersonate_target(client, version, os, os_vers, *_) -> str | None:
|
def compile_impersonate_target(*args) -> str | None:
|
||||||
|
client, version, os, os_vers = (list(args) + [None, None, None, None])[:4]
|
||||||
if not client:
|
if not client:
|
||||||
return
|
return
|
||||||
filtered_parts = [str(part) if part is not None else '' for part in (client, version, os, os_vers)]
|
filtered_parts = [str(part) if part is not None else '' for part in (client, version, os, os_vers)]
|
||||||
|
|
Loading…
Reference in New Issue
Block a user