-
Notifications
You must be signed in to change notification settings - Fork 0
/
init.rs
156 lines (127 loc) · 4.53 KB
/
init.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
use std::path::Path;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use crate::config::CONFIG;
use color_eyre::eyre::Result;
use futures::channel::mpsc;
use futures::sink::SinkExt;
use futures::stream::{self, StreamExt};
use meilisearch_sdk::{Client, Settings};
use tokio::fs;
use tokio::io::{AsyncBufReadExt, BufReader};
use walkdir::WalkDir;
use crate::{chunk_info_to_meili, retrieve_crate_toml, CrateInfo};
pub async fn init(client: Arc<Client>) -> Result<()> {
let (infos_sender, infos_receiver) = mpsc::channel(10_000);
let (cinfos_sender, cinfos_receiver) = mpsc::channel(10_000);
init_index(&client).await?;
init_settings(&client).await?;
let retrieve_handler = tokio::spawn(crates_infos(infos_sender, "crates.io-index/"));
let publish_handler = tokio::spawn(chunk_info_to_meili(client.clone(), cinfos_receiver));
let retrieve_toml = StreamExt::zip(infos_receiver, stream::repeat(cinfos_sender))
.for_each_concurrent(Some(250), |(info, mut sender)| async move {
let _ = tokio::spawn(async move {
match retrieve_crate_toml(&info).await {
Ok(cinfo) => match sender.send(cinfo).await {
Ok(_) => (),
Err(e) => tracing::error!("failed to send crate info: {:#?}", e),
},
Err(e) => tracing::error!("{:?} {}", info, e),
}
})
.await;
});
retrieve_toml.await;
retrieve_handler.await??;
publish_handler.await??;
Ok(())
}
#[tracing::instrument]
async fn process_file(entry: walkdir::DirEntry) -> Result<Option<CrateInfo>> {
if entry.file_type().is_file() {
let file = fs::File::open(entry.path()).await?;
let file = BufReader::new(file);
let mut lines = file.lines();
let mut last = None;
while let Some(line) = lines.next_line().await? {
last = Some(line);
}
let last_line = match last {
Some(line) => line,
None => return Ok(None),
};
let info: CrateInfo = match serde_json::from_str(&last_line) {
Ok(info) => info,
Err(_) => return Ok(None),
};
return Ok(Some(info));
}
Ok(None)
}
#[tracing::instrument(skip_all)]
async fn crates_infos<P: AsRef<Path>>(
mut sender: mpsc::Sender<CrateInfo>,
crates_io_index: P,
) -> Result<()> {
let walkdir = WalkDir::new(crates_io_index).contents_first(true);
let count = Arc::new(AtomicUsize::new(0));
for result in walkdir {
let entry = match result {
Ok(entry) => entry,
Err(e) => {
tracing::error!("{}", e);
continue;
}
};
let count = count.clone();
let mut sender = sender.clone();
tokio::spawn(async move {
match process_file(entry).await {
Ok(Some(info)) => {
let i = count.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
tracing::debug!(
name = info.name,
version = info.vers,
count = i + 1,
"sending crate"
);
if let Err(e) = sender.send(info).await {
tracing::error!("failed to send crate info: {:#?}", e);
}
}
Ok(None) => (),
Err(e) => tracing::warn!("error when processing file: {}", e),
}
});
}
sender.flush().await?;
Ok(())
}
#[tracing::instrument(skip_all)]
async fn init_index(client: &Client) -> Result<()> {
let task = client
.create_index(CONFIG.meili_index_uid.clone(), Some("name"))
.await?;
let res = client.wait_for_task(task, None, None).await?;
tracing::info!("{res:#?}");
Ok(())
}
#[tracing::instrument(skip_all)]
async fn init_settings(client: &Client) -> Result<()> {
let index = client.index(CONFIG.meili_index_uid.clone());
let settings = Settings {
searchable_attributes: Some(vec![
"name".to_string(),
"description".to_string(),
"keywords".to_string(),
"categories".to_string(),
"readme".to_string(),
]),
sortable_attributes: Some(vec!["downloads".to_string()]),
..Default::default()
};
let task = index.set_settings(&settings).await?;
let res = client.wait_for_task(task, None, None).await?;
tracing::info!("{res:#?}");
Ok(())
}