跳到主要内容

使用基于角色的访问控制 (RBAC) 和 JWT 验证保护你的 Rocket API

本指南将帮助你通过使用 基于角色的访问控制 (RBAC) 和 Logto 签发的 JSON Web Token (JWT) 来实现 Rocket API 的授权 (Authorization) 安全防护。

开始之前

你的客户端应用需要从 Logto 获取访问令牌 (Access tokens)。如果你还没有完成客户端集成,请查看我们的 快速开始,适用于 React、Vue、Angular 或其他客户端框架,或者参考我们的 机器对机器指南 以实现服务器到服务器的访问。

本指南聚焦于在你的 Rocket 应用中对这些令牌进行服务端验证

A figure showing the focus of this guide

你将学到什么

  • JWT 验证:学习如何验证访问令牌 (Access tokens) 并提取认证 (Authentication) 信息
  • 中间件实现:创建可复用的中间件以保护 API
  • 权限模型:理解并实现不同的授权 (Authorization) 模式:
    • 应用级端点的全局 API 资源
    • 用于租户特定功能控制的组织权限
    • 多租户数据访问的组织级 API 资源
  • RBAC 集成:在你的 API 端点中强制执行基于角色的权限 (Permissions) 和权限范围 (Scopes)

前置条件

  • 已安装 Rust 的最新稳定版本
  • 基本了解 Rocket 及 Web API 开发
  • 已配置 Logto 应用(如有需要请参见 快速开始

权限 (Permission) 模型概览

在实施保护之前,请选择适合你应用架构的权限 (Permission) 模型。这与 Logto 的三大授权 (Authorization) 场景保持一致:

全局 API 资源 RBAC
  • 使用场景: 保护整个应用共享的 API 资源(非组织 (Organization) 专属)
  • 令牌类型: 具有全局受众 (Audience) 的访问令牌 (Access token)
  • 示例: 公共 API、核心产品服务、管理端点
  • 最适合: 所有客户都使用 API 的 SaaS 产品、无租户隔离的微服务
  • 了解更多: 保护全局 API 资源

💡 在继续之前选择你的模型 —— 本指南后续内容将以你选择的方式为参考。

快速准备步骤

配置 Logto 资源和权限

  1. 创建 API 资源:前往 控制台 → API 资源 并注册你的 API(例如,https://api.yourapp.com
  2. 定义权限:添加如 read:productswrite:orders 等权限(Scopes)——参见 定义带权限的 API 资源
  3. 创建全局角色:前往 控制台 → 角色 并创建包含你的 API 权限的角色——参见 配置全局角色
  4. 分配角色:将角色分配给需要访问 API 的用户或 M2M 应用程序
初次接触基于角色的访问控制 (RBAC)?:

从我们的 基于角色的访问控制 (RBAC) 指南 开始,获取分步设置说明。

更新你的客户端应用

在客户端请求合适的权限(Scopes):

通常需要在客户端配置中更新以下一项或多项:

  • OAuth 流程中的 scope 参数
  • 用于 API 资源访问的 resource 参数
  • 用于组织上下文的 organization_id
编码前须知:

请确保你测试的用户或 M2M 应用已被分配包含所需 API 权限的合适角色或组织角色。

初始化你的 API 项目

要初始化一个新的 Rocket 项目,创建一个目录并设置基本结构:

cargo new your-api-name
cd your-api-name

在你的 Cargo.toml 中添加 Rocket 依赖:

Cargo.toml
[dependencies]
rocket = { version = "0.5", features = ["json"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"

创建一个基础的 Rocket 应用程序:

src/main.rs
use rocket::{get, launch, routes, serde::json::Json};
use serde_json::{json, Value};

#[get("/")]
fn hello_handler() -> Json<Value> {
Json(json!({ "message": "Hello from Rocket" }))
}

#[launch]
fn rocket() -> _ {
rocket::build()
.mount("/", routes![hello_handler])
}

启动开发服务器:

cargo run
备注:

更多关于如何设置路由、请求守卫以及其他功能的详细信息,请参考 Rocket 文档。

初始化常量和工具方法

在你的代码中定义必要的常量和工具函数,用于处理令牌的提取和校验。一个有效的请求必须包含 Authorization 请求头,格式为 Bearer <访问令牌 (Access token)>

lib.rs
use serde::{Deserialize, Serialize};
use std::fmt;

pub const JWKS_URI: &str = "https://your-tenant.logto.app/oidc/jwks";
pub const ISSUER: &str = "https://your-tenant.logto.app/oidc";

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AuthInfo {
pub sub: String,
pub client_id: Option<String>,
pub organization_id: Option<String>,
pub scopes: Vec<String>,
pub audience: Vec<String>,
}

impl AuthInfo {
pub fn new(
sub: String,
client_id: Option<String>,
organization_id: Option<String>,
scopes: Vec<String>,
audience: Vec<String>,
) -> Self {
Self {
sub,
client_id,
organization_id,
scopes,
audience,
}
}
}

#[derive(Debug)]
pub struct AuthorizationError {
pub message: String,
pub status_code: u16,
}

impl AuthorizationError {
pub fn new(message: impl Into<String>) -> Self {
Self {
message: message.into(),
status_code: 403,
}
}

pub fn with_status(message: impl Into<String>, status_code: u16) -> Self {
Self {
message: message.into(),
status_code,
}
}
}

impl fmt::Display for AuthorizationError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.message)
}
}

impl std::error::Error for AuthorizationError {}

pub fn extract_bearer_token(authorization: Option<&str>) -> Result<&str, AuthorizationError> {
let auth_header = authorization.ok_or_else(|| {
AuthorizationError::with_status("Authorization header is missing", 401)
})?;

if !auth_header.starts_with("Bearer ") {
return Err(AuthorizationError::with_status(
"Authorization header must start with \"Bearer \"",
401,
));
}

Ok(&auth_header[7..]) // 移除 'Bearer ' 前缀
}

获取你的 Logto 租户信息

你需要以下数值来验证 Logto 签发的令牌:

  • JSON Web Key Set (JWKS) URI:Logto 公钥的 URL,用于验证 JWT 签名。
  • 发行者 (Issuer):期望的发行者 (Issuer) 值(Logto 的 OIDC URL)。

首先,找到你的 Logto 租户的端点。你可以在多个地方找到它:

  • 在 Logto 控制台的 设置域名 下。
  • 在你在 Logto 配置的任何应用程序设置中,设置端点与凭证

从 OpenID Connect 发现端点获取

这些数值可以从 Logto 的 OpenID Connect 发现端点获取:

https://<your-logto-endpoint>/oidc/.well-known/openid-configuration

以下是一个示例响应(为简洁省略了其他字段):

{
"jwks_uri": "https://your-tenant.logto.app/oidc/jwks",
"issuer": "https://your-tenant.logto.app/oidc"
}

由于 Logto 不允许自定义 JWKS URI 或发行者 (Issuer),你可以在代码中硬编码这些数值。但对于生产环境的应用程序,这并不推荐,因为如果将来某些配置发生变化,可能会增加维护成本。

  • JWKS URI: https://<your-logto-endpoint>/oidc/jwks
  • 发行者 (Issuer): https://<your-logto-endpoint>/oidc

校验令牌和权限

在提取令牌并获取 OIDC 配置后,请验证以下内容:

  • 签名: JWT 必须有效且由 Logto(通过 JWKS)签名。
  • 发行者 (Issuer): 必须与你的 Logto 租户的发行者 (Issuer) 匹配。
  • 受众 (Audience): 必须与你在 Logto 中注册的 API 的资源指示器 (resource indicator) 匹配,或在适用时匹配组织 (organization) 上下文。
  • 过期时间: 令牌必须未过期。
  • 权限 (Scopes): 令牌必须包含你的 API / 操作所需的权限 (scopes)。权限 (scopes) 是 scope 声明中的以空格分隔的字符串。
  • 组织 (Organization) 上下文: 如果保护的是组织级 API 资源,请验证 organization_id 声明。

参见 JSON Web Token 以了解更多关于 JWT 结构和声明 (Claims) 的信息。

针对每种权限 (Permission) 模型需要检查什么

不同的权限 (Permission) 模型,其声明 (Claims) 和验证规则也不同:

  • 受众 (Audience) 声明 (aud): API 资源指示器 (resource indicator)
  • 组织 (Organization) 声明 (organization_id): 不存在
  • 需要检查的权限 (Scopes) (scope): API 资源权限 (permissions)

对于非 API 的组织 (Organization) 权限 (Permissions),组织上下文由 aud 声明表示 (例如,urn:logto:organization:abc123)。organization_id 声明仅在组织级 API 资源令牌中存在。

提示:

对于安全的多租户 API,请始终同时验证权限 (scopes) 和上下文(受众 (audience)、组织 (organization))。

添加校验逻辑

我们使用 jsonwebtoken 来验证 JWT。请在你的 Cargo.toml 中添加所需依赖:

Cargo.toml
[dependencies]
jsonwebtoken = "9.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
reqwest = { version = "0.11", features = ["json"] }
tokio = { version = "1.0", features = ["full"] }

首先,添加这些用于处理 JWT 验证的通用工具:

jwt_validator.rs
use crate::{AuthInfo, AuthorizationError, ISSUER, JWKS_URI};
use jsonwebtoken::{decode, decode_header, Algorithm, DecodingKey, Validation};
use serde_json::Value;
use std::collections::HashMap;

pub struct JwtValidator {
jwks: HashMap<String, DecodingKey>,
}

impl JwtValidator {
pub async fn new() -> Result<Self, AuthorizationError> {
let jwks = Self::fetch_jwks().await?;
Ok(Self { jwks })
}

async fn fetch_jwks() -> Result<HashMap<String, DecodingKey>, AuthorizationError> {
let response = reqwest::get(JWKS_URI).await.map_err(|e| {
AuthorizationError::with_status(format!("Failed to fetch JWKS: {}", e), 401)
})?;

let jwks: Value = response.json().await.map_err(|e| {
AuthorizationError::with_status(format!("Failed to parse JWKS: {}", e), 401)
})?;

let mut keys = HashMap::new();

if let Some(keys_array) = jwks["keys"].as_array() {
for key in keys_array {
if let (Some(kid), Some(kty), Some(n), Some(e)) = (
key["kid"].as_str(),
key["kty"].as_str(),
key["n"].as_str(),
key["e"].as_str(),
) {
if kty == "RSA" {
if let Ok(decoding_key) = DecodingKey::from_rsa_components(n, e) {
keys.insert(kid.to_string(), decoding_key);
}
}
}
}
}

if keys.is_empty() {
return Err(AuthorizationError::with_status("No valid keys found in JWKS", 401));
}

Ok(keys)
}

pub fn validate_jwt(&self, token: &str) -> Result<AuthInfo, AuthorizationError> {
let header = decode_header(token).map_err(|e| {
AuthorizationError::with_status(format!("Invalid token header: {}", e), 401)
})?;

let kid = header.kid.ok_or_else(|| {
AuthorizationError::with_status("Token missing kid claim", 401)
})?;

let key = self.jwks.get(&kid).ok_or_else(|| {
AuthorizationError::with_status("Unknown key ID", 401)
})?;

let mut validation = Validation::new(Algorithm::RS256);
validation.set_issuer(&[ISSUER]);
validation.validate_aud = false; // 我们会手动验证受众 (Audience)

let token_data = decode::<Value>(token, key, &validation).map_err(|e| {
AuthorizationError::with_status(format!("Invalid token: {}", e), 401)
})?;

let claims = token_data.claims;
self.verify_payload(&claims)?;

Ok(self.create_auth_info(claims))
}

fn verify_payload(&self, claims: &Value) -> Result<(), AuthorizationError> {
// 在这里根据权限模型实现你的验证逻辑
// 具体内容将在下方权限模型部分展示
Ok(())
}

fn create_auth_info(&self, claims: Value) -> AuthInfo {
let scopes = claims["scope"]
.as_str()
.map(|s| s.split(' ').map(|s| s.to_string()).collect())
.unwrap_or_default();

let audience = match &claims["aud"] {
Value::Array(arr) => arr.iter().filter_map(|v| v.as_str().map(|s| s.to_string())).collect(),
Value::String(s) => vec![s.clone()],
_ => vec![],
};

AuthInfo::new(
claims["sub"].as_str().unwrap_or_default().to_string(),
claims["client_id"].as_str().map(|s| s.to_string()),
claims["organization_id"].as_str().map(|s| s.to_string()),
scopes,
audience,
)
}
}

然后,实现中间件以验证访问令牌 (Access token):

guards.rs
use crate::{AuthInfo, AuthorizationError, extract_bearer_token};
use crate::jwt_validator::JwtValidator;
use rocket::{
http::Status,
outcome::Outcome,
request::{self, FromRequest, Request},
State,
};

#[rocket::async_trait]
impl<'r> FromRequest<'r> for AuthInfo {
type Error = AuthorizationError;

async fn from_request(req: &'r Request<'_>) -> request::Outcome<Self, Self::Error> {
let validator = match req.guard::<&State<JwtValidator>>().await {
Outcome::Success(validator) => validator,
Outcome::Failure((status, _)) => {
return Outcome::Failure((
status,
AuthorizationError::with_status("未找到 JWT 校验器 (JWT validator not found)", 500),
))
}
Outcome::Forward(()) => {
return Outcome::Forward(())
}
};

let authorization = req.headers().get_one("authorization");

match extract_bearer_token(authorization)
.and_then(|token| validator.validate_jwt(token))
{
Ok(auth_info) => Outcome::Success(auth_info),
Err(e) => {
let status = Status::from_code(e.status_code).unwrap_or(Status::Forbidden);
Outcome::Failure((status, e))
}
}
}
}

根据你的权限模型,在 JwtValidator 中实现相应的验证逻辑:

jwt_validator.rs
fn verify_payload(&self, claims: &Value) -> Result<(), AuthorizationError> {
// 检查受众 (Audience) 声明是否匹配你的 API 资源指示器
let audiences = match &claims["aud"] {
Value::Array(arr) => arr.iter().filter_map(|v| v.as_str()).collect::<Vec<_>>(),
Value::String(s) => vec![s.as_str()],
_ => vec![],
};

if !audiences.contains(&"https://your-api-resource-indicator") {
return Err(AuthorizationError::new("Invalid audience"));
}

// 检查全局 API 资源所需的权限 (Scopes)
let required_scopes = vec!["api:read", "api:write"]; // 替换为你实际需要的权限
let scopes = claims["scope"]
.as_str()
.map(|s| s.split(' ').collect::<Vec<_>>())
.unwrap_or_default();

for required_scope in &required_scopes {
if !scopes.contains(required_scope) {
return Err(AuthorizationError::new("Insufficient scope"));
}
}

Ok(())
}

将中间件应用到你的 API

现在,将中间件应用到你的受保护 API 路由。

main.rs
use rocket::{get, launch, routes, serde::json::Json};
use serde_json::{json, Value};

mod lib;
mod jwt_validator;
mod guards;

use lib::AuthInfo;
use jwt_validator::JwtValidator;

#[get("/api/protected")]
fn protected_handler(auth: AuthInfo) -> Json<Value> {
// 直接从请求守卫中访问认证 (Authentication) 信息
Json(json!({ "auth": auth }))
}

#[launch]
async fn rocket() -> _ {
let validator = JwtValidator::new().await.expect("初始化 JWT 验证器失败");

rocket::build()
.manage(validator)
.mount("/", routes![protected_handler])
}

测试你的受保护 API

获取访问令牌 (Access tokens)

从你的客户端应用程序获取: 如果你已经完成了客户端集成,你的应用可以自动获取令牌。提取访问令牌 (Access token),并在 API 请求中使用它。

使用 curl / Postman 进行测试:

  1. 用户令牌: 使用你的客户端应用的开发者工具,从 localStorage 或网络面板复制访问令牌 (Access token)

  2. 机器对机器令牌: 使用客户端凭证流。以下是一个使用 curl 的非规范示例:

    curl -X POST https://your-tenant.logto.app/oidc/token \
    -H "Content-Type: application/x-www-form-urlencoded" \
    -d "grant_type=client_credentials" \
    -d "client_id=your-m2m-client-id" \
    -d "client_secret=your-m2m-client-secret" \
    -d "resource=https://your-api-resource-indicator" \
    -d "scope=api:read api:write"

    你可能需要根据你的 API 资源和权限调整 resourcescope 参数;如果你的 API 是组织范围的,还可能需要 organization_id 参数。

提示:

需要查看令牌内容?使用我们的 JWT 解码器 来解码和验证你的 JWT。

测试受保护的端点

有效令牌请求
curl -H "Authorization: Bearer eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9..." \
http://localhost:3000/api/protected

预期响应:

{
"auth": {
"sub": "user123",
"clientId": "app456",
"organizationId": "org789",
"scopes": ["api:read", "api:write"],
"audience": ["https://your-api-resource-indicator"]
}
}
缺少令牌
curl http://localhost:3000/api/protected

预期响应 (401):

{
"error": "Authorization header is missing"
}
无效令牌
curl -H "Authorization: Bearer invalid-token" \
http://localhost:3000/api/protected

预期响应 (401):

{
"error": "Invalid token"
}

权限模型相关测试

针对受全局权限保护的 API 的测试场景:

  • 有效权限 (Scopes): 使用包含所需 API 权限(如 api:readapi:write)的令牌进行测试
  • 缺少权限 (Scopes): 当令牌缺少所需权限时,预期返回 403 Forbidden
  • 受众 (Audience) 错误: 当受众与 API 资源不匹配时,预期返回 403 Forbidden
# 缺少权限的令牌 - 预期 403
curl -H "Authorization: Bearer token-without-required-scopes" \
http://localhost:3000/api/protected

延伸阅读

RBAC 实践:为你的应用实现安全授权 (Authorization)

构建多租户 SaaS 应用:从设计到实现的完整指南