1
0
Fork 0
mirror of https://github.com/element-hq/synapse.git synced 2025-03-31 03:45:13 +00:00

Add support for expires_in when introspecting tokens with MAS.

This commit is contained in:
Olivier 'reivilibre 2025-03-14 15:57:42 +00:00
parent 0c02022a07
commit e007d640ab

View file

@ -19,6 +19,7 @@
#
#
import logging
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional
from urllib.parse import urlencode
@ -77,6 +78,61 @@ def scope_to_list(scope: str) -> List[str]:
return scope.strip().split(" ")
@dataclass
class IntrospectionResult:
_inner: IntrospectionToken
# when we retrieved this token,
# in milliseconds since the Unix epoch
retrieved_at_ms: int
def is_active(self, now_ms: int) -> bool:
if not self._inner.get("active"):
return False
expires_in = self._inner.get("expires_in")
if expires_in is None:
return True
if not isinstance(expires_in, int):
raise InvalidClientTokenError("token `expires_in` is not an int")
absolute_expiry_ms = expires_in * 1000 + self.retrieved_at_ms
return now_ms < absolute_expiry_ms
def get_scope_list(self) -> List[str]:
value = self._inner.get("scope")
if not isinstance(value, str):
return []
return scope_to_list(value)
def get_sub(self) -> Optional[str]:
value = self._inner.get("sub")
if not isinstance(value, str):
return None
return value
def get_username(self) -> Optional[str]:
value = self._inner.get("username")
if not isinstance(value, str):
return None
return value
def get_name(self) -> Optional[str]:
value = self._inner.get("name")
if not isinstance(value, str):
return None
return value
def get_device_id(self) -> Optional[str]:
value = self._inner.get("device_id")
if value is not None and not isinstance(value, str):
raise AuthError(
500,
"Invalid device ID in introspection result",
)
return value
class PrivateKeyJWTWithKid(PrivateKeyJWT): # type: ignore[misc]
"""An implementation of the private_key_jwt client auth method that includes a kid header.
@ -219,7 +275,7 @@ class MSC3861DelegatedAuth(BaseAuth):
metadata = await self._issuer_metadata.get()
return metadata.get("introspection_endpoint")
async def _introspect_token(self, token: str) -> IntrospectionToken:
async def _introspect_token(self, token: str) -> IntrospectionResult:
"""
Send a token to the introspection endpoint and returns the introspection response
@ -292,7 +348,9 @@ class MSC3861DelegatedAuth(BaseAuth):
"The introspection endpoint returned an invalid JSON response."
)
return IntrospectionToken(**resp)
return IntrospectionResult(
IntrospectionToken(**resp), retrieved_at_ms=self._clock.time_msec()
)
async def is_server_admin(self, requester: Requester) -> bool:
return "urn:synapse:admin:*" in requester.scope
@ -381,11 +439,11 @@ class MSC3861DelegatedAuth(BaseAuth):
# TODO: introspection verification should be more extensive, especially:
# - verify the audience
if not introspection_result.get("active"):
if not introspection_result.is_active(self._clock.time_msec()):
raise InvalidClientTokenError("Token is not active")
# Let's look at the scope
scope: List[str] = scope_to_list(introspection_result.get("scope", ""))
scope: List[str] = introspection_result.get_scope_list()
# Determine type of user based on presence of particular scopes
has_user_scope = SCOPE_MATRIX_API in scope
@ -395,7 +453,7 @@ class MSC3861DelegatedAuth(BaseAuth):
raise InvalidClientTokenError("No scope in token granting user rights")
# Match via the sub claim
sub: Optional[str] = introspection_result.get("sub")
sub: Optional[str] = introspection_result.get_sub()
if sub is None:
raise InvalidClientTokenError(
"Invalid sub claim in the introspection result"
@ -409,7 +467,7 @@ class MSC3861DelegatedAuth(BaseAuth):
# or the external_id was never recorded
# TODO: claim mapping should be configurable
username: Optional[str] = introspection_result.get("username")
username: Optional[str] = introspection_result.get_username()
if username is None or not isinstance(username, str):
raise AuthError(
500,
@ -427,7 +485,7 @@ class MSC3861DelegatedAuth(BaseAuth):
# TODO: claim mapping should be configurable
# If present, use the name claim as the displayname
name: Optional[str] = introspection_result.get("name")
name: Optional[str] = introspection_result.get_name()
await self.store.register_user(
user_id=user_id.to_string(), create_profile_with_displayname=name
@ -442,15 +500,8 @@ class MSC3861DelegatedAuth(BaseAuth):
# MAS 0.15+ will give us the device ID as an explicit value for compatibility sessions
# If present, we get it from here, if not we get it in thee scope
device_id = introspection_result.get("device_id")
if device_id is not None:
# We got the device ID explicitly, just sanity check that it's a string
if not isinstance(device_id, str):
raise AuthError(
500,
"Invalid device ID in introspection result",
)
else:
device_id = introspection_result.get_device_id()
if device_id is None:
# Find device_ids in scope
# We only allow a single device_id in the scope, so we find them all in the
# scope list, and raise if there are more than one. The OIDC server should be