Source code for keria.core.authing

# -*- encoding: utf-8 -*-
"""
KERIA
keria.core.authing module

"""
from urllib.parse import quote, unquote
import falcon
from hio.help import Hict
from keri import kering
from keri.end import ending
from keri.help import helping


class Authenticater:

    DefaultFields = ["Signify-Resource",
                     "@method",
                     "@path",
                     "Signify-Timestamp"]

    def __init__(self, agency):
        """ Create Agent Authenticator for verifying requests and signing responses

        Parameters:
            agency(Agency): habitat of Agent for signing responses

        Returns:
              Authenicator:  the configured habery

        """
        self.agency = agency

    @staticmethod
    def resource(request):
        headers = request.headers
        if "SIGNIFY-RESOURCE" not in headers:
            raise ValueError("Missing signify resource header")

        return headers["SIGNIFY-RESOURCE"]

    def verify(self, request):
        headers = request.headers
        if "SIGNATURE-INPUT" not in headers or "SIGNATURE" not in headers:
            return False

        siginput = headers["SIGNATURE-INPUT"]
        if not siginput:
            return False
        signature = headers["SIGNATURE"]
        if not signature:
            return False

        inputs = ending.desiginput(siginput.encode("utf-8"))
        inputs = [i for i in inputs if i.name == "signify"]

        if not inputs:
            return False

        for inputage in inputs:
            items = []
            for field in inputage.fields:
                if field.startswith("@"):
                    if field == "@method":
                        items.append(f'"{field}": {request.method}')
                    elif field == "@path":
                        items.append(f'"{field}": {request.path}')

                else:
                    key = field.upper()
                    field = field.lower()
                    if key not in headers:
                        continue

                    value = ending.normalize(headers[key])
                    items.append(f'"{field}": {value}')

            values = [f"({' '.join(inputage.fields)})", f"created={inputage.created}"]
            if inputage.expires is not None:
                values.append(f"expires={inputage.expires}")
            if inputage.nonce is not None:
                values.append(f"nonce={inputage.nonce}")
            if inputage.keyid is not None:
                values.append(f"keyid={inputage.keyid}")
            if inputage.context is not None:
                values.append(f"context={inputage.context}")
            if inputage.alg is not None:
                values.append(f"alg={inputage.alg}")

            params = ';'.join(values)

            items.append(f'"@signature-params: {params}"')
            ser = "\n".join(items).encode("utf-8")

            resource = self.resource(request)
            agent = self.agency.get(resource)

            if agent is None:
                raise kering.AuthNError("unknown or invalid controller")

            if resource not in agent.agentHab.kevers:
                raise kering.AuthNError("unknown or invalid controller")

            ckever = agent.agentHab.kevers[resource]
            signages = ending.designature(signature)
            cig = signages[0].markers[inputage.name]
            if not ckever.verfers[0].verify(sig=cig.raw, ser=ser):
                raise kering.AuthNError(f"Signature for {inputage} invalid")

        return True

    def sign(self, agent, headers, method, path, fields=None):
        """ Generate and add Signature Input and Signature fields to headers

        Parameters:
            agent (Agent): The agent that is replying to the request
            headers (Hict): HTTP header to sign
            method (str): HTTP method name of request/response
            path (str): HTTP Query path of request/response
            fields (Optional[list]): Optional list of Signature Input fields to sign.

        Returns:
            headers (Hict): Modified headers with new Signature and Signature Input fields

        """

        if fields is None:
            fields = self.DefaultFields

        header, qsig = ending.siginput("signify", method, path, headers, fields=fields, hab=agent.agentHab,
                                       alg="ed25519", keyid=agent.agentHab.pre)
        headers.extend(header)
        signage = ending.Signage(markers=dict(signify=qsig), indexed=False, signer=None, ordinal=None, digest=None,
                                 kind=None)
        headers.extend(ending.signature([signage]))

        return headers


[docs] class SignatureValidationComponent(object): """ Validate Signature and Signature-Input header signatures """ def __init__(self, agency, authn: Authenticater, allowed=None): """ Parameters: authn (Authenticater): Authenticator to validate signature headers on request allowed (list[str]): Paths that are not protected. """ if allowed is None: allowed = [] self.agency = agency self.authn = authn self.allowed = allowed
[docs] def process_request(self, req, resp): """ Process request to ensure has a valid signature from caid Parameters: req: Http request object resp: Http response object """ for path in self.allowed: if req.path.startswith(path): return req.path = quote(req.path) try: # Use Authenticater to verify the signature on the request if self.authn.verify(req): req.path = unquote(req.path) resource = self.authn.resource(req) agent = self.agency.get(caid=resource) req.context.agent = agent return except kering.AuthNError: pass except ValueError: pass resp.complete = True # This short-circuits Falcon, skipping all further processing resp.status = falcon.HTTP_401 return
[docs] def process_response(self, req, rep, resource, req_succeeded): """ Process every falcon response by adding signature headers signed by the Agent AID. Parameters: req (Request): Falcon request object rep (Response): Falcon response object resource (End): endpoint object the request was routed to req_succeeded (boot): True means the request was successfully handled """ if hasattr(req.context, "agent"): req.path = quote(req.path) agent = req.context.agent rep.set_header('Signify-Resource', agent.agentHab.pre) rep.set_header('Signify-Timestamp', helping.nowIso8601()) headers = self.authn.sign(agent, Hict(rep.headers), req.method, req.path) for key, val in headers.items(): rep.set_header(key, val)