-
Notifications
You must be signed in to change notification settings - Fork 775
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
5 changed files
with
344 additions
and
0 deletions.
There are no files selected for viewing
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,176 @@ | ||
# SPDX-FileCopyrightText: 2024 Tim Cocks | ||
# | ||
# SPDX-License-Identifier: MIT | ||
""" | ||
Bluesky_RPi_TFT_Scroller code.py | ||
Infinitely scroll Bluesky posts on a 320x240 pixel TFT | ||
""" | ||
import json | ||
import os | ||
|
||
import requests | ||
import webview | ||
|
||
FEEDLINK_RETROCOMPUTING = "https://bsky.app/profile/did:plc:tbo4hkau3p2itkar2vsnb3gp/feed/aaabo5oe7bzok" | ||
|
||
# Un-comment a single key inside of FEED_ARGS and set it's value to the feed, list or search | ||
# that you want to scroll. | ||
FETCH_ARGS = { | ||
# "feed_share_link": FEEDLINK_RETROCOMPUTING, | ||
# "feed_share_link": "https://bsky.app/profile/did:plc:463touruejpokvutnn5ikxb5/lists/3lbfdtahfzt2a", | ||
# "search_args": {"q": "Adafruit", "sort": "latest"} | ||
"search_args": {"q": "#circuitpython", "sort": "latest"} | ||
} | ||
|
||
|
||
def at_feed_uri_from_share_link(share_link): | ||
""" | ||
Converts a share link into an AT URI for that resource. | ||
:param share_link: The share link to convert. | ||
:return str: The AT URI pointing at the resource. | ||
""" | ||
at_feed_uri = share_link.replace("https://bsky.app/profile/", "at://") | ||
if "/feed/" in share_link: | ||
at_feed_uri = at_feed_uri.replace("/feed/", "/app.bsky.feed.generator/") | ||
if "/lists/" in share_link: | ||
at_feed_uri = at_feed_uri.replace("/lists/", "/app.bsky.graph.list/") | ||
return at_feed_uri | ||
|
||
|
||
def fetch_data(feed_share_link=None, search_args=None): | ||
""" | ||
Fetch posts from Bluesky API and write them into the local cached | ||
data files. After posts are written locally iterates over them | ||
and downloads the relevant photos from them. | ||
Must pass either feed_share_link or search_args. | ||
:param feed_share_link: The link copied from Bluesky front end to share the feed or list. | ||
:param search_args: A dictionary containing at minimum a ``q`` key with string value of | ||
the hashtag or term to search for. See bsky API docs for other supported keys. | ||
:return: None | ||
""" | ||
if feed_share_link is None and search_args is None: | ||
# If both inputs are None, just use retrocomputing feed. | ||
feed_share_link = FEEDLINK_RETROCOMPUTING | ||
|
||
# if a share link input was provided | ||
if feed_share_link is not None: | ||
FEED_AT = at_feed_uri_from_share_link(feed_share_link) | ||
# print(FEED_AT) | ||
|
||
# if it's a feed | ||
if "/app.bsky.feed.generator/" in FEED_AT: | ||
URL = f"https://public.api.bsky.app/xrpc/app.bsky.feed.getFeed?feed={FEED_AT}&limit=30" | ||
headers = {"Accept-Language": "en"} | ||
resp = requests.get(URL, headers=headers) | ||
|
||
# if it's a list | ||
elif "/app.bsky.graph.list/" in FEED_AT: | ||
URL = f"https://public.api.bsky.app/xrpc/app.bsky.feed.getListFeed?list={FEED_AT}&limit=30" | ||
headers = {"Accept-Language": "en"} | ||
resp = requests.get(URL, headers=headers) | ||
|
||
# raise error if it's an unknown type | ||
else: | ||
raise ValueError("Only 'app.bsky.feed.generator' and 'app.bsky.graph.list' URIs are supported.") | ||
|
||
# if a search input was provided | ||
if search_args is not None: | ||
URL = "https://public.api.bsky.app/xrpc/app.bsky.feed.searchPosts" | ||
headers = {"Accept-Language": "en"} | ||
resp = requests.get(URL, headers=headers, params=search_args) | ||
|
||
with open(".data/raw_data.json", "wb") as f: | ||
# write raw response to cache | ||
f.write(resp.content) | ||
|
||
# Process the post data into a smaller subset | ||
# containing just the bits we need for showing | ||
# on the TFT. | ||
resp_json = json.loads(resp.text) | ||
processed_posts = {"posts": []} | ||
fetched_posts = None | ||
if "feed" in resp_json.keys(): | ||
fetched_posts = resp_json["feed"] | ||
elif "posts" in resp_json.keys(): | ||
fetched_posts = resp_json["posts"] | ||
|
||
for post in fetched_posts: | ||
cur_post = {} | ||
if "post" in post.keys(): | ||
post = post["post"] | ||
cur_post["author"] = post["author"]["handle"] | ||
cur_post["text"] = post["record"]["text"] | ||
|
||
# image handling | ||
if "embed" in post.keys(): | ||
cid = post["cid"] | ||
if "images" in post["embed"].keys(): | ||
cur_post["image_url"] = post["embed"]["images"][0]["thumb"] | ||
elif "thumbnail" in post["embed"].keys(): | ||
cur_post["image_url"] = post["embed"]["thumbnail"] | ||
elif "external" in post["embed"].keys() and "thumb" in post["embed"]["external"].keys(): | ||
cur_post["image_url"] = post["embed"]["external"]["thumb"] | ||
|
||
# if we actually have an image to show | ||
if "image_url" in cur_post.keys(): | ||
# check if we already downloaded this image | ||
if f"{cid}.jpg" not in os.listdir("static/imgs/"): | ||
print(f"downloading: {cur_post['image_url']}") | ||
|
||
# download image and write to file | ||
img_resp = requests.get(cur_post["image_url"]) | ||
with open(f"static/imgs/{cid}.jpg", "wb") as f: | ||
f.write(img_resp.content) | ||
|
||
cur_post["image_file"] = f"{cid}.jpg" | ||
processed_posts["posts"].append(cur_post) | ||
|
||
# save the processed data to a file | ||
with open(".data/processed_data.json", "w", encoding="utf-8") as f: | ||
f.write(json.dumps(processed_posts)) | ||
|
||
|
||
def read_cached_data(): | ||
""" | ||
Load the cached processed data file and return | ||
the data from within it. | ||
:return: The posts data loaded from JSON | ||
""" | ||
with open(".data/processed_data.json", "r") as f: | ||
return json.load(f) | ||
|
||
|
||
class Api: | ||
""" | ||
API object for interaction between python code here | ||
and JS code running inside the page. | ||
""" | ||
|
||
def get_posts(self): | ||
""" | ||
Fetch new posts data from Bluesky API, cache and return it. | ||
:return: Processed data containing everything necessary to show | ||
posts on the TFT. | ||
""" | ||
fetch_data(**FETCH_ARGS) | ||
return read_cached_data() | ||
|
||
def check_quit(self): | ||
""" | ||
Allows the python code to correctly handle KeyboardInterrupt | ||
more quickly. | ||
:return: None | ||
""" | ||
pass | ||
|
||
|
||
# create a webview and load the index.html page | ||
webview.create_window("bsky posts", "static/index.html", | ||
js_api=Api(), width=320, height=240) | ||
webview.start() | ||
# webview.start(debug=True) # use this one to enable chromium dev tools to see console.log() output from the page. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
sudo apt install fonts-noto-color-emoji | ||
sudo apt install python3-webview | ||
sudo apt install python3-requests |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
<!DOCTYPE html> | ||
<html lang="en"> | ||
<head> | ||
<meta charset="UTF-8"> | ||
<title>Bluesky Posts</title> | ||
<style> | ||
.hidden { | ||
display: none; | ||
} | ||
|
||
/* Scale image down to fit the full thing on the TFT */ | ||
img { | ||
max-width: 304px; | ||
max-height: 240px; | ||
object-fit: contain; | ||
} | ||
|
||
/* make really long handles wrap to next line instead of run off edge */ | ||
.postAuthor{ | ||
overflow-wrap: break-word; | ||
} | ||
|
||
/* Hide scrollbar for Chrome, Safari and Opera */ | ||
body::-webkit-scrollbar { | ||
display: none; | ||
} | ||
</style> | ||
</head> | ||
<body> | ||
|
||
<!-- container to hold all posts --> | ||
<div id="postWall"> | ||
|
||
</div> | ||
|
||
<!-- template element for a single post --> | ||
<div id="postTemplate" class="hidden"> | ||
<h3 class="postAuthor"></h3> | ||
<p class="postText"></p> | ||
<img class="postImg"> | ||
</div> | ||
|
||
<!-- load the JS for the rest of the fun --> | ||
<script src="script.js"></script> | ||
</body> | ||
</html> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,119 @@ | ||
// SPDX-FileCopyrightText: 2024 Tim Cocks | ||
// | ||
// SPDX-License-Identifier: MIT | ||
|
||
/* bluesky scroller script.js */ | ||
|
||
// DOM Element references | ||
let $template = document.querySelector("#postTemplate"); | ||
let $postWall = document.querySelector("#postWall"); | ||
|
||
// holds how many times we've fetched data. Used for filtering out older posts | ||
let curFetchIndex = 0; | ||
|
||
// list that will hold new post objects that have been fetched | ||
let newPosts; | ||
|
||
// flag to know whether the wall has been initialized | ||
let initializedWall = false; | ||
|
||
// gets callback when pywebview Api object is ready to be used | ||
window.addEventListener('pywebviewready', function () { | ||
|
||
function fetchNewPosts() { | ||
/* Fetch posts, then initialize the wall if it hasn't been yet */ | ||
|
||
pywebview.api.get_posts().then(function (posts) { | ||
console.log("fetching new data") | ||
if (!initializedWall) { | ||
buildPostWall(posts); | ||
|
||
// start the autoscroller | ||
setTimeout(function(){setInterval(autoScroll, 50);}, 2000); | ||
|
||
// set flag true so we know next time | ||
initializedWall = true | ||
|
||
} else { // wall was initialized already | ||
// just update the newPosts list | ||
newPosts = posts; | ||
} | ||
|
||
curFetchIndex += 1; | ||
}); | ||
} | ||
|
||
// call fetch the first time | ||
fetchNewPosts(); | ||
|
||
// set an interval to call fetch every 7 minutes | ||
setInterval(fetchNewPosts, 7 * 60 * 1000); | ||
}) | ||
|
||
function inflatePostTemplate(postObj) { | ||
/* Takes an object represent the post to show and inflates | ||
* DOM elements and populates them with the post data. */ | ||
|
||
let $post = $template.cloneNode(true); | ||
$post.removeAttribute("id"); | ||
console.log($post); | ||
$post.setAttribute("data-fetch-index", curFetchIndex); | ||
$post.querySelector(".postAuthor").innerText = postObj["author"]; | ||
$post.querySelector(".postText").innerText = postObj["text"]; | ||
if(postObj.hasOwnProperty("image_file")){ | ||
//$post.querySelector(".postImg").src = "../../.data/imgs/" + postObj["image_file"]; | ||
$post.querySelector(".postImg").src = "imgs/" + postObj["image_file"]; | ||
}else{ | ||
$post.removeChild($post.querySelector(".postImg")); | ||
} | ||
|
||
$post.classList.remove("hidden"); | ||
return $post; | ||
} | ||
|
||
function buildPostWall(posts) { | ||
/* Takes an object with a list of posts in it, inflates DOM elements | ||
* for each post in the data and adds it to the wall. */ | ||
|
||
for (let i = 0; i < posts["posts"].length; i++) { | ||
let $post = inflatePostTemplate(posts["posts"][i]) | ||
$postWall.appendChild($post); | ||
} | ||
} | ||
|
||
// gets callback any time a scroll event occurs | ||
window.addEventListener('scroll', function () { | ||
// if scroll is past the boundary line | ||
if (window.scrollY > 1000) { | ||
// get the first post element from the top of the wall | ||
let $firstPost = $postWall.firstElementChild | ||
// remove it from the wall | ||
$postWall.removeChild($firstPost); | ||
|
||
// if there are no new posts currently | ||
if (newPosts === undefined || newPosts["posts"].length === 0) { | ||
// add the first post back to the wall at the bottom | ||
$postWall.appendChild($firstPost); | ||
|
||
} else { // there are new posts to start showing | ||
|
||
// inflate the first new post | ||
$newPost = inflatePostTemplate(newPosts["posts"].shift()); | ||
// add it to the post wall | ||
$postWall.appendChild($newPost); | ||
|
||
// if the post we removed from the top is still current | ||
if ($firstPost.getAttribute("data-fetch-index") === curFetchIndex) { | ||
// add it back in at the bottom | ||
$postWall.appendChild($firstPost); | ||
} | ||
} | ||
} | ||
}); | ||
|
||
function autoScroll() { | ||
/* Function to be called frequently to automatically scroll the page. | ||
* Also calls check_quit() to allow python to handle KeyboardInterrupt */ | ||
pywebview.api.check_quit(); | ||
window.scrollBy(0, 2); | ||
} |