mirror of
https://github.com/yt-dlp/yt-dlp.git
synced 2024-09-29 05:41:27 +02:00
Compare commits
3 Commits
edc0d055d4
...
7abee30b07
Author | SHA1 | Date | |
---|---|---|---|
|
7abee30b07 | ||
|
97df30b9b8 | ||
|
77292013f6 |
|
@ -50,13 +50,13 @@ from yt_dlp.networking.exceptions import (
|
||||||
TransportError,
|
TransportError,
|
||||||
UnsupportedRequest,
|
UnsupportedRequest,
|
||||||
)
|
)
|
||||||
from yt_dlp.networking.impersonate import ImpersonateRequestHandler, ImpersonateTarget
|
from yt_dlp.networking.impersonate import (
|
||||||
from yt_dlp.utils._utils import _YDLLogger as FakeLogger
|
ImpersonateRequestHandler,
|
||||||
from yt_dlp.utils import YoutubeDLError
|
ImpersonateTarget,
|
||||||
from yt_dlp.utils.networking import (
|
|
||||||
HTTPHeaderDict,
|
|
||||||
std_headers,
|
|
||||||
)
|
)
|
||||||
|
from yt_dlp.utils import YoutubeDLError
|
||||||
|
from yt_dlp.utils._utils import _YDLLogger as FakeLogger
|
||||||
|
from yt_dlp.utils.networking import HTTPHeaderDict, std_headers
|
||||||
|
|
||||||
TEST_DIR = os.path.dirname(os.path.abspath(__file__))
|
TEST_DIR = os.path.dirname(os.path.abspath(__file__))
|
||||||
|
|
||||||
|
@ -774,6 +774,21 @@ class TestHTTPImpersonateRequestHandler(TestRequestHandlerBase):
|
||||||
assert res.status == 200
|
assert res.status == 200
|
||||||
assert std_headers['user-agent'].lower() not in res.read().decode().lower()
|
assert std_headers['user-agent'].lower() not in res.read().decode().lower()
|
||||||
|
|
||||||
|
@pytest.mark.parametrize('impersonate', [True, False])
|
||||||
|
def test_headers_blacklist(self, handler, impersonate):
|
||||||
|
with handler() as rh:
|
||||||
|
for header in rh._IMPERSONATE_HEADERS_BLACKLIST:
|
||||||
|
supported_target = rh.supported_targets[0]
|
||||||
|
res = validate_and_send(rh, Request(
|
||||||
|
f'http://127.0.0.1:{self.http_port}/headers',
|
||||||
|
headers={header: 'testvalue'}, extensions={'impersonate': supported_target} if impersonate else {}))
|
||||||
|
assert res.status == 200
|
||||||
|
sent_headers = res.read().decode()
|
||||||
|
if impersonate:
|
||||||
|
assert f'{header}: testvalue'.lower() not in sent_headers.lower()
|
||||||
|
else:
|
||||||
|
assert f'{header}: testvalue'.lower() in sent_headers.lower()
|
||||||
|
|
||||||
|
|
||||||
class TestUrllibRequestHandler(TestRequestHandlerBase):
|
class TestUrllibRequestHandler(TestRequestHandlerBase):
|
||||||
@pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
|
@pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
|
||||||
|
@ -936,27 +951,6 @@ class TestCurlCFFIRequestHandler(TestRequestHandlerBase):
|
||||||
# Check that user agent is added over ours
|
# Check that user agent is added over ours
|
||||||
assert 'User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36' in res
|
assert 'User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36' in res
|
||||||
|
|
||||||
@pytest.mark.parametrize('handler', ['CurlCFFI'], indirect=True)
|
|
||||||
def test_headers(self, handler):
|
|
||||||
with handler(headers=std_headers) as rh:
|
|
||||||
# Ensure curl-impersonate overrides our standard headers (usually added
|
|
||||||
res = validate_and_send(
|
|
||||||
rh, Request(f'http://127.0.0.1:{self.http_port}/headers', extensions={
|
|
||||||
'impersonate': ImpersonateTarget('safari')}, headers={'x-custom': 'test', 'sec-fetch-mode': 'custom'})).read().decode().lower()
|
|
||||||
|
|
||||||
assert std_headers['user-agent'].lower() not in res
|
|
||||||
assert std_headers['accept-language'].lower() not in res
|
|
||||||
assert std_headers['sec-fetch-mode'].lower() not in res
|
|
||||||
# other than UA, custom headers that differ from std_headers should be kept
|
|
||||||
assert 'sec-fetch-mode: custom' in res
|
|
||||||
assert 'x-custom: test' in res
|
|
||||||
# but when not impersonating don't remove std_headers
|
|
||||||
res = validate_and_send(
|
|
||||||
rh, Request(f'http://127.0.0.1:{self.http_port}/headers', headers={'x-custom': 'test'})).read().decode().lower()
|
|
||||||
# std_headers should be present
|
|
||||||
for k, v in std_headers.items():
|
|
||||||
assert f'{k}: {v}'.lower() in res
|
|
||||||
|
|
||||||
@pytest.mark.parametrize('raised,expected,match', [
|
@pytest.mark.parametrize('raised,expected,match', [
|
||||||
(lambda: curl_cffi.requests.errors.RequestsError(
|
(lambda: curl_cffi.requests.errors.RequestsError(
|
||||||
'', code=curl_cffi.const.CurlECode.PARTIAL_FILE), IncompleteRead, None),
|
'', code=curl_cffi.const.CurlECode.PARTIAL_FILE), IncompleteRead, None),
|
||||||
|
|
|
@ -101,6 +101,7 @@ from .utils import (
|
||||||
SameFileError,
|
SameFileError,
|
||||||
UnavailableVideoError,
|
UnavailableVideoError,
|
||||||
UserNotLive,
|
UserNotLive,
|
||||||
|
YoutubeDLError,
|
||||||
age_restricted,
|
age_restricted,
|
||||||
args_to_str,
|
args_to_str,
|
||||||
bug_reports_message,
|
bug_reports_message,
|
||||||
|
@ -157,7 +158,6 @@ from .utils import (
|
||||||
windows_enable_vt_mode,
|
windows_enable_vt_mode,
|
||||||
write_json_file,
|
write_json_file,
|
||||||
write_string,
|
write_string,
|
||||||
YoutubeDLError,
|
|
||||||
)
|
)
|
||||||
from .utils._utils import _YDLLogger
|
from .utils._utils import _YDLLogger
|
||||||
from .utils.networking import (
|
from .utils.networking import (
|
||||||
|
|
|
@ -987,15 +987,39 @@ def _real_main(argv=None):
|
||||||
ydl._download_retcode = 100
|
ydl._download_retcode = 100
|
||||||
|
|
||||||
if opts.list_impersonate_targets:
|
if opts.list_impersonate_targets:
|
||||||
|
|
||||||
|
known_targets = [
|
||||||
|
# List of simplified targets we know are supported,
|
||||||
|
# to help users know what dependencies may be required.
|
||||||
|
(ImpersonateTarget('chrome'), 'curl_cffi'),
|
||||||
|
(ImpersonateTarget('edge'), 'curl_cffi'),
|
||||||
|
(ImpersonateTarget('safari'), 'curl_cffi'),
|
||||||
|
(ImpersonateTarget('chrome', os='android'), 'curl_cffi'),
|
||||||
|
]
|
||||||
|
|
||||||
available_targets = ydl.get_available_impersonate_targets()
|
available_targets = ydl.get_available_impersonate_targets()
|
||||||
|
|
||||||
rows = [
|
rows = [
|
||||||
[target.client, target.version, target.os, target.os_vers, handler]
|
[target.client or '-', target.version or '-', target.os or '-', target.os_vers or '-', handler]
|
||||||
for target, handler in available_targets
|
for target, handler in available_targets
|
||||||
]
|
]
|
||||||
|
|
||||||
|
for known_target, known_handler in known_targets:
|
||||||
|
if not any(
|
||||||
|
known_target in target and handler == known_handler
|
||||||
|
for target, handler in available_targets
|
||||||
|
):
|
||||||
|
rows.append([
|
||||||
|
ydl._format_out(known_target.client or '-', ydl.Styles.SUPPRESS),
|
||||||
|
ydl._format_out(known_target.version or '-', ydl.Styles.SUPPRESS),
|
||||||
|
ydl._format_out(known_target.os or '-', ydl.Styles.SUPPRESS),
|
||||||
|
ydl._format_out(known_target.os_vers or '-', ydl.Styles.SUPPRESS),
|
||||||
|
ydl._format_out(f'{known_handler} (not installed)', ydl.Styles.SUPPRESS),
|
||||||
|
])
|
||||||
|
|
||||||
ydl.to_screen('[info] Available impersonate targets')
|
ydl.to_screen('[info] Available impersonate targets')
|
||||||
ydl.to_stdout(
|
ydl.to_stdout(
|
||||||
render_table(['Client', 'Version', 'OS', 'OS Version', 'Source'], rows)
|
render_table(['Client', 'Version', 'OS', 'OS Version', 'Source'], rows, extra_gap=1)
|
||||||
)
|
)
|
||||||
return
|
return
|
||||||
|
|
||||||
|
|
|
@ -8,7 +8,6 @@ from .common import RequestHandler, register_preference
|
||||||
from .exceptions import UnsupportedRequest
|
from .exceptions import UnsupportedRequest
|
||||||
from ..compat.types import NoneType
|
from ..compat.types import NoneType
|
||||||
from ..utils import classproperty
|
from ..utils import classproperty
|
||||||
from ..utils.networking import std_headers
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass(order=True)
|
@dataclass(order=True)
|
||||||
|
@ -78,6 +77,26 @@ class ImpersonateRequestHandler(RequestHandler, ABC):
|
||||||
"""
|
"""
|
||||||
_SUPPORTED_IMPERSONATE_TARGET_MAP: dict[ImpersonateTarget, Any] = {}
|
_SUPPORTED_IMPERSONATE_TARGET_MAP: dict[ImpersonateTarget, Any] = {}
|
||||||
|
|
||||||
|
_IMPERSONATE_HEADERS_BLACKLIST = [
|
||||||
|
# Headers to remove from provided headers when impersonating.
|
||||||
|
# In the networking framework, the provided headers are intended
|
||||||
|
# to give a consistent user agent across request handlers.
|
||||||
|
# However, it is intended that the impersonation implementation will add the required headers to mimic a client.
|
||||||
|
# So we need to remove provided headers that may interfere with this behaviour.
|
||||||
|
# TODO(future): Add a method of excluding headers from this blacklist, such as User-Agent in certain cases.
|
||||||
|
# TODO(future): "Accept" should be included here, however it is currently required for some sites.
|
||||||
|
'User-Agent',
|
||||||
|
'Accept-Language',
|
||||||
|
'Sec-Fetch-Mode',
|
||||||
|
'Sec-Fetch-Site',
|
||||||
|
'Sec-Fetch-User',
|
||||||
|
'Sec-Fetch-Dest',
|
||||||
|
'Upgrade-Insecure-Requests',
|
||||||
|
'Sec-Ch-Ua',
|
||||||
|
'Sec-Ch-Ua-Mobile',
|
||||||
|
'Sec-Ch-Ua-Platform',
|
||||||
|
]
|
||||||
|
|
||||||
def __init__(self, *, impersonate: ImpersonateTarget = None, **kwargs):
|
def __init__(self, *, impersonate: ImpersonateTarget = None, **kwargs):
|
||||||
super().__init__(**kwargs)
|
super().__init__(**kwargs)
|
||||||
self.impersonate = impersonate
|
self.impersonate = impersonate
|
||||||
|
@ -130,10 +149,8 @@ class ImpersonateRequestHandler(RequestHandler, ABC):
|
||||||
def _get_impersonate_headers(self, request):
|
def _get_impersonate_headers(self, request):
|
||||||
headers = self._merge_headers(request.headers)
|
headers = self._merge_headers(request.headers)
|
||||||
if self._get_request_target(request) is not None:
|
if self._get_request_target(request) is not None:
|
||||||
# remove all headers present in std_headers
|
for header in self._IMPERSONATE_HEADERS_BLACKLIST:
|
||||||
for header in std_headers:
|
headers.pop(header, None)
|
||||||
if header in headers and std_headers[header] == headers[header]:
|
|
||||||
headers.pop(header, None)
|
|
||||||
return headers
|
return headers
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue
Block a user