Compare commits

...

2 Commits

Author SHA1 Message Date
coletdjnz
5e1200ac69
linter 2023-12-03 14:43:16 +13:00
coletdjnz
ed1fa68f75
fix various test bugs 2023-12-03 14:37:41 +13:00
2 changed files with 16 additions and 27 deletions

View File

@ -525,10 +525,10 @@ class TestHTTPRequestHandler(TestRequestHandlerBase):
validate_and_send(
rh, Request(f'http://127.0.0.1:{self.http_port}/timeout_1'))
with handler(timeout=0.01) as rh:
with handler(timeout=0.1) as rh:
with pytest.raises(TransportError):
validate_and_send(
rh, Request(f'http://127.0.0.1:{self.http_port}/timeout_1'))
rh, Request(f'http://127.0.0.1:{self.http_port}/timeout_5'))
# Per request timeout, should override handler timeout
validate_and_send(
@ -1372,15 +1372,10 @@ class TestYoutubeDLNetworking:
('', {'all': '__noproxy__'}),
(None, {'http': 'http://127.0.0.1:8081', 'https': 'http://127.0.0.1:8081'}) # env, set https
])
def test_proxy(self, proxy, expected):
old_http_proxy = os.environ.get('HTTP_PROXY')
try:
os.environ['HTTP_PROXY'] = 'http://127.0.0.1:8081' # ensure that provided proxies override env
def test_proxy(self, proxy, expected, monkeypatch):
monkeypatch.setenv('HTTP_PROXY', 'http://127.0.0.1:8081')
with FakeYDL({'proxy': proxy}) as ydl:
assert ydl.proxies == expected
finally:
if old_http_proxy:
os.environ['HTTP_PROXY'] = old_http_proxy
def test_compat_request(self):
with FakeRHYDL() as ydl:
@ -1508,23 +1503,17 @@ class TestYoutubeDLNetworking:
('http', 'socks4://example.com', 'socks4://example.com'),
('unrelated', '/bad/proxy', '/bad/proxy'), # clean_proxies should ignore bad proxies
])
def test_clean_proxy(self, proxy_key, proxy_url, expected):
def test_clean_proxy(self, proxy_key, proxy_url, expected, monkeypatch):
# proxies should be cleaned in urlopen()
with FakeRHYDL() as ydl:
req = ydl.urlopen(Request('test://', proxies={proxy_key: proxy_url})).request
assert req.proxies[proxy_key] == expected
# and should also be cleaned when building the handler
env_key = f'{proxy_key.upper()}_PROXY'
old_env_proxy = os.environ.get(env_key)
try:
os.environ[env_key] = proxy_url # ensure that provided proxies override env
monkeypatch.setenv(f'{proxy_key.upper()}_PROXY', proxy_url)
with FakeYDL() as ydl:
rh = self.build_handler(ydl)
assert rh.proxies[proxy_key] == expected
finally:
if old_env_proxy:
os.environ[env_key] = old_env_proxy
def test_clean_proxy_header(self):
with FakeRHYDL() as ydl:

View File

@ -138,13 +138,13 @@ class CurlCFFIRH(ImpersonateRequestHandler, InstanceStoreMixin):
proxies = self._get_proxies(request)
if 'no' in proxies:
session.curl.setopt(CurlOpt.NOPROXY, proxies['no'].encode())
session.curl.setopt(CurlOpt.NOPROXY, proxies['no'])
proxies.pop('no', None)
# curl doesn't support per protocol proxies, so we select the one that matches the request protocol
proxy = select_proxy(request.url, proxies=proxies)
if proxy:
session.curl.setopt(CurlOpt.PROXY, proxy.encode())
session.curl.setopt(CurlOpt.PROXY, proxy)
scheme = urllib.parse.urlparse(request.url).scheme.lower()
if scheme != 'http':
# Enable HTTP CONNECT for HTTPS urls.
@ -155,13 +155,13 @@ class CurlCFFIRH(ImpersonateRequestHandler, InstanceStoreMixin):
headers = self._get_impersonate_headers(request)
if self._client_cert:
session.curl.setopt(CurlOpt.SSLCERT, self._client_cert['client_certificate'].encode())
session.curl.setopt(CurlOpt.SSLCERT, self._client_cert['client_certificate'])
client_certificate_key = self._client_cert.get('client_certificate_key')
client_certificate_password = self._client_cert.get('client_certificate_password')
if client_certificate_key:
session.curl.setopt(CurlOpt.SSLKEY, client_certificate_key.encode())
session.curl.setopt(CurlOpt.SSLKEY, client_certificate_key)
if client_certificate_password:
session.curl.setopt(CurlOpt.KEYPASSWD, client_certificate_password.encode())
session.curl.setopt(CurlOpt.KEYPASSWD, client_certificate_password)
timeout = self._calculate_timeout(request)