Skip to content

Commit

Permalink
vendor pytube
Browse files Browse the repository at this point in the history
upstream is dead, bundle code to simplify patching
  • Loading branch information
JarbasAl committed Sep 4, 2024
1 parent 22b0948 commit 0828aad
Show file tree
Hide file tree
Showing 24 changed files with 6,071 additions and 339 deletions.
1 change: 0 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
requests
bs4
pytube
ytmusicapi
339 changes: 2 additions & 337 deletions tutubo/models.py
Original file line number Diff line number Diff line change
@@ -1,102 +1,4 @@
import json
import re
from typing import List, Tuple, Optional, Iterable

# Patches since upstream is abandoned (TODO time to fork ?)
import pytube.cipher
import pytube.innertube
from pytube import YouTube as _Yt, Channel as _Ch, Playlist as _Pl
from pytube import extract
from pytube.exceptions import VideoUnavailable
from pytube.helpers import uniqueify, cache, DeferredGeneratorList


def get_throttling_function_name(js: str) -> str:
"""Extract the name of the function that computes the throttling parameter.
:param str js:
The contents of the base.js asset file.
:rtype: str
:returns:
The name of the function used to compute the throttling parameter.
"""
function_patterns = [
# https://github.com/ytdl-org/youtube-dl/issues/29326#issuecomment-865985377
# https://github.com/yt-dlp/yt-dlp/commit/48416bc4a8f1d5ff07d5977659cb8ece7640dcd8
# var Bpa = [iha];
# ...
# a.C && (b = a.get("n")) && (b = Bpa[0](b), a.set("n", b),
# Bpa.length || iha("")) }};
# In the above case, `iha` is the relevant function name
r'a\.[a-zA-Z]\s*&&\s*\([a-z]\s*=\s*a\.get\("n"\)\)\s*&&.*?\|\|\s*([a-z]+)',
r'\([a-z]\s*=\s*([a-zA-Z0-9$]+)(\[\d+\])\([a-z]\)',
]
for pattern in function_patterns:
regex = re.compile(pattern)
function_match = regex.search(js)
if function_match:
if len(function_match.groups()) == 1:
return function_match.group(1)
idx = function_match.group(2)
if idx:
idx = idx.strip("[]")
array = re.search(
r'var {nfunc}\s*=\s*(\[.+?\]);'.format(
nfunc=re.escape(function_match.group(1))),
js
)
if array:
array = array.group(1).strip("[]").split(",")
array = [x.strip() for x in array]
return array[int(idx)]

raise extract.RegexMatchError(
caller="get_throttling_function_name", pattern="multiple"
)


def extract_channel_name(url: str) -> str:
"""Extract the ``channel_name`` or ``channel_id`` from a YouTube url.
This function supports the following patterns:
- :samp:`https://youtube.com/{channel_name}/*`
- :samp:`https://youtube.com/@{channel_name}/*`
- :samp:`https://youtube.com/c/{channel_name}/*`
- :samp:`https://youtube.com/channel/{channel_id}/*
- :samp:`https://youtube.com/c/@{channel_name}/*`
- :samp:`https://youtube.com/channel/@{channel_id}/*
- :samp:`https://youtube.com/u/{channel_name}/*`
- :samp:`https://youtube.com/user/{channel_id}/*
:param str url:
A YouTube url containing a channel name.
:rtype: str
:returns:
YouTube channel name.
"""
pattern = r"(?:https?:\/\/)?(?:www\.)?youtube\.com\/(?:(user|channel|c)(?:\/))?\@?([%\d\w_\-]+)"
regex = re.compile(pattern)
function_match = regex.search(url)
if function_match:
uri_style = function_match.group(1)
uri_style = uri_style if uri_style else "c"
uri_identifier = function_match.group(2)
return f'/{uri_style}/{uri_identifier}'

raise extract.RegexMatchError(
caller="channel_name", pattern="patterns"
)


extract.channel_name = extract_channel_name
pytube.cipher.get_throttling_function_name = get_throttling_function_name
pytube.innertube._default_clients["ANDROID"]["context"]["client"]["clientVersion"] = "19.08.35"
pytube.innertube._default_clients["IOS"]["context"]["client"]["clientVersion"] = "19.08.35"
pytube.innertube._default_clients["ANDROID_EMBED"]["context"]["client"]["clientVersion"] = "19.08.35"
pytube.innertube._default_clients["IOS_EMBED"]["context"]["client"]["clientVersion"] = "19.08.35"
pytube.innertube._default_clients["IOS_MUSIC"]["context"]["client"]["clientVersion"] = "6.41"
pytube.innertube._default_clients["ANDROID_MUSIC"] = pytube.innertube._default_clients["ANDROID_CREATOR"]
from tutubo.pytube import YouTube as Video, Channel, Playlist


class YoutubePreview:
Expand Down Expand Up @@ -353,242 +255,5 @@ def as_dict(self):
'image': self.thumbnail_url}


class Playlist(_Pl):
def __init__(self, url, *args, **kwargs):
super().__init__(url, *args, **kwargs)
self._metadata = None
self._microformat = None

@property
def metadata(self):
if self._metadata:
return self._metadata
else:
self._metadata = self.initial_data['metadata'][
'channelMetadataRenderer']
return self._metadata

@property
def microformat(self):
if self._microformat:
return self._microformat
else:
self._microformat = self.initial_data['metadata'][
'microformatDataRenderer']
return self._microformat

@property
def title(self):
"""Extract playlist title
:return: playlist title (name)
:rtype: Optional[str]
"""
try:
return self.sidebar_info[0]['playlistSidebarPrimaryInfoRenderer'][
'title']['runs'][0]['text']
except: # sidebar not available
pass
try:
return self.microformat['title']
except:
pass
try:
return self.metadata['title']
except:
pass

@property
def description(self) -> str:
try:
return self.sidebar_info[0]['playlistSidebarPrimaryInfoRenderer'][
'description']['simpleText'].strip()
except: # sometimes description is an empty dict
return ""

@property
def featured_videos(self):
videos = []
idx = 0
for vid in self.videos:
if idx > 5:
break
try:
videos.append({
"videoId": vid.video_id,
"url": vid.watch_url,
"image": vid.thumbnail_url,
"title": vid.title
})
idx += 1
except VideoUnavailable:
continue
return videos

@property
def thumbnail_url(self):
try:
return self.featured_videos[0]["image"]
except:
return None

@property
def as_dict(self):
return {'playlistId': self.playlist_id,
'title': self.title,
'url': self.playlist_url,
"image": self.thumbnail_url,
'featured_videos': self.featured_videos}


class Channel(Playlist, _Ch):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

# see https://github.com/pytube/pytube/pull/1019
@property
def playlists(self) -> Iterable[Playlist]:
"""Yields Playlist objects of playlists in this channel
:rtype: List[Playlist]
:returns: List of Playlist
"""
return DeferredGeneratorList(self.playlist_generator())

@staticmethod
def _extract_playlists(raw_json: str) -> Tuple[List[str], Optional[str]]:
"""Extracts playlists from a raw json page
:param str raw_json: Input json extracted from the page or the last
server response
:rtype: Tuple[List[str], Optional[str]]
:returns: Tuple containing a list of up to 100 video watch ids and
a continuation token, if more videos are available
"""
initial_data = json.loads(raw_json)

# this is the json tree structure, if the json was extracted from
# html
playlists = []
try:
tabs = initial_data["contents"]["twoColumnBrowseResultsRenderer"]["tabs"]
for t in tabs:
if "content" not in t["tabRenderer"]:
continue
data = t["tabRenderer"]["content"]
if "sectionListRenderer" not in t["tabRenderer"]["content"]:
continue
for c in data["sectionListRenderer"]["contents"][0]["itemSectionRenderer"]["contents"]:
if 'shelfRenderer' in c:
playlists = c['shelfRenderer']["content"]['horizontalListRenderer']["items"]
break
elif 'gridRenderer' in c:
playlists = c['gridRenderer']["items"]
break
if playlists:
break
except (KeyError, IndexError, TypeError):
pass

try:
continuation = playlists[-1]['continuationItemRenderer']['continuationEndpoint']['continuationCommand'][
'token']
playlists = playlists[:-1]
except (KeyError, IndexError):
# if there is an error, no continuation is available
continuation = None

for p in playlists:
if 'gridPlaylistRenderer' in p:
p['playlistId'] = p['gridPlaylistRenderer']['playlistId']
elif 'lockupViewModel' in p:
# p["title"] = p['lockupViewModel']['metadata']['lockupMetadataViewModel']['title']['content']
p['playlistId'] = p['lockupViewModel']['contentId']
# remove duplicates
return (
uniqueify(list(map(lambda x: f"/playlist?list={x['playlistId']}", playlists))),
continuation,
)

def playlist_generator(self):
for url in self.playlist_urls:
yield Playlist(url)

def playlist_url_generator(self):
"""Generator that yields video URLs.
:Yields: Video URLs
"""
for page in self._paginate_playlists():
for playlist in page:
yield self._playlist_url(playlist)

def _paginate_playlists(
self, until_watch_id: Optional[str] = None
) -> Iterable[List[str]]:
"""Parse the playlist links from the page source, yields the /watch?v=
part from video link
:param until_watch_id Optional[str]: YouTube Video watch id until
which the playlist should be read.
:rtype: Iterable[List[str]]
:returns: Iterable of lists of YouTube playlist ids
"""
playlist_urls, continuation = self._extract_playlists(
json.dumps(extract.initial_data(self.playlists_html))
)
yield playlist_urls

@staticmethod
def _playlist_url(playlist_path: str):
return f"https://www.youtube.com{playlist_path}"

@property # type: ignore
@cache
def playlist_urls(self) -> DeferredGeneratorList:
"""Complete links of all the playlists in channel
:rtype: List[str]
:returns: List of playlist URLs
"""
return DeferredGeneratorList(self.playlist_url_generator())

@property
def as_dict(self):
return {'channelId': self.channel_id,
'title': self.title,
'image': self.thumbnail_url,
'url': self.channel_url}


def video_description_info(watch_html: str):
try:
yt_description_result = extract.regex_search(r'"(?<=description":{"simpleText":")([^}]+)', watch_html, group=0)
except extract.RegexMatchError:
yt_description_result = None
return yt_description_result


class Video(_Yt):
@property
def description(self) -> str:
"""Get the video description."""
return self.vid_info.get("videoDetails", {}).get("shortDescription") or \
video_description_info(self.watch_html).replace('\\n', '\n')

@property
def as_dict(self):
return {'length': self.length,
'keywords': self.keywords,
'image': self.thumbnail_url,
'title': self.title,
"author": self.author,
'url': self.watch_url,
'videoId': self.video_id}


class RelatedVideo(Video):
@property
def as_dict(self):
return {'length': self.length,
'keywords': self.keywords,
'image': self.thumbnail_url,
'title': self.title,
"author": self.author,
'url': self.watch_url,
'videoId': self.video_id}
""""""
19 changes: 19 additions & 0 deletions tutubo/pytube/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# flake8: noqa: F401
# noreorder
"""
Pytube: a very serious Python library for downloading YouTube Videos.
"""
__title__ = "pytube"
__author__ = "Ronnie Ghose, Taylor Fox Dahlin, Nick Ficano"
__license__ = "The Unlicense (Unlicense)"
__js__ = None
__js_url__ = None

from tutubo.pytube.version import __version__
from tutubo.pytube.streams import Stream
from tutubo.pytube.captions import Caption
from tutubo.pytube.query import CaptionQuery, StreamQuery
from tutubo.pytube.__main__ import YouTube
from tutubo.pytube.contrib.playlist import Playlist
from tutubo.pytube.contrib.channel import Channel
from tutubo.pytube.contrib.search import Search
Loading

0 comments on commit 0828aad

Please sign in to comment.