Skip to content
Snippets Groups Projects

add an endpoint that will sign with a ca in the path and just give all available authority

Merged Chris Hines requested to merge newendpoint into main
2 files
+ 52
7
Compare changes
  • Side-by-side
  • Inline
Files
2
+ 37
5
import json
import urllib.request
from fastapi import FastAPI
from fastapi import FastAPI, Body
from typing import Annotated, Union
from fastapi import Depends, FastAPI
from jose import jwt
@@ -70,10 +70,17 @@ app = FastAPI(middleware=[
# from the format string specified in the config
@app.post("/token", response_model=Token)
async def login_for_access_token(token_request: AccessTokenRequest):
import logging
logger = logging.getLogger()
try:
payload = jwt.decode(token_request.oidc_token, jwks, algorithms=config.algorithm,
audience=config.audience, options={'verify_at_hash': False})
payload = jwt.decode(token_request.idToken, jwks, algorithms=config.algorithm,
audience=config.audience, access_token=token_request.accessToken)
except JWTError as e:
import traceback
logger.error(e)
logger.error(traceback.format_exc())
logger.error(token_request.idToken)
logger.error(token_request.accessToken)
raise credentials_exception
user_id = config.id_format.format(**payload)
access_data = get_authz_from_config(user_id, config)
@@ -165,13 +172,21 @@ def validate_request(sign_request: SigningRequest, access_params: AuthzParams):
async def sign_pub_key(pubkey, signcmd) -> str:
# Use the ssh-keygen command to sign a public key into a certificate
import logging
logger = logging.getLogger()
with tempfile.NamedTemporaryFile() as f:
f.write(pubkey.encode())
cert_id = f.name
f.flush()
signcmd.extend(['-I', cert_id, f.name])
p = subprocess.Popen(signcmd)
p.communicate()
p = subprocess.Popen(signcmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(stderr, stdout) = p.communicate()
if p.returncode != 0:
if b'not found in agent' in stderr:
logger.error('please add the keys to the agent before starting')
raise key_not_found_exception
logger.error(f'failed to run {" ".join(signcmd)}')
raise
with open(f.name+'-cert.pub', 'r') as certf:
cert = certf.read()
os.unlink(f.name+'-cert.pub')
@@ -195,3 +210,20 @@ async def sign(sign_request: SigningRequest, access_params: Annotated[AuthzParam
cert = await sign_pub_key(sign_request.pubkey, signcmd)
return cert
@app.post('/signall/{ca}')
async def sign_all(ca: str, public_key: Annotated[str, Body()], end: Annotated[str, Body()], access_params: Annotated[AuthzParams, Depends(get_authz_from_token)]):
# Input a Signing Request, which consists of a public key and the parameters we would like to sign with
# Also the Auth token (which encodes the parameters we are allowed to sign with)
# Sign the key and return the certificate
# Note that requests to sign a host key are slightly different than requests to sign a user access key
print(f'called signall {ca}, {public_key}, {end}')
period = (datetime.strptime(end,'%Y-%m-%dT%H:%M:%S.%fZ')-datetime.now()).total_seconds()
if ca in access_params.authz:
authz = access_params.authz[ca]
if type(authz) == UserCertParams:
sign_request = SigningRequest(ca = ca, pubkey=public_key, params=authz)
sign_request.params.period = int(period)
cert = sign(sign_request, access_params)
return cert
raise invalid_request_exception
\ No newline at end of file
Loading