mirror of
https://github.com/yt-dlp/yt-dlp.git
synced 2024-11-25 00:31:26 +01:00
Compare commits
2 Commits
77fd3567f4
...
5e1200ac69
Author | SHA1 | Date | |
---|---|---|---|
|
5e1200ac69 | ||
|
ed1fa68f75 |
|
@ -525,10 +525,10 @@ class TestHTTPRequestHandler(TestRequestHandlerBase):
|
|||
validate_and_send(
|
||||
rh, Request(f'http://127.0.0.1:{self.http_port}/timeout_1'))
|
||||
|
||||
with handler(timeout=0.01) as rh:
|
||||
with handler(timeout=0.1) as rh:
|
||||
with pytest.raises(TransportError):
|
||||
validate_and_send(
|
||||
rh, Request(f'http://127.0.0.1:{self.http_port}/timeout_1'))
|
||||
rh, Request(f'http://127.0.0.1:{self.http_port}/timeout_5'))
|
||||
|
||||
# Per request timeout, should override handler timeout
|
||||
validate_and_send(
|
||||
|
@ -1372,15 +1372,10 @@ class TestYoutubeDLNetworking:
|
|||
('', {'all': '__noproxy__'}),
|
||||
(None, {'http': 'http://127.0.0.1:8081', 'https': 'http://127.0.0.1:8081'}) # env, set https
|
||||
])
|
||||
def test_proxy(self, proxy, expected):
|
||||
old_http_proxy = os.environ.get('HTTP_PROXY')
|
||||
try:
|
||||
os.environ['HTTP_PROXY'] = 'http://127.0.0.1:8081' # ensure that provided proxies override env
|
||||
with FakeYDL({'proxy': proxy}) as ydl:
|
||||
assert ydl.proxies == expected
|
||||
finally:
|
||||
if old_http_proxy:
|
||||
os.environ['HTTP_PROXY'] = old_http_proxy
|
||||
def test_proxy(self, proxy, expected, monkeypatch):
|
||||
monkeypatch.setenv('HTTP_PROXY', 'http://127.0.0.1:8081')
|
||||
with FakeYDL({'proxy': proxy}) as ydl:
|
||||
assert ydl.proxies == expected
|
||||
|
||||
def test_compat_request(self):
|
||||
with FakeRHYDL() as ydl:
|
||||
|
@ -1508,23 +1503,17 @@ class TestYoutubeDLNetworking:
|
|||
('http', 'socks4://example.com', 'socks4://example.com'),
|
||||
('unrelated', '/bad/proxy', '/bad/proxy'), # clean_proxies should ignore bad proxies
|
||||
])
|
||||
def test_clean_proxy(self, proxy_key, proxy_url, expected):
|
||||
def test_clean_proxy(self, proxy_key, proxy_url, expected, monkeypatch):
|
||||
# proxies should be cleaned in urlopen()
|
||||
with FakeRHYDL() as ydl:
|
||||
req = ydl.urlopen(Request('test://', proxies={proxy_key: proxy_url})).request
|
||||
assert req.proxies[proxy_key] == expected
|
||||
|
||||
# and should also be cleaned when building the handler
|
||||
env_key = f'{proxy_key.upper()}_PROXY'
|
||||
old_env_proxy = os.environ.get(env_key)
|
||||
try:
|
||||
os.environ[env_key] = proxy_url # ensure that provided proxies override env
|
||||
with FakeYDL() as ydl:
|
||||
rh = self.build_handler(ydl)
|
||||
assert rh.proxies[proxy_key] == expected
|
||||
finally:
|
||||
if old_env_proxy:
|
||||
os.environ[env_key] = old_env_proxy
|
||||
monkeypatch.setenv(f'{proxy_key.upper()}_PROXY', proxy_url)
|
||||
with FakeYDL() as ydl:
|
||||
rh = self.build_handler(ydl)
|
||||
assert rh.proxies[proxy_key] == expected
|
||||
|
||||
def test_clean_proxy_header(self):
|
||||
with FakeRHYDL() as ydl:
|
||||
|
|
|
@ -138,13 +138,13 @@ class CurlCFFIRH(ImpersonateRequestHandler, InstanceStoreMixin):
|
|||
|
||||
proxies = self._get_proxies(request)
|
||||
if 'no' in proxies:
|
||||
session.curl.setopt(CurlOpt.NOPROXY, proxies['no'].encode())
|
||||
session.curl.setopt(CurlOpt.NOPROXY, proxies['no'])
|
||||
proxies.pop('no', None)
|
||||
|
||||
# curl doesn't support per protocol proxies, so we select the one that matches the request protocol
|
||||
proxy = select_proxy(request.url, proxies=proxies)
|
||||
if proxy:
|
||||
session.curl.setopt(CurlOpt.PROXY, proxy.encode())
|
||||
session.curl.setopt(CurlOpt.PROXY, proxy)
|
||||
scheme = urllib.parse.urlparse(request.url).scheme.lower()
|
||||
if scheme != 'http':
|
||||
# Enable HTTP CONNECT for HTTPS urls.
|
||||
|
@ -155,13 +155,13 @@ class CurlCFFIRH(ImpersonateRequestHandler, InstanceStoreMixin):
|
|||
headers = self._get_impersonate_headers(request)
|
||||
|
||||
if self._client_cert:
|
||||
session.curl.setopt(CurlOpt.SSLCERT, self._client_cert['client_certificate'].encode())
|
||||
session.curl.setopt(CurlOpt.SSLCERT, self._client_cert['client_certificate'])
|
||||
client_certificate_key = self._client_cert.get('client_certificate_key')
|
||||
client_certificate_password = self._client_cert.get('client_certificate_password')
|
||||
if client_certificate_key:
|
||||
session.curl.setopt(CurlOpt.SSLKEY, client_certificate_key.encode())
|
||||
session.curl.setopt(CurlOpt.SSLKEY, client_certificate_key)
|
||||
if client_certificate_password:
|
||||
session.curl.setopt(CurlOpt.KEYPASSWD, client_certificate_password.encode())
|
||||
session.curl.setopt(CurlOpt.KEYPASSWD, client_certificate_password)
|
||||
|
||||
timeout = self._calculate_timeout(request)
|
||||
|
||||
|
|
Loading…
Reference in New Issue
Block a user