Compare commits

...

12 Commits

Author SHA1 Message Date
Michal Kubeček
8058cab156
Merge 560bcb5291 into 52c0ffe40a 2024-11-17 09:08:24 +05:30
bashonly
52c0ffe40a
[ie/youtube] Remove broken OAuth support (#11558)
Closes #11462
Authored by: bashonly
2024-11-16 23:40:21 +00:00
sepro
637d62a3a9
[ie/youtube] Player client maintenance (#11528)
Authored by: bashonly, seproDev

Co-authored-by: bashonly <bashonly@protonmail.com>
2024-11-17 00:31:04 +01:00
sepro
f95a92b3d0
[cleanup] Deprecate more compat functions (#11439)
Authored by: seproDev
2024-11-17 00:24:11 +01:00
bashonly
560bcb5291
Merge branch 'yt-dlp:master' into pr/6698 2024-11-07 11:51:23 -06:00
bashonly
44c8cdb728
post-merge cleanup 2024-10-26 15:19:13 +00:00
bashonly
19003f882b
Merge branch 'master' into mk/ct-2023-03 2024-10-26 15:14:05 +00:00
Michal Kubecek
6f1db75869
[ceskatelevize] update selftests
Most selftest metadata is no longer correct.

- use 'live_status' to identify live broadcast; this can be no longer
  recognized from item['type'] (which is always 'VOD') so move the
  detection to the code path where we can actually find out
- update outdated test metadata
- drop georestricted test, the URL is no longer valid and I failed to find
  out what should be the right one
- disable live broadcast tests for now; CT1 does not have a stable id any
  more and the selftest framework requires duration which depends on
  current program
2024-03-20 08:01:47 +01:00
Michal Kubecek
4f31126f07
[ceskatelevize] more robust sidp detection
For older style livestreams (all except CT4 Sport and CT24), sidp value
cannot be determined from URL; the value used by browsers can be found as
showID inside the next_data JSON.
2024-03-20 08:00:17 +01:00
Michal Kubecek
e1b623cea1
[ceskatelevize] fix live broadcast
After recent site changes, live streaming is handled in three different
ways. Update the code to deal with all of them. Also update the test URLs.
2024-03-20 08:00:13 +01:00
pukkandan
59cdcf7795
Update yt_dlp/extractor/ceskatelevize.py 2024-03-20 08:00:02 +01:00
Michal Kubecek
a03cd32b71
[ceskatelevize] update to March 2023 changes (#6539)
Note: we could even skip downloading the player.ceskatelevize.cz page
completely as we do not actually need it to get the information we used to
need before the recent website changes. However, we would not catch the
errors that are handled here and the resulting output could be quite
confusing if one of them does happen.
2024-03-20 07:59:57 +01:00
41 changed files with 385 additions and 689 deletions

View File

@ -1768,7 +1768,7 @@ The following extractors use this feature:
#### youtube
* `lang`: Prefer translated metadata (`title`, `description` etc) of this language code (case-sensitive). By default, the video primary language metadata is preferred, with a fallback to `en` translated. See [youtube.py](https://github.com/yt-dlp/yt-dlp/blob/c26f9b991a0681fd3ea548d535919cec1fbbd430/yt_dlp/extractor/youtube.py#L381-L390) for list of supported content language codes
* `skip`: One or more of `hls`, `dash` or `translated_subs` to skip extraction of the m3u8 manifests, dash manifests and [auto-translated subtitles](https://github.com/yt-dlp/yt-dlp/issues/4090#issuecomment-1158102032) respectively
* `player_client`: Clients to extract video data from. The main clients are `web`, `ios` and `android`, with variants `_music` and `_creator` (e.g. `ios_creator`); and `mweb`, `mediaconnect`, `android_testsuite`, `android_vr`, `web_safari`, `web_embedded`, `tv` and `tv_embedded` with no variants. By default, `ios,mweb` is used, and `web_creator,mediaconnect` is added as needed for age-gated videos when account age verification is required. Similarly, the `_music` variants are added for `music.youtube.com` URLs. Some clients, such as `web` and `android`, require a `po_token` for their formats to be downloadable. Some clients, such as the `_creator` variants, will only work with authentication. You can use `all` to use all the clients, and `default` for the default clients. You can prefix a client with `-` to exclude it, e.g. `youtube:player_client=all,-web`
* `player_client`: Clients to extract video data from. The main clients are `web`, `ios` and `android`, with variants `_music` and `_creator` (e.g. `ios_creator`); and `mweb`, `mediaconnect`, `android_vr`, `web_safari`, `web_embedded`, `tv` and `tv_embedded` with no variants. By default, `ios,mweb` is used, and `web_creator` is added as needed for age-gated videos when account age verification is required. Similarly, the `_music` variants are added for `music.youtube.com` URLs. Some clients, such as `web` and `android`, require a `po_token` for their formats to be downloadable. Some clients, such as the `_creator` variants, will only work with authentication. You can use `all` to use all the clients, and `default` for the default clients. You can prefix a client with `-` to exclude it, e.g. `youtube:player_client=all,-web`
* `player_skip`: Skip some network requests that are generally needed for robust extraction. One or more of `configs` (skip client configs), `webpage` (skip initial webpage), `js` (skip js player). While these options can help reduce the number of requests needed or avoid some rate-limiting, they could cause some issues. See [#860](https://github.com/yt-dlp/yt-dlp/pull/860) for more details
* `player_params`: YouTube player parameters to use for player requests. Will overwrite any default ones set by yt-dlp.
* `comment_sort`: `top` or `new` (default) - choose comment sorting mode (on YouTube's side)

View File

@ -11,13 +11,12 @@ import codecs
import subprocess
from yt_dlp.aes import aes_encrypt, key_expansion
from yt_dlp.utils import intlist_to_bytes
secret_msg = b'Secret message goes here'
def hex_str(int_list):
return codecs.encode(intlist_to_bytes(int_list), 'hex')
return codecs.encode(bytes(int_list), 'hex')
def openssl_encode(algo, key, iv):

View File

@ -313,6 +313,16 @@ banned-from = [
"yt_dlp.compat.compat_urllib_parse_urlparse".msg = "Use `urllib.parse.urlparse` instead."
"yt_dlp.compat.compat_shlex_quote".msg = "Use `yt_dlp.utils.shell_quote` instead."
"yt_dlp.utils.error_to_compat_str".msg = "Use `str` instead."
"yt_dlp.utils.bytes_to_intlist".msg = "Use `list` instead."
"yt_dlp.utils.intlist_to_bytes".msg = "Use `bytes` instead."
"yt_dlp.utils.decodeArgument".msg = "Do not use"
"yt_dlp.utils.decodeFilename".msg = "Do not use"
"yt_dlp.utils.encodeFilename".msg = "Do not use"
"yt_dlp.compat.compat_os_name".msg = "Use `os.name` instead."
"yt_dlp.compat.compat_realpath".msg = "Use `os.path.realpath` instead."
"yt_dlp.compat.functools".msg = "Use `functools` instead."
"yt_dlp.utils.decodeOption".msg = "Do not use"
"yt_dlp.utils.compiled_regex_type".msg = "Use `re.Pattern` instead."
[tool.autopep8]
max_line_length = 120

View File

@ -9,7 +9,6 @@ import types
import yt_dlp.extractor
from yt_dlp import YoutubeDL
from yt_dlp.compat import compat_os_name
from yt_dlp.utils import preferredencoding, try_call, write_string, find_available_port
if 'pytest' in sys.modules:
@ -49,7 +48,7 @@ def report_warning(message, *args, **kwargs):
Print the message to stderr, it will be prefixed with 'WARNING:'
If stderr is a tty file the 'WARNING:' will be colored
"""
if sys.stderr.isatty() and compat_os_name != 'nt':
if sys.stderr.isatty() and os.name != 'nt':
_msg_header = '\033[0;33mWARNING:\033[0m'
else:
_msg_header = 'WARNING:'

View File

@ -15,7 +15,6 @@ import json
from test.helper import FakeYDL, assertRegexpMatches, try_rm
from yt_dlp import YoutubeDL
from yt_dlp.compat import compat_os_name
from yt_dlp.extractor import YoutubeIE
from yt_dlp.extractor.common import InfoExtractor
from yt_dlp.postprocessor.common import PostProcessor
@ -839,8 +838,8 @@ class TestYoutubeDL(unittest.TestCase):
test('%(filesize)#D', '1Ki')
test('%(height)5.2D', ' 1.08k')
test('%(title4)#S', 'foo_bar_test')
test('%(title4).10S', ('foo bar ', 'foo bar' + ('#' if compat_os_name == 'nt' else ' ')))
if compat_os_name == 'nt':
test('%(title4).10S', ('foo bar ', 'foo bar' + ('#' if os.name == 'nt' else ' ')))
if os.name == 'nt':
test('%(title4)q', ('"foo ""bar"" test"', None))
test('%(formats.:.id)#q', ('"id 1" "id 2" "id 3"', None))
test('%(formats.0.id)#q', ('"id 1"', None))
@ -903,9 +902,9 @@ class TestYoutubeDL(unittest.TestCase):
# Environment variable expansion for prepare_filename
os.environ['__yt_dlp_var'] = 'expanded'
envvar = '%__yt_dlp_var%' if compat_os_name == 'nt' else '$__yt_dlp_var'
envvar = '%__yt_dlp_var%' if os.name == 'nt' else '$__yt_dlp_var'
test(envvar, (envvar, 'expanded'))
if compat_os_name == 'nt':
if os.name == 'nt':
test('%s%', ('%s%', '%s%'))
os.environ['s'] = 'expanded'
test('%s%', ('%s%', 'expanded')) # %s% should be expanded before escaping %s

View File

@ -27,7 +27,6 @@ from yt_dlp.aes import (
pad_block,
)
from yt_dlp.dependencies import Cryptodome
from yt_dlp.utils import bytes_to_intlist, intlist_to_bytes
# the encrypted data can be generate with 'devscripts/generate_aes_testdata.py'
@ -40,33 +39,33 @@ class TestAES(unittest.TestCase):
def test_encrypt(self):
msg = b'message'
key = list(range(16))
encrypted = aes_encrypt(bytes_to_intlist(msg), key)
decrypted = intlist_to_bytes(aes_decrypt(encrypted, key))
encrypted = aes_encrypt(list(msg), key)
decrypted = bytes(aes_decrypt(encrypted, key))
self.assertEqual(decrypted, msg)
def test_cbc_decrypt(self):
data = b'\x97\x92+\xe5\x0b\xc3\x18\x91ky9m&\xb3\xb5@\xe6\x27\xc2\x96.\xc8u\x88\xab9-[\x9e|\xf1\xcd'
decrypted = intlist_to_bytes(aes_cbc_decrypt(bytes_to_intlist(data), self.key, self.iv))
decrypted = bytes(aes_cbc_decrypt(list(data), self.key, self.iv))
self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg)
if Cryptodome.AES:
decrypted = aes_cbc_decrypt_bytes(data, intlist_to_bytes(self.key), intlist_to_bytes(self.iv))
decrypted = aes_cbc_decrypt_bytes(data, bytes(self.key), bytes(self.iv))
self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg)
def test_cbc_encrypt(self):
data = bytes_to_intlist(self.secret_msg)
encrypted = intlist_to_bytes(aes_cbc_encrypt(data, self.key, self.iv))
data = list(self.secret_msg)
encrypted = bytes(aes_cbc_encrypt(data, self.key, self.iv))
self.assertEqual(
encrypted,
b'\x97\x92+\xe5\x0b\xc3\x18\x91ky9m&\xb3\xb5@\xe6\'\xc2\x96.\xc8u\x88\xab9-[\x9e|\xf1\xcd')
def test_ctr_decrypt(self):
data = bytes_to_intlist(b'\x03\xc7\xdd\xd4\x8e\xb3\xbc\x1a*O\xdc1\x12+8Aio\xd1z\xb5#\xaf\x08')
decrypted = intlist_to_bytes(aes_ctr_decrypt(data, self.key, self.iv))
data = list(b'\x03\xc7\xdd\xd4\x8e\xb3\xbc\x1a*O\xdc1\x12+8Aio\xd1z\xb5#\xaf\x08')
decrypted = bytes(aes_ctr_decrypt(data, self.key, self.iv))
self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg)
def test_ctr_encrypt(self):
data = bytes_to_intlist(self.secret_msg)
encrypted = intlist_to_bytes(aes_ctr_encrypt(data, self.key, self.iv))
data = list(self.secret_msg)
encrypted = bytes(aes_ctr_encrypt(data, self.key, self.iv))
self.assertEqual(
encrypted,
b'\x03\xc7\xdd\xd4\x8e\xb3\xbc\x1a*O\xdc1\x12+8Aio\xd1z\xb5#\xaf\x08')
@ -75,19 +74,19 @@ class TestAES(unittest.TestCase):
data = b'\x159Y\xcf5eud\x90\x9c\x85&]\x14\x1d\x0f.\x08\xb4T\xe4/\x17\xbd'
authentication_tag = b'\xe8&I\x80rI\x07\x9d}YWuU@:e'
decrypted = intlist_to_bytes(aes_gcm_decrypt_and_verify(
bytes_to_intlist(data), self.key, bytes_to_intlist(authentication_tag), self.iv[:12]))
decrypted = bytes(aes_gcm_decrypt_and_verify(
list(data), self.key, list(authentication_tag), self.iv[:12]))
self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg)
if Cryptodome.AES:
decrypted = aes_gcm_decrypt_and_verify_bytes(
data, intlist_to_bytes(self.key), authentication_tag, intlist_to_bytes(self.iv[:12]))
data, bytes(self.key), authentication_tag, bytes(self.iv[:12]))
self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg)
def test_gcm_aligned_decrypt(self):
data = b'\x159Y\xcf5eud\x90\x9c\x85&]\x14\x1d\x0f'
authentication_tag = b'\x08\xb1\x9d!&\x98\xd0\xeaRq\x90\xe6;\xb5]\xd8'
decrypted = intlist_to_bytes(aes_gcm_decrypt_and_verify(
decrypted = bytes(aes_gcm_decrypt_and_verify(
list(data), self.key, list(authentication_tag), self.iv[:12]))
self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg[:16])
if Cryptodome.AES:
@ -96,38 +95,38 @@ class TestAES(unittest.TestCase):
self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg[:16])
def test_decrypt_text(self):
password = intlist_to_bytes(self.key).decode()
password = bytes(self.key).decode()
encrypted = base64.b64encode(
intlist_to_bytes(self.iv[:8])
bytes(self.iv[:8])
+ b'\x17\x15\x93\xab\x8d\x80V\xcdV\xe0\t\xcdo\xc2\xa5\xd8ksM\r\xe27N\xae',
).decode()
decrypted = (aes_decrypt_text(encrypted, password, 16))
self.assertEqual(decrypted, self.secret_msg)
password = intlist_to_bytes(self.key).decode()
password = bytes(self.key).decode()
encrypted = base64.b64encode(
intlist_to_bytes(self.iv[:8])
bytes(self.iv[:8])
+ b'\x0b\xe6\xa4\xd9z\x0e\xb8\xb9\xd0\xd4i_\x85\x1d\x99\x98_\xe5\x80\xe7.\xbf\xa5\x83',
).decode()
decrypted = (aes_decrypt_text(encrypted, password, 32))
self.assertEqual(decrypted, self.secret_msg)
def test_ecb_encrypt(self):
data = bytes_to_intlist(self.secret_msg)
encrypted = intlist_to_bytes(aes_ecb_encrypt(data, self.key))
data = list(self.secret_msg)
encrypted = bytes(aes_ecb_encrypt(data, self.key))
self.assertEqual(
encrypted,
b'\xaa\x86]\x81\x97>\x02\x92\x9d\x1bR[[L/u\xd3&\xd1(h\xde{\x81\x94\xba\x02\xae\xbd\xa6\xd0:')
def test_ecb_decrypt(self):
data = bytes_to_intlist(b'\xaa\x86]\x81\x97>\x02\x92\x9d\x1bR[[L/u\xd3&\xd1(h\xde{\x81\x94\xba\x02\xae\xbd\xa6\xd0:')
decrypted = intlist_to_bytes(aes_ecb_decrypt(data, self.key, self.iv))
data = list(b'\xaa\x86]\x81\x97>\x02\x92\x9d\x1bR[[L/u\xd3&\xd1(h\xde{\x81\x94\xba\x02\xae\xbd\xa6\xd0:')
decrypted = bytes(aes_ecb_decrypt(data, self.key, self.iv))
self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg)
def test_key_expansion(self):
key = '4f6bdaa39e2f8cb07f5e722d9edef314'
self.assertEqual(key_expansion(bytes_to_intlist(bytearray.fromhex(key))), [
self.assertEqual(key_expansion(list(bytearray.fromhex(key))), [
0x4F, 0x6B, 0xDA, 0xA3, 0x9E, 0x2F, 0x8C, 0xB0, 0x7F, 0x5E, 0x72, 0x2D, 0x9E, 0xDE, 0xF3, 0x14,
0x53, 0x66, 0x20, 0xA8, 0xCD, 0x49, 0xAC, 0x18, 0xB2, 0x17, 0xDE, 0x35, 0x2C, 0xC9, 0x2D, 0x21,
0x8C, 0xBE, 0xDD, 0xD9, 0x41, 0xF7, 0x71, 0xC1, 0xF3, 0xE0, 0xAF, 0xF4, 0xDF, 0x29, 0x82, 0xD5,

View File

@ -12,12 +12,7 @@ import struct
from yt_dlp import compat
from yt_dlp.compat import urllib # isort: split
from yt_dlp.compat import (
compat_etree_fromstring,
compat_expanduser,
compat_urllib_parse_unquote, # noqa: TID251
compat_urllib_parse_urlencode, # noqa: TID251
)
from yt_dlp.compat import compat_etree_fromstring, compat_expanduser
from yt_dlp.compat.urllib.request import getproxies
@ -43,39 +38,6 @@ class TestCompat(unittest.TestCase):
finally:
os.environ['HOME'] = old_home or ''
def test_compat_urllib_parse_unquote(self):
self.assertEqual(compat_urllib_parse_unquote('abc%20def'), 'abc def')
self.assertEqual(compat_urllib_parse_unquote('%7e/abc+def'), '~/abc+def')
self.assertEqual(compat_urllib_parse_unquote(''), '')
self.assertEqual(compat_urllib_parse_unquote('%'), '%')
self.assertEqual(compat_urllib_parse_unquote('%%'), '%%')
self.assertEqual(compat_urllib_parse_unquote('%%%'), '%%%')
self.assertEqual(compat_urllib_parse_unquote('%2F'), '/')
self.assertEqual(compat_urllib_parse_unquote('%2f'), '/')
self.assertEqual(compat_urllib_parse_unquote('%E6%B4%A5%E6%B3%A2'), '津波')
self.assertEqual(
compat_urllib_parse_unquote('''<meta property="og:description" content="%E2%96%81%E2%96%82%E2%96%83%E2%96%84%25%E2%96%85%E2%96%86%E2%96%87%E2%96%88" />
%<a href="https://ar.wikipedia.org/wiki/%D8%AA%D8%B3%D9%88%D9%86%D8%A7%D9%85%D9%8A">%a'''),
'''<meta property="og:description" content="▁▂▃▄%▅▆▇█" />
%<a href="https://ar.wikipedia.org/wiki/تسونامي">%a''')
self.assertEqual(
compat_urllib_parse_unquote('''%28%5E%E2%97%A3_%E2%97%A2%5E%29%E3%81%A3%EF%B8%BB%E3%83%87%E2%95%90%E4%B8%80 %E2%87%80 %E2%87%80 %E2%87%80 %E2%87%80 %E2%87%80 %E2%86%B6%I%Break%25Things%'''),
'''(^◣_◢^)っ︻デ═一 ⇀ ⇀ ⇀ ⇀ ⇀ ↶%I%Break%Things%''')
def test_compat_urllib_parse_unquote_plus(self):
self.assertEqual(urllib.parse.unquote_plus('abc%20def'), 'abc def')
self.assertEqual(urllib.parse.unquote_plus('%7e/abc+def'), '~/abc def')
def test_compat_urllib_parse_urlencode(self):
self.assertEqual(compat_urllib_parse_urlencode({'abc': 'def'}), 'abc=def')
self.assertEqual(compat_urllib_parse_urlencode({'abc': b'def'}), 'abc=def')
self.assertEqual(compat_urllib_parse_urlencode({b'abc': 'def'}), 'abc=def')
self.assertEqual(compat_urllib_parse_urlencode({b'abc': b'def'}), 'abc=def')
self.assertEqual(compat_urllib_parse_urlencode([('abc', 'def')]), 'abc=def')
self.assertEqual(compat_urllib_parse_urlencode([('abc', b'def')]), 'abc=def')
self.assertEqual(compat_urllib_parse_urlencode([(b'abc', 'def')]), 'abc=def')
self.assertEqual(compat_urllib_parse_urlencode([(b'abc', b'def')]), 'abc=def')
def test_compat_etree_fromstring(self):
xml = '''
<root foo="bar" spam="中文">

View File

@ -15,7 +15,6 @@ import threading
from test.helper import http_server_port, try_rm
from yt_dlp import YoutubeDL
from yt_dlp.downloader.http import HttpFD
from yt_dlp.utils import encodeFilename
from yt_dlp.utils._utils import _YDLLogger as FakeLogger
TEST_DIR = os.path.dirname(os.path.abspath(__file__))
@ -82,12 +81,12 @@ class TestHttpFD(unittest.TestCase):
ydl = YoutubeDL(params)
downloader = HttpFD(ydl, params)
filename = 'testfile.mp4'
try_rm(encodeFilename(filename))
try_rm(filename)
self.assertTrue(downloader.real_download(filename, {
'url': f'http://127.0.0.1:{self.port}/{ep}',
}), ep)
self.assertEqual(os.path.getsize(encodeFilename(filename)), TEST_SIZE, ep)
try_rm(encodeFilename(filename))
self.assertEqual(os.path.getsize(filename), TEST_SIZE, ep)
try_rm(filename)
def download_all(self, params):
for ep in ('regular', 'no-content-length', 'no-range', 'no-range-no-content-length'):

View File

@ -21,7 +21,6 @@ import xml.etree.ElementTree
from yt_dlp.compat import (
compat_etree_fromstring,
compat_HTMLParseError,
compat_os_name,
)
from yt_dlp.utils import (
Config,
@ -49,7 +48,6 @@ from yt_dlp.utils import (
dfxp2srt,
encode_base_n,
encode_compat_str,
encodeFilename,
expand_path,
extract_attributes,
extract_basic_auth,
@ -69,7 +67,6 @@ from yt_dlp.utils import (
get_elements_html_by_class,
get_elements_text_and_html_by_attribute,
int_or_none,
intlist_to_bytes,
iri_to_uri,
is_html,
js_to_json,
@ -566,10 +563,10 @@ class TestUtil(unittest.TestCase):
self.assertEqual(res_data, {'a': 'b', 'c': 'd'})
def test_shell_quote(self):
args = ['ffmpeg', '-i', encodeFilename('ñ€ß\'.mp4')]
args = ['ffmpeg', '-i', 'ñ€ß\'.mp4']
self.assertEqual(
shell_quote(args),
"""ffmpeg -i 'ñ€ß'"'"'.mp4'""" if compat_os_name != 'nt' else '''ffmpeg -i "ñ€ß'.mp4"''')
"""ffmpeg -i 'ñ€ß'"'"'.mp4'""" if os.name != 'nt' else '''ffmpeg -i "ñ€ß'.mp4"''')
def test_float_or_none(self):
self.assertEqual(float_or_none('42.42'), 42.42)
@ -1309,15 +1306,10 @@ class TestUtil(unittest.TestCase):
self.assertEqual(clean_html('a:\n "b"'), 'a: "b"')
self.assertEqual(clean_html('a<br>\xa0b'), 'a\nb')
def test_intlist_to_bytes(self):
self.assertEqual(
intlist_to_bytes([0, 1, 127, 128, 255]),
b'\x00\x01\x7f\x80\xff')
def test_args_to_str(self):
self.assertEqual(
args_to_str(['foo', 'ba/r', '-baz', '2 be', '']),
'foo ba/r -baz \'2 be\' \'\'' if compat_os_name != 'nt' else 'foo ba/r -baz "2 be" ""',
'foo ba/r -baz \'2 be\' \'\'' if os.name != 'nt' else 'foo ba/r -baz "2 be" ""',
)
def test_parse_filesize(self):
@ -2117,7 +2109,7 @@ Line 1
assert extract_basic_auth('http://user:@foo.bar') == ('http://foo.bar', 'Basic dXNlcjo=')
assert extract_basic_auth('http://user:pass@foo.bar') == ('http://foo.bar', 'Basic dXNlcjpwYXNz')
@unittest.skipUnless(compat_os_name == 'nt', 'Only relevant on Windows')
@unittest.skipUnless(os.name == 'nt', 'Only relevant on Windows')
def test_windows_escaping(self):
tests = [
'test"&',

View File

@ -26,7 +26,7 @@ import unicodedata
from .cache import Cache
from .compat import urllib # isort: split
from .compat import compat_os_name, urllib_req_to_req
from .compat import urllib_req_to_req
from .cookies import CookieLoadError, LenientSimpleCookie, load_cookies
from .downloader import FFmpegFD, get_suitable_downloader, shorten_protocol_name
from .downloader.rtmp import rtmpdump_version
@ -109,7 +109,6 @@ from .utils import (
determine_ext,
determine_protocol,
encode_compat_str,
encodeFilename,
escapeHTML,
expand_path,
extract_basic_auth,
@ -167,7 +166,7 @@ from .utils.networking import (
)
from .version import CHANNEL, ORIGIN, RELEASE_GIT_HEAD, VARIANT, __version__
if compat_os_name == 'nt':
if os.name == 'nt':
import ctypes
@ -643,7 +642,7 @@ class YoutubeDL:
out=stdout,
error=sys.stderr,
screen=sys.stderr if self.params.get('quiet') else stdout,
console=None if compat_os_name == 'nt' else next(
console=None if os.name == 'nt' else next(
filter(supports_terminal_sequences, (sys.stderr, sys.stdout)), None),
)
@ -952,7 +951,7 @@ class YoutubeDL:
self._write_string(f'{self._bidi_workaround(message)}\n', self._out_files.error, only_once=only_once)
def _send_console_code(self, code):
if compat_os_name == 'nt' or not self._out_files.console:
if os.name == 'nt' or not self._out_files.console:
return
self._write_string(code, self._out_files.console)
@ -960,7 +959,7 @@ class YoutubeDL:
if not self.params.get('consoletitle', False):
return
message = remove_terminal_sequences(message)
if compat_os_name == 'nt':
if os.name == 'nt':
if ctypes.windll.kernel32.GetConsoleWindow():
# c_wchar_p() might not be necessary if `message` is
# already of type unicode()
@ -3255,9 +3254,9 @@ class YoutubeDL:
if full_filename is None:
return
if not self._ensure_dir_exists(encodeFilename(full_filename)):
if not self._ensure_dir_exists(full_filename):
return
if not self._ensure_dir_exists(encodeFilename(temp_filename)):
if not self._ensure_dir_exists(temp_filename):
return
if self._write_description('video', info_dict,
@ -3289,16 +3288,16 @@ class YoutubeDL:
if self.params.get('writeannotations', False):
annofn = self.prepare_filename(info_dict, 'annotation')
if annofn:
if not self._ensure_dir_exists(encodeFilename(annofn)):
if not self._ensure_dir_exists(annofn):
return
if not self.params.get('overwrites', True) and os.path.exists(encodeFilename(annofn)):
if not self.params.get('overwrites', True) and os.path.exists(annofn):
self.to_screen('[info] Video annotations are already present')
elif not info_dict.get('annotations'):
self.report_warning('There are no annotations to write.')
else:
try:
self.to_screen('[info] Writing video annotations to: ' + annofn)
with open(encodeFilename(annofn), 'w', encoding='utf-8') as annofile:
with open(annofn, 'w', encoding='utf-8') as annofile:
annofile.write(info_dict['annotations'])
except (KeyError, TypeError):
self.report_warning('There are no annotations to write.')
@ -3314,14 +3313,14 @@ class YoutubeDL:
f'Cannot write internet shortcut file because the actual URL of "{info_dict["webpage_url"]}" is unknown')
return True
linkfn = replace_extension(self.prepare_filename(info_dict, 'link'), link_type, info_dict.get('ext'))
if not self._ensure_dir_exists(encodeFilename(linkfn)):
if not self._ensure_dir_exists(linkfn):
return False
if self.params.get('overwrites', True) and os.path.exists(encodeFilename(linkfn)):
if self.params.get('overwrites', True) and os.path.exists(linkfn):
self.to_screen(f'[info] Internet shortcut (.{link_type}) is already present')
return True
try:
self.to_screen(f'[info] Writing internet shortcut (.{link_type}) to: {linkfn}')
with open(encodeFilename(to_high_limit_path(linkfn)), 'w', encoding='utf-8',
with open(to_high_limit_path(linkfn), 'w', encoding='utf-8',
newline='\r\n' if link_type == 'url' else '\n') as linkfile:
template_vars = {'url': url}
if link_type == 'desktop':
@ -3352,7 +3351,7 @@ class YoutubeDL:
if self.params.get('skip_download'):
info_dict['filepath'] = temp_filename
info_dict['__finaldir'] = os.path.dirname(os.path.abspath(encodeFilename(full_filename)))
info_dict['__finaldir'] = os.path.dirname(os.path.abspath(full_filename))
info_dict['__files_to_move'] = files_to_move
replace_info_dict(self.run_pp(MoveFilesAfterDownloadPP(self, False), info_dict))
info_dict['__write_download_archive'] = self.params.get('force_write_download_archive')
@ -3482,7 +3481,7 @@ class YoutubeDL:
self.report_file_already_downloaded(dl_filename)
dl_filename = dl_filename or temp_filename
info_dict['__finaldir'] = os.path.dirname(os.path.abspath(encodeFilename(full_filename)))
info_dict['__finaldir'] = os.path.dirname(os.path.abspath(full_filename))
except network_exceptions as err:
self.report_error(f'unable to download video data: {err}')
@ -4297,7 +4296,7 @@ class YoutubeDL:
else:
try:
self.to_screen(f'[info] Writing {label} description to: {descfn}')
with open(encodeFilename(descfn), 'w', encoding='utf-8') as descfile:
with open(descfn, 'w', encoding='utf-8') as descfile:
descfile.write(ie_result['description'])
except OSError:
self.report_error(f'Cannot write {label} description file {descfn}')
@ -4399,7 +4398,7 @@ class YoutubeDL:
try:
uf = self.urlopen(Request(t['url'], headers=t.get('http_headers', {})))
self.to_screen(f'[info] Writing {thumb_display_id} to: {thumb_filename}')
with open(encodeFilename(thumb_filename), 'wb') as thumbf:
with open(thumb_filename, 'wb') as thumbf:
shutil.copyfileobj(uf, thumbf)
ret.append((thumb_filename, thumb_filename_final))
t['filepath'] = thumb_filename

View File

@ -14,7 +14,6 @@ import os
import re
import traceback
from .compat import compat_os_name
from .cookies import SUPPORTED_BROWSERS, SUPPORTED_KEYRINGS, CookieLoadError
from .downloader.external import get_external_downloader
from .extractor import list_extractor_classes
@ -44,7 +43,6 @@ from .utils import (
GeoUtils,
PlaylistEntries,
SameFileError,
decodeOption,
download_range_func,
expand_path,
float_or_none,
@ -883,8 +881,8 @@ def parse_options(argv=None):
'listsubtitles': opts.listsubtitles,
'subtitlesformat': opts.subtitlesformat,
'subtitleslangs': opts.subtitleslangs,
'matchtitle': decodeOption(opts.matchtitle),
'rejecttitle': decodeOption(opts.rejecttitle),
'matchtitle': opts.matchtitle,
'rejecttitle': opts.rejecttitle,
'max_downloads': opts.max_downloads,
'prefer_free_formats': opts.prefer_free_formats,
'trim_file_name': opts.trim_file_name,
@ -1053,7 +1051,7 @@ def _real_main(argv=None):
ydl.warn_if_short_id(args)
# Show a useful error message and wait for keypress if not launched from shell on Windows
if not args and compat_os_name == 'nt' and getattr(sys, 'frozen', False):
if not args and os.name == 'nt' and getattr(sys, 'frozen', False):
import ctypes.wintypes
import msvcrt

View File

@ -3,7 +3,6 @@ from math import ceil
from .compat import compat_ord
from .dependencies import Cryptodome
from .utils import bytes_to_intlist, intlist_to_bytes
if Cryptodome.AES:
def aes_cbc_decrypt_bytes(data, key, iv):
@ -17,15 +16,15 @@ if Cryptodome.AES:
else:
def aes_cbc_decrypt_bytes(data, key, iv):
""" Decrypt bytes with AES-CBC using native implementation since pycryptodome is unavailable """
return intlist_to_bytes(aes_cbc_decrypt(*map(bytes_to_intlist, (data, key, iv))))
return bytes(aes_cbc_decrypt(*map(list, (data, key, iv))))
def aes_gcm_decrypt_and_verify_bytes(data, key, tag, nonce):
""" Decrypt bytes with AES-GCM using native implementation since pycryptodome is unavailable """
return intlist_to_bytes(aes_gcm_decrypt_and_verify(*map(bytes_to_intlist, (data, key, tag, nonce))))
return bytes(aes_gcm_decrypt_and_verify(*map(list, (data, key, tag, nonce))))
def aes_cbc_encrypt_bytes(data, key, iv, **kwargs):
return intlist_to_bytes(aes_cbc_encrypt(*map(bytes_to_intlist, (data, key, iv)), **kwargs))
return bytes(aes_cbc_encrypt(*map(list, (data, key, iv)), **kwargs))
BLOCK_SIZE_BYTES = 16
@ -221,7 +220,7 @@ def aes_gcm_decrypt_and_verify(data, key, tag, nonce):
j0 = [*nonce, 0, 0, 0, 1]
else:
fill = (BLOCK_SIZE_BYTES - (len(nonce) % BLOCK_SIZE_BYTES)) % BLOCK_SIZE_BYTES + 8
ghash_in = nonce + [0] * fill + bytes_to_intlist((8 * len(nonce)).to_bytes(8, 'big'))
ghash_in = nonce + [0] * fill + list((8 * len(nonce)).to_bytes(8, 'big'))
j0 = ghash(hash_subkey, ghash_in)
# TODO: add nonce support to aes_ctr_decrypt
@ -234,9 +233,9 @@ def aes_gcm_decrypt_and_verify(data, key, tag, nonce):
s_tag = ghash(
hash_subkey,
data
+ [0] * pad_len # pad
+ bytes_to_intlist((0 * 8).to_bytes(8, 'big') # length of associated data
+ ((len(data) * 8).to_bytes(8, 'big'))), # length of data
+ [0] * pad_len # pad
+ list((0 * 8).to_bytes(8, 'big') # length of associated data
+ ((len(data) * 8).to_bytes(8, 'big'))), # length of data
)
if tag != aes_ctr_encrypt(s_tag, key, j0):
@ -300,8 +299,8 @@ def aes_decrypt_text(data, password, key_size_bytes):
"""
NONCE_LENGTH_BYTES = 8
data = bytes_to_intlist(base64.b64decode(data))
password = bytes_to_intlist(password.encode())
data = list(base64.b64decode(data))
password = list(password.encode())
key = password[:key_size_bytes] + [0] * (key_size_bytes - len(password))
key = aes_encrypt(key[:BLOCK_SIZE_BYTES], key_expansion(key)) * (key_size_bytes // BLOCK_SIZE_BYTES)
@ -310,7 +309,7 @@ def aes_decrypt_text(data, password, key_size_bytes):
cipher = data[NONCE_LENGTH_BYTES:]
decrypted_data = aes_ctr_decrypt(cipher, key, nonce + [0] * (BLOCK_SIZE_BYTES - NONCE_LENGTH_BYTES))
return intlist_to_bytes(decrypted_data)
return bytes(decrypted_data)
RCON = (0x8d, 0x01, 0x02, 0x04, 0x08, 0x10, 0x20, 0x40, 0x80, 0x1b, 0x36)

View File

@ -1,5 +1,4 @@
import os
import sys
import xml.etree.ElementTree as etree
from .compat_utils import passthrough_module
@ -24,33 +23,14 @@ def compat_etree_fromstring(text):
return etree.XML(text, parser=etree.XMLParser(target=_TreeBuilder()))
compat_os_name = os._name if os.name == 'java' else os.name
def compat_shlex_quote(s):
from ..utils import shell_quote
return shell_quote(s)
def compat_ord(c):
return c if isinstance(c, int) else ord(c)
if compat_os_name == 'nt' and sys.version_info < (3, 8):
# os.path.realpath on Windows does not follow symbolic links
# prior to Python 3.8 (see https://bugs.python.org/issue9949)
def compat_realpath(path):
while os.path.islink(path):
path = os.path.abspath(os.readlink(path))
return os.path.realpath(path)
else:
compat_realpath = os.path.realpath
# Python 3.8+ does not honor %HOME% on windows, but this breaks compatibility with youtube-dl
# See https://github.com/yt-dlp/yt-dlp/issues/792
# https://docs.python.org/3/library/os.path.html#os.path.expanduser
if compat_os_name in ('nt', 'ce'):
if os.name in ('nt', 'ce'):
def compat_expanduser(path):
HOME = os.environ.get('HOME')
if not HOME:

View File

@ -8,16 +8,14 @@ passthrough_module(__name__, '.._legacy', callback=lambda attr: warnings.warn(
DeprecationWarning(f'{__name__}.{attr} is deprecated'), stacklevel=6))
del passthrough_module
import base64
import urllib.error
import urllib.parse
import functools # noqa: F401
import os
compat_str = str
compat_b64decode = base64.b64decode
compat_os_name = os.name
compat_realpath = os.path.realpath
compat_urlparse = urllib.parse
compat_parse_qs = urllib.parse.parse_qs
compat_urllib_parse_unquote = urllib.parse.unquote
compat_urllib_parse_urlencode = urllib.parse.urlencode
compat_urllib_parse_urlparse = urllib.parse.urlparse
def compat_shlex_quote(s):
from ..utils import shell_quote
return shell_quote(s)

View File

@ -30,7 +30,7 @@ from asyncio import run as compat_asyncio_run # noqa: F401
from re import Pattern as compat_Pattern # noqa: F401
from re import match as compat_Match # noqa: F401
from . import compat_expanduser, compat_HTMLParseError, compat_realpath
from . import compat_expanduser, compat_HTMLParseError
from .compat_utils import passthrough_module
from ..dependencies import brotli as compat_brotli # noqa: F401
from ..dependencies import websockets as compat_websockets # noqa: F401
@ -78,7 +78,7 @@ compat_kwargs = lambda kwargs: kwargs
compat_map = map
compat_numeric_types = (int, float, complex)
compat_os_path_expanduser = compat_expanduser
compat_os_path_realpath = compat_realpath
compat_os_path_realpath = os.path.realpath
compat_print = print
compat_shlex_split = shlex.split
compat_socket_create_connection = socket.create_connection
@ -104,5 +104,12 @@ compat_xml_parse_error = compat_xml_etree_ElementTree_ParseError = etree.ParseEr
compat_xpath = lambda xpath: xpath
compat_zip = zip
workaround_optparse_bug9161 = lambda: None
compat_str = str
compat_b64decode = base64.b64decode
compat_urlparse = urllib.parse
compat_parse_qs = urllib.parse.parse_qs
compat_urllib_parse_unquote = urllib.parse.unquote
compat_urllib_parse_urlencode = urllib.parse.urlencode
compat_urllib_parse_urlparse = urllib.parse.urlparse
legacy = []

View File

@ -1,7 +0,0 @@
# flake8: noqa: F405
from functools import * # noqa: F403
from .compat_utils import passthrough_module
passthrough_module(__name__, 'functools')
del passthrough_module

View File

@ -7,9 +7,9 @@ passthrough_module(__name__, 'urllib.request')
del passthrough_module
from .. import compat_os_name
import os
if compat_os_name == 'nt':
if os.name == 'nt':
# On older Python versions, proxies are extracted from Windows registry erroneously. [1]
# If the https proxy in the registry does not have a scheme, urllib will incorrectly add https:// to it. [2]
# It is unlikely that the user has actually set it to be https, so we should be fine to safely downgrade
@ -37,4 +37,4 @@ if compat_os_name == 'nt':
def getproxies():
return getproxies_environment() or getproxies_registry_patched()
del compat_os_name
del os

View File

@ -25,7 +25,6 @@ from .aes import (
aes_gcm_decrypt_and_verify_bytes,
unpad_pkcs7,
)
from .compat import compat_os_name
from .dependencies import (
_SECRETSTORAGE_UNAVAILABLE_REASON,
secretstorage,
@ -343,7 +342,7 @@ def _extract_chrome_cookies(browser_name, profile, keyring, logger):
logger.debug(f'cookie version breakdown: {counts}')
return jar
except PermissionError as error:
if compat_os_name == 'nt' and error.errno == 13:
if os.name == 'nt' and error.errno == 13:
message = 'Could not copy Chrome cookie database. See https://github.com/yt-dlp/yt-dlp/issues/7271 for more info'
logger.error(message)
raise DownloadError(message) # force exit

View File

@ -20,9 +20,7 @@ from ..utils import (
Namespace,
RetryManager,
classproperty,
decodeArgument,
deprecation_warning,
encodeFilename,
format_bytes,
join_nonempty,
parse_bytes,
@ -219,7 +217,7 @@ class FileDownloader:
def temp_name(self, filename):
"""Returns a temporary filename for the given filename."""
if self.params.get('nopart', False) or filename == '-' or \
(os.path.exists(encodeFilename(filename)) and not os.path.isfile(encodeFilename(filename))):
(os.path.exists(filename) and not os.path.isfile(filename)):
return filename
return filename + '.part'
@ -273,7 +271,7 @@ class FileDownloader:
"""Try to set the last-modified time of the given file."""
if last_modified_hdr is None:
return
if not os.path.isfile(encodeFilename(filename)):
if not os.path.isfile(filename):
return
timestr = last_modified_hdr
if timestr is None:
@ -432,13 +430,13 @@ class FileDownloader:
"""
nooverwrites_and_exists = (
not self.params.get('overwrites', True)
and os.path.exists(encodeFilename(filename))
and os.path.exists(filename)
)
if not hasattr(filename, 'write'):
continuedl_and_exists = (
self.params.get('continuedl', True)
and os.path.isfile(encodeFilename(filename))
and os.path.isfile(filename)
and not self.params.get('nopart', False)
)
@ -448,7 +446,7 @@ class FileDownloader:
self._hook_progress({
'filename': filename,
'status': 'finished',
'total_bytes': os.path.getsize(encodeFilename(filename)),
'total_bytes': os.path.getsize(filename),
}, info_dict)
self._finish_multiline_status()
return True, False
@ -489,9 +487,7 @@ class FileDownloader:
if not self.params.get('verbose', False):
return
str_args = [decodeArgument(a) for a in args]
if exe is None:
exe = os.path.basename(str_args[0])
exe = os.path.basename(args[0])
self.write_debug(f'{exe} command line: {shell_quote(str_args)}')
self.write_debug(f'{exe} command line: {shell_quote(args)}')

View File

@ -23,7 +23,6 @@ from ..utils import (
cli_valueless_option,
determine_ext,
encodeArgument,
encodeFilename,
find_available_port,
remove_end,
traverse_obj,
@ -67,7 +66,7 @@ class ExternalFD(FragmentFD):
'elapsed': time.time() - started,
}
if filename != '-':
fsize = os.path.getsize(encodeFilename(tmpfilename))
fsize = os.path.getsize(tmpfilename)
self.try_rename(tmpfilename, filename)
status.update({
'downloaded_bytes': fsize,
@ -184,9 +183,9 @@ class ExternalFD(FragmentFD):
dest.write(decrypt_fragment(fragment, src.read()))
src.close()
if not self.params.get('keep_fragments', False):
self.try_remove(encodeFilename(fragment_filename))
self.try_remove(fragment_filename)
dest.close()
self.try_remove(encodeFilename(f'{tmpfilename}.frag.urls'))
self.try_remove(f'{tmpfilename}.frag.urls')
return 0
def _call_process(self, cmd, info_dict):
@ -620,7 +619,7 @@ class FFmpegFD(ExternalFD):
args += self._configuration_args(('_o1', '_o', ''))
args = [encodeArgument(opt) for opt in args]
args.append(encodeFilename(ffpp._ffmpeg_filename_argument(tmpfilename), True))
args.append(ffpp._ffmpeg_filename_argument(tmpfilename))
self._debug_cmd(args)
piped = any(fmt['url'] in ('-', 'pipe:') for fmt in selected_formats)

View File

@ -9,10 +9,9 @@ import time
from .common import FileDownloader
from .http import HttpFD
from ..aes import aes_cbc_decrypt_bytes, unpad_pkcs7
from ..compat import compat_os_name
from ..networking import Request
from ..networking.exceptions import HTTPError, IncompleteRead
from ..utils import DownloadError, RetryManager, encodeFilename, traverse_obj
from ..utils import DownloadError, RetryManager, traverse_obj
from ..utils.networking import HTTPHeaderDict
from ..utils.progress import ProgressCalculator
@ -152,7 +151,7 @@ class FragmentFD(FileDownloader):
if self.__do_ytdl_file(ctx):
self._write_ytdl_file(ctx)
if not self.params.get('keep_fragments', False):
self.try_remove(encodeFilename(ctx['fragment_filename_sanitized']))
self.try_remove(ctx['fragment_filename_sanitized'])
del ctx['fragment_filename_sanitized']
def _prepare_frag_download(self, ctx):
@ -188,7 +187,7 @@ class FragmentFD(FileDownloader):
})
if self.__do_ytdl_file(ctx):
ytdl_file_exists = os.path.isfile(encodeFilename(self.ytdl_filename(ctx['filename'])))
ytdl_file_exists = os.path.isfile(self.ytdl_filename(ctx['filename']))
continuedl = self.params.get('continuedl', True)
if continuedl and ytdl_file_exists:
self._read_ytdl_file(ctx)
@ -390,7 +389,7 @@ class FragmentFD(FileDownloader):
def __exit__(self, exc_type, exc_val, exc_tb):
pass
if compat_os_name == 'nt':
if os.name == 'nt':
def future_result(future):
while True:
try:

View File

@ -15,7 +15,6 @@ from ..utils import (
ThrottledDownload,
XAttrMetadataError,
XAttrUnavailableError,
encodeFilename,
int_or_none,
parse_http_range,
try_call,
@ -58,9 +57,8 @@ class HttpFD(FileDownloader):
if self.params.get('continuedl', True):
# Establish possible resume length
if os.path.isfile(encodeFilename(ctx.tmpfilename)):
ctx.resume_len = os.path.getsize(
encodeFilename(ctx.tmpfilename))
if os.path.isfile(ctx.tmpfilename):
ctx.resume_len = os.path.getsize(ctx.tmpfilename)
ctx.is_resume = ctx.resume_len > 0
@ -241,7 +239,7 @@ class HttpFD(FileDownloader):
ctx.resume_len = byte_counter
else:
try:
ctx.resume_len = os.path.getsize(encodeFilename(ctx.tmpfilename))
ctx.resume_len = os.path.getsize(ctx.tmpfilename)
except FileNotFoundError:
ctx.resume_len = 0
raise RetryDownload(e)

View File

@ -8,7 +8,6 @@ from ..utils import (
Popen,
check_executable,
encodeArgument,
encodeFilename,
get_exe_version,
)
@ -179,7 +178,7 @@ class RtmpFD(FileDownloader):
return False
while retval in (RD_INCOMPLETE, RD_FAILED) and not test and not live:
prevsize = os.path.getsize(encodeFilename(tmpfilename))
prevsize = os.path.getsize(tmpfilename)
self.to_screen(f'[rtmpdump] Downloaded {prevsize} bytes')
time.sleep(5.0) # This seems to be needed
args = [*basic_args, '--resume']
@ -187,7 +186,7 @@ class RtmpFD(FileDownloader):
args += ['--skip', '1']
args = [encodeArgument(a) for a in args]
retval = run_rtmpdump(args)
cursize = os.path.getsize(encodeFilename(tmpfilename))
cursize = os.path.getsize(tmpfilename)
if prevsize == cursize and retval == RD_FAILED:
break
# Some rtmp streams seem abort after ~ 99.8%. Don't complain for those
@ -196,7 +195,7 @@ class RtmpFD(FileDownloader):
retval = RD_SUCCESS
break
if retval == RD_SUCCESS or (test and retval == RD_INCOMPLETE):
fsize = os.path.getsize(encodeFilename(tmpfilename))
fsize = os.path.getsize(tmpfilename)
self.to_screen(f'[rtmpdump] Downloaded {fsize} bytes')
self.try_rename(tmpfilename, filename)
self._hook_progress({

View File

@ -2,7 +2,7 @@ import os
import subprocess
from .common import FileDownloader
from ..utils import check_executable, encodeFilename
from ..utils import check_executable
class RtspFD(FileDownloader):
@ -26,7 +26,7 @@ class RtspFD(FileDownloader):
retval = subprocess.call(args)
if retval == 0:
fsize = os.path.getsize(encodeFilename(tmpfilename))
fsize = os.path.getsize(tmpfilename)
self.to_screen(f'\r[{args[0]}] {fsize} bytes')
self.try_rename(tmpfilename, filename)
self._hook_progress({

View File

@ -6,7 +6,6 @@ import hmac
import io
import json
import re
import struct
import time
import urllib.parse
import uuid
@ -18,10 +17,8 @@ from ..networking.exceptions import TransportError
from ..utils import (
ExtractorError,
OnDemandPagedList,
bytes_to_intlist,
decode_base_n,
int_or_none,
intlist_to_bytes,
time_seconds,
traverse_obj,
update_url_query,
@ -72,15 +69,15 @@ class AbemaLicenseRH(RequestHandler):
})
res = decode_base_n(license_response['k'], table=self._STRTABLE)
encvideokey = bytes_to_intlist(struct.pack('>QQ', res >> 64, res & 0xffffffffffffffff))
encvideokey = list(res.to_bytes(16, 'big'))
h = hmac.new(
binascii.unhexlify(self._HKEY),
(license_response['cid'] + self.ie._DEVICE_ID).encode(),
digestmod=hashlib.sha256)
enckey = bytes_to_intlist(h.digest())
enckey = list(h.digest())
return intlist_to_bytes(aes_ecb_decrypt(encvideokey, enckey))
return bytes(aes_ecb_decrypt(encvideokey, enckey))
class AbemaTVBaseIE(InfoExtractor):

View File

@ -11,11 +11,9 @@ from ..networking.exceptions import HTTPError
from ..utils import (
ExtractorError,
ass_subtitles_timecode,
bytes_to_intlist,
bytes_to_long,
float_or_none,
int_or_none,
intlist_to_bytes,
join_nonempty,
long_to_bytes,
parse_iso8601,
@ -198,16 +196,16 @@ Format: Marked,Start,End,Style,Name,MarginL,MarginR,MarginV,Effect,Text'''
links_url = try_get(options, lambda x: x['video']['url']) or (video_base_url + 'link')
self._K = ''.join(random.choices('0123456789abcdef', k=16))
message = bytes_to_intlist(json.dumps({
message = list(json.dumps({
'k': self._K,
't': token,
}))
}).encode())
# Sometimes authentication fails for no good reason, retry with
# a different random padding
links_data = None
for _ in range(3):
padded_message = intlist_to_bytes(pkcs1pad(message, 128))
padded_message = bytes(pkcs1pad(message, 128))
n, e = self._RSA_KEY
encrypted_message = long_to_bytes(pow(bytes_to_long(padded_message), e, n))
authorization = base64.b64encode(encrypted_message).decode()

View File

@ -8,10 +8,8 @@ import time
from .common import InfoExtractor
from ..aes import aes_encrypt
from ..utils import (
bytes_to_intlist,
determine_ext,
int_or_none,
intlist_to_bytes,
join_nonempty,
smuggle_url,
strip_jsonp,
@ -234,8 +232,8 @@ class AnvatoIE(InfoExtractor):
server_time = self._server_time(access_key, video_id)
input_data = f'{server_time}~{md5_text(video_data_url)}~{md5_text(server_time)}'
auth_secret = intlist_to_bytes(aes_encrypt(
bytes_to_intlist(input_data[:64]), bytes_to_intlist(self._AUTH_KEY)))
auth_secret = bytes(aes_encrypt(
list(input_data[:64].encode()), list(self._AUTH_KEY)))
query = {
'X-Anvato-Adst-Auth': base64.b64encode(auth_secret).decode('ascii'),
'rtyp': 'fp',

View File

@ -1,3 +1,4 @@
import json
import re
import urllib.parse
@ -8,6 +9,7 @@ from ..utils import (
float_or_none,
str_or_none,
traverse_obj,
unescapeHTML,
urlencode_postdata,
)
@ -19,7 +21,7 @@ USER_AGENTS = {
class CeskaTelevizeIE(InfoExtractor):
_VALID_URL = r'https?://(?:www\.)?ceskatelevize\.cz/(?:ivysilani|porady|zive)/(?:[^/?#&]+/)*(?P<id>[^/#?]+)'
_TESTS = [{
'url': 'http://www.ceskatelevize.cz/ivysilani/10441294653-hyde-park-civilizace/215411058090502/bonus/20641-bonus-01-en',
'url': 'https://www.ceskatelevize.cz/porady/10441294653-hyde-park-civilizace/bonus/20641/',
'info_dict': {
'id': '61924494877028507',
'ext': 'mp4',
@ -27,6 +29,7 @@ class CeskaTelevizeIE(InfoExtractor):
'description': 'English Subtittles',
'thumbnail': r're:^https?://.*\.jpg',
'duration': 81.3,
'live_status': 'not_live',
},
'params': {
# m3u8 download
@ -34,13 +37,16 @@ class CeskaTelevizeIE(InfoExtractor):
},
}, {
# live stream
'url': 'http://www.ceskatelevize.cz/zive/ct1/',
'url': 'https://www.ceskatelevize.cz/zive/ct1/',
'only_matching': True,
'info_dict': {
'id': '102',
'id': '61924494878124436',
'ext': 'mp4',
'title': r'ČT1 - živé vysílání online',
'title': r're:^ČT1 - živé vysílání online \d{4}-\d{2}-\d{2} \d{2}:\d{2}$',
'description': 'Sledujte živé vysílání kanálu ČT1 online. Vybírat si můžete i z dalších kanálů České televize na kterémkoli z vašich zařízení.',
'is_live': True,
'thumbnail': r're:^https?://.*\.jpg',
'duration': 5373.3,
'live_status': 'is_live',
},
'params': {
# m3u8 download
@ -48,18 +54,19 @@ class CeskaTelevizeIE(InfoExtractor):
},
}, {
# another
'url': 'http://www.ceskatelevize.cz/ivysilani/zive/ct4/',
'url': 'https://www.ceskatelevize.cz/zive/sport/',
'only_matching': True,
'info_dict': {
'id': '402',
'id': '422',
'ext': 'mp4',
'title': r're:^ČT Sport \d{4}-\d{2}-\d{2} \d{2}:\d{2}$',
'is_live': True,
'thumbnail': r're:^https?://.*\.jpg',
'live_status': 'is_live',
},
'params': {
# m3u8 download
'skip_download': True,
},
# 'skip': 'Georestricted to Czech Republic',
}, {
'url': 'http://www.ceskatelevize.cz/ivysilani/embed/iFramePlayer.php?hash=d6a3e1370d2e4fa76296b90bad4dfc19673b641e&IDEC=217 562 22150/0004&channelID=1&width=100%25',
'only_matching': True,
}, {
# video with 18+ caution trailer
'url': 'http://www.ceskatelevize.cz/porady/10520528904-queer/215562210900007-bogotart/',
@ -74,6 +81,7 @@ class CeskaTelevizeIE(InfoExtractor):
'ext': 'mp4',
'title': 'Bogotart - Queer (Varování 18+)',
'duration': 11.9,
'live_status': 'not_live',
},
}, {
'info_dict': {
@ -82,6 +90,7 @@ class CeskaTelevizeIE(InfoExtractor):
'title': 'Bogotart - Queer (Queer)',
'thumbnail': r're:^https?://.*\.jpg',
'duration': 1558.3,
'live_status': 'not_live',
},
}],
'params': {
@ -91,7 +100,19 @@ class CeskaTelevizeIE(InfoExtractor):
}, {
# iframe embed
'url': 'http://www.ceskatelevize.cz/porady/10614999031-neviditelni/21251212048/',
'only_matching': True,
'info_dict': {
'id': '61924494877628660',
'ext': 'mp4',
'title': 'Epizoda 1/13 - Neviditelní',
'description': 'Vypadají jako my, mluví jako my, ale mají něco navíc gen, který jim umožňuje dýchat vodu. Aniž to tušíme, žijí mezi námi.',
'thumbnail': r're:^https?://.*\.jpg',
'duration': 3576.8,
'live_status': 'not_live',
},
'params': {
# m3u8 download
'skip_download': True,
},
}]
def _real_extract(self, url):
@ -106,26 +127,83 @@ class CeskaTelevizeIE(InfoExtractor):
if playlist_description:
playlist_description = playlist_description.replace('\xa0', ' ')
type_ = 'IDEC'
type_ = 'episode'
is_live = False
if re.search(r'(^/porady|/zive)/', parsed_url.path):
next_data = self._search_nextjs_data(webpage, playlist_id)
if '/zive/' in parsed_url.path:
idec = traverse_obj(next_data, ('props', 'pageProps', 'data', 'liveBroadcast', 'current', 'idec'), get_all=False)
sidp = traverse_obj(next_data, ('props', 'pageProps', 'data', 'liveBroadcast', 'current', 'showId'), get_all=False)
is_live = True
else:
idec = traverse_obj(next_data, ('props', 'pageProps', 'data', ('show', 'mediaMeta'), 'idec'), get_all=False)
if not idec:
idec = traverse_obj(next_data, ('props', 'pageProps', 'data', 'videobonusDetail', 'bonusId'), get_all=False)
if idec:
type_ = 'bonus'
sidp = self._search_regex(r'https?://(?:www\.)?ceskatelevize\.cz/(?:ivysilani|porady|zive)/([0-9]+)-', url, playlist_id, default=playlist_id)
if not idec:
raise ExtractorError('Failed to find IDEC id')
iframe_hash = self._download_webpage(
'https://www.ceskatelevize.cz/v-api/iframe-hash/',
playlist_id, note='Getting IFRAME hash')
query = {'hash': iframe_hash, 'origin': 'iVysilani', 'autoStart': 'true', type_: idec}
sidp = sidp.rsplit('-')[0]
query = {'origin': 'iVysilani', 'autoStart': 'true', 'sidp': sidp, type_: idec}
webpage = self._download_webpage(
'https://www.ceskatelevize.cz/ivysilani/embed/iFramePlayer.php',
'https://player.ceskatelevize.cz/',
playlist_id, note='Downloading player', query=query)
playlistpage_url = 'https://www.ceskatelevize.cz/ivysilani/ajax/get-client-playlist/'
data = {
'playlist[0][type]': type_,
'playlist[0][id]': idec,
'requestUrl': parsed_url.path,
'requestSource': 'iVysilani',
}
elif parsed_url.path == '/' and parsed_url.fragment == 'live':
if self._search_regex(r'(?s)<section[^>]+id=[\'"]live[\'"][^>]+data-ctcomp-data=\'([^\']+)\'[^>]*>', webpage, 'live video player', default=None):
# CT4
is_live = True
ctcomp_data = self._parse_json(
self._search_regex(
r'(?s)<section[^>]+id=[\'"]live[\'"][^>]+data-ctcomp-data=\'([^\']+)\'[^>]*>',
webpage, 'ctcomp data', fatal=True),
playlist_id, transform_source=unescapeHTML)
current_item = traverse_obj(ctcomp_data, ('items', ctcomp_data.get('currentItem'), 'items', 0, 'video', 'data', 'source', 'playlist', 0))
playlistpage_url = 'https://playlist.ceskatelevize.cz/'
data = {
'contentType': 'live',
'items': [{
'id': current_item.get('id'),
'key': current_item.get('key'),
'assetId': current_item.get('assetId'),
'playerType': 'dash',
'date': current_item.get('date'),
'requestSource': current_item.get('requestSource'),
'drm': current_item.get('drm'),
'quality': current_item.get('quality'),
}],
}
data = {'data': json.dumps(data).encode('utf-8')}
else:
# CT24
is_live = True
lvp_url = self._search_regex(
r'(?s)<div[^>]+id=[\'"]live-video-player[\'"][^>]+data-url=[\'"]([^\'"]+)[\'"][^>]*>',
webpage, 'live video player', fatal=True)
lvp_hash = self._search_regex(
r'(?s)media_ivysilani: *{ *hash *: *[\'"]([0-9a-f]+)[\'"] *}',
webpage, 'live video hash', fatal=True)
lvp_url += '&hash=' + lvp_hash
webpage = self._download_webpage(unescapeHTML(lvp_url), playlist_id)
playlistpage = self._search_regex(
r'(?s)getPlaylistUrl\((\[[^\]]+\])[,\)]',
webpage, 'playlist params', fatal=True)
playlistpage_params = self._parse_json(playlistpage, playlist_id)[0]
playlistpage_url = 'https://www.ceskatelevize.cz/ivysilani/ajax/get-client-playlist/'
idec = playlistpage_params.get('id')
data = {
'playlist[0][type]': playlistpage_params.get('type'),
'playlist[0][id]': idec,
'requestUrl': '/ivysilani/embed/iFramePlayer.php',
'requestSource': 'iVysilani',
}
NOT_AVAILABLE_STRING = 'This content is not available at your territory due to limited copyright.'
if f'{NOT_AVAILABLE_STRING}</p>' in webpage:
@ -133,40 +211,10 @@ class CeskaTelevizeIE(InfoExtractor):
if any(not_found in webpage for not_found in ('Neplatný parametr pro videopřehrávač', 'IDEC nebyl nalezen')):
raise ExtractorError('no video with IDEC available', video_id=idec, expected=True)
type_ = None
episode_id = None
playlist = self._parse_json(
self._search_regex(
r'getPlaylistUrl\(\[({.+?})\]', webpage, 'playlist',
default='{}'), playlist_id)
if playlist:
type_ = playlist.get('type')
episode_id = playlist.get('id')
if not type_:
type_ = self._html_search_regex(
r'getPlaylistUrl\(\[\{"type":"(.+?)","id":".+?"\}\],',
webpage, 'type')
if not episode_id:
episode_id = self._html_search_regex(
r'getPlaylistUrl\(\[\{"type":".+?","id":"(.+?)"\}\],',
webpage, 'episode_id')
data = {
'playlist[0][type]': type_,
'playlist[0][id]': episode_id,
'requestUrl': parsed_url.path,
'requestSource': 'iVysilani',
}
entries = []
for user_agent in (None, USER_AGENTS['Safari']):
req = Request(
'https://www.ceskatelevize.cz/ivysilani/ajax/get-client-playlist/',
data=urlencode_postdata(data))
req = Request(playlistpage_url, data=urlencode_postdata(data))
req.headers['Content-type'] = 'application/x-www-form-urlencoded'
req.headers['x-addr'] = '127.0.0.1'
req.headers['X-Requested-With'] = 'XMLHttpRequest'
@ -179,25 +227,25 @@ class CeskaTelevizeIE(InfoExtractor):
if not playlistpage:
continue
playlist_url = playlistpage['url']
if playlist_url == 'error_region':
raise ExtractorError(NOT_AVAILABLE_STRING, expected=True)
playlist_url = playlistpage.get('url')
if playlist_url:
if playlist_url == 'error_region':
raise ExtractorError(NOT_AVAILABLE_STRING, expected=True)
req = Request(urllib.parse.unquote(playlist_url))
req.headers['Referer'] = url
playlist = self._download_json(req, playlist_id, fatal=False)
if not playlist:
continue
playlist = playlist.get('playlist')
else:
playlist = traverse_obj(playlistpage, ('RESULT', 'playlist'))
req = Request(urllib.parse.unquote(playlist_url))
req.headers['Referer'] = url
playlist = self._download_json(req, playlist_id, fatal=False)
if not playlist:
continue
playlist = playlist.get('playlist')
if not isinstance(playlist, list):
continue
playlist_len = len(playlist)
for num, item in enumerate(playlist):
is_live = item.get('type') == 'LIVE'
formats = []
for format_id, stream_url in item.get('streamUrls', {}).items():
if 'playerType=flash' in stream_url:
@ -222,7 +270,7 @@ class CeskaTelevizeIE(InfoExtractor):
continue
item_id = str_or_none(item.get('id') or item['assetId'])
title = item['title']
title = item.get('title') or 'live'
duration = float_or_none(item.get('duration'))
thumbnail = item.get('previewImageUrl')
@ -231,7 +279,7 @@ class CeskaTelevizeIE(InfoExtractor):
if item.get('type') == 'VOD':
subs = item.get('subtitles')
if subs:
subtitles = self.extract_subtitles(episode_id, subs)
subtitles = self.extract_subtitles(idec, subs)
if playlist_len == 1:
final_title = playlist_title or title
@ -246,7 +294,7 @@ class CeskaTelevizeIE(InfoExtractor):
'duration': duration,
'formats': formats,
'subtitles': subtitles,
'is_live': is_live,
'live_status': 'is_live' if is_live else 'not_live',
})
if len(entries) == 1:

View File

@ -25,7 +25,6 @@ import xml.etree.ElementTree
from ..compat import (
compat_etree_fromstring,
compat_expanduser,
compat_os_name,
urllib_req_to_req,
)
from ..cookies import LenientSimpleCookie
@ -1029,7 +1028,7 @@ class InfoExtractor:
filename = sanitize_filename(f'{basen}.dump', restricted=True)
# Working around MAX_PATH limitation on Windows (see
# http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx)
if compat_os_name == 'nt':
if os.name == 'nt':
absfilepath = os.path.abspath(filename)
if len(absfilepath) > 259:
filename = fR'\\?\{absfilepath}'

View File

@ -1,11 +1,9 @@
import base64
from .common import InfoExtractor
from ..aes import aes_cbc_decrypt, unpad_pkcs7
from ..aes import aes_cbc_decrypt_bytes, unpad_pkcs7
from ..utils import (
ExtractorError,
bytes_to_intlist,
intlist_to_bytes,
unified_strdate,
)
@ -68,10 +66,10 @@ class ShemarooMeIE(InfoExtractor):
data_json = self._download_json('https://www.shemaroome.com/users/user_all_lists', video_id, data=data.encode())
if not data_json.get('status'):
raise ExtractorError('Premium videos cannot be downloaded yet.', expected=True)
url_data = bytes_to_intlist(base64.b64decode(data_json['new_play_url']))
key = bytes_to_intlist(base64.b64decode(data_json['key']))
iv = [0] * 16
m3u8_url = unpad_pkcs7(intlist_to_bytes(aes_cbc_decrypt(url_data, key, iv))).decode('ascii')
url_data = base64.b64decode(data_json['new_play_url'])
key = base64.b64decode(data_json['key'])
iv = bytes(16)
m3u8_url = unpad_pkcs7(aes_cbc_decrypt_bytes(url_data, key, iv)).decode('ascii')
headers = {'stream_key': data_json['stream_key']}
formats, m3u8_subs = self._extract_m3u8_formats_and_subtitles(m3u8_url, video_id, fatal=False, headers=headers)
for fmt in formats:

View File

@ -22,7 +22,7 @@ import urllib.parse
from .common import InfoExtractor, SearchInfoExtractor
from .openload import PhantomJSwrapper
from ..jsinterp import JSInterpreter
from ..networking.exceptions import HTTPError, TransportError, network_exceptions
from ..networking.exceptions import HTTPError, network_exceptions
from ..utils import (
NO_DEFAULT,
ExtractorError,
@ -50,12 +50,12 @@ from ..utils import (
parse_iso8601,
parse_qs,
qualities,
remove_end,
remove_start,
smuggle_url,
str_or_none,
str_to_int,
strftime_or_none,
time_seconds,
traverse_obj,
try_call,
try_get,
@ -124,14 +124,15 @@ INNERTUBE_CLIENTS = {
},
},
'INNERTUBE_CONTEXT_CLIENT_NAME': 62,
'REQUIRE_AUTH': True,
},
'android': {
'INNERTUBE_CONTEXT': {
'client': {
'clientName': 'ANDROID',
'clientVersion': '19.29.37',
'clientVersion': '19.44.38',
'androidSdkVersion': 30,
'userAgent': 'com.google.android.youtube/19.29.37 (Linux; U; Android 11) gzip',
'userAgent': 'com.google.android.youtube/19.44.38 (Linux; U; Android 11) gzip',
'osName': 'Android',
'osVersion': '11',
},
@ -140,13 +141,14 @@ INNERTUBE_CLIENTS = {
'REQUIRE_JS_PLAYER': False,
'REQUIRE_PO_TOKEN': True,
},
# This client now requires sign-in for every video
'android_music': {
'INNERTUBE_CONTEXT': {
'client': {
'clientName': 'ANDROID_MUSIC',
'clientVersion': '7.11.50',
'clientVersion': '7.27.52',
'androidSdkVersion': 30,
'userAgent': 'com.google.android.apps.youtube.music/7.11.50 (Linux; U; Android 11) gzip',
'userAgent': 'com.google.android.apps.youtube.music/7.27.52 (Linux; U; Android 11) gzip',
'osName': 'Android',
'osVersion': '11',
},
@ -154,15 +156,16 @@ INNERTUBE_CLIENTS = {
'INNERTUBE_CONTEXT_CLIENT_NAME': 21,
'REQUIRE_JS_PLAYER': False,
'REQUIRE_PO_TOKEN': True,
'REQUIRE_AUTH': True,
},
# This client now requires sign-in for every video
'android_creator': {
'INNERTUBE_CONTEXT': {
'client': {
'clientName': 'ANDROID_CREATOR',
'clientVersion': '24.30.100',
'clientVersion': '24.45.100',
'androidSdkVersion': 30,
'userAgent': 'com.google.android.apps.youtube.creator/24.30.100 (Linux; U; Android 11) gzip',
'userAgent': 'com.google.android.apps.youtube.creator/24.45.100 (Linux; U; Android 11) gzip',
'osName': 'Android',
'osVersion': '11',
},
@ -170,17 +173,18 @@ INNERTUBE_CLIENTS = {
'INNERTUBE_CONTEXT_CLIENT_NAME': 14,
'REQUIRE_JS_PLAYER': False,
'REQUIRE_PO_TOKEN': True,
'REQUIRE_AUTH': True,
},
# YouTube Kids videos aren't returned on this client for some reason
'android_vr': {
'INNERTUBE_CONTEXT': {
'client': {
'clientName': 'ANDROID_VR',
'clientVersion': '1.57.29',
'clientVersion': '1.60.19',
'deviceMake': 'Oculus',
'deviceModel': 'Quest 3',
'androidSdkVersion': 32,
'userAgent': 'com.google.android.apps.youtube.vr.oculus/1.57.29 (Linux; U; Android 12L; eureka-user Build/SQ3A.220605.009.A1) gzip',
'userAgent': 'com.google.android.apps.youtube.vr.oculus/1.60.19 (Linux; U; Android 12L; eureka-user Build/SQ3A.220605.009.A1) gzip',
'osName': 'Android',
'osVersion': '12L',
},
@ -188,68 +192,56 @@ INNERTUBE_CLIENTS = {
'INNERTUBE_CONTEXT_CLIENT_NAME': 28,
'REQUIRE_JS_PLAYER': False,
},
'android_testsuite': {
'INNERTUBE_CONTEXT': {
'client': {
'clientName': 'ANDROID_TESTSUITE',
'clientVersion': '1.9',
'androidSdkVersion': 30,
'userAgent': 'com.google.android.youtube/1.9 (Linux; U; Android 11) gzip',
'osName': 'Android',
'osVersion': '11',
},
},
'INNERTUBE_CONTEXT_CLIENT_NAME': 30,
'REQUIRE_JS_PLAYER': False,
'PLAYER_PARAMS': '2AMB',
},
# iOS clients have HLS live streams. Setting device model to get 60fps formats.
# See: https://github.com/TeamNewPipe/NewPipeExtractor/issues/680#issuecomment-1002724558
'ios': {
'INNERTUBE_CONTEXT': {
'client': {
'clientName': 'IOS',
'clientVersion': '19.29.1',
'clientVersion': '19.45.4',
'deviceMake': 'Apple',
'deviceModel': 'iPhone16,2',
'userAgent': 'com.google.ios.youtube/19.29.1 (iPhone16,2; U; CPU iOS 17_5_1 like Mac OS X;)',
'userAgent': 'com.google.ios.youtube/19.45.4 (iPhone16,2; U; CPU iOS 18_1_0 like Mac OS X;)',
'osName': 'iPhone',
'osVersion': '17.5.1.21F90',
'osVersion': '18.1.0.22B83',
},
},
'INNERTUBE_CONTEXT_CLIENT_NAME': 5,
'REQUIRE_JS_PLAYER': False,
},
# This client now requires sign-in for every video
'ios_music': {
'INNERTUBE_CONTEXT': {
'client': {
'clientName': 'IOS_MUSIC',
'clientVersion': '7.08.2',
'clientVersion': '7.27.0',
'deviceMake': 'Apple',
'deviceModel': 'iPhone16,2',
'userAgent': 'com.google.ios.youtubemusic/7.08.2 (iPhone16,2; U; CPU iOS 17_5_1 like Mac OS X;)',
'userAgent': 'com.google.ios.youtubemusic/7.27.0 (iPhone16,2; U; CPU iOS 18_1_0 like Mac OS X;)',
'osName': 'iPhone',
'osVersion': '17.5.1.21F90',
'osVersion': '18.1.0.22B83',
},
},
'INNERTUBE_CONTEXT_CLIENT_NAME': 26,
'REQUIRE_JS_PLAYER': False,
'REQUIRE_AUTH': True,
},
# This client now requires sign-in for every video
'ios_creator': {
'INNERTUBE_CONTEXT': {
'client': {
'clientName': 'IOS_CREATOR',
'clientVersion': '24.30.100',
'clientVersion': '24.45.100',
'deviceMake': 'Apple',
'deviceModel': 'iPhone16,2',
'userAgent': 'com.google.ios.ytcreator/24.30.100 (iPhone16,2; U; CPU iOS 17_5_1 like Mac OS X;)',
'userAgent': 'com.google.ios.ytcreator/24.45.100 (iPhone16,2; U; CPU iOS 18_1_0 like Mac OS X;)',
'osName': 'iPhone',
'osVersion': '17.5.1.21F90',
'osVersion': '18.1.0.22B83',
},
},
'INNERTUBE_CONTEXT_CLIENT_NAME': 15,
'REQUIRE_JS_PLAYER': False,
'REQUIRE_AUTH': True,
},
# mweb has 'ultralow' formats
# See: https://github.com/yt-dlp/yt-dlp/pull/557
@ -282,8 +274,10 @@ INNERTUBE_CLIENTS = {
},
},
'INNERTUBE_CONTEXT_CLIENT_NAME': 85,
'REQUIRE_AUTH': True,
},
# This client has pre-merged video+audio 720p/1080p streams
# This client now requires sign-in for every video
# It may be able to receive pre-merged video+audio 720p/1080p streams
'mediaconnect': {
'INNERTUBE_CONTEXT': {
'client': {
@ -293,6 +287,7 @@ INNERTUBE_CLIENTS = {
},
'INNERTUBE_CONTEXT_CLIENT_NAME': 95,
'REQUIRE_JS_PLAYER': False,
'REQUIRE_AUTH': True,
},
}
@ -321,6 +316,7 @@ def build_innertube_clients():
ytcfg.setdefault('INNERTUBE_HOST', 'www.youtube.com')
ytcfg.setdefault('REQUIRE_JS_PLAYER', True)
ytcfg.setdefault('REQUIRE_PO_TOKEN', False)
ytcfg.setdefault('REQUIRE_AUTH', False)
ytcfg.setdefault('PLAYER_PARAMS', None)
ytcfg['INNERTUBE_CONTEXT']['client'].setdefault('hl', 'en')
@ -577,208 +573,18 @@ class YoutubeBaseInfoExtractor(InfoExtractor):
self._check_login_required()
def _perform_login(self, username, password):
auth_type, _, user = (username or '').partition('+')
if auth_type != 'oauth':
raise ExtractorError(self._youtube_login_hint, expected=True)
self._initialize_oauth(user, password)
'''
OAuth 2.0 Device Authorization Grant flow, used by the YouTube TV client (youtube.com/tv).
For more information regarding OAuth 2.0 and the Device Authorization Grant flow in general, see:
- https://developers.google.com/identity/protocols/oauth2/limited-input-device
- https://accounts.google.com/.well-known/openid-configuration
- https://www.rfc-editor.org/rfc/rfc8628
- https://www.rfc-editor.org/rfc/rfc6749
Note: The official client appears to use a proxied version of the oauth2 endpoints on youtube.com/o/oauth2,
which applies some modifications to the response (such as returning errors as 200 OK).
Since the client works with the standard API, we will use that as it is well-documented.
'''
_OAUTH_PROFILE = None
_OAUTH_ACCESS_TOKEN_CACHE = {}
_OAUTH_DISPLAY_ID = 'oauth'
# YouTube TV (TVHTML5) client. You can find these at youtube.com/tv
_OAUTH_CLIENT_ID = '861556708454-d6dlm3lh05idd8npek18k6be8ba3oc68.apps.googleusercontent.com'
_OAUTH_CLIENT_SECRET = 'SboVhoG9s0rNafixCSGGKXAT'
_OAUTH_SCOPE = 'http://gdata.youtube.com https://www.googleapis.com/auth/youtube-paid-content'
# From https://accounts.google.com/.well-known/openid-configuration
# Technically, these should be fetched dynamically and not hard-coded.
# However, as these endpoints rarely change, we can risk saving an extra request for every invocation.
_OAUTH_DEVICE_AUTHORIZATION_ENDPOINT = 'https://oauth2.googleapis.com/device/code'
_OAUTH_TOKEN_ENDPOINT = 'https://oauth2.googleapis.com/token'
@property
def _oauth_cache_key(self):
return f'oauth_refresh_token_{self._OAUTH_PROFILE}'
def _read_oauth_error_response(self, response):
return traverse_obj(
self._webpage_read_content(response, self._OAUTH_TOKEN_ENDPOINT, self._OAUTH_DISPLAY_ID, fatal=False),
({json.loads}, 'error', {str}))
def _set_oauth_info(self, token_response):
YoutubeBaseInfoExtractor._OAUTH_ACCESS_TOKEN_CACHE.setdefault(self._OAUTH_PROFILE, {}).update({
'access_token': token_response['access_token'],
'token_type': token_response['token_type'],
'expiry': time_seconds(
seconds=traverse_obj(token_response, ('expires_in', {float_or_none}), default=300) - 10),
})
refresh_token = traverse_obj(token_response, ('refresh_token', {str}))
if refresh_token:
self.cache.store(self._NETRC_MACHINE, self._oauth_cache_key, refresh_token)
YoutubeBaseInfoExtractor._OAUTH_ACCESS_TOKEN_CACHE[self._OAUTH_PROFILE]['refresh_token'] = refresh_token
def _initialize_oauth(self, user, refresh_token):
self._OAUTH_PROFILE = user or 'default'
if self._OAUTH_PROFILE in YoutubeBaseInfoExtractor._OAUTH_ACCESS_TOKEN_CACHE:
self.write_debug(f'{self._OAUTH_DISPLAY_ID}: Using cached access token for profile "{self._OAUTH_PROFILE}"')
return
YoutubeBaseInfoExtractor._OAUTH_ACCESS_TOKEN_CACHE[self._OAUTH_PROFILE] = {}
if refresh_token:
msg = f'{self._OAUTH_DISPLAY_ID}: Using password input as refresh token'
if self.get_param('cachedir') is not False:
msg += ' and caching token to disk; you should supply an empty password next time'
self.to_screen(msg)
self.cache.store(self._NETRC_MACHINE, self._oauth_cache_key, refresh_token)
else:
refresh_token = self.cache.load(self._NETRC_MACHINE, self._oauth_cache_key)
if refresh_token:
YoutubeBaseInfoExtractor._OAUTH_ACCESS_TOKEN_CACHE[self._OAUTH_PROFILE]['refresh_token'] = refresh_token
try:
token_response = self._refresh_token(refresh_token)
except ExtractorError as e:
error_msg = str(e.orig_msg).replace('Failed to refresh access token: ', '')
self.report_warning(f'{self._OAUTH_DISPLAY_ID}: Failed to refresh access token: {error_msg}')
token_response = self._oauth_authorize
else:
token_response = self._oauth_authorize
self._set_oauth_info(token_response)
self.write_debug(f'{self._OAUTH_DISPLAY_ID}: Logged in using profile "{self._OAUTH_PROFILE}"')
def _refresh_token(self, refresh_token):
try:
token_response = self._download_json(
self._OAUTH_TOKEN_ENDPOINT,
video_id=self._OAUTH_DISPLAY_ID,
note='Refreshing access token',
data=json.dumps({
'client_id': self._OAUTH_CLIENT_ID,
'client_secret': self._OAUTH_CLIENT_SECRET,
'refresh_token': refresh_token,
'grant_type': 'refresh_token',
}).encode(),
headers={'Content-Type': 'application/json'})
except ExtractorError as e:
if isinstance(e.cause, HTTPError):
error = self._read_oauth_error_response(e.cause.response)
if error == 'invalid_grant':
# RFC6749 § 5.2
raise ExtractorError(
'Failed to refresh access token: Refresh token is invalid, revoked, or expired (invalid_grant)',
expected=True, video_id=self._OAUTH_DISPLAY_ID)
raise ExtractorError(
f'Failed to refresh access token: Authorization server returned error {error}',
video_id=self._OAUTH_DISPLAY_ID)
raise
return token_response
@property
def _oauth_authorize(self):
code_response = self._download_json(
self._OAUTH_DEVICE_AUTHORIZATION_ENDPOINT,
video_id=self._OAUTH_DISPLAY_ID,
note='Initializing authorization flow',
data=json.dumps({
'client_id': self._OAUTH_CLIENT_ID,
'scope': self._OAUTH_SCOPE,
}).encode(),
headers={'Content-Type': 'application/json'})
verification_url = traverse_obj(code_response, ('verification_url', {str}))
user_code = traverse_obj(code_response, ('user_code', {str}))
if not verification_url or not user_code:
if username.startswith('oauth'):
raise ExtractorError(
'Authorization server did not provide verification_url or user_code', video_id=self._OAUTH_DISPLAY_ID)
f'Login with OAuth is no longer supported. {self._youtube_login_hint}', expected=True)
# note: The whitespace is intentional
self.to_screen(
f'{self._OAUTH_DISPLAY_ID}: To give yt-dlp access to your account, '
f'go to {verification_url} and enter code {user_code}')
# RFC8628 § 3.5: default poll interval is 5 seconds if not provided
poll_interval = traverse_obj(code_response, ('interval', {int}), default=5)
for retry in self.RetryManager():
while True:
try:
token_response = self._download_json(
self._OAUTH_TOKEN_ENDPOINT,
video_id=self._OAUTH_DISPLAY_ID,
note=False,
errnote='Failed to request access token',
data=json.dumps({
'client_id': self._OAUTH_CLIENT_ID,
'client_secret': self._OAUTH_CLIENT_SECRET,
'device_code': code_response['device_code'],
'grant_type': 'urn:ietf:params:oauth:grant-type:device_code',
}).encode(),
headers={'Content-Type': 'application/json'})
except ExtractorError as e:
if isinstance(e.cause, TransportError):
retry.error = e
break
elif isinstance(e.cause, HTTPError):
error = self._read_oauth_error_response(e.cause.response)
if not error:
retry.error = e
break
if error == 'authorization_pending':
time.sleep(poll_interval)
continue
elif error == 'expired_token':
raise ExtractorError(
'Authorization timed out', expected=True, video_id=self._OAUTH_DISPLAY_ID)
elif error == 'access_denied':
raise ExtractorError(
'You denied access to an account', expected=True, video_id=self._OAUTH_DISPLAY_ID)
elif error == 'slow_down':
# RFC8628 § 3.5: add 5 seconds to the poll interval
poll_interval += 5
time.sleep(poll_interval)
continue
else:
raise ExtractorError(
f'Authorization server returned an error when fetching access token: {error}',
video_id=self._OAUTH_DISPLAY_ID)
raise
return token_response
def _update_oauth(self):
token = YoutubeBaseInfoExtractor._OAUTH_ACCESS_TOKEN_CACHE.get(self._OAUTH_PROFILE)
if token is None or token['expiry'] > time.time():
return
self._set_oauth_info(self._refresh_token(token['refresh_token']))
self.report_warning(
f'Login with password is not supported for YouTube. {self._youtube_login_hint}')
@property
def _youtube_login_hint(self):
return ('Use --username=oauth[+PROFILE] --password="" to log in using oauth, '
f'or else u{self._login_hint(method="cookies")[1:]}. '
'See https://github.com/yt-dlp/yt-dlp/wiki/Extractors#logging-in-with-oauth for more on how to use oauth. '
'See https://github.com/yt-dlp/yt-dlp/wiki/Extractors#exporting-youtube-cookies for help with cookies')
return (f'{self._login_hint(method="cookies")}. Also see '
'https://github.com/yt-dlp/yt-dlp/wiki/Extractors#exporting-youtube-cookies '
'for tips on effectively exporting YouTube cookies')
def _check_login_required(self):
if self._LOGIN_REQUIRED and not self.is_authenticated:
@ -928,7 +734,7 @@ class YoutubeBaseInfoExtractor(InfoExtractor):
@functools.cached_property
def is_authenticated(self):
return self._OAUTH_PROFILE or bool(self._generate_sapisidhash_header())
return bool(self._generate_sapisidhash_header())
def extract_ytcfg(self, video_id, webpage):
if not webpage:
@ -938,16 +744,6 @@ class YoutubeBaseInfoExtractor(InfoExtractor):
r'ytcfg\.set\s*\(\s*({.+?})\s*\)\s*;', webpage, 'ytcfg',
default='{}'), video_id, fatal=False) or {}
def _generate_oauth_headers(self):
self._update_oauth()
oauth_token = YoutubeBaseInfoExtractor._OAUTH_ACCESS_TOKEN_CACHE.get(self._OAUTH_PROFILE)
if not oauth_token:
return {}
return {
'Authorization': f'{oauth_token["token_type"]} {oauth_token["access_token"]}',
}
def _generate_cookie_auth_headers(self, *, ytcfg=None, account_syncid=None, session_index=None, origin=None, **kwargs):
headers = {}
account_syncid = account_syncid or self._extract_account_syncid(ytcfg)
@ -977,14 +773,10 @@ class YoutubeBaseInfoExtractor(InfoExtractor):
'Origin': origin,
'X-Goog-Visitor-Id': visitor_data or self._extract_visitor_data(ytcfg),
'User-Agent': self._ytcfg_get_safe(ytcfg, lambda x: x['INNERTUBE_CONTEXT']['client']['userAgent'], default_client=default_client),
**self._generate_oauth_headers(),
**self._generate_cookie_auth_headers(ytcfg=ytcfg, account_syncid=account_syncid, session_index=session_index, origin=origin),
}
return filter_dict(headers)
def _generate_webpage_headers(self):
return self._generate_oauth_headers()
def _download_ytcfg(self, client, video_id):
url = {
'web': 'https://www.youtube.com',
@ -994,8 +786,7 @@ class YoutubeBaseInfoExtractor(InfoExtractor):
if not url:
return {}
webpage = self._download_webpage(
url, video_id, fatal=False, note=f'Downloading {client.replace("_", " ").strip()} client config',
headers=self._generate_webpage_headers())
url, video_id, fatal=False, note=f'Downloading {client.replace("_", " ").strip()} client config')
return self.extract_ytcfg(video_id, webpage) or {}
@staticmethod
@ -3260,8 +3051,7 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
code = self._download_webpage(
player_url, video_id, fatal=fatal,
note='Downloading player ' + player_id,
errnote=f'Download of {player_url} failed',
headers=self._generate_webpage_headers())
errnote=f'Download of {player_url} failed')
if code:
self._code_cache[player_id] = code
return self._code_cache.get(player_id)
@ -3544,8 +3334,7 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
self._download_webpage(
url, video_id, f'Marking {label}watched',
'Unable to mark watched', fatal=False,
headers=self._generate_webpage_headers())
'Unable to mark watched', fatal=False)
@classmethod
def _extract_from_webpage(cls, url, webpage):
@ -4059,9 +3848,10 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
if smuggled_data.get('is_music_url') or self.is_music_url(url):
for requested_client in requested_clients:
_, base_client, variant = _split_innertube_client(requested_client)
music_client = f'{base_client}_music'
music_client = f'{base_client}_music' if base_client != 'mweb' else 'web_music'
if variant != 'music' and music_client in INNERTUBE_CLIENTS:
requested_clients.append(music_client)
if not INNERTUBE_CLIENTS[music_client]['REQUIRE_AUTH'] or self.is_authenticated:
requested_clients.append(music_client)
return orderedSet(requested_clients)
@ -4174,10 +3964,10 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
self.to_screen(
f'{video_id}: This video is age-restricted and YouTube is requiring '
'account age-verification; some formats may be missing', only_once=True)
# web_creator and mediaconnect can work around the age-verification requirement
# _testsuite & _vr variants can also work around age-verification
# web_creator can work around the age-verification requirement
# android_vr and mediaconnect may also be able to work around age-verification
# tv_embedded may(?) still work around age-verification if the video is embeddable
append_client('web_creator', 'mediaconnect')
append_client('web_creator')
prs.extend(deprioritized_prs)
@ -4526,7 +4316,7 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
if pp:
query['pp'] = pp
webpage = self._download_webpage(
webpage_url, video_id, fatal=False, query=query, headers=self._generate_webpage_headers())
webpage_url, video_id, fatal=False, query=query)
master_ytcfg = self.extract_ytcfg(video_id, webpage) or self._get_default_ytcfg()
@ -4669,6 +4459,9 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
self.raise_geo_restricted(subreason, countries, metadata_available=True)
reason += f'. {subreason}'
if reason:
if 'sign in' in reason.lower():
reason = remove_end(reason, 'This helps protect our community. Learn more')
reason = f'{remove_end(reason.strip(), ".")}. {self._youtube_login_hint}'
self.raise_no_formats(reason, expected=True)
keywords = get_first(video_details, 'keywords', expected_type=list) or []
@ -5814,7 +5607,7 @@ class YoutubeTabBaseInfoExtractor(YoutubeBaseInfoExtractor):
webpage, data = None, None
for retry in self.RetryManager(fatal=fatal):
try:
webpage = self._download_webpage(url, item_id, note='Downloading webpage', headers=self._generate_webpage_headers())
webpage = self._download_webpage(url, item_id, note='Downloading webpage')
data = self.extract_yt_initial_data(item_id, webpage or '', fatal=fatal) or {}
except ExtractorError as e:
if isinstance(e.cause, network_exceptions):

View File

@ -9,7 +9,6 @@ from ..utils import (
RetryManager,
_configuration_args,
deprecation_warning,
encodeFilename,
)
@ -151,7 +150,7 @@ class PostProcessor(metaclass=PostProcessorMetaClass):
def try_utime(self, path, atime, mtime, errnote='Cannot update utime of file'):
try:
os.utime(encodeFilename(path), (atime, mtime))
os.utime(path, (atime, mtime))
except Exception:
self.report_warning(errnote)

View File

@ -12,7 +12,6 @@ from ..utils import (
PostProcessingError,
check_executable,
encodeArgument,
encodeFilename,
prepend_extension,
shell_quote,
)
@ -68,7 +67,7 @@ class EmbedThumbnailPP(FFmpegPostProcessor):
self.to_screen('There are no thumbnails on disk')
return [], info
thumbnail_filename = info['thumbnails'][idx]['filepath']
if not os.path.exists(encodeFilename(thumbnail_filename)):
if not os.path.exists(thumbnail_filename):
self.report_warning('Skipping embedding the thumbnail because the file is missing.')
return [], info
@ -85,7 +84,7 @@ class EmbedThumbnailPP(FFmpegPostProcessor):
thumbnail_filename = convertor.convert_thumbnail(thumbnail_filename, 'png')
thumbnail_ext = 'png'
mtime = os.stat(encodeFilename(filename)).st_mtime
mtime = os.stat(filename).st_mtime
success = True
if info['ext'] == 'mp3':
@ -154,12 +153,12 @@ class EmbedThumbnailPP(FFmpegPostProcessor):
else:
if not prefer_atomicparsley:
self.to_screen('mutagen was not found. Falling back to AtomicParsley')
cmd = [encodeFilename(atomicparsley, True),
encodeFilename(filename, True),
cmd = [atomicparsley,
filename,
encodeArgument('--artwork'),
encodeFilename(thumbnail_filename, True),
thumbnail_filename,
encodeArgument('-o'),
encodeFilename(temp_filename, True)]
temp_filename]
cmd += [encodeArgument(o) for o in self._configuration_args('AtomicParsley')]
self._report_run('atomicparsley', filename)

View File

@ -21,7 +21,6 @@ from ..utils import (
determine_ext,
dfxp2srt,
encodeArgument,
encodeFilename,
filter_dict,
float_or_none,
is_outdated_version,
@ -243,13 +242,13 @@ class FFmpegPostProcessor(PostProcessor):
try:
if self.probe_available:
cmd = [
encodeFilename(self.probe_executable, True),
self.probe_executable,
encodeArgument('-show_streams')]
else:
cmd = [
encodeFilename(self.executable, True),
self.executable,
encodeArgument('-i')]
cmd.append(encodeFilename(self._ffmpeg_filename_argument(path), True))
cmd.append(self._ffmpeg_filename_argument(path))
self.write_debug(f'{self.basename} command line: {shell_quote(cmd)}')
stdout, stderr, returncode = Popen.run(
cmd, text=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
@ -282,7 +281,7 @@ class FFmpegPostProcessor(PostProcessor):
self.check_version()
cmd = [
encodeFilename(self.probe_executable, True),
self.probe_executable,
encodeArgument('-hide_banner'),
encodeArgument('-show_format'),
encodeArgument('-show_streams'),
@ -335,9 +334,9 @@ class FFmpegPostProcessor(PostProcessor):
self.check_version()
oldest_mtime = min(
os.stat(encodeFilename(path)).st_mtime for path, _ in input_path_opts if path)
os.stat(path).st_mtime for path, _ in input_path_opts if path)
cmd = [encodeFilename(self.executable, True), encodeArgument('-y')]
cmd = [self.executable, encodeArgument('-y')]
# avconv does not have repeat option
if self.basename == 'ffmpeg':
cmd += [encodeArgument('-loglevel'), encodeArgument('repeat+info')]
@ -353,7 +352,7 @@ class FFmpegPostProcessor(PostProcessor):
args.append('-i')
return (
[encodeArgument(arg) for arg in args]
+ [encodeFilename(self._ffmpeg_filename_argument(file), True)])
+ [self._ffmpeg_filename_argument(file)])
for arg_type, path_opts in (('i', input_path_opts), ('o', output_path_opts)):
cmd += itertools.chain.from_iterable(
@ -522,8 +521,8 @@ class FFmpegExtractAudioPP(FFmpegPostProcessor):
return [], information
orig_path = prepend_extension(path, 'orig')
temp_path = prepend_extension(path, 'temp')
if (self._nopostoverwrites and os.path.exists(encodeFilename(new_path))
and os.path.exists(encodeFilename(orig_path))):
if (self._nopostoverwrites and os.path.exists(new_path)
and os.path.exists(orig_path)):
self.to_screen(f'Post-process file {new_path} exists, skipping')
return [], information
@ -838,7 +837,7 @@ class FFmpegMergerPP(FFmpegPostProcessor):
args.extend(['-map', f'{i}:v:0'])
self.to_screen(f'Merging formats into "{filename}"')
self.run_ffmpeg_multiple_files(info['__files_to_merge'], temp_filename, args)
os.rename(encodeFilename(temp_filename), encodeFilename(filename))
os.rename(temp_filename, filename)
return info['__files_to_merge'], info
def can_merge(self):
@ -1039,7 +1038,7 @@ class FFmpegSplitChaptersPP(FFmpegPostProcessor):
def _ffmpeg_args_for_chapter(self, number, chapter, info):
destination = self._prepare_filename(number, chapter, info)
if not self._downloader._ensure_dir_exists(encodeFilename(destination)):
if not self._downloader._ensure_dir_exists(destination):
return
chapter['filepath'] = destination

View File

@ -4,8 +4,6 @@ from .common import PostProcessor
from ..compat import shutil
from ..utils import (
PostProcessingError,
decodeFilename,
encodeFilename,
make_dir,
)
@ -21,25 +19,25 @@ class MoveFilesAfterDownloadPP(PostProcessor):
return 'MoveFiles'
def run(self, info):
dl_path, dl_name = os.path.split(encodeFilename(info['filepath']))
dl_path, dl_name = os.path.split(info['filepath'])
finaldir = info.get('__finaldir', dl_path)
finalpath = os.path.join(finaldir, dl_name)
if self._downloaded:
info['__files_to_move'][info['filepath']] = decodeFilename(finalpath)
info['__files_to_move'][info['filepath']] = finalpath
make_newfilename = lambda old: decodeFilename(os.path.join(finaldir, os.path.basename(encodeFilename(old))))
make_newfilename = lambda old: os.path.join(finaldir, os.path.basename(old))
for oldfile, newfile in info['__files_to_move'].items():
if not newfile:
newfile = make_newfilename(oldfile)
if os.path.abspath(encodeFilename(oldfile)) == os.path.abspath(encodeFilename(newfile)):
if os.path.abspath(oldfile) == os.path.abspath(newfile):
continue
if not os.path.exists(encodeFilename(oldfile)):
if not os.path.exists(oldfile):
self.report_warning(f'File "{oldfile}" cannot be found')
continue
if os.path.exists(encodeFilename(newfile)):
if os.path.exists(newfile):
if self.get_param('overwrites', True):
self.report_warning(f'Replacing existing file "{newfile}"')
os.remove(encodeFilename(newfile))
os.remove(newfile)
else:
self.report_warning(
f'Cannot move file "{oldfile}" out of temporary directory since "{newfile}" already exists. ')

View File

@ -9,7 +9,6 @@ from ..utils import (
check_executable,
cli_option,
encodeArgument,
encodeFilename,
prepend_extension,
shell_quote,
str_or_none,
@ -52,7 +51,7 @@ class SponSkrubPP(PostProcessor):
return [], information
filename = information['filepath']
if not os.path.exists(encodeFilename(filename)): # no download
if not os.path.exists(filename): # no download
return [], information
if information['extractor_key'].lower() != 'youtube':
@ -71,8 +70,8 @@ class SponSkrubPP(PostProcessor):
self.report_warning('If sponskrub is run multiple times, unintended parts of the video could be cut out.')
temp_filename = prepend_extension(filename, self._temp_ext)
if os.path.exists(encodeFilename(temp_filename)):
os.remove(encodeFilename(temp_filename))
if os.path.exists(temp_filename):
os.remove(temp_filename)
cmd = [self.path]
if not self.cutout:

View File

@ -1,7 +1,6 @@
import os
from .common import PostProcessor
from ..compat import compat_os_name
from ..utils import (
PostProcessingError,
XAttrMetadataError,
@ -57,7 +56,7 @@ class XAttrMetadataPP(PostProcessor):
elif e.reason == 'VALUE_TOO_LONG':
self.report_warning(f'Unable to write extended attribute "{xattrname}" due to too long values.')
else:
tip = ('You need to use NTFS' if compat_os_name == 'nt'
tip = ('You need to use NTFS' if os.name == 'nt'
else 'You may have to enable them in your "/etc/fstab"')
raise PostProcessingError(f'This filesystem doesn\'t support extended attributes. {tip}')

View File

@ -13,7 +13,6 @@ import sys
from dataclasses import dataclass
from zipimport import zipimporter
from .compat import compat_realpath
from .networking import Request
from .networking.exceptions import HTTPError, network_exceptions
from .utils import (
@ -201,8 +200,6 @@ class UpdateInfo:
binary_name: str | None = _get_binary_name() # noqa: RUF009: Always returns the same value
checksum: str | None = None
_has_update = True
class Updater:
# XXX: use class variables to simplify testing
@ -523,7 +520,7 @@ class Updater:
@functools.cached_property
def filename(self):
"""Filename of the executable"""
return compat_realpath(_get_variant_and_executable_path()[1])
return os.path.realpath(_get_variant_and_executable_path()[1])
@functools.cached_property
def cmd(self):
@ -562,62 +559,14 @@ class Updater:
f'Unable to {action}{delim} visit '
f'https://github.com/{self.requested_repo}/releases/{path}', True)
# XXX: Everything below this line in this class is deprecated / for compat only
@property
def _target_tag(self):
"""Deprecated; requested tag with 'tags/' prepended when necessary for API calls"""
return f'tags/{self.requested_tag}' if self.requested_tag != 'latest' else self.requested_tag
def _check_update(self):
"""Deprecated; report whether there is an update available"""
return bool(self.query_update(_output=True))
def __getattr__(self, attribute: str):
"""Compat getter function for deprecated attributes"""
deprecated_props_map = {
'check_update': '_check_update',
'target_tag': '_target_tag',
'target_channel': 'requested_channel',
}
update_info_props_map = {
'has_update': '_has_update',
'new_version': 'version',
'latest_version': 'requested_version',
'release_name': 'binary_name',
'release_hash': 'checksum',
}
if attribute not in deprecated_props_map and attribute not in update_info_props_map:
raise AttributeError(f'{type(self).__name__!r} object has no attribute {attribute!r}')
msg = f'{type(self).__name__}.{attribute} is deprecated and will be removed in a future version'
if attribute in deprecated_props_map:
source_name = deprecated_props_map[attribute]
if not source_name.startswith('_'):
msg += f'. Please use {source_name!r} instead'
source = self
mapping = deprecated_props_map
else: # attribute in update_info_props_map
msg += '. Please call query_update() instead'
source = self.query_update()
if source is None:
source = UpdateInfo('', None, None, None)
source._has_update = False
mapping = update_info_props_map
deprecation_warning(msg)
for target_name, source_name in mapping.items():
value = getattr(source, source_name)
setattr(self, target_name, value)
return getattr(self, attribute)
def run_update(ydl):
"""Update the program file with the latest version from the repository
@returns Whether there was a successful update (No update = False)
"""
deprecation_warning(
'"yt_dlp.update.run_update(ydl)" is deprecated and may be removed in a future version. '
'Use "yt_dlp.update.Updater(ydl).update()" instead')
return Updater(ydl).update()

View File

@ -9,31 +9,23 @@ passthrough_module(__name__, '.._legacy', callback=lambda attr: warnings.warn(
del passthrough_module
from ._utils import preferredencoding
import re
import struct
def encodeFilename(s, for_subprocess=False):
assert isinstance(s, str)
return s
def bytes_to_intlist(bs):
if not bs:
return []
if isinstance(bs[0], int): # Python 3
return list(bs)
else:
return [ord(c) for c in bs]
def decodeFilename(b, for_subprocess=False):
return b
def intlist_to_bytes(xs):
if not xs:
return b''
return struct.pack('%dB' % len(xs), *xs)
def decodeArgument(b):
return b
def decodeOption(optval):
if optval is None:
return optval
if isinstance(optval, bytes):
optval = optval.decode(preferredencoding())
assert isinstance(optval, str)
return optval
def error_to_compat_str(err):
return str(err)
compiled_regex_type = type(re.compile(''))

View File

@ -313,3 +313,30 @@ def make_HTTPS_handler(params, **kwargs):
def process_communicate_or_kill(p, *args, **kwargs):
return Popen.communicate_or_kill(p, *args, **kwargs)
def encodeFilename(s, for_subprocess=False):
assert isinstance(s, str)
return s
def decodeFilename(b, for_subprocess=False):
return b
def decodeArgument(b):
return b
def decodeOption(optval):
if optval is None:
return optval
if isinstance(optval, bytes):
optval = optval.decode(preferredencoding())
assert isinstance(optval, str)
return optval
def error_to_compat_str(err):
return str(err)

View File

@ -49,15 +49,11 @@ from ..compat import (
compat_etree_fromstring,
compat_expanduser,
compat_HTMLParseError,
compat_os_name,
)
from ..dependencies import xattr
__name__ = __name__.rsplit('.', 1)[0] # noqa: A001: Pretend to be the parent module
# This is not clearly defined otherwise
compiled_regex_type = type(re.compile(''))
class NO_DEFAULT:
pass
@ -874,7 +870,7 @@ class Popen(subprocess.Popen):
kwargs.setdefault('encoding', 'utf-8')
kwargs.setdefault('errors', 'replace')
if shell and compat_os_name == 'nt' and kwargs.get('executable') is None:
if shell and os.name == 'nt' and kwargs.get('executable') is None:
if not isinstance(args, str):
args = shell_quote(args, shell=True)
shell = False
@ -1457,7 +1453,7 @@ def system_identifier():
@functools.cache
def get_windows_version():
""" Get Windows version. returns () if it's not running on Windows """
if compat_os_name == 'nt':
if os.name == 'nt':
return version_tuple(platform.win32_ver()[1])
else:
return ()
@ -1470,7 +1466,7 @@ def write_string(s, out=None, encoding=None):
if not out:
return
if compat_os_name == 'nt' and supports_terminal_sequences(out):
if os.name == 'nt' and supports_terminal_sequences(out):
s = re.sub(r'([\r\n]+)', r' \1', s)
enc, buffer = None, out
@ -1503,21 +1499,6 @@ def deprecation_warning(msg, *, printer=None, stacklevel=0, **kwargs):
deprecation_warning._cache = set()
def bytes_to_intlist(bs):
if not bs:
return []
if isinstance(bs[0], int): # Python 3
return list(bs)
else:
return [ord(c) for c in bs]
def intlist_to_bytes(xs):
if not xs:
return b''
return struct.pack('%dB' % len(xs), *xs)
class LockingUnsupportedError(OSError):
msg = 'File locking is not supported'
@ -1701,7 +1682,7 @@ _CMD_QUOTE_TRANS = str.maketrans({
def shell_quote(args, *, shell=False):
args = list(variadic(args))
if compat_os_name != 'nt':
if os.name != 'nt':
return shlex.join(args)
trans = _CMD_QUOTE_TRANS if shell else _WINDOWS_QUOTE_TRANS
@ -4516,7 +4497,7 @@ def urshift(val, n):
def write_xattr(path, key, value):
# Windows: Write xattrs to NTFS Alternate Data Streams:
# http://en.wikipedia.org/wiki/NTFS#Alternate_data_streams_.28ADS.29
if compat_os_name == 'nt':
if os.name == 'nt':
assert ':' not in key
assert os.path.exists(path)
@ -4778,12 +4759,12 @@ def jwt_decode_hs256(jwt):
return json.loads(base64.urlsafe_b64decode(f'{payload_b64}==='))
WINDOWS_VT_MODE = False if compat_os_name == 'nt' else None
WINDOWS_VT_MODE = False if os.name == 'nt' else None
@functools.cache
def supports_terminal_sequences(stream):
if compat_os_name == 'nt':
if os.name == 'nt':
if not WINDOWS_VT_MODE:
return False
elif not os.getenv('TERM'):
@ -4877,7 +4858,7 @@ def parse_http_range(range):
def read_stdin(what):
if what:
eof = 'Ctrl+Z' if compat_os_name == 'nt' else 'Ctrl+D'
eof = 'Ctrl+Z' if os.name == 'nt' else 'Ctrl+D'
write_string(f'Reading {what} from STDIN - EOF ({eof}) to end:\n')
return sys.stdin