Compare commits

..

3 Commits

Author SHA1 Message Date
bashonly
9496357407
replace ` with "
Authored by: bashonly
2024-11-17 22:57:26 -06:00
bashonly
e028d3f077
Allow --username cache
Authored by: bashonly
2024-11-17 22:56:36 -06:00
bashonly
a01cba2803
simplify
Authored by: bashonly
2024-11-17 22:20:49 -06:00

View File

@ -70,8 +70,8 @@ class DigitalConcertHallIE(InfoExtractor):
'playlist_count': 1, 'playlist_count': 1,
}] }]
_LOGIN_HINT = ('Use --username token --password ACCESS_TOKEN where ACCESS_TOKEN ' _LOGIN_HINT = ('Use --username token --password ACCESS_TOKEN where ACCESS_TOKEN '
'is the `access_token_production` from your browser local storage') 'is the "access_token_production" from your browser local storage')
_REFRESH_HINT = 'or else use a `refresh_token` with --username refresh --password REFRESH_TOKEN' _REFRESH_HINT = 'or else use a "refresh_token" with --username refresh --password REFRESH_TOKEN'
_OAUTH_URL = 'https://api.digitalconcerthall.com/v2/oauth2/token' _OAUTH_URL = 'https://api.digitalconcerthall.com/v2/oauth2/token'
_CLIENT_ID = 'dch.webapp' _CLIENT_ID = 'dch.webapp'
_CLIENT_SECRET = '2ySLN+2Fwb' _CLIENT_SECRET = '2ySLN+2Fwb'
@ -83,7 +83,7 @@ class DigitalConcertHallIE(InfoExtractor):
'Referer': 'https://www.digitalconcerthall.com/', 'Referer': 'https://www.digitalconcerthall.com/',
'User-Agent': _USER_AGENT, 'User-Agent': _USER_AGENT,
} }
_real_access_token = None _access_token = None
_access_token_expiry = 0 _access_token_expiry = 0
_refresh_token = None _refresh_token = None
@ -91,50 +91,48 @@ class DigitalConcertHallIE(InfoExtractor):
def _access_token_is_expired(self): def _access_token_is_expired(self):
return self._access_token_expiry - 30 <= int(time.time()) return self._access_token_expiry - 30 <= int(time.time())
@property def _set_access_token(self, value):
def _access_token(self): self._access_token = value
if not self._real_access_token:
# return an initial access token that can be used to auth with password or refresh_token
return self._download_json(
self._OAUTH_URL, None, 'Obtaining initial token', 'Unable to obtain initial token',
data=urlencode_postdata({
'affiliate': 'none',
'grant_type': 'device',
'device_vendor': 'unknown',
# device_model 'Safari' gets split streams of 4K/HEVC video and lossless/FLAC audio,
# but this is no longer effective since actual login is not possible anymore
'device_model': 'unknown',
'app_id': self._CLIENT_ID,
'app_distributor': 'berlinphil',
'app_version': '1.95.0',
'client_secret': self._CLIENT_SECRET,
}), headers=self._OAUTH_HEADERS)['access_token']
if self._access_token_is_expired:
if not self._refresh_token:
raise ExtractorError(
'Access token has expired. Get a new access_token from your browser '
f'and try again, {self._REFRESH_HINT}', expected=True)
self._fetch_new_tokens()
return self._real_access_token
@_access_token.setter
def _access_token(self, value):
self._real_access_token = value
self._access_token_expiry = traverse_obj(value, ({jwt_decode_hs256}, 'exp', {int})) or 0 self._access_token_expiry = traverse_obj(value, ({jwt_decode_hs256}, 'exp', {int})) or 0
def _set_tokens(self, auth_data): def _cache_tokens(self, /):
self._access_token = auth_data['access_token']
if refresh_token := traverse_obj(auth_data, ('refresh_token', {str})):
self.write_debug('New refresh token granted')
self._refresh_token = refresh_token
self.cache.store(self._NETRC_MACHINE, 'tokens', { self.cache.store(self._NETRC_MACHINE, 'tokens', {
'access_token': self._real_access_token, 'access_token': self._access_token,
'refresh_token': self._refresh_token, 'refresh_token': self._refresh_token,
}) })
def _fetch_new_tokens(self): def _fetch_new_tokens(self, invalidate=False):
if invalidate:
self.report_warning('Access token has been invalidated')
self._set_access_token(None)
if not self._access_token_is_expired:
return
if not self._refresh_token:
self._set_access_token(None)
self._cache_tokens()
raise ExtractorError(
'Access token has expired or been invalidated. '
'Get a new "access_token_production" value from your browser '
f'and try again, {self._REFRESH_HINT}', expected=True)
# If we only have a refresh token, we need a temporary "initial token" for the refresh flow
bearer_token = self._access_token or self._download_json(
self._OAUTH_URL, None, 'Obtaining initial token', 'Unable to obtain initial token',
data=urlencode_postdata({
'affiliate': 'none',
'grant_type': 'device',
'device_vendor': 'unknown',
# device_model 'Safari' gets split streams of 4K/HEVC video and lossless/FLAC audio,
# but this is no longer effective since actual login is not possible anymore
'device_model': 'unknown',
'app_id': self._CLIENT_ID,
'app_distributor': 'berlinphil',
'app_version': '1.95.0',
'client_secret': self._CLIENT_SECRET,
}), headers=self._OAUTH_HEADERS)['access_token']
try: try:
response = self._download_json( response = self._download_json(
self._OAUTH_URL, None, 'Refreshing token', 'Unable to refresh token', self._OAUTH_URL, None, 'Refreshing token', 'Unable to refresh token',
@ -145,55 +143,60 @@ class DigitalConcertHallIE(InfoExtractor):
'client_secret': self._CLIENT_SECRET, 'client_secret': self._CLIENT_SECRET,
}), headers={ }), headers={
**self._OAUTH_HEADERS, **self._OAUTH_HEADERS,
'Authorization': f'Bearer {self._real_access_token or self._access_token}', 'Authorization': f'Bearer {bearer_token}',
}) })
except ExtractorError as e: except ExtractorError as e:
if isinstance(e.cause, HTTPError) and e.cause.status == 401: if isinstance(e.cause, HTTPError) and e.cause.status == 401:
self._set_access_token(None)
self._refresh_token = None self._refresh_token = None
self._set_tokens({'access_token': None}) self._cache_tokens()
raise ExtractorError('Your tokens have been invalidated', expected=True) raise ExtractorError('Your tokens have been invalidated', expected=True)
raise raise
self._set_tokens(response)
self._set_access_token(response['access_token'])
if refresh_token := traverse_obj(response, ('refresh_token', {str})):
self.write_debug('New refresh token granted')
self._refresh_token = refresh_token
self._cache_tokens()
def _perform_login(self, username, password): def _perform_login(self, username, password):
if username not in ('refresh', 'token'):
raise ExtractorError(
'Login with username and password is no longer supported '
f'for this site. {self._LOGIN_HINT}, {self._REFRESH_HINT}', expected=True)
self.report_login() self.report_login()
# Try first with passed refresh_token since user may want to override the cache with it
if username == 'refresh': if username == 'refresh':
self._refresh_token = password self._refresh_token = password
self._fetch_new_tokens() self._fetch_new_tokens()
if username == 'token':
if not traverse_obj(password, {jwt_decode_hs256}):
raise ExtractorError(
f'The access token passed to yt-dlp is not valid. {self._LOGIN_HINT}', expected=True)
self._set_access_token(password)
self._cache_tokens()
if username in ('refresh', 'token'):
if self.get_param('cachedir') is not False:
token_type = 'access' if username == 'token' else 'refresh'
self.to_screen(f'Your {token_type} token has been cached to disk. To use the cached '
'token next time, pass --username cache along with any password')
return return
if username != 'cache':
raise ExtractorError(
'Login with username and password is no longer supported '
f'for this site. {self._LOGIN_HINT}, {self._REFRESH_HINT}', expected=True)
# Try cached access_token # Try cached access_token
cached_tokens = self.cache.load(self._NETRC_MACHINE, 'tokens', default={}) cached_tokens = self.cache.load(self._NETRC_MACHINE, 'tokens', default={})
self._access_token = cached_tokens.get('access_token') self._set_access_token(cached_tokens.get('access_token'))
self._refresh_token = cached_tokens.get('refresh_token') self._refresh_token = cached_tokens.get('refresh_token')
if not self._access_token_is_expired: if not self._access_token_is_expired:
return return
if self._real_access_token:
self.write_debug('Cached access token has expired, invalidating')
self._access_token = None
# Try cached refresh_token
if self._refresh_token:
try:
return self._fetch_new_tokens()
except ExtractorError:
# Do not raise for cached tokens; invalidate and continue
self._access_token = None
# username is 'token' # Try cached refresh_token
if not traverse_obj(password, {jwt_decode_hs256}): self._fetch_new_tokens(invalidate=True)
raise ExtractorError(
f'The access token passed to yt-dlp is not valid. {self._LOGIN_HINT}', expected=True)
self._access_token = password
self._set_tokens({'access_token': self._access_token})
def _real_initialize(self): def _real_initialize(self):
if not self._real_access_token: if not self._access_token:
self.raise_login_required( self.raise_login_required(
'All content on this site is only available for registered users. ' 'All content on this site is only available for registered users. '
f'{self._LOGIN_HINT}, {self._REFRESH_HINT}', method=None) f'{self._LOGIN_HINT}, {self._REFRESH_HINT}', method=None)
@ -203,6 +206,7 @@ class DigitalConcertHallIE(InfoExtractor):
video_id = item['id'] video_id = item['id']
for should_retry in (True, False): for should_retry in (True, False):
self._fetch_new_tokens(invalidate=not should_retry)
try: try:
stream_info = self._download_json( stream_info = self._download_json(
self._proto_relative_url(item['_links']['streams']['href']), video_id, headers={ self._proto_relative_url(item['_links']['streams']['href']), video_id, headers={
@ -214,8 +218,6 @@ class DigitalConcertHallIE(InfoExtractor):
break break
except ExtractorError as error: except ExtractorError as error:
if should_retry and isinstance(error.cause, HTTPError) and error.cause.status == 401: if should_retry and isinstance(error.cause, HTTPError) and error.cause.status == 401:
self.report_warning('Access token has been invalidated')
self._fetch_new_tokens()
continue continue
raise raise
@ -253,7 +255,6 @@ class DigitalConcertHallIE(InfoExtractor):
'Accept': 'application/json', 'Accept': 'application/json',
'Accept-Language': language, 'Accept-Language': language,
'User-Agent': self._USER_AGENT, 'User-Agent': self._USER_AGENT,
'Authorization': f'Bearer {self._access_token}',
}) })
videos = [vid_info] if type_ == 'film' else traverse_obj(vid_info, ('_embedded', ..., ...)) videos = [vid_info] if type_ == 'film' else traverse_obj(vid_info, ('_embedded', ..., ...))