Compare commits

..

2 Commits

Author SHA1 Message Date
coletdjnz
5e1200ac69
linter 2023-12-03 14:43:16 +13:00
coletdjnz
ed1fa68f75
fix various test bugs 2023-12-03 14:37:41 +13:00
2 changed files with 16 additions and 27 deletions

View File

@ -525,10 +525,10 @@ class TestHTTPRequestHandler(TestRequestHandlerBase):
validate_and_send( validate_and_send(
rh, Request(f'http://127.0.0.1:{self.http_port}/timeout_1')) rh, Request(f'http://127.0.0.1:{self.http_port}/timeout_1'))
with handler(timeout=0.01) as rh: with handler(timeout=0.1) as rh:
with pytest.raises(TransportError): with pytest.raises(TransportError):
validate_and_send( validate_and_send(
rh, Request(f'http://127.0.0.1:{self.http_port}/timeout_1')) rh, Request(f'http://127.0.0.1:{self.http_port}/timeout_5'))
# Per request timeout, should override handler timeout # Per request timeout, should override handler timeout
validate_and_send( validate_and_send(
@ -1372,15 +1372,10 @@ class TestYoutubeDLNetworking:
('', {'all': '__noproxy__'}), ('', {'all': '__noproxy__'}),
(None, {'http': 'http://127.0.0.1:8081', 'https': 'http://127.0.0.1:8081'}) # env, set https (None, {'http': 'http://127.0.0.1:8081', 'https': 'http://127.0.0.1:8081'}) # env, set https
]) ])
def test_proxy(self, proxy, expected): def test_proxy(self, proxy, expected, monkeypatch):
old_http_proxy = os.environ.get('HTTP_PROXY') monkeypatch.setenv('HTTP_PROXY', 'http://127.0.0.1:8081')
try:
os.environ['HTTP_PROXY'] = 'http://127.0.0.1:8081' # ensure that provided proxies override env
with FakeYDL({'proxy': proxy}) as ydl: with FakeYDL({'proxy': proxy}) as ydl:
assert ydl.proxies == expected assert ydl.proxies == expected
finally:
if old_http_proxy:
os.environ['HTTP_PROXY'] = old_http_proxy
def test_compat_request(self): def test_compat_request(self):
with FakeRHYDL() as ydl: with FakeRHYDL() as ydl:
@ -1508,23 +1503,17 @@ class TestYoutubeDLNetworking:
('http', 'socks4://example.com', 'socks4://example.com'), ('http', 'socks4://example.com', 'socks4://example.com'),
('unrelated', '/bad/proxy', '/bad/proxy'), # clean_proxies should ignore bad proxies ('unrelated', '/bad/proxy', '/bad/proxy'), # clean_proxies should ignore bad proxies
]) ])
def test_clean_proxy(self, proxy_key, proxy_url, expected): def test_clean_proxy(self, proxy_key, proxy_url, expected, monkeypatch):
# proxies should be cleaned in urlopen() # proxies should be cleaned in urlopen()
with FakeRHYDL() as ydl: with FakeRHYDL() as ydl:
req = ydl.urlopen(Request('test://', proxies={proxy_key: proxy_url})).request req = ydl.urlopen(Request('test://', proxies={proxy_key: proxy_url})).request
assert req.proxies[proxy_key] == expected assert req.proxies[proxy_key] == expected
# and should also be cleaned when building the handler # and should also be cleaned when building the handler
env_key = f'{proxy_key.upper()}_PROXY' monkeypatch.setenv(f'{proxy_key.upper()}_PROXY', proxy_url)
old_env_proxy = os.environ.get(env_key)
try:
os.environ[env_key] = proxy_url # ensure that provided proxies override env
with FakeYDL() as ydl: with FakeYDL() as ydl:
rh = self.build_handler(ydl) rh = self.build_handler(ydl)
assert rh.proxies[proxy_key] == expected assert rh.proxies[proxy_key] == expected
finally:
if old_env_proxy:
os.environ[env_key] = old_env_proxy
def test_clean_proxy_header(self): def test_clean_proxy_header(self):
with FakeRHYDL() as ydl: with FakeRHYDL() as ydl:

View File

@ -138,13 +138,13 @@ class CurlCFFIRH(ImpersonateRequestHandler, InstanceStoreMixin):
proxies = self._get_proxies(request) proxies = self._get_proxies(request)
if 'no' in proxies: if 'no' in proxies:
session.curl.setopt(CurlOpt.NOPROXY, proxies['no'].encode()) session.curl.setopt(CurlOpt.NOPROXY, proxies['no'])
proxies.pop('no', None) proxies.pop('no', None)
# curl doesn't support per protocol proxies, so we select the one that matches the request protocol # curl doesn't support per protocol proxies, so we select the one that matches the request protocol
proxy = select_proxy(request.url, proxies=proxies) proxy = select_proxy(request.url, proxies=proxies)
if proxy: if proxy:
session.curl.setopt(CurlOpt.PROXY, proxy.encode()) session.curl.setopt(CurlOpt.PROXY, proxy)
scheme = urllib.parse.urlparse(request.url).scheme.lower() scheme = urllib.parse.urlparse(request.url).scheme.lower()
if scheme != 'http': if scheme != 'http':
# Enable HTTP CONNECT for HTTPS urls. # Enable HTTP CONNECT for HTTPS urls.
@ -155,13 +155,13 @@ class CurlCFFIRH(ImpersonateRequestHandler, InstanceStoreMixin):
headers = self._get_impersonate_headers(request) headers = self._get_impersonate_headers(request)
if self._client_cert: if self._client_cert:
session.curl.setopt(CurlOpt.SSLCERT, self._client_cert['client_certificate'].encode()) session.curl.setopt(CurlOpt.SSLCERT, self._client_cert['client_certificate'])
client_certificate_key = self._client_cert.get('client_certificate_key') client_certificate_key = self._client_cert.get('client_certificate_key')
client_certificate_password = self._client_cert.get('client_certificate_password') client_certificate_password = self._client_cert.get('client_certificate_password')
if client_certificate_key: if client_certificate_key:
session.curl.setopt(CurlOpt.SSLKEY, client_certificate_key.encode()) session.curl.setopt(CurlOpt.SSLKEY, client_certificate_key)
if client_certificate_password: if client_certificate_password:
session.curl.setopt(CurlOpt.KEYPASSWD, client_certificate_password.encode()) session.curl.setopt(CurlOpt.KEYPASSWD, client_certificate_password)
timeout = self._calculate_timeout(request) timeout = self._calculate_timeout(request)