Skip to content

Commit

Permalink
编写部分udp转发功能代码
Browse files Browse the repository at this point in the history
  • Loading branch information
editso committed Dec 4, 2021
1 parent bbb14f1 commit f593355
Show file tree
Hide file tree
Showing 11 changed files with 529 additions and 262 deletions.
8 changes: 8 additions & 0 deletions fuso-api/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,14 @@ impl<T> Rollback<T, Buffer<u8>> {
Ok(())
}
}

#[inline]
pub async fn release(&self) -> Result<()> {
*self.rollback.write().unwrap() = false;
self.store.lock().unwrap().clear();

Ok(())
}
}

impl Rollback<TcpStream, Buffer<u8>> {
Expand Down
83 changes: 42 additions & 41 deletions fuso-api/src/stream.rs
Original file line number Diff line number Diff line change
@@ -1,118 +1,119 @@
use std::{net::SocketAddr, pin::Pin, sync::Arc};
use std::{net::SocketAddr, pin::Pin};

use futures::{AsyncRead, AsyncWrite};
use smol::net::TcpStream;

use crate::{Buffer, Rollback, RollbackEx, UdpStream};

#[derive(Debug, Clone)]
pub struct FusoStream<Inner> {
inner: Arc<std::sync::Mutex<Rollback<Inner, Buffer<u8>>>>,
pub struct SafeStream<Inner> {
inner: Rollback<Inner, Buffer<u8>>,
}

pub trait FusoStreamEx<T> {
fn as_fuso_stream(self) -> FusoStream<T>;

pub trait SafeStreamEx<T> {
fn as_safe_stream(self) -> SafeStream<T>;
}

impl FusoStreamEx<Self> for TcpStream {
impl SafeStreamEx<Self> for TcpStream {
#[inline]
fn as_fuso_stream(self) -> FusoStream<Self> {
FusoStream {
inner: Arc::new(std::sync::Mutex::new(self.roll())),
}
fn as_safe_stream(self) -> SafeStream<Self> {
SafeStream { inner: self.roll() }
}
}

impl FusoStreamEx<Self> for UdpStream {
impl SafeStreamEx<Self> for UdpStream {
#[inline]
fn as_fuso_stream(self) -> FusoStream<Self> {
FusoStream {
inner: Arc::new(std::sync::Mutex::new(self.roll())),
}
fn as_safe_stream(self) -> SafeStream<Self> {
SafeStream { inner: self.roll() }
}
}

impl FusoStream<TcpStream> {
impl SafeStream<TcpStream> {
#[inline]
pub fn local_addr(&self) -> std::io::Result<SocketAddr> {
self.inner.lock().unwrap().local_addr()
self.inner.local_addr()
}

#[inline]
pub fn peer_addr(&self) -> std::io::Result<SocketAddr> {
self.inner.lock().unwrap().peer_addr()
self.inner.peer_addr()
}
}

impl FusoStream<UdpStream> {
impl SafeStream<UdpStream> {
#[inline]
pub fn local_addr(&self) -> std::io::Result<SocketAddr> {
self.inner.lock().unwrap().local_addr()
self.inner.local_addr()
}

#[inline]
pub fn peer_addr(&self) -> std::io::Result<SocketAddr> {
self.inner.lock().unwrap().peer_addr()
self.inner.peer_addr()
}
}

impl<Inner> FusoStream<Inner> {
impl<Inner> SafeStream<Inner>
where
Inner: Send + Sync + 'static,
{
#[inline]
pub async fn begin(&self) -> crate::Result<()> {
self.inner.lock().unwrap().begin().await
self.inner.begin().await
}

#[inline]
pub async fn back(&self) -> crate::Result<()> {
self.inner.lock().unwrap().back().await
self.inner.back().await
}

#[inline]
pub async fn release(&self) -> crate::Result<()>{
self.inner.release().await
}

}

impl<Inner> AsyncRead for FusoStream<Inner>
impl<Inner> AsyncRead for SafeStream<Inner>
where
Inner: AsyncRead + Unpin + Send + Sync + 'static,
Inner: Clone + AsyncRead + Unpin + Send + Sync + 'static,
{
#[inline]
fn poll_read(
self: std::pin::Pin<&mut Self>,
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut [u8],
) -> std::task::Poll<std::io::Result<usize>> {
let mut io = self.inner.lock().unwrap();
Pin::new(&mut *io).poll_read(cx, buf)
Pin::new(&mut self.inner).poll_read(cx, buf)
}
}


impl<Inner> AsyncWrite for FusoStream<Inner>
impl<Inner> AsyncWrite for SafeStream<Inner>
where
Inner: AsyncWrite + Unpin + Send + Sync + 'static,
Inner: Clone + AsyncWrite + Unpin + Send + Sync + 'static,
{
#[inline]
fn poll_write(
self: std::pin::Pin<&mut Self>,
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> std::task::Poll<std::io::Result<usize>> {
let mut io = self.inner.lock().unwrap();
Pin::new(&mut *io).poll_write(cx, buf)
Pin::new(&mut self.inner).poll_write(cx, buf)
}

#[inline]
fn poll_flush(
self: std::pin::Pin<&mut Self>,
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<std::io::Result<()>> {
let mut io = self.inner.lock().unwrap();
Pin::new(&mut *io).poll_flush(cx)
Pin::new(&mut self.inner).poll_flush(cx)
}

#[inline]
fn poll_close(
self: std::pin::Pin<&mut Self>,
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<std::io::Result<()>> {
let mut io = self.inner.lock().unwrap();
Pin::new(&mut *io).poll_close(cx)
Pin::new(&mut self.inner).poll_close(cx)
}
}
58 changes: 43 additions & 15 deletions fuso-core/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
use std::net::SocketAddr;
use std::{
net::{IpAddr, Ipv4Addr, SocketAddr},
sync::Arc,
};

use fuso_api::{async_trait, Error, Forward, FusoListener, FusoPacket, Result, Spwan};
use fuso_api::{
async_trait, AsyncTcpSocketEx, Error, Forward, FusoListener, FusoPacket, Result, Spwan,
};

use futures::AsyncWriteExt;
use smol::{
Expand All @@ -23,21 +28,37 @@ pub struct Fuso {
accept_ax: Receiver<Reactor>,
}

pub struct Config {
// 名称, 可选的
pub name: Option<String>,
// 服务端地址
pub server_addr: SocketAddr,
// 自定义服务绑定的端口, 如果不指定默认为随机分配
pub server_bind_port: u16,
// 桥接监听的地址
pub bridge_addr: Option<SocketAddr>,
}

impl Fuso {
pub async fn bind(addr: SocketAddr, bind_port: u16, bridge_bind: u16) -> Result<Self> {
// let stream = addr.tcp_connect().await?;
pub async fn bind(config: Config) -> Result<Self> {
let cfg = Arc::new(config);
let server_addr = cfg.server_addr.clone();
let server_bind_port = cfg.server_bind_port.clone();
let bridge_addr = cfg.bridge_addr.clone();
let name = cfg.name.clone();

let mut stream = TcpStream::connect(addr)
.await
.map_err(|e| Error::with_io(e))?;
let mut stream = server_addr.tcp_connect().await?;

stream
.send(
Action::Bind({
if bind_port == 0 {
Action::Bind(name, {
if cfg.server_bind_port == 0 {
None
} else {
Some(format!("0.0.0.0:{}", bind_port).parse().unwrap())
Some(SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
server_bind_port,
))
}
})
.into(),
Expand All @@ -54,13 +75,13 @@ impl Fuso {
let mut stream = stream.guard(5000).await?;

async move {
if bridge_bind == 0 {
if bridge_addr.is_none() {
return;
}

match Bridge::bind(format!("0.0.0.0:{}", bridge_bind).parse().unwrap(), addr)
.await
{
let bridge_addr = bridge_addr.unwrap();

match Bridge::bind(bridge_addr, server_addr).await {
Ok(mut bridge) => {
log::info!("[bridge] Bridge service opened successfully");
loop {
Expand Down Expand Up @@ -99,7 +120,14 @@ impl Fuso {
Ok(Action::Ping) => {}

Ok(action) => {
match accept_tx.send(Reactor { conv, action, addr }).await {
match accept_tx
.send(Reactor {
conv,
action,
addr: server_addr,
})
.await
{
Err(_) => {
let _ = stream.close().await;
break;
Expand Down
Loading

0 comments on commit f593355

Please sign in to comment.