Compare commits

...

3 Commits

Author SHA1 Message Date
Benjamin Ryan b96c90c0ad
Merge 773a8764c8 into 89f535e265 2024-04-24 02:49:43 +01:00
bashonly 89f535e265
[ci] Fix `curl-cffi` installation (Bugfix for 02483bea1c)
Authored by: bashonly
2024-04-22 20:36:01 +00:00
redraskal 773a8764c8 [extractor/tiktok] Fix TikTokUserIE extractor 2023-02-05 13:31:47 -06:00
2 changed files with 196 additions and 64 deletions

View File

@ -53,7 +53,7 @@ jobs:
with:
python-version: ${{ matrix.python-version }}
- name: Install test requirements
run: python3 ./devscripts/install_deps.py --include dev --include curl_cffi
run: python3 ./devscripts/install_deps.py --include dev --include curl-cffi
- name: Run tests
continue-on-error: False
run: |

View File

@ -6,6 +6,14 @@
import time
import uuid
from playwright.sync_api import sync_playwright
from base64 import b64encode
from urllib.parse import urlencode
from Cryptodome.Cipher import AES
from Cryptodome.Util.Padding import pad
from .common import InfoExtractor
from ..compat import compat_urllib_parse_urlparse
from ..networking import HEADRequest
@ -791,104 +799,228 @@ def _real_extract(self, url):
raise ExtractorError(f'Video not available, status code {status}', video_id=video_id)
class TikTokUserIE(TikTokBaseIE):
class TikTokUserIE(TikTokIE):
IE_NAME = 'tiktok:user'
_VALID_URL = r'https?://(?:www\.)?tiktok\.com/@(?P<id>[\w\.-]+)/?(?:$|[#?])'
_WORKING = False
_WORKING = True
_TESTS = [{
'url': 'https://tiktok.com/@corgibobaa?lang=en',
'playlist_mincount': 45,
'url': 'https://tiktok.com/@therock?lang=en',
'playlist_mincount': 25,
'info_dict': {
'id': '6935371178089399301',
'title': 'corgibobaa',
'thumbnail': r're:https://.+_1080x1080\.webp'
'id': '6745191554350760966',
'title': 'therock',
'thumbnail': r're:https://.+_100x100\.jpeg',
'signature': str,
'follower_count': int,
'verified': True,
'private': bool,
'following_count': int,
'nickname': str,
'like_count': int
},
'expected_warnings': ['Retrying']
}, {
'url': 'https://www.tiktok.com/@6820838815978423302',
'url': 'https://www.tiktok.com/@pokemonlife22',
'playlist_mincount': 5,
'info_dict': {
'id': '6820838815978423302',
'title': '6820838815978423302',
'thumbnail': r're:https://.+_1080x1080\.webp'
'title': 'pokemonlife22',
'thumbnail': r're:https://.+_100x100\.jpeg',
'signature': str,
'follower_count': int,
'verified': bool,
'private': bool,
'following_count': int,
'nickname': str,
'like_count': int
},
'expected_warnings': ['Retrying']
}, {
'url': 'https://www.tiktok.com/@meme',
'playlist_mincount': 593,
'playlist_mincount': 25,
'info_dict': {
'id': '79005827461758976',
'title': 'meme',
'thumbnail': r're:https://.+_1080x1080\.webp'
'thumbnail': r're:https://.+_100x100\.jpeg',
'signature': str,
'follower_count': int,
'verified': True,
'private': bool,
'following_count': int,
'nickname': str,
'like_count': int
},
'expected_warnings': ['Retrying']
}]
r''' # TODO: Fix by adding _signature to api_url
def _entries(self, webpage, user_id, username):
secuid = self._search_regex(r'\"secUid\":\"(?P<secUid>[^\"]+)', webpage, username)
verifyfp_cookie = self._get_cookies('https://www.tiktok.com').get('s_v_web_id')
if not verifyfp_cookie:
raise ExtractorError('Improper cookies (missing s_v_web_id).', expected=True)
api_url = f'https://m.tiktok.com/api/post/item_list/?aid=1988&cookie_enabled=true&count=30&verifyFp={verifyfp_cookie.value}&secUid={secuid}&cursor='
cursor = '0'
for page in itertools.count():
data_json = self._download_json(api_url + cursor, username, note='Downloading Page %d' % page)
for video in data_json.get('itemList', []):
video_id = video['id']
video_url = f'https://www.tiktok.com/@{user_id}/video/{video_id}'
yield self._url_result(video_url, 'TikTok', video_id, str_or_none(video.get('desc')))
if not data_json.get('hasMore'):
break
cursor = data_json['cursor']
'''
def _video_entries_api(self, webpage, user_id, username):
query = {
'user_id': user_id,
'count': 21,
'max_cursor': 0,
'min_cursor': 0,
'retry_type': 'no_retry',
'device_id': ''.join(random.choices(string.digits, k=19)), # Some endpoints don't like randomized device_id, so it isn't directly set in _call_api.
def _generate_x_tt_params(self, secUid, device_id, cursor):
payload = {
'aid': '1988',
'app_name': 'tiktok_web',
'channel': 'tiktok_web',
'device_platform': 'web_pc',
'device_id': device_id,
'region': 'US',
'priority_region': '',
'os': 'windows',
'referer': '',
'root_referer': 'undefined',
'cookie_enabled': 'true',
'screen_width': '1920',
'screen_height': '1080',
'browser_language': 'en-US',
'browser_platform': 'Win32',
'browser_name': 'Mozilla',
'browser_version': '5.0 (Windows)',
'browser_online': 'true',
'verifyFp': 'undefined',
'app_language': 'en',
'webcast_language': 'en',
'tz_name': 'America/Chicago',
'is_page_visible': 'true',
'focus_state': 'false',
'is_fullscreen': 'false',
'history_len': '2',
'from_page': 'user',
'secUid': secUid,
'count': '30',
'cursor': cursor,
'language': 'en',
'userId': 'undefined',
'is_encryption': '1'
}
# https://github.com/davidteather/TikTok-Api/issues/899#issuecomment-1175439842
s = urlencode(payload, doseq=True, quote_via=lambda s, *_: s)
key = "webapp1.0+202106".encode("utf-8")
cipher = AES.new(key, AES.MODE_CBC, key)
ct_bytes = cipher.encrypt(pad(s.encode("utf-8"), AES.block_size))
return b64encode(ct_bytes).decode("utf-8")
for page in itertools.count(1):
for retry in self.RetryManager():
try:
post_list = self._call_api(
'aweme/post', query, username, note=f'Downloading user video list page {page}',
errnote='Unable to download user video list')
except ExtractorError as e:
if isinstance(e.cause, json.JSONDecodeError) and e.cause.pos == 0:
retry.error = e
continue
raise
yield from post_list.get('aweme_list', [])
if not post_list.get('has_more'):
def _video_entries_api(self, user_name, secUid):
cursor = '0'
videos = []
author = []
max = self._downloader.params.get('playlistend') or -1
device_id = ''.join(random.choices(string.digits, k=19))
self.write_debug('Launching headless browser')
with sync_playwright() as p:
browser = p.firefox.launch(args=['--mute-audio'])
page = browser.new_page()
page.goto('https://tiktok.com', wait_until='load')
time.sleep(2) # it just works ok
for i in itertools.count(1):
x_tt_params = self._generate_x_tt_params(secUid, device_id, cursor)
self.to_screen(f'Downloading page {i}')
self.write_debug(f'x-tt-params: {x_tt_params}')
data_json = page.evaluate('([x, d]) => fetch(`https://us.tiktok.com/api/post/item_list/?aid=1988&app_language=en&app_name=tiktok_web&browser_language=en-US&browser_name=Mozilla&browser_online=true&browser_platform=Win32&browser_version=5.0%20%28Windows%29&channel=tiktok_web&cookie_enabled=true&device_id=${d}&device_platform=web_pc&focus_state=true&from_page=user&history_len=2&is_fullscreen=false&is_page_visible=true&os=windows&priority_region=&referer=&region=US&screen_height=1080&screen_width=1920`, { headers: { "x-tt-params": x } }).then(res => res.json())', [x_tt_params, device_id])
for video in data_json.get('itemList', []):
video_id = video.get('id', '')
if len(videos) == 0:
author = video.get('author', [])
video_url = f'https://www.tiktok.com/@{user_name}/video/{video_id}'
videos.append(self.url_result(video_url, 'TikTok', video_id, str_or_none(video.get('desc'))))
if max > -1 and len(videos) >= max:
break
else:
if not data_json.get('hasMore'):
break
cursor = data_json['cursor']
continue
break
query['max_cursor'] = post_list['max_cursor']
browser.close()
return author, videos
def _entries_api(self, user_id, videos):
def _entries_api(self, videos):
for video in videos:
yield {
**self._parse_aweme_video_app(video),
**self._try_extract(video['url'], video['id']),
'extractor_key': TikTokIE.ie_key(),
'extractor': 'TikTok',
'webpage_url': f'https://tiktok.com/@{user_id}/video/{video["aweme_id"]}',
'webpage_url': video['url'],
}
def _try_extract(self, url, video_id):
try:
return self._extract_video(url, video_id)
except ExtractorError as e:
self.report_warning(e)
return {}
def _extract_video(self, url, video_id):
try:
return self._extract_aweme_app(video_id)
except ExtractorError as e:
self.report_warning(f'{e}; trying with webpage')
webpage = self._download_webpage(url, video_id, headers={'User-Agent': 'User-Agent:Mozilla/5.0'})
next_data = self._search_nextjs_data(webpage, video_id, default='{}')
if next_data:
status = traverse_obj(next_data, ('props', 'pageProps', 'statusCode'), expected_type=int) or 0
video_data = traverse_obj(next_data, ('props', 'pageProps', 'itemInfo', 'itemStruct'), expected_type=dict)
else:
sigi_data = self._get_sigi_state(webpage, video_id)
status = traverse_obj(sigi_data, ('VideoPage', 'statusCode'), expected_type=int) or 0
video_data = traverse_obj(sigi_data, ('ItemModule', video_id), expected_type=dict)
if status == 0:
return self._parse_aweme_video_web(video_data, url)
elif status == 10216:
raise ExtractorError('This video is private', expected=True)
raise ExtractorError('Video not available', video_id=video_id)
def _get_frontity_state(self, webpage, user_name):
return traverse_obj(
self._parse_json(self._search_regex(
r'(?s)<script[^>]+id=[\'"]__FRONTITY_CONNECT_STATE__[\'"][^>]*>([^<]+)</script>',
webpage, 'frontity data'), 'frontity data'),
('source', 'data', f'/embed/@{user_name}'))
def _extract_secUid(self, aweme_id):
feed_list = self._call_api('feed', {'aweme_id': aweme_id}, aweme_id,
note='Downloading video feed', errnote='Unable to download video feed').get('aweme_list') or []
aweme_detail = next((aweme for aweme in feed_list if str(aweme.get('aweme_id')) == aweme_id), None)
if not aweme_detail:
raise ExtractorError('Unable to find video in feed', video_id=aweme_id)
return traverse_obj(aweme_detail, ('author', 'sec_uid'))
def _real_extract(self, url):
user_name = self._match_id(url)
webpage = self._download_webpage(url, user_name, headers={
'User-Agent': 'facebookexternalhit/1.1 (+http://www.facebook.com/externalhit_uatext.php)'
})
user_id = self._html_search_regex(r'snssdk\d*://user/profile/(\d+)', webpage, 'user ID', default=None) or user_name
user_info = []
secUid = ''
videos = LazyList(self._video_entries_api(webpage, user_id, user_name))
thumbnail = traverse_obj(videos, (0, 'author', 'avatar_larger', 'url_list', 0))
try:
webpage = self._download_webpage(f'https://www.tiktok.com/embed/@{user_name}', user_name, note='Downloading user embed')
state = self._get_frontity_state(webpage, user_name)
user_info = state.get('userInfo')
latest_video = next((video for video in state.get('videoList') if len(video.get('playAddr')) > 0), None)
if latest_video:
latest_video_id = latest_video.get('id')
secUid = self._extract_secUid(latest_video_id)
except ExtractorError as e:
secUid = self._configuration_arg('secuid', [''], ie_key=TikTokIE, casesense=True)[0]
if len(secUid) == 0:
raise e
self.report_warning(f'{e}; secUid supplied, trying anyway')
return self.playlist_result(self._entries_api(user_id, videos), user_id, user_name, thumbnail=thumbnail)
author, response = self._video_entries_api(user_name, secUid)
if author.get('uniqueId', '') == user_name:
user_info = author
user_info['avatarThumbUrl'] = user_info['avatarLarger']
videos = LazyList(response)
return self.playlist_result(
self._entries_api(videos),
user_info.get('id'), user_name,
nickname=user_info.get('nickname', user_name),
thumbnail=user_info.get('avatarThumbUrl', ''),
verified=user_info.get('verified', False),
follower_count=user_info.get('followerCount', 0),
following_count=user_info.get('followingCount', 0),
like_count=user_info.get('heartCount', 0),
signature=user_info.get('signature', ''),
private=user_info.get('privateAccount', False)
)
class TikTokBaseListIE(TikTokBaseIE): # XXX: Conventionally, base classes should end with BaseIE/InfoExtractor