Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding new methods and modifing some old ones #544

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ Features:

Fast and effective Instagram Private API wrapper (public+private requests and challenge resolver) without selenium. Use the most recent version of the API from Instagram, which was obtained using [reverse-engineering with Charles Proxy](https://adw0rd.com/2020/03/26/sniffing-instagram-charles-proxy/en/) and [Proxyman](https://proxyman.io/).

*Instagram API valid for **3 January 2022** (last reverse-engineering check)*
*Instagram API valid for **20 January 2022** (last reverse-engineering check)*

Support **Python >= 3.6**, recommend 3.8+

Expand Down
12 changes: 7 additions & 5 deletions docs/usage-guide/user.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@ Low level methods:
| Method | Return | Description
| ------------------------------------------------- | --------------- | ---------------------------------------------------------
| user_followers_gql_chunk(user_id: int, max_amount: int = 0, end_cursor: str = None) | Tuple[List[UserShort], str] | Get user's followers information by Public Graphql API and end_cursor
| user_followers_gql(user_id: int, amount: int = 0) | List[UserShort] | Get user's followers information by Public Graphql API
| user_followers_v1_chunk(user_id: int, max_amount: int = 0, max_id: str = "") | Tuple[List[UserShort], str] | Get user's followers information by Private Mobile API and max_id (cursor)
| user_followers_v1(user_id: int, amount: int = 0) | List[UserShort] | Get user's followers information by Private Mobile API
| user_following_v1(user_id: int, amount: int = 0) | List[UserShort] | Get user's following users information by Private Mobile API
| user_following_gql(user_id: int, amount: int = 0) | List[UserShort] | Get user's following information by Public Graphql API
| user_followers_gql(user_id: int, amount: int = 0) | List[Dict] Full JSON Response| Get user's followers information by Public Graphql API
| user_followers_v1_chunk(user_id: int, max_amount: int = 0, max_id: str = "") | Full JSON Response, str] | Get user's followers information by Private Mobile API and max_id (cursor)
| user_followers_v1(user_id: int, amount: int = 0) | List[Dict], Full JSON Response| Get user's followers information by Private Mobile API
| user_following_v1(user_id: int, amount: int = 0) | List[Dict], Full JSON Response| Get user's following users information by Private Mobile API
| user_following_v1_chunk(user_id: int, max_amount: int = 0, max_id: str = "") | List[Dict], Full JSON Response] | Get user's followings information by Private Mobile API and max_id (cursor)
| user_following_gql(user_id: int, amount: int = 0) | List[Dict] Full JSON Response| Get user's following information by Public Graphql API
| search_followers_v1(user_id: int, query: str) | List[UserShort] | Search by followers by Private Mobile API
| search_following_v1(user_id: int, query: str) | List[UserShort] | Search by following by Private Mobile API
| user_following_hashtags_gql(user_id: int, amount: int = 0) | List[Dict] Full JSON Response | Get user's following hashtags by Public Graphql API

Example:

Expand Down
121 changes: 23 additions & 98 deletions instagrapi/extractors.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,13 @@
DirectShortThread,
DirectThread,
Hashtag,
Highlight,
Location,
Media,
MediaOembed,
Resource,
Story,
StoryLink,
StoryMedia,
StoryMention,
Track,
User,
UserShort,
Usertag,
Expand Down Expand Up @@ -62,13 +59,14 @@ def extract_media_v1(data):
)
media["like_count"] = media.get("like_count", 0)
media["has_liked"] = media.get("has_liked", False)
return Media(
"""return Media(
caption_text=(media.get("caption") or {}).get("text", ""),
resources=[
extract_resource_v1(edge) for edge in media.get("carousel_media", [])
],
**media,
)
)"""
return data


def extract_media_gql(data):
Expand All @@ -85,14 +83,11 @@ def extract_media_gql(data):
media["media_type"] = 0
if media.get("media_type") == 2 and not media.get("product_type"):
media["product_type"] = "feed"
if "thumbnail_src" in media:
media["thumbnail_url"] = media["thumbnail_src"]
else:
media["thumbnail_url"] = sorted(
# display_resources - user feed, thumbnail_resources - hashtag feed
media.get("display_resources", media.get("thumbnail_resources")),
key=lambda o: o["config_width"] * o["config_height"],
)[-1]["src"]
media["thumbnail_url"] = sorted(
# display_resources - user feed, thumbnail_resources - hashtag feed
media.get("display_resources", media.get("thumbnail_resources")),
key=lambda o: o["config_width"] * o["config_height"],
)[-1]["src"]
if media.get("media_type") == 8:
# remove thumbnail_url and video_url for albums
# see resources
Expand All @@ -102,32 +97,7 @@ def extract_media_gql(data):
media_id = media.get("id")
media["pk"] = media_id
media["id"] = f"{media_id}_{user.pk}"
return Media(
code=media.get("shortcode"),
taken_at=media.get("taken_at_timestamp"),
location=extract_location(location) if location else None,
user=user,
view_count=media.get("video_view_count", 0),
comment_count=json_value(media, "edge_media_to_comment", "count"),
like_count=json_value(media, "edge_media_preview_like", "count"),
caption_text=json_value(
media, "edge_media_to_caption", "edges", 0, "node", "text", default=""
),
usertags=sorted(
[
extract_usertag(usertag["node"])
for usertag in media.get("edge_media_to_tagged_user", {}).get(
"edges", []
)
],
key=lambda tag: tag.user.pk,
),
resources=[
extract_resource_gql(edge["node"])
for edge in media.get("edge_sidecar_to_children", {}).get("edges", [])
],
**media,
)
return data


def extract_resource_v1(data):
Expand Down Expand Up @@ -162,17 +132,8 @@ def extract_user_short(data):

def extract_user_gql(data):
"""For Public GraphQL API"""
return User(
pk=data["id"],
media_count=data["edge_owner_to_timeline_media"]["count"],
follower_count=data["edge_followed_by"]["count"],
following_count=data["edge_follow"]["count"],
is_business=data["is_business_account"],
public_email=data["business_email"],
contact_phone_number=data["business_phone_number"],
**data,
)


return data

def extract_user_v1(data):
"""For Private API"""
Expand Down Expand Up @@ -228,17 +189,12 @@ def extract_media_oembed(data):


def extract_direct_thread(data):
data["pk"] = data.get("thread_v2_id")
data["id"] = data.get("thread_id")
data["messages"] = []
for item in data["items"]:
item["thread_id"] = data["id"]
data["messages"].append(
extract_direct_message(item)
)
data["messages"] = [extract_direct_message(item) for item in data["items"]]
data["users"] = [extract_user_short(u) for u in data["users"]]
if "inviter" in data:
data["inviter"] = extract_user_short(data["inviter"])
data["pk"] = data.get("thread_v2_id")
data["id"] = data.get("thread_id")
data["left_users"] = data.get("left_users", [])
return DirectThread(**data)

Expand All @@ -256,10 +212,7 @@ def extract_direct_response(data):
def extract_direct_message(data):
data["id"] = data.get("item_id")
if "media_share" in data:
ms = data["media_share"]
if not ms.get("code"):
ms["code"] = InstagramIdCodec.encode(ms["id"])
data["media_share"] = extract_media_v1(ms)
data["media_share"] = extract_media_v1(data["media_share"])
if "media" in data:
data["media"] = extract_direct_media(data["media"])
clip = data.get("clip", {})
Expand Down Expand Up @@ -295,13 +248,11 @@ def extract_account(data):

def extract_hashtag_gql(data):
data["media_count"] = data.get("edge_hashtag_to_media", {}).get("count")
data["profile_pic_url"] = data["profile_pic_url"] or None
return Hashtag(**data)


def extract_hashtag_v1(data):
data["allow_following"] = data.get("allow_following") == 1
data["profile_pic_url"] = data["profile_pic_url"] or None
return Hashtag(**data)


Expand All @@ -314,7 +265,7 @@ def extract_story_v1(data):
story["video_versions"], key=lambda o: o["height"] * o["width"]
)[-1]["url"]
if story["media_type"] == 2 and not story.get("product_type"):
story["product_type"] = "story"
story["product_type"] = "feed"
if "image_versions2" in story:
story["thumbnail_url"] = sorted(
story["image_versions2"]["candidates"],
Expand All @@ -326,12 +277,6 @@ def extract_story_v1(data):
story["locations"] = []
story["hashtags"] = []
story["stickers"] = []
feed_medias = []
story_feed_medias = data.get('story_feed_media') or []
for feed_media in story_feed_medias:
feed_media["media_pk"] = int(feed_media["media_id"])
feed_medias.append(StoryMedia(**feed_media))
story["medias"] = feed_medias
story["links"] = []
for cta in story.get("story_cta", []):
for link in cta.get("links", []):
Expand All @@ -348,21 +293,15 @@ def extract_story_gql(data):
story["video_url"] = sorted(
story["video_resources"], key=lambda o: o["config_height"] * o["config_width"]
)[-1]["src"]
story["product_type"] = "story"
# if story["tappable_objects"] and "GraphTappableFeedMedia" in [x["__typename"] for x in story["tappable_objects"]]:
story["product_type"] = "feed"
story["thumbnail_url"] = story.get("display_url")
story["mentions"] = []
story["medias"] = []
for item in story.get("tappable_objects", []):
if item["__typename"] == "GraphTappableMention":
item["id"] = 1
item["user"] = extract_user_short(item)
story["mentions"].append(StoryMention(**item))
if item["__typename"] == "GraphTappableFeedMedia":
media = item.get("media")
if media:
item["media_pk"] = int(media["id"])
item["media_code"] = media["shortcode"]
story["medias"].append(StoryMedia(**item))
for mention in story.get("tappable_objects", []):
if mention["__typename"] == "GraphTappableMention":
mention["id"] = 1
mention["user"] = extract_user_short(mention)
story["mentions"].append(StoryMention(**mention))
story["locations"] = []
story["hashtags"] = []
story["stickers"] = []
Expand All @@ -377,17 +316,3 @@ def extract_story_gql(data):
story["taken_at"] = story["taken_at_timestamp"]
story["media_type"] = 2 if story["is_video"] else 1
return Story(**story)


def extract_highlight_v1(data):
highlight = deepcopy(data)
highlight['pk'] = highlight['id'].split(':')[1]
highlight['items'] = [
extract_story_v1(item)
for item in highlight.get('items', [])
]
return Highlight(**highlight)


def extract_track(data):
return Track(**data)
83 changes: 83 additions & 0 deletions instagrapi/mixins/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -870,3 +870,86 @@ def unmute_stories_from_follow(self, user_id: str) -> bool:
A boolean value
"""
return self.mute_stories_from_follow(user_id, True)

#get user followings

def user_following_v1_chunk(self, user_id: int, max_amount: int = 0, max_id: str = ""):
"""
Get user's followers information by Private Mobile API and max_id (cursor)

Parameters
----------
user_id: int
User id of an instagram account
max_amount: int, optional
Maximum number of media to return, default is 0 - Inf
max_id: str, optional
Max ID, default value is empty String

Returns
-------
Tuple[List[UserShort], str]
Tuple of List of users and max_id
"""
users = []
while True:
result = self.private_request(f"friendships/{user_id}/following/", params={
"max_id": max_id,
"rank_token": self.rank_token,
"search_surface": "follow_list_page",
"query": "",
"enable_groups": "true"
})
for user in result["users"]:
# users.append(extract_user_short(user))
users.append(user)
max_id = result.get("next_max_id")
if not max_id or (max_amount and len(users) >= max_amount):
break
return users, max_id

# returning following hashtags
def user_following_hashtags_gql(self, user_id: int, amount: int = 0):
"""
Get user's following information by Public Graphql API

Parameters
----------
user_id: int
User id of an instagram account
amount: int, optional
Maximum number of media to return, default is 0

Returns
-------
List[UserShort]
List of objects of User type
"""
user_id = int(user_id)
end_cursor = None
users = []
variables = {
"id": user_id,
}
self.inject_sessionid_to_public()
while True:
if end_cursor:
variables["after"] = end_cursor
data = self.public_graphql_request(
variables, query_hash="e6306cc3dbe69d6a82ef8b5f8654c50b"
)
if not data["user"] and not users:
raise UserNotFound(user_id=user_id, **data)
#page_info = json_value(data, "user", "edge_following_hashtag", "page_info", default={})
edges = json_value(data, "user", "edge_following_hashtag", "edges", default=[])
for edge in edges:
users.append(edge["node"])
"""end_cursor = page_info.get("end_cursor")
if not page_info.get("has_next_page") or not end_cursor:
break"""
if amount and len(users) >= amount:
break
# time.sleep(sleep)
if amount:
users = users[:amount]
return users