Compare commits

..

No commits in common. "dc512e3a8a26a8e3fc7f1f67e5ee5e7699db8659" and "be008e657d79832642e2158557c899249c9e31cd" have entirely different histories.

5 changed files with 211 additions and 397 deletions

View File

@ -23,7 +23,6 @@ from youtube_dl.compat import (
compat_urllib_parse_unquote, compat_urllib_parse_unquote,
compat_urllib_parse_unquote_plus, compat_urllib_parse_unquote_plus,
compat_urllib_parse_urlencode, compat_urllib_parse_urlencode,
compat_urllib_request,
) )
@ -136,19 +135,6 @@ class TestCompat(unittest.TestCase):
self.assertEqual(compat_casefold('\u03a3'), '\u03c3') self.assertEqual(compat_casefold('\u03a3'), '\u03c3')
self.assertEqual(compat_casefold('A\u0345\u03a3'), 'a\u03b9\u03c3') self.assertEqual(compat_casefold('A\u0345\u03a3'), 'a\u03b9\u03c3')
def test_compat_urllib_request_Request(self):
self.assertEqual(
compat_urllib_request.Request('http://127.0.0.1', method='PUT').get_method(),
'PUT')
class PUTrequest(compat_urllib_request.Request):
def get_method(self):
return 'PUT'
self.assertEqual(
PUTrequest('http://127.0.0.1').get_method(),
'PUT')
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()

View File

@ -295,7 +295,6 @@ class TestNRKSubtitles(BaseTestSubtitles):
def test_allsubtitles(self): def test_allsubtitles(self):
self.DL.params['writesubtitles'] = True self.DL.params['writesubtitles'] = True
self.DL.params['allsubtitles'] = True self.DL.params['allsubtitles'] = True
self.DL.params['format'] = 'best/bestvideo'
subtitles = self.getSubtitles() subtitles = self.getSubtitles()
self.assertEqual(set(subtitles.keys()), set(['nb-ttv'])) self.assertEqual(set(subtitles.keys()), set(['nb-ttv']))
self.assertEqual(md5(subtitles['nb-ttv']), '67e06ff02d0deaf975e68f6cb8f6a149') self.assertEqual(md5(subtitles['nb-ttv']), '67e06ff02d0deaf975e68f6cb8f6a149')

View File

@ -58,26 +58,19 @@ except ImportError: # Python 2
# Also fix up lack of method arg in old Pythons # Also fix up lack of method arg in old Pythons
try: try:
type(compat_urllib_request.Request('http://127.0.0.1', method='GET')) _req = compat_urllib_request.Request
_req('http://127.0.0.1', method='GET')
except TypeError: except TypeError:
def _add_init_method_arg(cls): class _request(object):
def __new__(cls, url, *args, **kwargs):
method = kwargs.pop('method', None)
r = _req(url, *args, **kwargs)
if method:
r.get_method = types.MethodType(lambda _: method, r)
return r
init = cls.__init__ compat_urllib_request.Request = _request
def wrapped_init(self, *args, **kwargs):
method = kwargs.pop('method', 'GET')
init(self, *args, **kwargs)
if any(callable(x.__dict__.get('get_method')) for x in (self.__class__, self) if x != cls):
# allow instance or its subclass to override get_method()
return
if self.has_data() and method == 'GET':
method = 'POST'
self.get_method = types.MethodType(lambda _: method, self)
cls.__init__ = wrapped_init
_add_init_method_arg(compat_urllib_request.Request)
del _add_init_method_arg
try: try:
import urllib.error as compat_urllib_error import urllib.error as compat_urllib_error

View File

@ -596,14 +596,6 @@ class InfoExtractor(object):
"""Sets the downloader for this IE.""" """Sets the downloader for this IE."""
self._downloader = downloader self._downloader = downloader
@property
def cache(self):
return self._downloader.cache
@property
def cookiejar(self):
return self._downloader.cookiejar
def _real_initialize(self): def _real_initialize(self):
"""Real initialization process. Redefine in subclasses.""" """Real initialization process. Redefine in subclasses."""
pass pass
@ -950,47 +942,14 @@ class InfoExtractor(object):
else: else:
self.report_warning(errmsg + str(ve)) self.report_warning(errmsg + str(ve))
def __ie_msg(self, *msg): def report_warning(self, msg, video_id=None):
return '[{0}] {1}'.format(self.IE_NAME, ''.join(msg))
# msg, video_id=None, *args, only_once=False, **kwargs
def report_warning(self, msg, *args, **kwargs):
if len(args) > 0:
video_id = args[0]
args = args[1:]
else:
video_id = kwargs.pop('video_id', None)
idstr = '' if video_id is None else '%s: ' % video_id idstr = '' if video_id is None else '%s: ' % video_id
self._downloader.report_warning( self._downloader.report_warning(
self.__ie_msg(idstr, msg), *args, **kwargs) '[%s] %s%s' % (self.IE_NAME, idstr, msg))
def to_screen(self, msg): def to_screen(self, msg):
"""Print msg to screen, prefixing it with '[ie_name]'""" """Print msg to screen, prefixing it with '[ie_name]'"""
self._downloader.to_screen(self.__ie_msg(msg)) self._downloader.to_screen('[%s] %s' % (self.IE_NAME, msg))
def write_debug(self, msg, only_once=False, _cache=[]):
'''Log debug message or Print message to stderr'''
if not self.get_param('verbose', False):
return
message = '[debug] ' + self.__ie_msg(msg)
logger = self.get_param('logger')
if logger:
logger.debug(message)
else:
if only_once and hash(message) in _cache:
return
self._downloader.to_stderr(message)
_cache.append(hash(message))
# name, default=None, *args, **kwargs
def get_param(self, name, *args, **kwargs):
default, args = (args[0], args[1:]) if len(args) > 0 else (kwargs.pop('default', None), args)
if self._downloader:
return self._downloader.params.get(name, default, *args, **kwargs)
return default
def report_drm(self, video_id):
self.raise_no_formats('This video is DRM protected', expected=True, video_id=video_id)
def report_extraction(self, id_or_name): def report_extraction(self, id_or_name):
"""Report information extraction.""" """Report information extraction."""
@ -1018,15 +977,6 @@ class InfoExtractor(object):
def raise_geo_restricted(msg='This video is not available from your location due to geo restriction', countries=None): def raise_geo_restricted(msg='This video is not available from your location due to geo restriction', countries=None):
raise GeoRestrictedError(msg, countries=countries) raise GeoRestrictedError(msg, countries=countries)
def raise_no_formats(self, msg, expected=False, video_id=None):
if expected and (
self.get_param('ignore_no_formats_error') or self.get_param('wait_for_video')):
self.report_warning(msg, video_id)
elif isinstance(msg, ExtractorError):
raise msg
else:
raise ExtractorError(msg, expected=expected, video_id=video_id)
# Methods for following #608 # Methods for following #608
@staticmethod @staticmethod
def url_result(url, ie=None, video_id=None, video_title=None): def url_result(url, ie=None, video_id=None, video_title=None):

View File

@ -2,7 +2,6 @@
from __future__ import unicode_literals from __future__ import unicode_literals
import collections
import itertools import itertools
import json import json
import os.path import os.path
@ -24,10 +23,10 @@ from ..compat import (
) )
from ..jsinterp import JSInterpreter from ..jsinterp import JSInterpreter
from ..utils import ( from ..utils import (
ExtractorError,
clean_html, clean_html,
dict_get, dict_get,
error_to_compat_str, error_to_compat_str,
ExtractorError,
float_or_none, float_or_none,
extract_attributes, extract_attributes,
get_element_by_attribute, get_element_by_attribute,
@ -37,9 +36,7 @@ from ..utils import (
LazyList, LazyList,
merge_dicts, merge_dicts,
mimetype2ext, mimetype2ext,
NO_DEFAULT,
parse_codecs, parse_codecs,
parse_count,
parse_duration, parse_duration,
parse_qs, parse_qs,
qualities, qualities,
@ -47,9 +44,7 @@ from ..utils import (
smuggle_url, smuggle_url,
str_or_none, str_or_none,
str_to_int, str_to_int,
T,
traverse_obj, traverse_obj,
try_call,
try_get, try_get,
txt_or_none, txt_or_none,
unescapeHTML, unescapeHTML,
@ -1252,7 +1247,7 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
'title': 'IMG 3456', 'title': 'IMG 3456',
'description': '', 'description': '',
'upload_date': '20170613', 'upload_date': '20170613',
'uploader': "l'Or Vert asbl", 'uploader': 'ElevageOrVert',
'uploader_id': '@ElevageOrVert', 'uploader_id': '@ElevageOrVert',
}, },
'params': { 'params': {
@ -1465,30 +1460,6 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
self._code_cache = {} self._code_cache = {}
self._player_cache = {} self._player_cache = {}
# *ytcfgs, webpage=None
def _extract_player_url(self, *ytcfgs, **kw_webpage):
if ytcfgs and not isinstance(ytcfgs[0], dict):
webpage = kw_webpage.get('webpage') or ytcfgs[0]
if webpage:
player_url = self._search_regex(
r'"(?:PLAYER_JS_URL|jsUrl)"\s*:\s*"([^"]+)"',
webpage or '', 'player URL', fatal=False)
if player_url:
ytcfgs = ytcfgs + ({'PLAYER_JS_URL': player_url},)
return traverse_obj(
ytcfgs, (Ellipsis, 'PLAYER_JS_URL'), (Ellipsis, 'WEB_PLAYER_CONTEXT_CONFIGS', Ellipsis, 'jsUrl'),
get_all=False, expected_type=lambda u: urljoin('https://www.youtube.com', u))
def _download_player_url(self, video_id, fatal=False):
res = self._download_webpage(
'https://www.youtube.com/iframe_api',
note='Downloading iframe API JS', video_id=video_id, fatal=fatal)
player_version = self._search_regex(
r'player\\?/([0-9a-fA-F]{8})\\?/', res or '', 'player version', fatal=fatal,
default=NO_DEFAULT if res else None)
if player_version:
return 'https://www.youtube.com/s/player/{0}/player_ias.vflset/en_US/base.js'.format(player_version)
def _signature_cache_id(self, example_sig): def _signature_cache_id(self, example_sig):
""" Return a string representation of a signature """ """ Return a string representation of a signature """
return '.'.join(compat_str(len(part)) for part in example_sig.split('.')) return '.'.join(compat_str(len(part)) for part in example_sig.split('.'))
@ -1503,49 +1474,46 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
raise ExtractorError('Cannot identify player %r' % player_url) raise ExtractorError('Cannot identify player %r' % player_url)
return id_m.group('id') return id_m.group('id')
def _load_player(self, video_id, player_url, fatal=True, player_id=None): def _get_player_code(self, video_id, player_url, player_id=None):
if not player_id: if not player_id:
player_id = self._extract_player_info(player_url) player_id = self._extract_player_info(player_url)
if player_id not in self._code_cache: if player_id not in self._code_cache:
code = self._download_webpage( self._code_cache[player_id] = self._download_webpage(
player_url, video_id, fatal=fatal, player_url, video_id,
note='Downloading player ' + player_id, note='Downloading player ' + player_id,
errnote='Download of %s failed' % player_url) errnote='Download of %s failed' % player_url)
if code: return self._code_cache[player_id]
self._code_cache[player_id] = code
return self._code_cache[player_id] if fatal else self._code_cache.get(player_id)
def _extract_signature_function(self, video_id, player_url, example_sig): def _extract_signature_function(self, video_id, player_url, example_sig):
player_id = self._extract_player_info(player_url) player_id = self._extract_player_info(player_url)
# Read from filesystem cache # Read from filesystem cache
func_id = 'js_{0}_{1}'.format( func_id = 'js_%s_%s' % (
player_id, self._signature_cache_id(example_sig)) player_id, self._signature_cache_id(example_sig))
assert os.path.basename(func_id) == func_id assert os.path.basename(func_id) == func_id
self.write_debug('Extracting signature function {0}'.format(func_id)) cache_spec = self._downloader.cache.load('youtube-sigfuncs', func_id)
cache_spec, code = self.cache.load('youtube-sigfuncs', func_id), None if cache_spec is not None:
if not cache_spec:
code = self._load_player(video_id, player_url, player_id)
if code:
res = self._parse_sig_js(code)
test_string = ''.join(map(compat_chr, range(len(example_sig))))
cache_spec = [ord(c) for c in res(test_string)]
self.cache.store('youtube-sigfuncs', func_id, cache_spec)
return lambda s: ''.join(s[i] for i in cache_spec) return lambda s: ''.join(s[i] for i in cache_spec)
def _print_sig_code(self, func, example_sig): code = self._get_player_code(video_id, player_url, player_id)
if not self.get_param('youtube_print_sig_code'): res = self._parse_sig_js(code)
return
test_string = ''.join(map(compat_chr, range(len(example_sig))))
cache_res = res(test_string)
cache_spec = [ord(c) for c in cache_res]
self._downloader.cache.store('youtube-sigfuncs', func_id, cache_spec)
return res
def _print_sig_code(self, func, example_sig):
def gen_sig_code(idxs): def gen_sig_code(idxs):
def _genslice(start, end, step): def _genslice(start, end, step):
starts = '' if start == 0 else str(start) starts = '' if start == 0 else str(start)
ends = (':%d' % (end + step)) if end + step >= 0 else ':' ends = (':%d' % (end + step)) if end + step >= 0 else ':'
steps = '' if step == 1 else (':%d' % step) steps = '' if step == 1 else (':%d' % step)
return 's[{0}{1}{2}]'.format(starts, ends, steps) return 's[%s%s%s]' % (starts, ends, steps)
step = None step = None
# Quelch pyflakes warnings - start will be set when step is set # Quelch pyflakes warnings - start will be set when step is set
@ -1596,137 +1564,143 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
jscode, 'Initial JS player signature function name', group='sig') jscode, 'Initial JS player signature function name', group='sig')
jsi = JSInterpreter(jscode) jsi = JSInterpreter(jscode)
initial_function = jsi.extract_function(funcname) initial_function = jsi.extract_function(funcname)
return lambda s: initial_function([s]) return lambda s: initial_function([s])
def _cached(self, func, *cache_id):
def inner(*args, **kwargs):
if cache_id not in self._player_cache:
try:
self._player_cache[cache_id] = func(*args, **kwargs)
except ExtractorError as e:
self._player_cache[cache_id] = e
except Exception as e:
self._player_cache[cache_id] = ExtractorError(traceback.format_exc(), cause=e)
ret = self._player_cache[cache_id]
if isinstance(ret, Exception):
raise ret
return ret
return inner
def _decrypt_signature(self, s, video_id, player_url): def _decrypt_signature(self, s, video_id, player_url):
"""Turn the encrypted s field into a working signature""" """Turn the encrypted s field into a working signature"""
extract_sig = self._cached(
self._extract_signature_function, 'sig', player_url, self._signature_cache_id(s)) if player_url is None:
func = extract_sig(video_id, player_url, s) raise ExtractorError('Cannot decrypt signature without player_url')
try:
player_id = (player_url, self._signature_cache_id(s))
if player_id not in self._player_cache:
func = self._extract_signature_function(
video_id, player_url, s
)
self._player_cache[player_id] = func
func = self._player_cache[player_id]
if self._downloader.params.get('youtube_print_sig_code'):
self._print_sig_code(func, s) self._print_sig_code(func, s)
return func(s) return func(s)
except Exception as e:
tb = traceback.format_exc()
raise ExtractorError(
'Signature extraction failed: ' + tb, cause=e)
def _extract_player_url(self, webpage):
player_url = self._search_regex(
r'"(?:PLAYER_JS_URL|jsUrl)"\s*:\s*"([^"]+)"',
webpage or '', 'player URL', fatal=False)
if not player_url:
return
if player_url.startswith('//'):
player_url = 'https:' + player_url
elif not re.match(r'https?://', player_url):
player_url = compat_urllib_parse.urljoin(
'https://www.youtube.com', player_url)
return player_url
# from yt-dlp # from yt-dlp
# See also: # See also:
# 1. https://github.com/ytdl-org/youtube-dl/issues/29326#issuecomment-894619419 # 1. https://github.com/ytdl-org/youtube-dl/issues/29326#issuecomment-894619419
# 2. https://code.videolan.org/videolan/vlc/-/blob/4fb284e5af69aa9ac2100ccbdd3b88debec9987f/share/lua/playlist/youtube.lua#L116 # 2. https://code.videolan.org/videolan/vlc/-/blob/4fb284e5af69aa9ac2100ccbdd3b88debec9987f/share/lua/playlist/youtube.lua#L116
# 3. https://github.com/ytdl-org/youtube-dl/issues/30097#issuecomment-950157377 # 3. https://github.com/ytdl-org/youtube-dl/issues/30097#issuecomment-950157377
def _decrypt_nsig(self, n, video_id, player_url): def _extract_n_function_name(self, jscode):
"""Turn the encrypted n field into a working signature""" target = r'(?P<nfunc>[a-zA-Z_$][\w$]*)(?:\[(?P<idx>\d+)\])?'
if player_url is None: nfunc_and_idx = self._search_regex(
raise ExtractorError('Cannot decrypt nsig without player_url') r'\.get\("n"\)\)&&\(b=(%s)\([\w$]+\)' % (target, ),
jscode, 'Initial JS player n function name')
nfunc, idx = re.match(target, nfunc_and_idx).group('nfunc', 'idx')
if not idx:
return nfunc
VAR_RE_TMPL = r'var\s+%s\s*=\s*(?P<name>\[(?P<alias>%s)\])[;,]'
note = 'Initial JS player n function {0} (%s[%s])' % (nfunc, idx)
def search_function_code(needle, group):
return self._search_regex(
VAR_RE_TMPL % (re.escape(nfunc), needle), jscode,
note.format(group), group=group)
if int_or_none(idx) == 0:
real_nfunc = search_function_code(r'[a-zA-Z_$][\w$]*', group='alias')
if real_nfunc:
return real_nfunc
return self._parse_json(
search_function_code('.+?', group='name'),
nfunc, transform_source=js_to_json)[int(idx)]
def _extract_n_function(self, video_id, player_url):
player_id = self._extract_player_info(player_url)
func_code = self._downloader.cache.load('youtube-nsig', player_id)
if func_code:
jsi = JSInterpreter(func_code)
else:
jscode = self._get_player_code(video_id, player_url, player_id)
funcname = self._extract_n_function_name(jscode)
jsi = JSInterpreter(jscode)
func_code = jsi.extract_function_code(funcname)
self._downloader.cache.store('youtube-nsig', player_id, func_code)
if self._downloader.params.get('youtube_print_sig_code'):
self.to_screen('Extracted nsig function from {0}:\n{1}\n'.format(player_id, func_code[1]))
return lambda s: jsi.extract_function_from_code(*func_code)([s])
def _n_descramble(self, n_param, player_url, video_id):
"""Compute the response to YT's "n" parameter challenge,
or None
Args:
n_param -- challenge string that is the value of the
URL's "n" query parameter
player_url -- URL of YT player JS
video_id
"""
sig_id = ('nsig_value', n_param)
if sig_id in self._player_cache:
return self._player_cache[sig_id]
try: try:
jsi, player_id, func_code = self._extract_n_function_code(video_id, player_url) player_id = ('nsig', player_url)
except ExtractorError as e: if player_id not in self._player_cache:
raise ExtractorError('Unable to extract nsig jsi, player_id, func_codefunction code', cause=e) self._player_cache[player_id] = self._extract_n_function(video_id, player_url)
if self.get_param('youtube_print_sig_code'): func = self._player_cache[player_id]
self.to_screen('Extracted nsig function from {0}:\n{1}\n'.format( ret = func(n_param)
player_id, func_code[1])) if ret.startswith('enhanced_except_'):
raise ExtractorError('Unhandled exception in decode')
try: self._player_cache[sig_id] = ret
extract_nsig = self._cached(self._extract_n_function_from_code, 'nsig func', player_url) if self._downloader.params.get('verbose', False):
ret = extract_nsig(jsi, func_code)(n) self._downloader.to_screen('[debug] [%s] %s' % (self.IE_NAME, 'Decrypted nsig {0} => {1}'.format(n_param, self._player_cache[sig_id])))
except JSInterpreter.Exception as e: return self._player_cache[sig_id]
self.report_warning( except Exception as e:
'%s (%s %s)' % ( self._downloader.report_warning(
self.__ie_msg( '[%s] %s (%s %s)' % (
'Unable to decode n-parameter: download likely to be throttled'), self.IE_NAME,
'Unable to decode n-parameter: download likely to be throttled',
error_to_compat_str(e), error_to_compat_str(e),
traceback.format_exc())) traceback.format_exc()))
return
self.write_debug('Decrypted nsig {0} => {1}'.format(n, ret))
return ret
def _extract_n_function_name(self, jscode):
func_name, idx = self._search_regex(
r'\.get\("n"\)\)&&\(b=(?P<nfunc>[a-zA-Z_$][\w$]*)(?:\[(?P<idx>\d+)\])?\([\w$]+\)',
jscode, 'Initial JS player n function name', group=('nfunc', 'idx'))
if not idx:
return func_name
return self._parse_json(self._search_regex(
r'var {0}\s*=\s*(\[.+?\])\s*[,;]'.format(re.escape(func_name)), jscode,
'Initial JS player n function list ({0}.{1})'.format(func_name, idx)),
func_name, transform_source=js_to_json)[int(idx)]
def _extract_n_function_code(self, video_id, player_url):
player_id = self._extract_player_info(player_url)
func_code = self.cache.load('youtube-nsig', player_id)
jscode = func_code or self._load_player(video_id, player_url)
jsi = JSInterpreter(jscode)
if func_code:
return jsi, player_id, func_code
func_name = self._extract_n_function_name(jscode)
# For redundancy
func_code = self._search_regex(
r'''(?xs)%s\s*=\s*function\s*\((?P<var>[\w$]+)\)\s*
# NB: The end of the regex is intentionally kept strict
{(?P<code>.+?}\s*return\ [\w$]+.join\(""\))};''' % func_name,
jscode, 'nsig function', group=('var', 'code'), default=None)
if func_code:
func_code = ([func_code[0]], func_code[1])
else:
self.write_debug('Extracting nsig function with jsinterp')
func_code = jsi.extract_function_code(func_name)
self.cache.store('youtube-nsig', player_id, func_code)
return jsi, player_id, func_code
def _extract_n_function_from_code(self, jsi, func_code):
func = jsi.extract_function_from_code(*func_code)
def extract_nsig(s):
try:
ret = func([s])
except JSInterpreter.Exception:
raise
except Exception as e:
raise JSInterpreter.Exception(traceback.format_exc(), cause=e)
if ret.startswith('enhanced_except_'):
raise JSInterpreter.Exception('Signature function returned an exception')
return ret
return extract_nsig
def _unthrottle_format_urls(self, video_id, player_url, *formats):
def decrypt_nsig(n):
return self._cached(self._decrypt_nsig, 'nsig', n, player_url)
def _unthrottle_format_urls(self, video_id, player_url, formats):
for fmt in formats: for fmt in formats:
parsed_fmt_url = compat_urllib_parse.urlparse(fmt['url']) parsed_fmt_url = compat_urllib_parse.urlparse(fmt['url'])
n_param = compat_parse_qs(parsed_fmt_url.query).get('n') n_param = compat_parse_qs(parsed_fmt_url.query).get('n')
if not n_param: if not n_param:
continue continue
n_param = n_param[-1] n_param = n_param[-1]
n_response = decrypt_nsig(n_param)(n_param, video_id, player_url) n_response = self._n_descramble(n_param, player_url, video_id)
if n_response is None: if n_response is None:
# give up if descrambling failed # give up if descrambling failed
break break
fmt['url'] = update_url_query(fmt['url'], {'n': n_response}) for fmt_dct in traverse_obj(fmt, (None, (None, ('fragments', Ellipsis))), expected_type=dict):
fmt_dct['url'] = update_url(
fmt_dct['url'], query_update={'n': [n_response]})
# from yt-dlp, with tweaks # from yt-dlp, with tweaks
def _extract_signature_timestamp(self, video_id, player_url, ytcfg=None, fatal=False): def _extract_signature_timestamp(self, video_id, player_url, ytcfg=None, fatal=False):
@ -1734,16 +1708,16 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
Extract signatureTimestamp (sts) Extract signatureTimestamp (sts)
Required to tell API what sig/player version is in use. Required to tell API what sig/player version is in use.
""" """
sts = traverse_obj(ytcfg, 'STS', expected_type=int) sts = int_or_none(ytcfg.get('STS')) if isinstance(ytcfg, dict) else None
if not sts: if not sts:
# Attempt to extract from player # Attempt to extract from player
if player_url is None: if player_url is None:
error_msg = 'Cannot extract signature timestamp without player_url.' error_msg = 'Cannot extract signature timestamp without player_url.'
if fatal: if fatal:
raise ExtractorError(error_msg) raise ExtractorError(error_msg)
self.report_warning(error_msg) self._downloader.report_warning(error_msg)
return return
code = self._load_player(video_id, player_url, fatal=fatal) code = self._get_player_code(video_id, player_url)
sts = int_or_none(self._search_regex( sts = int_or_none(self._search_regex(
r'(?:signatureTimestamp|sts)\s*:\s*(?P<sts>[0-9]{5})', code or '', r'(?:signatureTimestamp|sts)\s*:\s*(?P<sts>[0-9]{5})', code or '',
'JS player signature timestamp', group='sts', fatal=fatal)) 'JS player signature timestamp', group='sts', fatal=fatal))
@ -1759,18 +1733,12 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
# cpn generation algorithm is reverse engineered from base.js. # cpn generation algorithm is reverse engineered from base.js.
# In fact it works even with dummy cpn. # In fact it works even with dummy cpn.
CPN_ALPHABET = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_' CPN_ALPHABET = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_'
cpn = ''.join(CPN_ALPHABET[random.randint(0, 256) & 63] for _ in range(0, 16)) cpn = ''.join((CPN_ALPHABET[random.randint(0, 256) & 63] for _ in range(0, 16)))
# more consistent results setting it to right before the end playback_url = update_url(
qs = parse_qs(playback_url) playback_url, query_update={
video_length = '{0}'.format(float((qs.get('len') or ['1.5'])[0]) - 1) 'ver': ['2'],
'cpn': [cpn],
playback_url = update_url_query(
playback_url, {
'ver': '2',
'cpn': cpn,
'cmt': video_length,
'el': 'detailpage', # otherwise defaults to "shorts"
}) })
self._download_webpage( self._download_webpage(
@ -2018,11 +1986,8 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
else: else:
self.to_screen('Downloading just video %s because of --no-playlist' % video_id) self.to_screen('Downloading just video %s because of --no-playlist' % video_id)
if not player_url:
player_url = self._extract_player_url(webpage)
formats = [] formats = []
itags = collections.defaultdict(set) itags = []
itag_qualities = {} itag_qualities = {}
q = qualities(['tiny', 'small', 'medium', 'large', 'hd720', 'hd1080', 'hd1440', 'hd2160', 'hd2880', 'highres']) q = qualities(['tiny', 'small', 'medium', 'large', 'hd720', 'hd1080', 'hd1440', 'hd2160', 'hd2880', 'highres'])
CHUNK_SIZE = 10 << 20 CHUNK_SIZE = 10 << 20
@ -2038,92 +2003,58 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
}) })
} for range_start in range(0, f['filesize'], CHUNK_SIZE)) } for range_start in range(0, f['filesize'], CHUNK_SIZE))
lower = lambda s: s.lower()
for fmt in streaming_formats: for fmt in streaming_formats:
if fmt.get('targetDurationSec'): if fmt.get('targetDurationSec') or fmt.get('drmFamilies'):
continue continue
itag = str_or_none(fmt.get('itag')) itag = str_or_none(fmt.get('itag'))
audio_track = traverse_obj(fmt, ('audioTrack', T(dict))) or {} quality = fmt.get('quality')
if itag and quality:
quality = traverse_obj(fmt, ((
# The 3gp format (17) in android client has a quality of "small",
# but is actually worse than other formats
T(lambda _: 'tiny' if itag == 17 else None),
('quality', T(lambda q: q if q and q != 'tiny' else None)),
('audioQuality', T(lower)),
'quality'), T(txt_or_none)), get_all=False)
if quality and itag:
itag_qualities[itag] = quality itag_qualities[itag] = quality
# FORMAT_STREAM_TYPE_OTF(otf=1) requires downloading the init fragment # FORMAT_STREAM_TYPE_OTF(otf=1) requires downloading the init fragment
# (adding `&sq=0` to the URL) and parsing emsg box to determine the # (adding `&sq=0` to the URL) and parsing emsg box to determine the
# number of fragments that would subsequently be requested with (`&sq=N`) # number of fragment that would subsequently requested with (`&sq=N`)
if fmt.get('type') == 'FORMAT_STREAM_TYPE_OTF': if fmt.get('type') == 'FORMAT_STREAM_TYPE_OTF':
continue continue
fmt_url = fmt.get('url') fmt_url = fmt.get('url')
if not fmt_url: if not fmt_url:
sc = compat_parse_qs(fmt.get('signatureCipher')) sc = compat_parse_qs(fmt.get('signatureCipher'))
fmt_url = traverse_obj(sc, ('url', -1, T(url_or_none))) fmt_url = url_or_none(try_get(sc, lambda x: x['url'][0]))
encrypted_sig = traverse_obj(sc, ('s', -1)) encrypted_sig = try_get(sc, lambda x: x['s'][0])
if not (fmt_url and encrypted_sig): if not (sc and fmt_url and encrypted_sig):
continue continue
player_url = player_url or self._extract_player_url(webpage) if not player_url:
player_url = self._extract_player_url(webpage)
if not player_url: if not player_url:
continue continue
try: signature = self._decrypt_signature(sc['s'][0], video_id, player_url)
fmt_url = update_url_query(fmt_url, { sp = try_get(sc, lambda x: x['sp'][0]) or 'signature'
traverse_obj(sc, ('sp', -1)) or 'signature': fmt_url += '&' + sp + '=' + signature
[self._decrypt_signature(encrypted_sig, video_id, player_url)],
})
except ExtractorError as e:
self.report_warning('Signature extraction failed: Some formats may be missing',
video_id=video_id, only_once=True)
self.write_debug(error_to_compat_str(e), only_once=True)
continue
language_preference = (
10 if audio_track.get('audioIsDefault')
else -10 if 'descriptive' in (traverse_obj(audio_track, ('displayName', T(lower))) or '')
else -1)
name = (
traverse_obj(fmt, ('qualityLabel', T(txt_or_none)))
or quality.replace('audio_quality_', ''))
dct = {
'format_id': join_nonempty(itag, fmt.get('isDrc') and 'drc'),
'url': fmt_url,
# Format 22 is likely to be damaged: see https://github.com/yt-dlp/yt-dlp/issues/3372
'source_preference': ((-5 if itag == '22' else -1)
+ (100 if 'Premium' in name else 0)),
'quality': q(quality),
'language': join_nonempty(audio_track.get('id', '').split('.')[0],
'desc' if language_preference < -1 else '') or None,
'language_preference': language_preference,
# Strictly de-prioritize 3gp formats
'preference': -2 if itag == '17' else None,
}
if itag: if itag:
itags[itag].add(('https', dct.get('language'))) itags.append(itag)
self._unthrottle_format_urls(video_id, player_url, dct) tbr = float_or_none(
dct.update(traverse_obj(fmt, { fmt.get('averageBitrate') or fmt.get('bitrate'), 1000)
'asr': ('audioSampleRate', T(int_or_none)), dct = {
'filesize': ('contentLength', T(int_or_none)), 'asr': int_or_none(fmt.get('audioSampleRate')),
'format_note': ('qualityLabel', T(lambda x: x or quality)), 'filesize': int_or_none(fmt.get('contentLength')),
# for some formats, fps is wrongly returned as 1 'format_id': itag,
'fps': ('fps', T(int_or_none), T(lambda f: f if f > 1 else None)), 'format_note': fmt.get('qualityLabel') or quality,
'audio_channels': ('audioChannels', T(int_or_none)), 'fps': int_or_none(fmt.get('fps')),
'height': ('height', T(int_or_none)), 'height': int_or_none(fmt.get('height')),
'has_drm': ('drmFamilies', T(bool)), 'quality': q(quality),
'tbr': (('averageBitrate', 'bitrate'), T(lambda t: float_or_none(t, 1000))), 'tbr': tbr,
'width': ('width', T(int_or_none)), 'url': fmt_url,
'_duration_ms': ('approxDurationMs', T(int_or_none)), 'width': fmt.get('width'),
}, get_all=False)) }
mime_mobj = re.match( mimetype = fmt.get('mimeType')
r'((?:[^/]+)/(?:[^;]+))(?:;\s*codecs="([^"]+)")?', fmt.get('mimeType') or '') if mimetype:
if mime_mobj: mobj = re.match(
dct['ext'] = mimetype2ext(mime_mobj.group(1)) r'((?:[^/]+)/(?:[^;]+))(?:;\s*codecs="([^"]+)")?', mimetype)
dct.update(parse_codecs(mime_mobj.group(2))) if mobj:
dct['ext'] = mimetype2ext(mobj.group(1))
dct.update(parse_codecs(mobj.group(2)))
single_stream = 'none' in (dct.get(c) for c in ('acodec', 'vcodec')) single_stream = 'none' in (dct.get(c) for c in ('acodec', 'vcodec'))
if single_stream and dct.get('ext'): if single_stream and dct.get('ext'):
dct['container'] = dct['ext'] + '_dash' dct['container'] = dct['ext'] + '_dash'
@ -2138,42 +2069,14 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
formats.append(dct) formats.append(dct)
def process_manifest_format(f, proto, client_name, itag, all_formats=False):
key = (proto, f.get('language'))
if not all_formats and key in itags[itag]:
return False
itags[itag].add(key)
if itag:
f['format_id'] = (
'{0}-{1}'.format(itag, proto)
if all_formats or any(p != proto for p, _ in itags[itag])
else itag)
if f.get('source_preference') is None:
f['source_preference'] = -1
if itag in ('616', '235'):
f['format_note'] = join_nonempty(f.get('format_note'), 'Premium', delim=' ')
f['source_preference'] += 100
f['quality'] = q(traverse_obj(f, (
'format_id', T(lambda s: itag_qualities[s.split('-')[0]])), default=-1))
if try_call(lambda: f['fps'] <= 1):
del f['fps']
if proto == 'hls' and f.get('has_drm'):
f['has_drm'] = 'maybe'
f['source_preference'] -= 5
return True
hls_manifest_url = streaming_data.get('hlsManifestUrl') hls_manifest_url = streaming_data.get('hlsManifestUrl')
if hls_manifest_url: if hls_manifest_url:
for f in self._extract_m3u8_formats( for f in self._extract_m3u8_formats(
hls_manifest_url, video_id, 'mp4', fatal=False): hls_manifest_url, video_id, 'mp4', fatal=False):
if process_manifest_format( itag = self._search_regex(
f, 'hls', None, self._search_regex( r'/itag/(\d+)', f['url'], 'itag', default=None)
r'/itag/(\d+)', f['url'], 'itag', default=None)): if itag:
f['format_id'] = itag
formats.append(f) formats.append(f)
if self._downloader.params.get('youtube_include_dash_manifest', True): if self._downloader.params.get('youtube_include_dash_manifest', True):
@ -2181,20 +2084,18 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
if dash_manifest_url: if dash_manifest_url:
for f in self._extract_mpd_formats( for f in self._extract_mpd_formats(
dash_manifest_url, video_id, fatal=False): dash_manifest_url, video_id, fatal=False):
if process_manifest_format( itag = f['format_id']
f, 'dash', None, f['format_id']): if itag in itags:
f['filesize'] = traverse_obj(f, ( continue
('fragment_base_url', 'url'), T(lambda u: self._search_regex( if itag in itag_qualities:
r'/clen/(\d+)', u, 'file size', default=None)), f['quality'] = q(itag_qualities[itag])
T(int_or_none)), get_all=False) filesize = int_or_none(self._search_regex(
r'/clen/(\d+)', f.get('fragment_base_url')
or f['url'], 'file size', default=None))
if filesize:
f['filesize'] = filesize
formats.append(f) formats.append(f)
playable_formats = [f for f in formats if not f.get('has_drm')]
if formats and not playable_formats:
# If there are no formats that definitely don't have DRM, all have DRM
self.report_drm(video_id)
formats[:] = playable_formats
if not formats: if not formats:
if streaming_data.get('licenseInfos'): if streaming_data.get('licenseInfos'):
raise ExtractorError( raise ExtractorError(
@ -2265,17 +2166,6 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
video_details.get('lengthSeconds') video_details.get('lengthSeconds')
or microformat.get('lengthSeconds')) \ or microformat.get('lengthSeconds')) \
or parse_duration(search_meta('duration')) or parse_duration(search_meta('duration'))
for f in formats:
# Some formats may have much smaller duration than others (possibly damaged during encoding)
# but avoid false positives with small duration differences.
# Ref: https://github.com/yt-dlp/yt-dlp/issues/2823
if try_call(lambda x: float(x.pop('_duration_ms')) / duration < 500, args=(f,)):
self.report_warning(
'{0}: Some possibly damaged formats will be deprioritized'.format(video_id), only_once=True)
# Strictly de-prioritize damaged formats
f['preference'] = -10
is_live = video_details.get('isLive') is_live = video_details.get('isLive')
owner_profile_url = self._yt_urljoin(self._extract_author_var( owner_profile_url = self._yt_urljoin(self._extract_author_var(
@ -2284,6 +2174,10 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
uploader = self._extract_author_var( uploader = self._extract_author_var(
webpage, 'name', videodetails=video_details, metadata=microformat) webpage, 'name', videodetails=video_details, metadata=microformat)
if not player_url:
player_url = self._extract_player_url(webpage)
self._unthrottle_format_urls(video_id, player_url, formats)
info = { info = {
'id': video_id, 'id': video_id,
'title': self._live_title(video_title) if is_live else video_title, 'title': self._live_title(video_title) if is_live else video_title,
@ -2476,14 +2370,6 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
'like_count': str_to_int(like_count), 'like_count': str_to_int(like_count),
'dislike_count': str_to_int(dislike_count), 'dislike_count': str_to_int(dislike_count),
}) })
else:
info['like_count'] = traverse_obj(vpir, (
'videoActions', 'menuRenderer', 'topLevelButtons', Ellipsis,
'segmentedLikeDislikeButtonViewModel', 'likeButtonViewModel', 'likeButtonViewModel',
'toggleButtonViewModel', 'toggleButtonViewModel', 'defaultButtonViewModel',
'buttonViewModel', (('title', ('accessibilityText', T(lambda s: s.split()), Ellipsis))), T(parse_count)),
get_all=False)
vsir = content.get('videoSecondaryInfoRenderer') vsir = content.get('videoSecondaryInfoRenderer')
if vsir: if vsir:
rows = try_get( rows = try_get(
@ -2598,7 +2484,7 @@ class YoutubeTabIE(YoutubeBaseInfoExtractor):
'playlist_mincount': 94, 'playlist_mincount': 94,
'info_dict': { 'info_dict': {
'id': 'UCqj7Cz7revf5maW9g5pgNcg', 'id': 'UCqj7Cz7revf5maW9g5pgNcg',
'title': r're:Igor Kleiner(?: Ph\.D\.)? - Playlists', 'title': 'Igor Kleiner - Playlists',
'description': 'md5:be97ee0f14ee314f1f002cf187166ee2', 'description': 'md5:be97ee0f14ee314f1f002cf187166ee2',
'uploader': 'Igor Kleiner', 'uploader': 'Igor Kleiner',
'uploader_id': '@IgorDataScience', 'uploader_id': '@IgorDataScience',
@ -2609,7 +2495,7 @@ class YoutubeTabIE(YoutubeBaseInfoExtractor):
'playlist_mincount': 94, 'playlist_mincount': 94,
'info_dict': { 'info_dict': {
'id': 'UCqj7Cz7revf5maW9g5pgNcg', 'id': 'UCqj7Cz7revf5maW9g5pgNcg',
'title': r're:Igor Kleiner(?: Ph\.D\.)? - Playlists', 'title': 'Igor Kleiner - Playlists',
'description': 'md5:be97ee0f14ee314f1f002cf187166ee2', 'description': 'md5:be97ee0f14ee314f1f002cf187166ee2',
'uploader': 'Igor Kleiner', 'uploader': 'Igor Kleiner',
'uploader_id': '@IgorDataScience', 'uploader_id': '@IgorDataScience',
@ -2721,7 +2607,7 @@ class YoutubeTabIE(YoutubeBaseInfoExtractor):
'url': 'https://www.youtube.com/channel/UCKfVa3S1e4PHvxWcwyMMg8w/channels', 'url': 'https://www.youtube.com/channel/UCKfVa3S1e4PHvxWcwyMMg8w/channels',
'info_dict': { 'info_dict': {
'id': 'UCKfVa3S1e4PHvxWcwyMMg8w', 'id': 'UCKfVa3S1e4PHvxWcwyMMg8w',
'title': r're:lex will - (?:Home|Channels)', 'title': 'lex will - Channels',
'description': 'md5:2163c5d0ff54ed5f598d6a7e6211e488', 'description': 'md5:2163c5d0ff54ed5f598d6a7e6211e488',
'uploader': 'lex will', 'uploader': 'lex will',
'uploader_id': '@lexwill718', 'uploader_id': '@lexwill718',