Compare commits

..

No commits in common. "0b6f829b1dfda15d3c1d7d1fbe4ea6102c26dd24" and "04a5e06350e3ef7c03f94f2f3f90dd96c6411152" have entirely different histories.

4 changed files with 43 additions and 63 deletions

View File

@ -2317,6 +2317,23 @@ Line 1
self.assertEqual(traverse_obj({}, (0, slice(1)), traverse_string=True), [], self.assertEqual(traverse_obj({}, (0, slice(1)), traverse_string=True), [],
msg='branching should result in list if `traverse_string`') msg='branching should result in list if `traverse_string`')
# Test is_user_input behavior
_IS_USER_INPUT_DATA = {'range8': list(range(8))}
self.assertEqual(traverse_obj(_IS_USER_INPUT_DATA, ('range8', '3'),
is_user_input=True), 3,
msg='allow for string indexing if `is_user_input`')
self.assertCountEqual(traverse_obj(_IS_USER_INPUT_DATA, ('range8', '3:'),
is_user_input=True), tuple(range(8))[3:],
msg='allow for string slice if `is_user_input`')
self.assertCountEqual(traverse_obj(_IS_USER_INPUT_DATA, ('range8', ':4:2'),
is_user_input=True), tuple(range(8))[:4:2],
msg='allow step in string slice if `is_user_input`')
self.assertCountEqual(traverse_obj(_IS_USER_INPUT_DATA, ('range8', ':'),
is_user_input=True), range(8),
msg='`:` should be treated as `...` if `is_user_input`')
with self.assertRaises(TypeError, msg='too many params should result in error'):
traverse_obj(_IS_USER_INPUT_DATA, ('range8', ':::'), is_user_input=True)
# Test re.Match as input obj # Test re.Match as input obj
mobj = re.fullmatch(r'0(12)(?P<group>3)(4)?', '0123') mobj = re.fullmatch(r'0(12)(?P<group>3)(4)?', '0123')
self.assertEqual(traverse_obj(mobj, ...), [x for x in mobj.groups() if x is not None], self.assertEqual(traverse_obj(mobj, ...), [x for x in mobj.groups() if x is not None],

View File

@ -1201,15 +1201,6 @@ class YoutubeDL:
(?:\|(?P<default>.*?))? (?:\|(?P<default>.*?))?
)$''') )$''')
def _from_user_input(field):
if field == ':':
return ...
elif ':' in field:
return slice(*map(int_or_none, field.split(':')))
elif int_or_none(field) is not None:
return int(field)
return field
def _traverse_infodict(fields): def _traverse_infodict(fields):
fields = [f for x in re.split(r'\.({.+?})\.?', fields) fields = [f for x in re.split(r'\.({.+?})\.?', fields)
for f in ([x] if x.startswith('{') else x.split('.'))] for f in ([x] if x.startswith('{') else x.split('.'))]
@ -1219,12 +1210,11 @@ class YoutubeDL:
for i, f in enumerate(fields): for i, f in enumerate(fields):
if not f.startswith('{'): if not f.startswith('{'):
fields[i] = _from_user_input(f)
continue continue
assert f.endswith('}'), f'No closing brace for {f} in {fields}' assert f.endswith('}'), f'No closing brace for {f} in {fields}'
fields[i] = {k: list(map(_from_user_input, k.split('.'))) for k in f[1:-1].split(',')} fields[i] = {k: k.split('.') for k in f[1:-1].split(',')}
return traverse_obj(info_dict, fields, traverse_string=True) return traverse_obj(info_dict, fields, is_user_input=True, traverse_string=True)
def get_value(mdict): def get_value(mdict):
# Object traversal # Object traversal

View File

@ -4,14 +4,7 @@ from urllib.parse import unquote
from .common import InfoExtractor from .common import InfoExtractor
from ..compat import functools from ..compat import functools
from ..utils import ( from ..utils import ExtractorError, make_archive_id, urljoin
ExtractorError,
float_or_none,
int_or_none,
make_archive_id,
mimetype2ext,
urljoin,
)
from ..utils.traversal import traverse_obj from ..utils.traversal import traverse_obj
@ -33,7 +26,6 @@ class Pr0grammIE(InfoExtractor):
'dislike_count': int, 'dislike_count': int,
'age_limit': 0, 'age_limit': 0,
'thumbnail': r're:^https://thumb\.pr0gramm\.com/.*\.jpg', 'thumbnail': r're:^https://thumb\.pr0gramm\.com/.*\.jpg',
'_old_archive_ids': ['pr0grammstatic 5466437'],
}, },
}, { }, {
# Tags require account # Tags require account
@ -51,7 +43,6 @@ class Pr0grammIE(InfoExtractor):
'dislike_count': int, 'dislike_count': int,
'age_limit': 0, 'age_limit': 0,
'thumbnail': r're:^https://thumb\.pr0gramm\.com/.*\.jpg', 'thumbnail': r're:^https://thumb\.pr0gramm\.com/.*\.jpg',
'_old_archive_ids': ['pr0grammstatic 3052805'],
}, },
}, { }, {
# Requires verified account # Requires verified account
@ -69,7 +60,6 @@ class Pr0grammIE(InfoExtractor):
'dislike_count': int, 'dislike_count': int,
'age_limit': 18, 'age_limit': 18,
'thumbnail': r're:^https://thumb\.pr0gramm\.com/.*\.jpg', 'thumbnail': r're:^https://thumb\.pr0gramm\.com/.*\.jpg',
'_old_archive_ids': ['pr0grammstatic 5848332'],
}, },
}, { }, {
'url': 'https://pr0gramm.com/static/5466437', 'url': 'https://pr0gramm.com/static/5466437',
@ -120,61 +110,37 @@ class Pr0grammIE(InfoExtractor):
return data return data
@staticmethod
def _create_source_url(path):
return urljoin('https://img.pr0gramm.com', path)
def _real_extract(self, url): def _real_extract(self, url):
video_id = self._match_id(url) video_id = self._match_id(url)
video_info = traverse_obj( video_info = traverse_obj(
self._call_api('get', video_id, {'id': video_id, 'flags': self._maximum_flags}), self._call_api('get', video_id, {'id': video_id, 'flags': self._maximum_flags}),
('items', 0, {dict})) ('items', 0, {dict}))
source = video_info.get('image') source = urljoin('https://img.pr0gramm.com', video_info.get('image'))
if not source or not source.endswith('mp4'): if not source or not source.endswith('mp4'):
self.raise_no_formats('Could not extract a video', expected=bool(source), video_id=video_id) self.raise_no_formats('Could not extract a video', expected=bool(source), video_id=video_id)
tags = None tags = None
if self._is_logged_in: if self._is_logged_in:
metadata = self._call_api('info', video_id, {'itemId': video_id}, note='Downloading tags') metadata = self._call_api('info', video_id, {'itemId': video_id})
tags = traverse_obj(metadata, ('tags', ..., 'tag', {str})) tags = traverse_obj(metadata, ('tags', ..., 'tag', {str}))
# Sorted by "confidence", higher confidence = earlier in list # Sorted by "confidence", higher confidence = earlier in list
confidences = traverse_obj(metadata, ('tags', ..., 'confidence', ({int}, {float}))) confidences = traverse_obj(metadata, ('tags', ..., 'confidence', ({int}, {float})))
if confidences: if confidences:
tags = [tag for _, tag in sorted(zip(confidences, tags), reverse=True)] tags = [tag for _, tag in sorted(zip(confidences, tags), reverse=True)]
formats = traverse_obj(video_info, ('variants', ..., {
'format_id': ('name', {str}),
'url': ('path', {self._create_source_url}),
'ext': ('mimeType', {mimetype2ext}),
'vcodec': ('codec', {str}),
'width': ('width', {int_or_none}),
'height': ('height', {int_or_none}),
'bitrate': ('bitRate', {float_or_none}),
'filesize': ('fileSize', {int_or_none}),
})) if video_info.get('variants') else [{
'ext': 'mp4',
'format_id': 'source',
**traverse_obj(video_info, {
'url': ('image', {self._create_source_url}),
'width': ('width', {int_or_none}),
'height': ('height', {int_or_none}),
}),
}]
subtitles = {}
for subtitle in traverse_obj(video_info, ('subtitles', lambda _, v: v['language'])):
subtitles.setdefault(subtitle['language'], []).append(traverse_obj(subtitle, {
'url': ('path', {self._create_source_url}),
'note': ('label', {str}),
}))
return { return {
'id': video_id, 'id': video_id,
'title': f'pr0gramm-{video_id} by {video_info.get("user")}', 'title': f'pr0gramm-{video_id} by {video_info.get("user")}',
'formats': [{
'url': source,
'ext': 'mp4',
**traverse_obj(video_info, {
'width': ('width', {int}),
'height': ('height', {int}),
}),
}],
'tags': tags, 'tags': tags,
'formats': formats,
'subtitles': subtitles,
'age_limit': 18 if traverse_obj(video_info, ('flags', {0b110.__and__})) else 0, 'age_limit': 18 if traverse_obj(video_info, ('flags', {0b110.__and__})) else 0,
'_old_archive_ids': [make_archive_id('Pr0grammStatic', video_id)], '_old_archive_ids': [make_archive_id('Pr0grammStatic', video_id)],
**traverse_obj(video_info, { **traverse_obj(video_info, {

View File

@ -8,7 +8,7 @@ from ._utils import (
IDENTITY, IDENTITY,
NO_DEFAULT, NO_DEFAULT,
LazyList, LazyList,
deprecation_warning, int_or_none,
is_iterable_like, is_iterable_like,
try_call, try_call,
variadic, variadic,
@ -17,7 +17,7 @@ from ._utils import (
def traverse_obj( def traverse_obj(
obj, *paths, default=NO_DEFAULT, expected_type=None, get_all=True, obj, *paths, default=NO_DEFAULT, expected_type=None, get_all=True,
casesense=True, is_user_input=NO_DEFAULT, traverse_string=False): casesense=True, is_user_input=False, traverse_string=False):
""" """
Safely traverse nested `dict`s and `Iterable`s Safely traverse nested `dict`s and `Iterable`s
@ -63,8 +63,10 @@ def traverse_obj(
@param get_all If `False`, return the first matching result, otherwise all matching ones. @param get_all If `False`, return the first matching result, otherwise all matching ones.
@param casesense If `False`, consider string dictionary keys as case insensitive. @param casesense If `False`, consider string dictionary keys as case insensitive.
`traverse_string` is only meant to be used by YoutubeDL.prepare_outtmpl and is not part of the API The following are only meant to be used by YoutubeDL.prepare_outtmpl and are not part of the API
@param is_user_input Whether the keys are generated from user input.
If `True` strings get converted to `int`/`slice` if needed.
@param traverse_string Whether to traverse into objects as strings. @param traverse_string Whether to traverse into objects as strings.
If `True`, any non-compatible object will first be If `True`, any non-compatible object will first be
converted into a string and then traversed into. converted into a string and then traversed into.
@ -78,9 +80,6 @@ def traverse_obj(
If no `default` is given and the last path branches, a `list` of results If no `default` is given and the last path branches, a `list` of results
is always returned. If a path ends on a `dict` that result will always be a `dict`. is always returned. If a path ends on a `dict` that result will always be a `dict`.
""" """
if is_user_input is not NO_DEFAULT:
deprecation_warning('The is_user_input parameter is deprecated and no longer works')
casefold = lambda k: k.casefold() if isinstance(k, str) else k casefold = lambda k: k.casefold() if isinstance(k, str) else k
if isinstance(expected_type, type): if isinstance(expected_type, type):
@ -196,6 +195,14 @@ def traverse_obj(
key = None key = None
for last, key in lazy_last(variadic(path, (str, bytes, dict, set))): for last, key in lazy_last(variadic(path, (str, bytes, dict, set))):
if is_user_input and isinstance(key, str):
if key == ':':
key = ...
elif ':' in key:
key = slice(*map(int_or_none, key.split(':')))
elif int_or_none(key) is not None:
key = int(key)
if not casesense and isinstance(key, str): if not casesense and isinstance(key, str):
key = key.casefold() key = key.casefold()