这篇博文写的非常懒惰和抽象,大概懂那个意思就行,不要在意细节

不管在那种编程语言中,使用到会在多个线程中共享的变量都需要开发者保证线程安全。在 Rust 中最常用的方式之一使用 Mutex,也就是锁,Rust 提供了非常好的抽象,用法如下:

fn main() {

    let sharing = Mutex::new(0);
   {
        let mut v = sharing.lock().unwrap(); //  lock here
        *v += 1;
        //  do sth with v
    }
    //   unlcok here

    //  lock here
    let v = sharing.lock().unwrap();
    println!("{v}");
}
//   unlcok here

如上,我们将一个要共享的 u32 变量包裹在 Mutex 中,每次使用它之前都需要 lock 一下,在离开当前作用域的时候会自动 unlock。如果其他线程访问到这个 locking 的变量就会被阻塞住,直到 unlock

然后我就遇到了一个问题。

我尝试用 Rust 做一个通讯功能,会有多个客户端的连接,每个客户端都会有一个 TcpStream,服务器要向客户端发送信息的话就是 TcpStream::write,当要监听客户端的信息的话就是 TcpStream::read(差不多这个意思,反正就是读和写嘛)。

起初,我让每个客户端在需要向其他客户端发送信息的时候,先获取目标客户端的 TcpStream,再 write,为了可以方便通过目标的名字获取到 TcpStream,我还弄了一个全局的 HashMap 来存放,同样的,因为会被多个线程共享,这个 HashMap 也需要被 Mutex 包装。

看起来很安全,对吧。

然而因为 Mutex 住了,当一个线程获取到 HashMap 的时候,其他线程不能使用 HashMap,直到 HashMap 和获取的 TcpStream 都一起使用结束。 问:能不能在获取到 TcpStream 之后把 HashMap unlock,只 keep 住 TcpStream? 答:不能。这涉及到生命周期的概念,反正就是不能。

而当我们需要监听来自客户端的消息时,就需要:

  1. lockHashMap
  2. 获取 TcpStream
  3. 监听 TcpStreamTcpStream::read()

而监听的操作会一直持续到有结果或者超时,这个时候如果有其他的线程需要使用这个 TcpStream,将导致长时间的阻塞,因为直到 TcpStream 的监听有结果或者超时之前,这个 TcpStream 都将被 lock 住,HashMap 也被 lock 住。

导致的结果就是经常导致长时间阻塞。

因为问题在于多线程中 lock 住了 HashMapTcpStream,直到 TcpStream 使用完之前两者都不能被 unlock。所以我尝试在 HashMap 里不获取 TcpStream,而是获取 TcpStream 的引用来绕过生命周期检查,这样能够马上 unlock HashMap,如下代码:

type ClientStore = Arc<RwLock<HashMap<String, TcpStream>>>;

pub async fn get_mut_ref<'a>(
    store: &'a ClientStore,
    username: &String,
) -> Option<&'a mut TcpStream> {
    store
        .write()
        .await
        .get_mut(username)
        .map(|t| unsafe { &mut *(t as *mut TcpStream) })
}

这样既能够在拿到 &TcpStream 后马上 unlockHashMap,又能够让多个线程使用 &TcpStream

虽然。。。但是。。。这种方式毕竟是不安全的,因为使用了全局变量,这在 Rust 中是不提倡的,而且使用了在多线程中到处传递的原始指针,这是不安全的代码,如果开发者没有能力保证安全的话。

后来用 Actor 模型解决了这个问题

Actor 模型是一中并发计算模型,由许多独立的并发实体组成,每个实体都是一个 Actor。每个 Actor 都有自己的状态和行为,Actor 之间通过消息传递。

如果用 Actor 模型来写上面的代码,那么每个客户端都将成为一个 Actor,分别持有自己的 TcpStream,接下来无论是向这个 &TcpStream 发送还是监听,都由持有这个 &TcpStream 的 Actor 去执行,而执行的时机就通过消息(Message)来触发。

如下实例代码:

pub(crate) enum ClientMessage {
    /// representing there is a message need send,
    /// tuple parameters: (sender, message)
    ReceiveMessage(String, String),
    /// terminate current client
    Terminate,
}

pub(crate) struct Client {
    tcp_stream: TcpStream,
    inbox: Inbox<ClientMessage>,
}

上面的 Client 就是一个表现为客户端 Actor,看它持有 TcpStream 和一个 Inbox,这个 Inbox 是用于接收传递给 Actor 的 Message,Actor 将通过 Message 决定要触发什么行为。

比如说 Actor 接收到了 ClientMessage::ReceiveMessage(String, String),说明有一条信息需要发送给客户端,Actor 就执行 TcpStream::write() 的操作,结束后继续等待下一条信息。

这个简单的流程不再出现全局变量 HashMap 和原始指针 *mut TcpStream,也没有 Mutex,看起来干净又清爽,不失为一个多线程下的解决方案。

我把代码放在了 这个地址