这篇文章主要翻译和修改于 Inventing the Service trait,用于描述 tower 中最主要的 trait: Service 是怎么来的.

Tower Service

Rust 的 crate tower 是一个用于构建客户端-服务器网络组件的库,最主要提供的是 ServiceLayer 这两个 trait。这里主要讲 Service,现在这个 trait 是这个样子的:

pub trait Service<Request> {
    type Response;
    type Error;
    type Future: Future<Output = Result<Self::Response, Self::Error>>;

    // Required methods
    fn poll_ready(
        &mut self,
        cx: &mut Context<'_>,
    ) -> Poll<Result<(), Self::Error>>;
    fn call(&mut self, req: Request) -> Self::Future;
}

这个 ServiceRequestResponse 的抽象,下面会一步步从一个简单的函数,逐步构造出这个 trait。因为是用 Rust 构建的代码,所以其中会涉及到 Rust 语言特用的问题,像是异步的处理,为了解决这些问题会使用一些比较“怪异”的写法。熟悉 Rust 的同学会更好的理解。

HTTP 网络请求

写 Web 的同学会熟悉 HTTP 的网络请求过程,简单来将就是一个 Request 变成 Response 的过程,用下面这个方法来表示:

fn handler(req: Request) -> Response {
	//  do something
}

这个 handler 函数是一个很好的抽象的表示,可以用于描述任何 请求响应 的过程,就像 HTTP 协议的请求响应,客户端发送一个 Request 给服务器,服务器根据这个请求变成一个 Response

有了这么一个 handler 函数后,我们可以让我们的 Web 应用提供一个使用这个函数的入口,假设我们的 Web 应用抽象为 Service,那么它应该有这么一个方法来使用 handler

impl Service {
	fn run<F>(f: F) -> Result<(), Error> 
	where F: Fn(Request) -> Response
	{
		let listener = TcpListener::bind("0.0.0.0").await?;

		loop {
			let mut connection = listener.accept().await?;
      let request = read_http_request(&mut connection).await?;

       task::spawn(async move {
					// Call the handler provided by the user
					let response = f(request);

					write_http_response(connection, response).await?;
			});
		}
	}
}

上面的伪代码很简单,就是将接受到的请求传递给 handler 去处理, handler 传递给 Service::run(...),所以我们想要怎么处理请求就构建一个 handler,然后传递给 Service

但此时我们的 handler 还有一点问题

错误处理

现在 handler 直接返回 Respanse,但我们可能会遇到无法处理的情况,在 Rust 中我们会使用 Result 来处理。所以需要将返回值修改为:

fn handler(req: Request) -> Result<Response, Error> {
	if req.url == "/" {
		Ok(Response::new(200))
	} else {
		Err(Error::new("error message"))
	}
}

返回值里的 Error 可以是你想要的任何类型。这样的改动不多,在 Service 里使用的话也是相当简单:

impl Service {
	fn run<F>(f: F) -> Result<(), Error> 
	where F: Fn(Request) -> Response
	{
		let listener = TcpListener::bind("0.0.0.0").await?;

		loop {
			let mut connection = listener.accept().await?;
      let request = read_http_request(&mut connection).await?;

       task::spawn(async move {
					// Call the handler provided by the user
					let result = f(request);

					match result {
						Ok(resp) => write_http_response(connection, resp).await?,
						Err(e) => write_http_error(connection, e).await?
					}
					
			});
		}
	}
}

更多行为

handler 已经可以处理传入的请求,但为了应对更加复杂的情况还需要增强一下。假设 handler 处理的请求太过耗时,我们想要给它加上一个超时功能,超时直接返回 Err

使用 tokio::time::timeout 我们直接为 handler 附加上超时功能,但比起直接在 handler 里写上超时的逻辑,我们有更好的方法:定义一个用于给 handler 附加上超时的方法—— handler_with_timeout

async fn handler_with_timeout(request: Request) -> Result<Response, Error> {
    let result = tokio::time::timeout(
        Duration::from_secs(30),
        handle_request(request)
    ).await;

    match result {
        Ok(Ok(response)) => Ok(response),
        Ok(Err(error)) => Err(error),
        Err(_timeout_elapsed) => Err(Error::timeout()),
    }
}

这很好的实现了关注点分离,超时函数并不关心 handler 的具体逻辑,handler 也不关心自己会不会超时,通过将超时函数和 handler 分离,在需要的时候组合能够给予程序更多的灵活性和健壮能力。

同理,如果我们想要在 handler 处理完后,修改 ResponseContent-Typeapplication/json,就可以仿照 handler_with_timeout 函数,构建一个新的函数:handler_with_content_type

fn handler_with_content_type(request: Request) -> Result<Response, Error> {
	let response = handle_request(request)?;
	response.set_header("Content-Type", "application/json");
	
	Ok(response)
}

尽管此刻我们有了监听超时的函数:handler_with_timeout 和修改相应头的函数:handler_with_content_type,但如果我们想要一个既能监听超时又能修改响应头的函数,我们希望是能够重用已有的这两个函数,而不是重新定义一个函数叫做:handler_with_timeout_and_content_type。我们希望能够用更灵活的方式组合起来而不是将 handler 给 hard-code 到函数体里,就像这种写法:

let final_handler = with_content_type(with_timeout(handle_request));

上面的写法中 handle_request 是我们处理请求的逻辑,with_timeout 为它附加上了监听超时的能力,with_content 附加上了修改 Content-Type 的能力。这三个方法的调用会组合成一个最终的 handler:final_handler。它仍然能够像之前一样给 Service 调用:

server.run(final_handler).await?;

这种方式挺好,只要不超过三个组合,一旦超过三个就会发现陷入了痛苦的嵌套地狱。好在我们可以有另一种组合方式。

Handler Trait

让我们尝试另一种方式:使用 Trait 来包装 handler 方法,让 Serivce 从调用闭包到调用 Trait。

Handrer Trait 是对我们上面的 fn handler(request: Request) -> Result<Response, Error> 的包装。

trait Handler {
	async fn call(&mut self, request: HttpRequest) -> Result<HttpResponse, Error>;
}

在翻译的原文编写的时候,Rust 还不支持直接在 trait 中定义 async 方法,所以会有下面这一点描述:

However, Rust currently doesn't support async trait methods, so we have two options:

- Make call return a boxed future like Pin<Box<dyn Future<Output = Result<HttpResponse, Error>>>. This is what the async-trait crate does.
- Add an associated type Future to Handler so users get to pick their own type.

但在本文编写的时候 Rust 已经支持,但仍然会按照原文中的方法定义。

在原文中为何能够在 Handler trait 中使用异步,将 trait 定义为:

trait Handler {
    type Future: Future<Output = Result<HttpResponse, Error>>;

    fn call(&mut self, request: HttpRequest) -> Self::Future;
}

call 方法的参数使用了可变借用的 self,是因为可能会根据需要使用到其实现的内部状态。

那么我们在之前的 handler 函数逻辑就可以修改为实现了 Handler trait 的 struct:

struct BaseHandler;

impl Handler for BaseHandler {
	type Future = Pin<Box<dyn Future<Output = Result<Response, Error>>>>;
	
	fn call(&mut self, request: Request) -> Self::Future {
        Box::pin(async move {
            // same implementation as we had before
            if request.path() == "/" {
                Ok(Response::ok("Hello, World!"))
            } else if request.path() == "/important-data" {
                let some_data = fetch_data_from_database().await?;
                Ok(make_response(some_data))
            } else {
                Ok(Response::not_found())
            }
        })
    }
}

那么我们要如何实现超时检测?要记住我们的目的是支持组合式 API,我们希望能够以一种灵活的方式来构建我们的 Timeout,可以像这样定义一个 Timeout:

struct Timeout<T> {
	inner_handler: T,
	duration: Duration
}

然后为它实现 Handler trait:

impl<T> Handler for Timeout<T> 
where T: Handler
{
	type Future = Pin<Box<dyn Future<Output = Result<HttpResponse, Error>>>>;

    fn call(&mut self, request: HttpRequest) -> Self::Future {
        Box::pin(async move {
            let result = tokio::time::timeout(
                self.duration,
                self.inner_handler.call(request),
            ).await;

            match result {
                Ok(Ok(response)) => Ok(response),
                Ok(Err(error)) => Err(error),
                Err(_timeout) => Err(Error::timeout()),
            }
        })
    }
}

实现了 Handler 的 Timeout 的泛型参数 T 限定为实现了 Handler,这样一来就可以将 Handler 委托给 Timeout 执行,Timeout 不关心 inner_handler 是什么,只关心 inner_handler 的超时。

但是这段代码是无法编译的,编译器会报告错误:

error[E0759]: `self` has an anonymous lifetime `'_` but it needs to satisfy a `'static` lifetime requirement
   --> src/lib.rs:145:29
       |
144 |       fn call(&mut self, request: HttpRequest) -> Self::Future {
			 |               --------- this data with an anonymous lifetime `'_`...
145 |           Box::pin(async move {
			 |  _____________________________^
146 | |             let result = tokio::time::timeout(
147 | |                 self.duration,
148 | |                 self.inner_handler.call(request),
...     |
155 | |             }
156 | |         })
       | |_________^ ...is captured here, requiring it to live as long as `'static`

这是因为我们将可变借用 &mut seft 传递给了异步块,比如 self.durationself.inner_handler,这表示异步函数持有借用的时间可能会超过其持有者拥有所有权的时间。所以传入异步块的参数应该是 'static 生命周期。

但实际上我们并不需要将 Timeout 变成 'static 的,让异步块拥有其所有权也能解决这个问题。最简单的方式就是让 Timeout 实现 Clone trait。

#[derive(Clone)]
struct Timeout<T> {
	inner_handler: T,
	duration: Duration
}

impl<T> Handler for Timeout<T> 
where T: Handler + Clone
{
	type Future = Pin<Box<dyn Future<Output = Result<HttpResponse, Error>>>>;

    fn call(&mut self, request: HttpRequest) -> Self::Future {
			let mut this = self.clone();
        Box::pin(async move {
            let result = tokio::time::timeout(
                this.duration,
                this.inner_handler.call(request),
            ).await;

            match result {
                Ok(Ok(response)) => Ok(response),
                Ok(Err(error)) => Err(error),
                Err(_timeout) => Err(Error::timeout()),
            }
        })
    }
}
```

在 Timeout 这里使用 `Clone` 不会造成太高的成本,因为 Timeout 只拥有 `Duration`(实现了 `Copy` trait),clone 起来相当快捷。但这也说明了实现了 `Handler` 的 struct 不宜包含 clone 起来成本太高的数据,像 `Vec`。

但此时编译器还是会继续报错:

```rust
error[E0310]: the parameter type `T` may not live long enough
   --> src/lib.rs:149:9
    |
140 |   impl<T> Handler for Timeout<T>
    |        - help: consider adding an explicit lifetime bound...: `T: 'static`
...
149 | /         Box::pin(async move {
150 | |             let result = tokio::time::timeout(
151 | |                 this.duration,
152 | |                 this.inner_handler.call(request),
...   |
159 | |             }
160 | |         })
    | |__________^ ...so that the type `impl Future` will meet its required lifetime bounds
```

还是报了生命周期的问题但有一点不一样。此时的 T 可以是实现了 `Handler` 的任何类型,其中也包括包含借用的类型,比如:`Vec<'a str>`,就会出现和上面一样的生命周期问题。编译器也提示我们可以加上 `'static` 生命周期约束。

```rust
impl<T> Handler for Timeout<T>
where
    T: Handler + Clone + 'static,
{
    // ...
}
```

**P.S. 这里的 'static 生命周期约束并不是说要求 T 一定要是 static 的,是程序全局有效的。可以理解为是在使用 T 期间 T 必须全程有效,而持有不包含借用的 T 的所有权,可以满足这个条件**

现在可以成功编译了。

接着我们实现 `Handler` 来写一个修改 Content-Type 的 handler,

```rust
#[derive(Clone)]
struct JsonContentType<T> {
    inner_handler: T,
}

impl<T> Handler for JsonContentType<T>
where
    T: Handler + Clone + 'static,
{
    type Future = Pin<Box<dyn Future<Output = Result<HttpResponse, Error>>>>;

    fn call(&mut self, request: HttpRequest) -> Self::Future {
        let mut this = self.clone();

        Box::pin(async move {
            let mut response = this.inner_handler.call(request).await?;
            response.set_header("Content-Type", "application/json");
            Ok(response)
        })
    }
}
```

修改外我们的 `Service` 接受 `Handler` trait,

```rust
impl Server {
    async fn run<T>(self, mut handler: T) -> Result<(), Error>
    where
        T: Handler,
    {
        let listener = TcpListener::bind(self.addr).await?;

        loop {
            let mut connection = listener.accept().await?;
            let request = read_http_request(&mut connection).await?;

            task::spawn(async move {
                // have to call `Handler::call` here
                match handler.call(request).await {
                    Ok(response) => write_http_response(connection, response).await?,
                    Err(error) => handle_error_somehow(error, connection),
                }
            });
        }
    }
}
```

就像上面提到过的组合三个 handler 函数一样,我们也可以组合这三个实现了 `Handler` trait 的 Handler:

```rust
JsonContentType {
    inner_handler: Timeout {
        inner_handler: RequestHandler,
        duration: Duration::from_secs(30),
    },
}
```

如果我们为这三个 Handler 实现 `new` 函数可以更容易来使用:

```rust
let handler = RequestHandler;
let handler = Timeout::new(handler, Duration::from_secs(30));
let handler = JsonContentType::new(handler);

// `handler` has type `JsonContentType<Timeout<RequestHandler>>`

server.run(handler).await
```

现在这个 `Handler` 看起来非常不错,可以发布给其他开发者使用了。

## 让 `Handler` 更通用

尽管这个 `Handler` trait 看起来挺好了,但还是不足够通用,它的入参是 `Request`,返回值是 `Response`,这是两个固定的类型,在 crate `http` 中会提供通用的 `Request` 和 `Response`,但我不一定要使用这两个类型,我可以为我的 HTTP 服务器定义我自己的类型,或者我不使用 HTTP 服务器,这样一来 `Handler` 就显得不够用了。

好在在 Rust 中可以使用泛型和关联类型让 `Handler` 更通用,我们将 `Handler` 修改为使用泛型和关联类型的样子:

```rust
trait Handler<Request> {
    type Response;

    // Error should also be an associated type. No reason for that to be a
    // hardcoded type
    type Error;

    // Our future type from before, but now it's output must use
    // the associated `Response` and `Error` types
    type Future: Future<Output = Result<Self::Response, Self::Error>>;

    // `call` is unchanged, but note that `Request` here is our generic
    // `Request` type parameter and not the `HttpRequest` type we've used
    // until now
    fn call(&mut self, request: Request) -> Self::Future;
}
```

如果要实现 HTTP 服务的话就像这样:

```rust
impl Handler<Request> for RequestHandler {
    type Response = Response;
    type Error = Error;
    type Future = Pin<Box<dyn Future<Output = Result<HttpResponse, Error>>>>;

    fn call(&mut self, request: Request) -> Self::Future {
        // same as before
    }
}
```

能够看到就是一个泛型的版本。同样的改造 Timeout 的实现,Timeout 有一点不一样,它有一个 T 的泛型参数,在上一个 Timeout 的实现中我们知道其实只要 T 实现了 `Handler`,Timeout 并不关心其他细节,但在这个泛型的 `Handler` 中 Timeout 需要保证它的 T 泛型参数和它要实现的 `Handler` 的泛型参数 `Request` 一致。除此之外 Timeout 并不关心 T 的 Response 关联类型。

```rust
impl<R, T> Handler<R> for Timeout<T>
where
    // The actual type of request must not contain
    // references. The compiler would tell us to add
    // this if we didn't
    R: 'static,
    // `T` must accept requests of type `R`
    T: Handler<R> + Clone + 'static,
    // We must be able to convert an `Elapsed` into
    // `T`'s error type
    T::Error: From<tokio::time::error::Elapsed>,
{
    // Our response type is the same as `T`'s, since we
    // don't have to modify it
    type Response = T::Response;

    // Error type is also the same
    type Error = T::Error;

    // Future must output a `Result` with the correct types
    type Future = Pin<Box<dyn Future<Output = Result<T::Response, T::Error>>>>;

    fn call(&mut self, request: R) -> Self::Future {
        let mut this = self.clone();

        Box::pin(async move {
            let result = tokio::time::timeout(
                this.duration,
                this.inner_handler.call(request),
            ).await;

            match result {
                Ok(Ok(response)) => Ok(response),
                Ok(Err(error)) => Err(error),
                Err(elapsed) => {
                    // Convert the error
                    Err(T::Error::from(elapsed))
                }
            }
        })
    }
}
```

而 JsonContentType 有一点不一样,它同样持有泛型参数 T,但它关心 Request 和 Response 的类型,因为它需要修改 Response,所以我们要对它持有的 T 的 Response 进行约束:

```rust
// Again a generic request type
impl<R, T> Handler<R> for JsonContentType<T>
where
    R: 'static,
    // `T` must accept requests of any type `R` and return
    // responses of type `HttpResponse`
    T: Handler<R, Response = HttpResponse> + Clone + 'static,
{
    type Response = HttpResponse;

    // Our error type is whatever `T`'s error type is
    type Error = T::Error;

    type Future = Pin<Box<dyn Future<Output = Result<Response, T::Error>>>>;

    fn call(&mut self, request: R) -> Self::Future {
        let mut this = self.clone();

        Box::pin(async move {
            let mut response = this.inner_handler.call(request).await?;
            response.set_header("Content-Type", "application/json");
            Ok(response)
        })
    }
}
```

`Service::run` 接收的是实现了 `Handler` 的 trait,

```rust
impl Server {
    async fn run<T>(self, mut handler: T) -> Result<(), Error>
    where
        T: Handler<HttpRequest, Response = HttpResponse>,
    {
        // ...
    }
}
```

创建 `Service` 的写法也和前面一样:

```rust
let handler = RequestHandler;
let handler = Timeout::new(handler, Duration::from_secs(30));
let handler = JsonContentType::new(handler);

server.run(handler).await
```

## Service 的 `poll_ready`

到目前为止 `Service` 已经挺完美了,但查看 tower 的 `Serivce` 实现会发现它还有一个 `poll_ready` 方法,这是什么。

假设一个 handler 要处理和长时间,在处理的时候服务器仍然会接收请求,服务器会不断产生新的 handler 在处理,如果大量的 handler 不断产生处理又非常的费时而不能即使释放,大量的 handler 可能会把内存撑爆。

所以在 `Service` trait 中提供了 `poll_ready` 异步方法用来检测 `handler` 是否有能力处理这个请求,如果没有能力就会返回 `Pending`,直到有能力(返回 Ready(OK(())))或者出错(Ready(Err(err)))。

比方说同时只能处理 100 个请求,那么当达到 100 个请求的时候,`poll_ready` 就会返回 `Pending`,知道 `poll_ready` 再次返回 `Ready`。

这对 `Handler` 是非常有用的,可以为 `Handler` 实现拥塞控制。

# 完整的 Service

这样一来完整的 `Service` trait 就变成了

```rust
pub trait Service<Request> {
    type Response;
    type Error;
    type Future: Future<Output = Result<Self::Response, Self::Error>>;

    // Required methods
    fn poll_ready(
        &mut self,
        cx: &mut Context<'_>,
    ) -> Poll<Result<(), Self::Error>>;
    fn call(&mut self, req: Request) -> Self::Future;
}
```