-
Notifications
You must be signed in to change notification settings - Fork 0
/
live.rs
65 lines (51 loc) · 1.86 KB
/
live.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
use std::io::BufReader;
use std::sync::Arc;
use atom_syndication::Feed;
use futures::channel::mpsc;
use futures::sink::SinkExt;
use futures::stream::{self, StreamExt};
use crate::{chunk_info_to_meili, retrieve_crate_toml, CrateInfo};
use color_eyre::Result;
use meilisearch_sdk::Client;
#[tracing::instrument]
pub async fn live(client: Arc<Client>) -> Result<()> {
let (infos_sender, infos_receiver) = mpsc::channel(10_000);
let (cinfos_sender, cinfos_receiver) = mpsc::channel(10_000);
let retrieve_handler = tokio::spawn(crates_infos(infos_sender));
let publish_handler = tokio::spawn(chunk_info_to_meili(client, cinfos_receiver));
StreamExt::zip(infos_receiver, stream::repeat(cinfos_sender))
.for_each_concurrent(Some(8), |(info, mut sender)| async move {
match retrieve_crate_toml(&info).await {
Ok(cinfo) => sender.send(cinfo).await.unwrap(),
Err(e) => tracing::info!("{:?} {}", info, e),
}
})
.await;
retrieve_handler.await??;
publish_handler.await??;
Ok(())
}
#[tracing::instrument(skip_all)]
async fn crates_infos(mut sender: mpsc::Sender<CrateInfo>) -> Result<()> {
let body = reqwest::get("https://docs.rs/releases/feed")
.await?
.bytes()
.await?;
let feed = Feed::read_from(BufReader::new(&body[..])).unwrap();
for entry in feed.entries() {
// urn:docs-rs:hello_exercism:0.2.8
let name = match entry.id().split(':').nth(2) {
Some(name) => name.to_string(),
None => continue,
};
let vers = match entry.id().split(':').nth(3) {
Some(vers) => vers.to_string(),
None => continue,
};
let info = CrateInfo { name, vers };
if let Err(e) = sender.send(info).await {
tracing::info!("{}", e);
}
}
Ok(())
}