diff --git a/fuso-api/src/core.rs b/fuso-api/src/core.rs index 17b9ff1..9126c58 100644 --- a/fuso-api/src/core.rs +++ b/fuso-api/src/core.rs @@ -349,6 +349,14 @@ impl Rollback> { Ok(()) } } + + #[inline] + pub async fn release(&self) -> Result<()> { + *self.rollback.write().unwrap() = false; + self.store.lock().unwrap().clear(); + + Ok(()) + } } impl Rollback> { diff --git a/fuso-api/src/stream.rs b/fuso-api/src/stream.rs index 51dd7fe..cec0ced 100644 --- a/fuso-api/src/stream.rs +++ b/fuso-api/src/stream.rs @@ -1,4 +1,4 @@ -use std::{net::SocketAddr, pin::Pin, sync::Arc}; +use std::{net::SocketAddr, pin::Pin}; use futures::{AsyncRead, AsyncWrite}; use smol::net::TcpStream; @@ -6,113 +6,114 @@ use smol::net::TcpStream; use crate::{Buffer, Rollback, RollbackEx, UdpStream}; #[derive(Debug, Clone)] -pub struct FusoStream { - inner: Arc>>>, +pub struct SafeStream { + inner: Rollback>, } -pub trait FusoStreamEx { - fn as_fuso_stream(self) -> FusoStream; + +pub trait SafeStreamEx { + fn as_safe_stream(self) -> SafeStream; } -impl FusoStreamEx for TcpStream { +impl SafeStreamEx for TcpStream { #[inline] - fn as_fuso_stream(self) -> FusoStream { - FusoStream { - inner: Arc::new(std::sync::Mutex::new(self.roll())), - } + fn as_safe_stream(self) -> SafeStream { + SafeStream { inner: self.roll() } } } -impl FusoStreamEx for UdpStream { +impl SafeStreamEx for UdpStream { #[inline] - fn as_fuso_stream(self) -> FusoStream { - FusoStream { - inner: Arc::new(std::sync::Mutex::new(self.roll())), - } + fn as_safe_stream(self) -> SafeStream { + SafeStream { inner: self.roll() } } } -impl FusoStream { +impl SafeStream { #[inline] pub fn local_addr(&self) -> std::io::Result { - self.inner.lock().unwrap().local_addr() + self.inner.local_addr() } #[inline] pub fn peer_addr(&self) -> std::io::Result { - self.inner.lock().unwrap().peer_addr() + self.inner.peer_addr() } } -impl FusoStream { +impl SafeStream { #[inline] pub fn local_addr(&self) -> std::io::Result { - self.inner.lock().unwrap().local_addr() + self.inner.local_addr() } #[inline] pub fn peer_addr(&self) -> std::io::Result { - self.inner.lock().unwrap().peer_addr() + self.inner.peer_addr() } } -impl FusoStream { +impl SafeStream +where + Inner: Send + Sync + 'static, +{ #[inline] pub async fn begin(&self) -> crate::Result<()> { - self.inner.lock().unwrap().begin().await + self.inner.begin().await } #[inline] pub async fn back(&self) -> crate::Result<()> { - self.inner.lock().unwrap().back().await + self.inner.back().await + } + + #[inline] + pub async fn release(&self) -> crate::Result<()>{ + self.inner.release().await } + } -impl AsyncRead for FusoStream +impl AsyncRead for SafeStream where - Inner: AsyncRead + Unpin + Send + Sync + 'static, + Inner: Clone + AsyncRead + Unpin + Send + Sync + 'static, { #[inline] fn poll_read( - self: std::pin::Pin<&mut Self>, + mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, buf: &mut [u8], ) -> std::task::Poll> { - let mut io = self.inner.lock().unwrap(); - Pin::new(&mut *io).poll_read(cx, buf) + Pin::new(&mut self.inner).poll_read(cx, buf) } } - -impl AsyncWrite for FusoStream +impl AsyncWrite for SafeStream where - Inner: AsyncWrite + Unpin + Send + Sync + 'static, + Inner: Clone + AsyncWrite + Unpin + Send + Sync + 'static, { #[inline] fn poll_write( - self: std::pin::Pin<&mut Self>, + mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, buf: &[u8], ) -> std::task::Poll> { - let mut io = self.inner.lock().unwrap(); - Pin::new(&mut *io).poll_write(cx, buf) + Pin::new(&mut self.inner).poll_write(cx, buf) } #[inline] fn poll_flush( - self: std::pin::Pin<&mut Self>, + mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { - let mut io = self.inner.lock().unwrap(); - Pin::new(&mut *io).poll_flush(cx) + Pin::new(&mut self.inner).poll_flush(cx) } #[inline] fn poll_close( - self: std::pin::Pin<&mut Self>, + mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { - let mut io = self.inner.lock().unwrap(); - Pin::new(&mut *io).poll_close(cx) + Pin::new(&mut self.inner).poll_close(cx) } } diff --git a/fuso-core/src/client.rs b/fuso-core/src/client.rs index 97c2c4a..7705ae4 100644 --- a/fuso-core/src/client.rs +++ b/fuso-core/src/client.rs @@ -1,6 +1,11 @@ -use std::net::SocketAddr; +use std::{ + net::{IpAddr, Ipv4Addr, SocketAddr}, + sync::Arc, +}; -use fuso_api::{async_trait, Error, Forward, FusoListener, FusoPacket, Result, Spwan}; +use fuso_api::{ + async_trait, AsyncTcpSocketEx, Error, Forward, FusoListener, FusoPacket, Result, Spwan, +}; use futures::AsyncWriteExt; use smol::{ @@ -23,21 +28,37 @@ pub struct Fuso { accept_ax: Receiver, } +pub struct Config { + // 名称, 可选的 + pub name: Option, + // 服务端地址 + pub server_addr: SocketAddr, + // 自定义服务绑定的端口, 如果不指定默认为随机分配 + pub server_bind_port: u16, + // 桥接监听的地址 + pub bridge_addr: Option, +} + impl Fuso { - pub async fn bind(addr: SocketAddr, bind_port: u16, bridge_bind: u16) -> Result { - // let stream = addr.tcp_connect().await?; + pub async fn bind(config: Config) -> Result { + let cfg = Arc::new(config); + let server_addr = cfg.server_addr.clone(); + let server_bind_port = cfg.server_bind_port.clone(); + let bridge_addr = cfg.bridge_addr.clone(); + let name = cfg.name.clone(); - let mut stream = TcpStream::connect(addr) - .await - .map_err(|e| Error::with_io(e))?; + let mut stream = server_addr.tcp_connect().await?; stream .send( - Action::Bind({ - if bind_port == 0 { + Action::Bind(name, { + if cfg.server_bind_port == 0 { None } else { - Some(format!("0.0.0.0:{}", bind_port).parse().unwrap()) + Some(SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), + server_bind_port, + )) } }) .into(), @@ -54,13 +75,13 @@ impl Fuso { let mut stream = stream.guard(5000).await?; async move { - if bridge_bind == 0 { + if bridge_addr.is_none() { return; } - match Bridge::bind(format!("0.0.0.0:{}", bridge_bind).parse().unwrap(), addr) - .await - { + let bridge_addr = bridge_addr.unwrap(); + + match Bridge::bind(bridge_addr, server_addr).await { Ok(mut bridge) => { log::info!("[bridge] Bridge service opened successfully"); loop { @@ -99,7 +120,14 @@ impl Fuso { Ok(Action::Ping) => {} Ok(action) => { - match accept_tx.send(Reactor { conv, action, addr }).await { + match accept_tx + .send(Reactor { + conv, + action, + addr: server_addr, + }) + .await + { Err(_) => { let _ = stream.close().await; break; diff --git a/fuso-core/src/core.rs b/fuso-core/src/core.rs index 9ae761e..71dceb5 100644 --- a/fuso-core/src/core.rs +++ b/fuso-core/src/core.rs @@ -13,7 +13,10 @@ use smol::{ net::TcpStream, }; -use fuso_api::{AsyncTcpSocketEx, FusoPacket, Result, Spwan}; +#[allow(unused)] +use crate::ciphe::{Security, Xor}; + +use fuso_api::{AsyncTcpSocketEx, FusoPacket, Result, SafeStreamEx, Spwan}; use crate::retain::Heartbeat; use crate::{dispatch::DynHandler, packet::Action}; @@ -41,10 +44,10 @@ pub struct FusoStream { pub struct Channel { pub conv: u64, - pub core: HeartGuard, + pub core: HeartGuard>, pub config: Arc, pub strategys: Arc, Action>>>>>, - pub wait_queue: Arc>>, + pub wait_queue: Arc>>>, } #[derive(Clone)] @@ -53,8 +56,8 @@ pub struct Context { pub config: Arc, pub handlers: Arc, ()>>>>>, pub strategys: Arc, Action>>>>>, - pub sessions: Arc>>>, - pub accept_ax: Sender>, + pub sessions: Arc>>>>, + pub accept_ax: Sender>>, } pub struct Fuso { @@ -110,7 +113,9 @@ impl FusoBuilder> { self } - pub async fn build(self) -> fuso_api::Result>> { + pub async fn build( + self, + ) -> fuso_api::Result>>> { let config = Arc::new(self.config.unwrap()); let (accept_ax, accept_tx) = unbounded(); @@ -132,34 +137,33 @@ impl FusoBuilder> { async move { log::info!("Service started successfully"); - log::info!("Bound to {}", bind_addr); + log::info!("Bind to {}", bind_addr); log::info!("Waiting to connect"); let _ = listen .incoming() - .try_fold(cx, |cx, mut stream| async move { - log::debug!("[tcp] accept {}", stream.local_addr().unwrap()); + .try_fold(cx, |cx, mut tcp| async move { + log::debug!("[tcp] accept {}", tcp.local_addr().unwrap()); { let handlers = cx.handlers.clone(); let cx = cx.clone(); async move { - match stream.clone().dispatch(handlers, cx).await { - Ok(_) => { - log::debug!( - "[tcp] Successfully processed {}", - stream.local_addr().unwrap() - ); - } - Err(_) => { - let _ = stream.close().await; - - log::warn!( - "[tcp] An illegal connection {}", - stream.peer_addr().unwrap() - ); - } - }; + let process = tcp.clone().dispatch(handlers, cx).await; + + if process.is_err() { + let _ = tcp.close().await; + + log::warn!( + "[tcp] An illegal connection {}", + tcp.peer_addr().unwrap() + ); + } else { + log::debug!( + "[tcp] Successfully processed {}", + tcp.local_addr().unwrap() + ); + } } .detach(); } @@ -204,7 +208,7 @@ where impl Channel { #[inline] - pub async fn try_wake(&self) -> Result { + pub async fn try_wake(&self) -> Result> { match self.wait_queue.lock().await.pop_front() { Some(tcp) => Ok(tcp), None => Err("No task operation required".into()), @@ -212,7 +216,7 @@ impl Channel { } #[inline] - pub async fn suspend(&self, tcp: TcpStream) -> Result<()> { + pub async fn suspend(&self, tcp: fuso_api::SafeStream) -> Result<()> { self.wait_queue.lock().await.push_back(tcp); Ok(()) } @@ -220,7 +224,7 @@ impl Channel { impl Context { #[inline] - pub async fn fork(&self) -> (u64, Receiver) { + pub async fn fork(&self) -> (u64, Receiver>) { let (accept_tx, accept_ax) = unbounded(); let mut sessions = self.sessions.write().await; @@ -245,7 +249,7 @@ impl Context { } #[inline] - pub async fn route(&self, conv: u64, tcp: TcpStream) -> Result<()> { + pub async fn route(&self, conv: u64, tcp: fuso_api::SafeStream) -> Result<()> { let sessions = self.sessions.read().await; if let Some(accept_tx) = sessions.get(&conv) { @@ -259,7 +263,12 @@ impl Context { } } - pub async fn spwan(&self, tcp: TcpStream, addr: Option) -> Result { + pub async fn spwan( + &self, + tcp: fuso_api::SafeStream, + addr: Option, + name: Option, + ) -> Result { let (conv, accept_ax) = self.fork().await; let accept_tx = self.accept_ax.clone(); let clinet_addr = tcp.local_addr().unwrap(); @@ -278,8 +287,11 @@ impl Context { wait_queue: Arc::new(Mutex::new(VecDeque::new())), }); + let name = name.unwrap_or("anonymous".to_string()); + log::info!( - "New mapping {} -> {}", + "New mapping [{}] {} -> {}", + name, listen.local_addr().unwrap(), clinet_addr ); @@ -289,6 +301,7 @@ impl Context { let channel = channel.clone(); async move { loop { + // 接收客户端发来的连接 let from = accept_ax.recv().await; if from.is_err() { @@ -297,80 +310,98 @@ impl Context { } let mut from = from.unwrap(); - match channel.try_wake().await { - Ok(to) => { - if let Err(e) = accept_tx.send(FusoStream { from, to }).await { - log::error!("An unavoidable error occurred {}", &e); - break; - } - } - Err(e) => { - let _ = from.close().await; - log::warn!("{}", e); - } + + let to = channel.try_wake().await; + + if to.is_err() { + let _ = from.close().await; + log::warn!("{}", to.unwrap_err()); + continue; + } + + let to = to.unwrap(); + + log::info!( + "{} -> {}", + from.peer_addr().unwrap(), + to.peer_addr().unwrap() + ); + + // 成功建立映射, 发给下一端处理 + let err = accept_tx.send(FusoStream::new(from, to)).await; + + if err.is_err() { + log::error!("An unavoidable error occurred {}", err.unwrap_err()); + break; } } } }; let fuso_future = { - let mut io = channel.core.clone(); + let mut core = channel.core.clone(); + // 服务端与客户端加密, + // let mut core = core.ciphe(Xor::new(10)).await; + async move { loop { - match io.recv().await { - Ok(packet) => { - log::trace!("Client message received {:?}", packet); - } - Err(_) => { - log::warn!("Client session was aborted"); - break; - } + // 接收客户端发来的包 + let packet = core.recv().await; + + if packet.is_err() { + log::warn!("Client session was aborted {}", name); + break; } + + // 暂时不处理该包, 以后可能会用到 + let ignore_packet = packet.unwrap(); + + log::trace!("Client message received {:?}", ignore_packet); } } }; - let future_listen = { - async move { - let _ = listen - .incoming() - .try_fold(channel, |channel, mut tcp| async move { - { - log::debug!("connected {}", tcp.peer_addr().unwrap()); - - let channel = channel.clone(); - async move { - let mut core = channel.core.clone(); - - let action = { - tcp.clone() - .select(channel.strategys.clone(), channel.clone()) - .await - }; - - match action { - Ok(action) => { - log::debug!("action {:?}", action); - let _ = core.send(action.into()).await; - let _ = channel.suspend(tcp).await; - } - Err(e) => { - let _ = tcp.close().await; - log::warn!( - "Unable to process connection {} {}", - e, - tcp.peer_addr().unwrap() - ); - } - } + let future_listen = async move { + let _ = listen + .incoming() + .try_fold(channel, |channel, tcp| async move { + { + log::debug!("connected {}", tcp.peer_addr().unwrap()); + let channel = channel.clone(); + async move { + let mut tcp = tcp.as_safe_stream(); + let mut core = channel.core.clone(); + + let action = { + let strategys = channel.strategys.clone(); + let tcp = tcp.clone(); + // 选择一个合适的策略 + tcp.select(strategys, channel.clone()).await + }; + + if action.is_err() { + let _ = tcp.close().await; + log::warn!( + "Unable to process connection {} {}", + action.unwrap_err(), + tcp.peer_addr().unwrap() + ); + } else { + let action = action.unwrap(); + log::debug!("action {:?}", action); + // 通知客户端需执行的方法 + let _ = core.send(action.into()).await; + // 暂时休眠当前这个连接, 该连接可能会超时, + // 并且连接数达到一定数量时可能导致连接积累过多导致无法在建立连接,也就是fd用尽 + let _ = channel.suspend(tcp).await; } - .detach(); } + .detach(); + } - Ok(channel) - }) - .await; - } + Ok(channel) + }) + .await; }; accept_future.race(fuso_future.race(future_listen)).await @@ -380,3 +411,15 @@ impl Context { Ok(conv) } } + +impl FusoStream { + #[inline] + pub fn new(from: T, to: T) -> Self { + Self { from, to } + } + + #[inline] + pub fn split(self) -> (T, T) { + (self.from, self.to) + } +} diff --git a/fuso-core/src/dispatch.rs b/fuso-core/src/dispatch.rs index 0051cbb..de51fd6 100644 --- a/fuso-core/src/dispatch.rs +++ b/fuso-core/src/dispatch.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use async_trait::async_trait; -use fuso_api::{Buffer, Rollback, RollbackEx}; +use fuso_api::SafeStreamEx; use futures::AsyncWriteExt; use smol::net::TcpStream; @@ -28,7 +28,8 @@ pub trait StrategyEx { async fn select(self, strategys: S, cx: C) -> fuso_api::Result; } -pub type TcpStreamRollback = Rollback>; +// pub type TcpStreamRollback = Rollback>; +pub type TcpStreamRollback = fuso_api::SafeStream; pub type DynHandler = dyn Handler + Send + Sync + 'static; @@ -43,17 +44,16 @@ where handlers: Arc>>>>, cx: C, ) -> fuso_api::Result<()> { - let mut io = self.roll(); - - io.begin().await?; + let mut io = self.as_safe_stream(); for handle in handlers.iter() { let handle = handle.clone(); match handle.dispose(io.clone(), cx.clone()).await? { - State::Accept(()) => return Ok(()), + State::Accept(()) => { + return Ok(()); + } State::Next => { - io.back().await?; log::debug!("[dispatch] Next handler, rollback"); } } @@ -70,9 +70,9 @@ where } } - #[async_trait] -impl StrategyEx>>>>, C> for TcpStream +impl StrategyEx>>>>, C> + for fuso_api::SafeStream where C: Clone + Send + Sync + 'static, { @@ -82,7 +82,8 @@ where strategys: Arc>>>>, cx: C, ) -> fuso_api::Result { - let mut io = self.roll(); + // let mut io = self.roll(); + let mut io = self; io.begin().await?; @@ -90,7 +91,9 @@ where let handle = handle.clone(); match handle.dispose(io.clone(), cx.clone()).await? { - State::Accept(action) => return Ok(action), + State::Accept(action) => { + return Ok(action); + } State::Next => { io.back().await?; log::debug!("[dispatch] Next handler, rollback"); diff --git a/fuso-core/src/handler.rs b/fuso-core/src/handler.rs index 1035b82..5af8755 100644 --- a/fuso-core/src/handler.rs +++ b/fuso-core/src/handler.rs @@ -50,7 +50,6 @@ where } } - #[test] fn test_hander() { use crate::dispatch::Handler; diff --git a/fuso-core/src/lib.rs b/fuso-core/src/lib.rs index 9ccb2fe..2b32ec5 100644 --- a/fuso-core/src/lib.rs +++ b/fuso-core/src/lib.rs @@ -1,3 +1,4 @@ +pub mod bridge; pub mod ciphe; pub mod client; pub mod cmd; @@ -7,7 +8,6 @@ pub mod handler; pub mod packet; pub mod retain; pub mod server; -pub mod bridge; use std::sync::Arc; @@ -72,7 +72,10 @@ mod tests { fn test_packet() { init_logger(); - let action = Action::Bind(Some("127.0.0.1:8080".parse().unwrap())); + let action = Action::Bind( + Some("hello world".into()), + Some("127.0.0.1:80".parse().unwrap()), + ); let packet: Packet = action.into(); @@ -98,10 +101,17 @@ mod tests { .with_chain(|chain| { chain.next(|mut tcp, cx| async move { let action: Action = tcp.recv().await?.try_into()?; + match action { - Action::Bind(addr) => { - let conv = cx.spwan(tcp.into(), addr).await?; - log::debug!("[fuso] accept {}", conv); + Action::Bind(name, addr) => { + let client_addr = tcp.peer_addr().unwrap(); + let conv = cx.spwan(tcp, addr, name).await?; + log::debug!( + "[fuso] accept conv={}, addr={}, name={}", + conv, + client_addr, + conv + ); Ok(State::Accept(())) } Action::Connect(conv) => { diff --git a/fuso-core/src/packet.rs b/fuso-core/src/packet.rs index bc23e8e..bab97b4 100644 --- a/fuso-core/src/packet.rs +++ b/fuso-core/src/packet.rs @@ -23,7 +23,7 @@ pub enum Addr { #[derive(Debug, Clone, PartialEq, Eq)] pub enum Action { Ping, - Bind(Option), + Bind(Option, Option), Reset(Addr), Accept(u64), Forward(Addr), @@ -112,21 +112,42 @@ impl TryFrom for Action { fn try_from(mut packet: fuso_api::Packet) -> Result { match packet.get_cmd() { CMD_PING => Ok(Action::Ping), - CMD_BIND if packet.get_len().eq(&0) => Ok(Action::Bind(None)), - CMD_BIND => { - let addr: Addr = packet.get_data().as_ref().try_into()?; - Ok(Action::Bind(Some({ - match addr { - Addr::Socket(addr) => addr, - Addr::Domain(_, _) => { - return Err(fuso_api::Error::with_io(std::io::Error::new( - std::io::ErrorKind::Other, - "Does not support domain bind", - ))) - } + CMD_BIND if packet.get_len().eq(&0) => Ok(Action::Bind(None, None)), + CMD_BIND => Ok(Action::Bind( + { + let data = packet.get_mut_data(); + + let len = data.get_u8(); + + if len > 0 && data.len() >= len as usize { + let name = Some(String::from_utf8_lossy(&data[..len as usize]).to_string()); + data.advance(len as usize); + name + } else { + None } - }))) - } + }, + { + let data = packet.get_data(); + + if data.len() > 0 { + let addr: Addr = packet.get_data().as_ref().try_into()?; + Some({ + match addr { + Addr::Socket(addr) => addr, + Addr::Domain(_, _) => { + return Err(fuso_api::Error::with_io(std::io::Error::new( + std::io::ErrorKind::Other, + "Does not support domain bind", + ))) + } + } + }) + } else { + None + } + }, + )), CMD_ACCEPT if packet.get_len().ge(&8) => { Ok(Action::Accept(packet.get_mut_data().get_u64())) } @@ -149,8 +170,28 @@ impl From for fuso_api::Packet { fn from(packet: Action) -> Self { match packet { Action::Ping => Packet::new(CMD_PING, Bytes::new()), - Action::Bind(Some(addr)) => Packet::new(CMD_BIND, Addr::Socket(addr).to_bytes()), - Action::Bind(None) => Packet::new(CMD_BIND, Bytes::new()), + Action::Bind(Some(name), Some(addr)) => Packet::new(CMD_BIND, { + let mut buf = BytesMut::new(); + let name = name.as_bytes(); + buf.put_u8(name.len() as u8); + buf.put_slice(name); + buf.put_slice(&Addr::Socket(addr).to_bytes()); + buf.into() + }), + Action::Bind(Some(name), None) => Packet::new(CMD_BIND, { + let mut buf = BytesMut::new(); + let name = name.as_bytes(); + buf.put_u8(name.len() as u8); + buf.put_slice(name); + buf.into() + }), + Action::Bind(None, Some(addr)) => Packet::new(CMD_BIND, { + let mut buf = BytesMut::new(); + buf.put_u8(0); + buf.put_slice(&Addr::Socket(addr).to_bytes()); + buf.into() + }), + Action::Bind(None, None) => Packet::new(CMD_BIND, Bytes::new()), Action::Reset(addr) => Packet::new(CMD_CONNECT, addr.to_bytes()), Action::Accept(conv) => Packet::new(CMD_ACCEPT, { let mut buf = BytesMut::new(); diff --git a/src/client.rs b/src/client.rs index aaed7e8..39dee2e 100644 --- a/src/client.rs +++ b/src/client.rs @@ -115,37 +115,52 @@ fn main() { let app = App::new("fuso") .version("v1.0.3") .author("https://github.com/editso/fuso") - .arg(Arg::new("server-host").default_value("127.0.0.1")) - .arg(Arg::new("server-port").default_value("9003")) + .arg( + Arg::new("server-host") + .default_value("127.0.0.1") + .about("服务端监听的地址"), + ) + .arg( + Arg::new("server-port") + .default_value("9003") + .about("服务端监听的端口"), + ) .arg( Arg::new("forward-host") .short('h') .long("host") .default_value("127.0.0.1") + .display_order(1) + .about("转发地址, (如果开启了socks代理该参数将无效)"), ) .arg( Arg::new("forward-port") .short('p') .long("port") - .default_value("80"), + .default_value("80") + .display_order(2) + .about("转发的端口 (如果开启了socks代理该参数将无效)"), ) .arg( Arg::new("forward-type") .short('t') .long("type") .possible_values(["socks", "forward"]) - .default_value("forward"), + .default_value("forward") + .display_order(3) + .about("转发类型"), ) .arg( - Arg::new("service-bind") + Arg::new("service-bind-port") .short('b') .long("bind") .validator(|port| { port.parse::() .map_or(Err(format!("Invalid port {}", port)), |_| Ok(())) }) - .about("Specify the address to be monitored by the server, which will be randomly assigned by default") - .default_value("0"), + .display_order(5) + .takes_value(true) + .about("真实映射成功后访问的端口号, 不指定将自动分配"), ) .arg( Arg::new("xor-secret") @@ -155,17 +170,44 @@ fn main() { .validator(|num| { num.parse::() .map_or(Err(String::from("Invalid number 0-255")), |_| Ok(())) - }), + }) + .display_order(4) + .about( + "传输时使用异或加密的Key (Use exclusive OR encrypted key during transmission)", + ), + ) + .arg( + Arg::new("bridge-bind-host") + .long("bridge-host") + .takes_value(true) + .display_order(5) + .about("桥接服务监听的地址"), + ) + .arg( + Arg::new("bridge-bind-port") + .long("bridge-port") + .takes_value(true) + .validator(|port| { + port.parse::() + .map_or(Err(format!("Invalid port {}", port)), |_| Ok(())) + }) + .display_order(6) + .about("桥接服务监听的端口"), + ) + .arg( + Arg::new("name") + .long("name") + .short('n') + .takes_value(true) + .display_order(7) + .about("自定义当前映射服务的名称"), ) - .arg(Arg::new("bridge-bind").long("bridge").default_value("0").validator(|port| { - port.parse::() - .map_or(Err(format!("Invalid port {}", port)), |_| Ok(())) - })) .arg( Arg::new("log") .short('l') .possible_values(["debug", "info", "trace", "error"]) - .default_value("info"), + .default_value("info") + .about("日志级别"), ); let matches = app.get_matches(); @@ -180,18 +222,20 @@ fn main() { matches.value_of("forward-port").unwrap(), ); - let service_bind: u16 = matches.value_of("service-bind").unwrap().parse().unwrap(); - - let forward_type = matches.value_of("forward-type").unwrap(); + let name = matches + .value_of("name") + .map_or_else(|| None, |name| Some(name.to_string())); - let bridge_bind: u16 = matches - .value_of("bridge-bind") + let service_bind_port: u16 = matches + .value_of("service-bind-port") .unwrap_or("0") .parse() .unwrap(); + let forward_type = matches.value_of("forward-type").unwrap(); + if server_addr.is_err() { - println!("Parameter error: {}", server_addr.unwrap_err()); + println!("Server address error: {}", server_addr.unwrap_err()); exit(1); } @@ -221,17 +265,53 @@ fn main() { .filter_module("fuso_socks", log::LevelFilter::Info) .init(); + let bridge_addr = { + let bridge_bind_host = matches.value_of("bridge-bind-host").unwrap_or("0.0.0.0"); + let bridge_bind_port = matches.value_of("bridge-bind-port"); + + if bridge_bind_port.is_none() { + None + } else { + parse_addr(bridge_bind_host, bridge_bind_port.unwrap()).map_or_else( + |s| { + log::warn!("Incorrect bridge parameters: {}", s); + None + }, + |addr| Some(addr), + ) + } + }; + log::info!( - "[fuc] server_addr={}, forward_type={}, forward_addr={}, xor_num={}", + "\nserver_addr={}\nforward_type={}\n{}\nxor_num={}\n{}\n", server_addr, forward_type, - forward_addr, - xor_num + { + match proxy { + Proxy::Port(_) => format!("forward_addr={}", forward_addr), + Proxy::Socks5 => "--".to_string(), + } + }, + xor_num, + { + if bridge_addr.is_some() { + format!("bridge_addr={}", bridge_addr.clone().unwrap()) + } else { + "--".to_string() + } + } ); smol::block_on(async move { loop { - match Fuso::bind(server_addr, service_bind, bridge_bind).await { + match Fuso::bind(fuso_core::client::Config { + name: name.clone(), + server_addr, + server_bind_port: service_bind_port, + bridge_addr: bridge_addr, + }) + .await + { Ok(fuso) => { log::info!("[fuc] connection succeeded"); let _ = poll_stream(fuso, proxy.clone(), Xor::new(xor_num)).await; diff --git a/src/no-log-client.rs b/src/no-log-client.rs index f279e45..fa1b9c1 100644 --- a/src/no-log-client.rs +++ b/src/no-log-client.rs @@ -115,37 +115,52 @@ fn main() { let app = App::new("fuso") .version("v1.0.3") .author("https://github.com/editso/fuso") - .arg(Arg::new("server-host").default_value("127.0.0.1")) - .arg(Arg::new("server-port").default_value("9003")) + .arg( + Arg::new("server-host") + .default_value("127.0.0.1") + .about("服务端监听的地址"), + ) + .arg( + Arg::new("server-port") + .default_value("9003") + .about("服务端监听的端口"), + ) .arg( Arg::new("forward-host") .short('h') .long("host") - .default_value("127.0.0.1"), + .default_value("127.0.0.1") + .display_order(1) + .about("转发地址, (如果开启了socks代理该参数将无效)"), ) .arg( Arg::new("forward-port") .short('p') .long("port") - .default_value("80"), + .default_value("80") + .display_order(2) + .about("转发的端口 (如果开启了socks代理该参数将无效)"), ) .arg( Arg::new("forward-type") .short('t') .long("type") .possible_values(["socks", "forward"]) - .default_value("forward"), + .default_value("forward") + .display_order(3) + .about("转发类型"), ) .arg( - Arg::new("service-bind") + Arg::new("service-bind-port") .short('b') .long("bind") .validator(|port| { port.parse::() .map_or(Err(format!("Invalid port {}", port)), |_| Ok(())) }) - .about("Specify the address to be monitored by the server, which will be randomly assigned by default") - .default_value("0"), + .display_order(5) + .takes_value(true) + .about("真实映射成功后访问的端口号, 不指定将自动分配"), ) .arg( Arg::new("xor-secret") @@ -155,12 +170,38 @@ fn main() { .validator(|num| { num.parse::() .map_or(Err(String::from("Invalid number 0-255")), |_| Ok(())) - }), + }) + .display_order(4) + .about( + "传输时使用异或加密的Key (Use exclusive OR encrypted key during transmission)", + ), + ) + .arg( + Arg::new("bridge-bind-host") + .long("bridge-host") + .takes_value(true) + .display_order(5) + .about("桥接服务监听的地址"), + ) + .arg( + Arg::new("bridge-bind-port") + .long("bridge-port") + .takes_value(true) + .validator(|port| { + port.parse::() + .map_or(Err(format!("Invalid port {}", port)), |_| Ok(())) + }) + .display_order(6) + .about("桥接服务监听的端口"), ) - .arg(Arg::new("bridge-bind").long("bridge").default_value("0").validator(|port| { - port.parse::() - .map_or(Err(format!("Invalid port {}", port)), |_| Ok(())) - })); + .arg( + Arg::new("name") + .long("name") + .short('n') + .takes_value(true) + .display_order(7) + .about("自定义当前映射服务的名称"), + ); let matches = app.get_matches(); @@ -174,18 +215,20 @@ fn main() { matches.value_of("forward-port").unwrap(), ); - let service_bind: u16 = matches.value_of("service-bind").unwrap().parse().unwrap(); + let name = matches + .value_of("name") + .map_or_else(|| None, |name| Some(name.to_string())); - let forward_type = matches.value_of("forward-type").unwrap(); - - let bridge_bind: u16 = matches - .value_of("bridge-bind") + let service_bind_port: u16 = matches + .value_of("service-bind-port") .unwrap_or("0") .parse() .unwrap(); + let forward_type = matches.value_of("forward-type").unwrap(); + if server_addr.is_err() { - println!("Parameter error: {}", server_addr.unwrap_err()); + println!("Server address error: {}", server_addr.unwrap_err()); exit(1); } @@ -204,17 +247,28 @@ fn main() { _ => Proxy::Port(forward_addr), }; - log::info!( - "[fuc] server_addr={}, forward_type={}, forward_addr={}, xor_num={}", - server_addr, - forward_type, - forward_addr, - xor_num - ); + let bridge_addr = { + let bridge_bind_host = matches.value_of("bridge-bind-host").unwrap_or("0.0.0.0"); + let bridge_bind_port = matches.value_of("bridge-bind-port"); + + if bridge_bind_port.is_none() { + None + } else { + parse_addr(bridge_bind_host, bridge_bind_port.unwrap()) + .map_or_else(|_| None, |addr| Some(addr)) + } + }; smol::block_on(async move { loop { - match Fuso::bind(server_addr, service_bind, bridge_bind).await { + match Fuso::bind(fuso_core::client::Config { + name: name.clone(), + server_addr, + server_bind_port: service_bind_port, + bridge_addr: bridge_addr, + }) + .await + { Ok(fuso) => { log::info!("[fuc] connection succeeded"); let _ = poll_stream(fuso, proxy.clone(), Xor::new(xor_num)).await; diff --git a/src/server.rs b/src/server.rs index 152a538..3f1c2e2 100644 --- a/src/server.rs +++ b/src/server.rs @@ -16,35 +16,41 @@ fn main() { let app = App::new("fuso") .version("v1.0.3") .author("https://github.com/editso/fuso") - .arg(Arg::new("server-bind-host").default_value("0.0.0.0")) - .arg(Arg::new("server-bind-port").default_value("9003")) .arg( - Arg::new("server-visit-host") + Arg::new("server-bind-host") + .default_value("0.0.0.0") .long("host") .short('h') - .default_value("0.0.0.0"), + .display_order(1) + .about("监听地址"), ) .arg( - Arg::new("server-visit-port") + Arg::new("server-bind-port") + .default_value("9003") .long("port") .short('p') - .default_value("0"), + .display_order(2) + .about("监听端口"), ) .arg( Arg::new("xor-secret") .long("xor") .short('x') .default_value("27") + .display_order(3) .validator(|num| { num.parse::() .map_or(Err(String::from("Invalid number 0-255")), |_| Ok(())) - }), + }) + .about("传输时使用异或加密的Key"), ) .arg( Arg::new("log") .short('l') + .display_order(4) .possible_values(["debug", "info", "trace", "error"]) - .default_value("info"), + .default_value("info") + .about("日志级别"), ); let matches = app.get_matches(); @@ -54,22 +60,11 @@ fn main() { matches.value_of("server-bind-port").unwrap(), ); - let server_vis_addr = parse_addr( - matches.value_of("server-visit-host").unwrap(), - matches.value_of("server-visit-port").unwrap(), - ); - if server_bind_addr.is_err() { println!("Parameter error: {}", server_bind_addr.unwrap_err()); exit(1); } - if server_vis_addr.is_err() { - println!("Parameter error: {}", server_vis_addr.unwrap_err()); - exit(1); - } - - let server_vis_addr = server_vis_addr.unwrap(); let server_bind_addr = server_bind_addr.unwrap(); let xor_num: u8 = matches.value_of("xor-secret").unwrap().parse().unwrap(); @@ -94,9 +89,13 @@ fn main() { chain.next(|mut tcp, cx| async move { let action: Action = tcp.recv().await?.try_into()?; match action { - Action::Bind(addr) => match cx.spwan(tcp.clone().into(), addr).await { + Action::Bind(name, addr) => match cx.spwan(tcp.clone(), addr, name).await { Ok(conv) => { - log::debug!("[fuso] accept {}", conv); + log::debug!( + "[fuso] accept conv={}, addr={}", + conv, + tcp.peer_addr().unwrap(), + ); Ok(State::Accept(())) } Err(e) => { @@ -115,13 +114,8 @@ fn main() { }) .chain_strategy(|chain| { chain - .next(|_, _| async move { - Ok(State::Accept(Action::Forward(Addr::Socket( - "127.0.0.1:80".parse().unwrap(), - )))) - }) .next(|tcp, _| async move { - // 下面是测试性的, 永远都不会执行到这里 + let _ = tcp.begin().await; let io = tcp.clone(); let socks = Socks::parse( io, @@ -141,8 +135,8 @@ fn main() { .await; if socks.is_err() { - let _ = tcp.back().await; log::debug!("Not a valid socks package"); + let _ = tcp.back().await; Ok(State::Next) } else { match socks.unwrap() { @@ -153,6 +147,11 @@ fn main() { } } }) + .next(|_, _| async move { + Ok(State::Accept(Action::Forward(Addr::Socket( + "127.0.0.1:80".parse().unwrap(), + )))) + }) }) .build() .await; @@ -161,9 +160,11 @@ fn main() { Ok(mut fuso) => loop { match fuso.accept().await { Ok(stream) => { - let to = stream.to.ciphe(Xor::new(xor_num)).await; + let (from, to) = stream.split(); + + let to = to.ciphe(Xor::new(xor_num)).await; - to.forward(stream.from).detach(); + to.forward(from).detach(); } Err(e) => { log::warn!("[fuso] Server error {}", e.to_string()); @@ -173,9 +174,8 @@ fn main() { }, Err(_) => { log::error!( - "[fus] Invalid address or already used . bind={}, visit={}", - server_bind_addr, - server_vis_addr + "[fus] Invalid address or already used . bind={}", + server_bind_addr ) } }