Skip to content

Commit

Permalink
Modify the phase processing function of handler to asynchronous mode …
Browse files Browse the repository at this point in the history
…to enhance performance. (#95)
  • Loading branch information
wa5i authored Dec 30, 2024
1 parent da9c8ac commit 46879fe
Show file tree
Hide file tree
Showing 18 changed files with 722 additions and 718 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ ctor = "0.2.8"
better_default = "1.0.5"
prometheus-client = "0.22.3"
sysinfo = "0.31.4"
async-trait = "0.1"

# optional dependencies
openssl = { version = "0.10.64", optional = true }
Expand Down
10 changes: 5 additions & 5 deletions src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ impl Core {
Ok(())
}

pub fn handle_request(&self, req: &mut Request) -> Result<Option<Response>, RvError> {
pub async fn handle_request(&self, req: &mut Request) -> Result<Option<Response>, RvError> {
let mut resp = None;
let mut err: Option<RvError> = None;
let handlers = self.handlers.read()?;
Expand All @@ -418,7 +418,7 @@ impl Core {
}

for handler in handlers.iter() {
match handler.pre_route(req) {
match handler.pre_route(req).await {
Ok(res) => {
if res.is_some() {
resp = res;
Expand All @@ -436,7 +436,7 @@ impl Core {

if resp.is_none() && err.is_none() {
for handler in handlers.iter() {
match handler.route(req) {
match handler.route(req).await {
Ok(res) => {
if res.is_some() {
resp = res;
Expand All @@ -454,7 +454,7 @@ impl Core {

if err.is_none() {
for handler in handlers.iter() {
match handler.post_route(req, &mut resp) {
match handler.post_route(req, &mut resp).await {
Ok(_) => {}
Err(error) => {
if error != RvError::ErrHandlerDefault {
Expand All @@ -468,7 +468,7 @@ impl Core {
}

for handler in handlers.iter() {
match handler.log(req, &resp) {
match handler.log(req, &resp).await {
Ok(_) => {}
Err(error) => {
if error != RvError::ErrHandlerDefault {
Expand Down
6 changes: 6 additions & 0 deletions src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,12 @@ pub enum RvError {
#[error("Some rustls_pemfile error happened")]
RustlsPemFileError(rustls_pemfile::Error),

#[error("Some tokio task error happened")]
TokioTaskJoinError {
#[from]
source: tokio::task::JoinError,
},

#[error("Some string utf8 error happened, {:?}", .source)]
StringUtf8Error {
#[from]
Expand Down
16 changes: 10 additions & 6 deletions src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,45 +8,49 @@
use std::sync::{Arc, RwLock};

use async_trait::async_trait;

use crate::{
core::Core,
cli::config::Config,
errors::RvError,
logical::{request::Request, response::Response, Auth},
};

#[async_trait]
pub trait Handler: Send + Sync {
fn name(&self) -> String;

fn post_config(&self, _core: Arc<RwLock<Core>>, _config: Option<&Config>) -> Result<(), RvError> {
Err(RvError::ErrHandlerDefault)
}

fn pre_route(&self, _req: &mut Request) -> Result<Option<Response>, RvError> {
async fn pre_route(&self, _req: &mut Request) -> Result<Option<Response>, RvError> {
Err(RvError::ErrHandlerDefault)
}

fn route(&self, _req: &mut Request) -> Result<Option<Response>, RvError> {
async fn route(&self, _req: &mut Request) -> Result<Option<Response>, RvError> {
Err(RvError::ErrHandlerDefault)
}

fn post_route(&self, _req: &mut Request, _resp: &mut Option<Response>) -> Result<(), RvError> {
async fn post_route(&self, _req: &mut Request, _resp: &mut Option<Response>) -> Result<(), RvError> {
Err(RvError::ErrHandlerDefault)
}

fn log(&self, _req: &Request, _resp: &Option<Response>) -> Result<(), RvError> {
async fn log(&self, _req: &Request, _resp: &Option<Response>) -> Result<(), RvError> {
Err(RvError::ErrHandlerDefault)
}
}

#[async_trait]
pub trait AuthHandler: Send + Sync {
fn name(&self) -> String;

fn pre_auth(&self, _req: &mut Request) -> Result<Option<Auth>, RvError> {
async fn pre_auth(&self, _req: &mut Request) -> Result<Option<Auth>, RvError> {
Err(RvError::ErrHandlerDefault)
}

fn post_auth(&self, _req: &mut Request) -> Result<(), RvError> {
async fn post_auth(&self, _req: &mut Request) -> Result<(), RvError> {
Err(RvError::ErrHandlerDefault)
}
}
2 changes: 1 addition & 1 deletion src/http/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ async fn logical_request_handler(
}
}

match core.read()?.handle_request(&mut r)? {
match core.read()?.handle_request(&mut r).await? {
Some(resp) => response_logical(&resp, &r.path),
None => {
if matches!(r.operation, Operation::Read | Operation::List) {
Expand Down
4 changes: 2 additions & 2 deletions src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,9 @@ pub fn response_json_ok<T: Serialize>(cookie: Option<Cookie>, body: T) -> HttpRe
response_json(StatusCode::OK, cookie, body)
}

pub fn handle_request(core: web::Data<Arc<RwLock<Core>>>, req: &mut Request) -> Result<HttpResponse, RvError> {
pub async fn handle_request(core: web::Data<Arc<RwLock<Core>>>, req: &mut Request) -> Result<HttpResponse, RvError> {
let core = core.read()?;
let resp = core.handle_request(req)?;
let resp = core.handle_request(req).await?;
if resp.is_none() {
Ok(response_ok(None, None))
} else {
Expand Down
14 changes: 7 additions & 7 deletions src/http/sys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ async fn sys_list_mounts_request_handler(
r.path = "sys/mounts".to_string();
r.operation = Operation::Read;

handle_request(core, &mut r)
handle_request(core, &mut r).await
}

async fn sys_mount_request_handler(
Expand All @@ -172,7 +172,7 @@ async fn sys_mount_request_handler(
r.operation = Operation::Write;
r.body = Some(payload);

handle_request(core, &mut r)
handle_request(core, &mut r).await
}

async fn sys_unmount_request_handler(
Expand All @@ -189,7 +189,7 @@ async fn sys_unmount_request_handler(
r.path = "sys/mounts/".to_owned() + mount_path.as_str();
r.operation = Operation::Delete;

handle_request(core, &mut r)
handle_request(core, &mut r).await
}

async fn sys_remount_request_handler(
Expand All @@ -205,7 +205,7 @@ async fn sys_remount_request_handler(
r.operation = Operation::Write;
r.body = Some(payload);

handle_request(core, &mut r)
handle_request(core, &mut r).await
}

async fn sys_list_auth_mounts_request_handler(
Expand All @@ -216,7 +216,7 @@ async fn sys_list_auth_mounts_request_handler(
r.path = "sys/auth".to_string();
r.operation = Operation::Read;

handle_request(core, &mut r)
handle_request(core, &mut r).await
}

async fn sys_auth_enable_request_handler(
Expand All @@ -238,7 +238,7 @@ async fn sys_auth_enable_request_handler(
r.operation = Operation::Write;
r.body = Some(payload);

handle_request(core, &mut r)
handle_request(core, &mut r).await
}

async fn sys_auth_disable_request_handler(
Expand All @@ -255,7 +255,7 @@ async fn sys_auth_disable_request_handler(
r.path = "sys/auth/".to_owned() + mount_path.as_str();
r.operation = Operation::Delete;

handle_request(core, &mut r)
handle_request(core, &mut r).await
}

pub fn init_sys_service(cfg: &mut web::ServiceConfig) {
Expand Down
12 changes: 12 additions & 0 deletions src/logical/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,4 +201,16 @@ impl Request {
pub fn add_task(&mut self, task: JoinHandle<()>) {
self.tasks.push(task);
}

pub fn clear_task(&mut self) {
self.tasks.clear();
}

pub async fn wait_task_finish(&mut self) -> Result<(), RvError> {
for task in &mut self.tasks {
task.await?;
}

Ok(())
}
}
6 changes: 3 additions & 3 deletions src/modules/auth/expiration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ impl ExpirationManagerInner {
}

let mut req = Request::new_revoke_request(&le.path, secret, data);
let ret = self.router.as_ref().unwrap().as_handler().route(&mut req);
let ret = self.router.as_ref().unwrap().handle_request(&mut req);
if ret.is_err() {
log::error!("failed to revoke entry: {:?}, err: {}", le, ret.unwrap_err());
}
Expand Down Expand Up @@ -563,7 +563,7 @@ impl ExpirationManagerInner {
}

let mut req = Request::new_renew_request(&le.path, secret, data);
let ret = self.router.as_ref().unwrap().as_handler().route(&mut req);
let ret = self.router.as_ref().unwrap().handle_request(&mut req);
if ret.is_err() {
log::error!("failed to renew entry: {}", ret.as_ref().unwrap_err());
}
Expand All @@ -586,7 +586,7 @@ impl ExpirationManagerInner {
}

let mut req = Request::new_renew_auth_request(&le.path, auth, None);
let ret = self.router.as_ref().unwrap().as_handler().route(&mut req);
let ret = self.router.as_ref().unwrap().handle_request(&mut req);
if ret.is_err() {
log::error!("failed to renew_auth entry: {}", ret.as_ref().unwrap_err());
}
Expand Down
16 changes: 11 additions & 5 deletions src/modules/auth/token_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::{
time::Duration,
};

use async_trait::async_trait;
use derive_more::Deref;
use humantime::parse_duration;
use lazy_static::lazy_static;
Expand Down Expand Up @@ -592,22 +593,27 @@ impl TokenStoreInner {
}
}

#[async_trait]
impl Handler for TokenStore {
fn name(&self) -> String {
"auth_token".to_string()
}

fn pre_route(&self, req: &mut Request) -> Result<Option<Response>, RvError> {
async fn pre_route(&self, req: &mut Request) -> Result<Option<Response>, RvError> {
let is_unauth_path = self.router.is_unauth_path(&req.path)?;
if is_unauth_path {
return Ok(None);
}

let mut auth: Option<Auth> = None;

let auth_handlers = self.auth_handlers.read()?;
let auth_handlers: Vec<_> = {
let handlers = self.auth_handlers.read()?;
handlers.iter().cloned().collect::<Vec<_>>()
};

for auth_handler in auth_handlers.iter() {
if let Some(ret) = auth_handler.pre_auth(req)? {
if let Some(ret) = auth_handler.pre_auth(req).await? {
auth = Some(ret);
break;
}
Expand All @@ -625,13 +631,13 @@ impl Handler for TokenStore {
req.auth = auth;

for auth_handler in auth_handlers.iter() {
auth_handler.post_auth(req)?;
auth_handler.post_auth(req).await?;
}

Ok(None)
}

fn post_route(&self, req: &mut Request, resp: &mut Option<Response>) -> Result<(), RvError> {
async fn post_route(&self, req: &mut Request, resp: &mut Option<Response>) -> Result<(), RvError> {
if resp.is_none() {
return Ok(());
}
Expand Down
Loading

0 comments on commit 46879fe

Please sign in to comment.