# -*- encoding: utf-8 -*-
"""
KERIA
keria.app.agenting module
"""
import json
import os
from dataclasses import asdict
from urllib.parse import urlparse, urljoin
from keri import kering
from keri.app.notifying import Notifier
from keri.app.storing import Mailboxer
import falcon
from falcon import media
from hio.base import doing
from hio.core import http, tcp
from hio.help import decking
from keri.app import configing, keeping, habbing, storing, signaling, oobiing, agenting, \
forwarding, querying, connecting, grouping
from keri.app.grouping import Counselor
from keri.app.keeping import Algos
from keri.core import coring, parsing, eventing, routing, serdering
from keri.core.coring import Ilks, randomNonce
from keri.db import dbing
from keri.db.basing import OobiRecord
from keri.vc import protocoling
from keria.end import ending
from keri.help import helping, ogler, nowIso8601
from keri.peer import exchanging
from keri.vdr import verifying
from keri.vdr.credentialing import Regery, sendArtifacts
from keri.vdr.eventing import Tevery
from keri.app import challenging
from . import aiding, notifying, indirecting, credentialing, ipexing, delegating
from . import grouping as keriagrouping
from ..peer import exchanging as keriaexchanging
from .specing import AgentSpecResource
from ..core import authing, longrunning, httping
from ..core.authing import Authenticater
from ..core.keeping import RemoteManager
from ..db import basing
logger = ogler.getLogger()
[docs]
def setup(name, bran, adminPort, bootPort, base='', httpPort=None, configFile=None, configDir=None,
keypath=None, certpath=None, cafilepath=None):
""" Set up an ahab in Signify mode """
agency = Agency(name=name, base=base, bran=bran, configFile=configFile, configDir=configDir)
bootApp = falcon.App(middleware=falcon.CORSMiddleware(
allow_origins='*', allow_credentials='*',
expose_headers=['cesr-attachment', 'cesr-date', 'content-type', 'signature', 'signature-input',
'signify-resource', 'signify-timestamp']))
bootServer = createHttpServer(bootPort, bootApp, keypath, certpath, cafilepath)
if not bootServer.reopen():
raise RuntimeError(f"cannot create boot http server on port {bootPort}")
bootServerDoer = http.ServerDoer(server=bootServer)
bootEnd = BootEnd(agency)
bootApp.add_route("/boot", bootEnd)
bootApp.add_route("/health", HealthEnd())
# Create Authenticater for verifying signatures on all requests
authn = Authenticater(agency=agency)
app = falcon.App(middleware=falcon.CORSMiddleware(
allow_origins='*', allow_credentials='*',
expose_headers=['cesr-attachment', 'cesr-date', 'content-type', 'signature', 'signature-input',
'signify-resource', 'signify-timestamp']))
if os.getenv("KERI_AGENT_CORS", "false").lower() in ("true", "1"):
app.add_middleware(middleware=httping.HandleCORS())
app.add_middleware(authing.SignatureValidationComponent(agency=agency, authn=authn, allowed=["/agent"]))
app.req_options.media_handlers.update(media.Handlers())
app.resp_options.media_handlers.update(media.Handlers())
adminServer = createHttpServer(adminPort, app, keypath, certpath, cafilepath)
if not adminServer.reopen():
raise RuntimeError(f"cannot create admin http server on port {adminPort}")
adminServerDoer = http.ServerDoer(server=adminServer)
doers = [agency, bootServerDoer, adminServerDoer]
loadEnds(app=app)
aidEnd = aiding.loadEnds(app=app, agency=agency, authn=authn)
credentialing.loadEnds(app=app, identifierResource=aidEnd)
notifying.loadEnds(app=app)
keriagrouping.loadEnds(app=app)
keriaexchanging.loadEnds(app=app)
ipexing.loadEnds(app=app)
if httpPort:
happ = falcon.App(middleware=falcon.CORSMiddleware(
allow_origins='*', allow_credentials='*',
expose_headers=['cesr-attachment', 'cesr-date', 'content-type', 'signature', 'signature-input',
'signify-resource', 'signify-timestamp']))
happ.req_options.media_handlers.update(media.Handlers())
happ.resp_options.media_handlers.update(media.Handlers())
ending.loadEnds(agency=agency, app=happ)
indirecting.loadEnds(agency=agency, app=happ)
server = createHttpServer(httpPort, happ, keypath, certpath, cafilepath)
if not server.reopen():
raise RuntimeError(f"cannot create local http server on port {httpPort}")
httpServerDoer = http.ServerDoer(server=server)
doers.append(httpServerDoer)
swagsink = http.serving.StaticSink(staticDirPath="./static")
happ.add_sink(swagsink, prefix="/swaggerui")
specEnd = AgentSpecResource(app=app, title='KERIA Interactive Web Interface API')
specEnd.addRoutes(happ)
happ.add_route("/spec.yaml", specEnd)
print("The Agency is loaded and waiting for requests...")
return doers
[docs]
def createHttpServer(port, app, keypath=None, certpath=None, cafilepath=None):
"""
Create an HTTP or HTTPS server depending on whether TLS key material is present
Parameters:
port (int) : port to listen on for all HTTP(s) server instances
app (falcon.App) : application instance to pass to the http.Server instance
keypath (string) : the file path to the TLS private key
certpath (string) : the file path to the TLS signed certificate (public key)
cafilepath (string): the file path to the TLS CA certificate chain file
Returns:
hio.core.http.Server
"""
if keypath is not None and certpath is not None and cafilepath is not None:
servant = tcp.ServerTls(certify=False,
keypath=keypath,
certpath=certpath,
cafilepath=cafilepath,
port=port)
server = http.Server(port=port, app=app, servant=servant)
else:
server = http.Server(port=port, app=app)
return server
[docs]
class Agency(doing.DoDoer):
"""
Agency
"""
def __init__(self, name, bran, base="", configFile=None, configDir=None, adb=None, temp=False):
self.name = name
self.base = base
self.bran = bran
self.temp = temp
self.configFile = configFile
self.configDir = configDir
self.cf = None
if self.configFile is not None: # Load config file if creating database
self.cf = configing.Configer(name=self.configFile,
base="",
headDirPath=self.configDir,
temp=False,
reopen=True,
clear=False)
self.agents = dict()
self.adb = adb if adb is not None else basing.AgencyBaser(name="TheAgency", base=base, reopen=True, temp=temp)
super(Agency, self).__init__(doers=[], always=True)
def create(self, caid):
ks = keeping.Keeper(name=caid,
base=self.base,
temp=self.temp,
reopen=True)
cf = None
if self.cf is not None: # Load config file if creating database
data = dict(self.cf.get())
if "keria" in data:
curls = data["keria"]
data[f"agent-{caid}"] = curls
del data["keria"]
cf = configing.Configer(name=f"{caid}",
base="",
human=False,
temp=self.temp,
reopen=True,
clear=False)
cf.put(data)
# Create the Hab for the Agent with only 2 AIDs
agentHby = habbing.Habery(name=caid, base=self.base, bran=self.bran, ks=ks, cf=cf, temp=self.temp)
agentHab = agentHby.makeHab(f"agent-{caid}", ns="agent", transferable=True, delpre=caid)
agentRgy = Regery(hby=agentHby, name=agentHab.name, base=self.base, temp=self.temp)
agent = Agent(agentHby, agentRgy, agentHab,
caid=caid,
agency=self,
configDir=self.configDir,
configFile=self.configFile)
res = self.adb.agnt.pin(keys=(caid,),
val=coring.Prefixer(qb64=agent.pre))
self.adb.ctrl.pin(keys=(agent.pre,),
val=coring.Prefixer(qb64=caid))
# add agent to cache
self.agents[caid] = agent
# start agents processes running
self.extend([agent])
return agent
def delete(self, agent):
self.adb.agnt.rem(key=agent.caid)
agent.hby.deleteHab(agent.caid)
agent.hby.ks.close(clear=True)
agent.hby.close(clear=True)
del self.agents[agent.caid]
def get(self, caid):
if caid in self.agents:
return self.agents[caid]
aaid = self.adb.agnt.get(keys=(caid,))
if aaid is None:
return None
ks = keeping.Keeper(name=caid,
base=self.base,
temp=self.temp,
reopen=True)
agentHby = habbing.Habery(name=caid, base=self.base, bran=self.bran, ks=ks, temp=self.temp)
agentHab = agentHby.habByName(f"agent-{caid}", ns="agent")
if aaid.qb64 != agentHab.pre:
raise kering.ConfigurationError(f"invalid agent aid={aaid.qb64}/{agentHab.pre} to controller aid={caid}")
agentRgy = Regery(hby=agentHby, name=agentHab.name, base=self.base, temp=self.temp)
agent = Agent(hby=agentHby, rgy=agentRgy, agentHab=agentHab, agency=self, caid=caid)
self.agents[caid] = agent
self.extend([agent])
return agent
def lookup(self, pre):
# Check to see if this is a managed AID
if (prefixer := self.adb.aids.get(keys=(pre,))) is not None:
caid = prefixer.qb64
# Or if its an agent AID
elif (prefixer := self.adb.ctrl.get(keys=(pre,))) is not None:
caid = prefixer.qb64
else:
return None
try:
return self.get(caid)
except kering.ConfigurationError:
return None
def incept(self, caid, pre):
self.adb.aids.pin(keys=(pre,), val=coring.Prefixer(qb64=caid))
[docs]
class Agent(doing.DoDoer):
"""
The top level object and DoDoer representing a Habery for a remote controller and all associated processing
"""
def __init__(self, hby, rgy, agentHab, agency, caid, **opts):
self.hby = hby
self.rgy = rgy
self.agentHab = agentHab
self.agency = agency
self.caid = caid
self.swain = delegating.Sealer(hby=hby, proxy=agentHab)
self.counselor = Counselor(hby=hby, swain=self.swain, proxy=agentHab)
self.org = connecting.Organizer(hby=hby)
oobiery = oobiing.Oobiery(hby=hby)
self.mgr = RemoteManager(hby=hby)
self.cues = decking.Deck()
self.groups = decking.Deck()
self.anchors = decking.Deck()
self.witners = decking.Deck()
self.queries = decking.Deck()
self.exchanges = decking.Deck()
self.grants = decking.Deck()
self.admits = decking.Deck()
receiptor = agenting.Receiptor(hby=hby)
self.witq = agenting.WitnessInquisitor(hby=self.hby)
self.witPub = agenting.WitnessPublisher(hby=self.hby)
self.witDoer = agenting.WitnessReceiptor(hby=self.hby)
self.rep = storing.Respondant(hby=hby, cues=self.cues, mbx=Mailboxer(name=self.hby.name, temp=self.hby.temp))
doers = [habbing.HaberyDoer(habery=hby), receiptor, self.witq, self.witPub, self.rep, self.swain,
self.counselor, self.witDoer, *oobiery.doers]
signaler = signaling.Signaler()
self.notifier = Notifier(hby=hby, signaler=signaler)
self.mux = grouping.Multiplexor(hby=hby, notifier=self.notifier)
# Initialize all the credential processors
self.verifier = verifying.Verifier(hby=hby, reger=rgy.reger)
self.registrar = credentialing.Registrar(agentHab=agentHab, hby=hby, rgy=rgy, counselor=self.counselor,
witPub=self.witPub, witDoer=self.witDoer, verifier=self.verifier)
self.credentialer = credentialing.Credentialer(agentHab=agentHab, hby=self.hby, rgy=self.rgy,
registrar=self.registrar, verifier=self.verifier,
notifier=self.notifier)
self.seeker = basing.Seeker(name=hby.name, db=hby.db, reger=self.rgy.reger, reopen=True, temp=self.hby.temp)
self.exnseeker = basing.ExnSeeker(name=hby.name, db=hby.db, reopen=True, temp=self.hby.temp)
challengeHandler = challenging.ChallengeHandler(db=hby.db, signaler=signaler)
handlers = [challengeHandler]
self.exc = exchanging.Exchanger(hby=hby, handlers=handlers)
grouping.loadHandlers(exc=self.exc, mux=self.mux)
protocoling.loadHandlers(hby=self.hby, exc=self.exc, notifier=self.notifier)
self.monitor = longrunning.Monitor(hby=hby, swain=self.swain, counselor=self.counselor, temp=hby.temp,
registrar=self.registrar, credentialer=self.credentialer, exchanger=self.exc)
self.rvy = routing.Revery(db=hby.db, cues=self.cues)
self.kvy = eventing.Kevery(db=hby.db,
lax=True,
local=False,
rvy=self.rvy,
cues=self.cues)
self.kvy.registerReplyRoutes(router=self.rvy.rtr)
self.tvy = Tevery(reger=self.verifier.reger,
db=hby.db,
local=False,
cues=self.cues)
self.tvy.registerReplyRoutes(router=self.rvy.rtr)
self.parser = parsing.Parser(framed=True,
kvy=self.kvy,
tvy=self.tvy,
exc=self.exc,
rvy=self.rvy,
vry=self.verifier)
doers.extend([
Initer(agentHab=agentHab, caid=caid),
Querier(hby=hby, agentHab=agentHab, kvy=self.kvy, queries=self.queries),
Escrower(kvy=self.kvy, rgy=self.rgy, rvy=self.rvy, tvy=self.tvy, exc=self.exc, vry=self.verifier,
registrar=self.registrar, credentialer=self.credentialer),
ParserDoer(kvy=self.kvy, parser=self.parser),
Witnesser(receiptor=receiptor, witners=self.witners),
Delegator(agentHab=agentHab, swain=self.swain, anchors=self.anchors),
ExchangeSender(hby=hby, agentHab=agentHab, exc=self.exc, exchanges=self.exchanges),
Granter(hby=hby, rgy=rgy, agentHab=agentHab, exc=self.exc, grants=self.grants),
Admitter(hby=hby, witq=self.witq, psr=self.parser, agentHab=agentHab, exc=self.exc, admits=self.admits),
GroupRequester(hby=hby, agentHab=agentHab, counselor=self.counselor, groups=self.groups),
SeekerDoer(seeker=self.seeker, cues=self.verifier.cues),
ExchangeCueDoer(seeker=self.exnseeker, cues=self.exc.cues, queries=self.queries)
])
super(Agent, self).__init__(doers=doers, always=True, **opts)
@property
def pre(self):
return self.agentHab.pre
def inceptSalty(self, pre, **kwargs):
keeper = self.mgr.get(Algos.salty)
keeper.incept(pre=pre, **kwargs)
self.agency.incept(self.caid, pre)
def inceptRandy(self, pre, verfers, digers, **kwargs):
keeper = self.mgr.get(Algos.randy)
keeper.incept(pre=pre, verfers=verfers, digers=digers, **kwargs)
self.agency.incept(self.caid, pre)
def inceptGroup(self, pre, mpre, verfers, digers):
keeper = self.mgr.get(Algos.group)
keeper.incept(pre=pre, mpre=mpre, verfers=verfers, digers=digers)
self.agency.incept(self.caid, pre)
def inceptExtern(self, pre, verfers, digers, **kwargs):
keeper = self.mgr.get(Algos.extern)
keeper.incept(pre=pre, verfers=verfers, digers=digers, **kwargs)
self.agency.incept(self.caid, pre)
[docs]
class ParserDoer(doing.Doer):
def __init__(self, kvy, parser):
self.kvy = kvy
self.parser = parser
super(ParserDoer, self).__init__()
[docs]
def recur(self, tyme=None):
if self.parser.ims:
logger.info("Agent %s received:\n%s\n...\n", self.kvy, self.parser.ims[:1024])
done = yield from self.parser.parsator() # process messages continuously
return done # should never get here except forced close
[docs]
class Witnesser(doing.Doer):
def __init__(self, receiptor, witners):
self.receiptor = receiptor
self.witners = witners
super(Witnesser, self).__init__()
[docs]
def recur(self, tyme=None):
while True:
if self.witners:
msg = self.witners.popleft()
serder = msg["serder"]
# If we are a rotation event, may need to catch new witnesses up to current key state
if serder.ked['t'] in (Ilks.rot, Ilks.drt):
adds = serder.ked["ba"]
for wit in adds:
yield from self.receiptor.catchup(serder.pre, wit)
yield from self.receiptor.receipt(serder.pre, serder.sn)
yield self.tock
[docs]
class Delegator(doing.Doer):
def __init__(self, agentHab, swain, anchors):
self.agentHab = agentHab
self.swain = swain
self.anchors = anchors
super(Delegator, self).__init__()
[docs]
def recur(self, tyme=None):
if self.anchors:
msg = self.anchors.popleft()
sn = msg["sn"] if "sn" in msg else None
self.swain.delegation(pre=msg["pre"], sn=sn, proxy=self.agentHab)
return False
[docs]
class ExchangeSender(doing.DoDoer):
def __init__(self, hby, agentHab, exc, exchanges):
self.hby = hby
self.agentHab = agentHab
self.exc = exc
self.exchanges = exchanges
super(ExchangeSender, self).__init__(always=True)
[docs]
def recur(self, tyme, deeds=None):
if self.exchanges:
msg = self.exchanges.popleft()
said = msg['said']
if not self.exc.complete(said=said):
self.exchanges.append(msg)
return super(ExchangeSender, self).recur(tyme, deeds)
serder, pathed = exchanging.cloneMessage(self.hby, said)
pre = msg["pre"]
rec = msg["rec"]
topic = msg['topic']
hab = self.hby.habs[pre]
if self.exc.lead(hab, said=said):
atc = exchanging.serializeMessage(self.hby, said)
del atc[:serder.size]
for recp in rec:
postman = forwarding.StreamPoster(hby=self.hby, hab=self.agentHab, recp=recp, topic=topic)
try:
postman.send(serder=serder,
attachment=atc)
except kering.ValidationError:
logger.info(f"unable to send to recipient={recp}")
else:
doer = doing.DoDoer(doers=postman.deliver())
self.extend([doer])
return super(ExchangeSender, self).recur(tyme, deeds)
[docs]
class Granter(doing.DoDoer):
def __init__(self, hby, rgy, agentHab, exc, grants):
self.hby = hby
self.rgy = rgy
self.agentHab = agentHab
self.exc = exc
self.grants = grants
super(Granter, self).__init__(always=True)
[docs]
def recur(self, tyme, deeds=None):
if self.grants:
msg = self.grants.popleft()
said = msg['said']
if not self.exc.complete(said=said):
self.grants.append(msg)
return super(Granter, self).recur(tyme, deeds)
serder, pathed = exchanging.cloneMessage(self.hby, said)
pre = msg["pre"]
rec = msg["rec"]
hab = self.hby.habs[pre]
if self.exc.lead(hab, said=said):
for recp in rec:
postman = forwarding.StreamPoster(hby=self.hby, hab=self.agentHab, recp=recp, topic="credential")
try:
credSaid = serder.ked['e']['acdc']['d']
creder = self.rgy.reger.creds.get(keys=(credSaid,))
sendArtifacts(self.hby, self.rgy.reger, postman, creder, recp)
sources = self.rgy.reger.sources(self.hby.db, creder)
for source, atc in sources:
sendArtifacts(self.hby, self.rgy.reger, postman, source, recp)
postman.send(serder=source, attachment=atc)
except kering.ValidationError:
logger.info(f"unable to send to recipient={recp}")
except KeyError:
logger.info(f"invalid grant message={serder.ked}")
else:
doer = doing.DoDoer(doers=postman.deliver())
self.extend([doer])
return super(Granter, self).recur(tyme, deeds)
[docs]
class Admitter(doing.Doer):
def __init__(self, hby, witq, psr, agentHab, exc, admits):
self.hby = hby
self.agentHab = agentHab
self.witq = witq
self.psr = psr
self.exc = exc
self.admits = admits
super(Admitter, self).__init__()
[docs]
def recur(self, tyme):
if self.admits:
msg = self.admits.popleft()
said = msg['said']
if not self.exc.complete(said=said):
self.admits.append(msg)
return False
admit, _ = exchanging.cloneMessage(self.hby, said)
if 'p' not in admit.ked or not admit.ked['p']:
print(f"Invalid admit message={admit.ked}, no grant listed")
return False
grant, pathed = exchanging.cloneMessage(self.hby, admit.ked['p'])
embeds = grant.ked['e']
acdc = embeds["acdc"]
issr = acdc['i']
# Lets get the latest KEL and Registry if needed
self.witq.query(hab=self.agentHab, pre=issr)
if "ri" in acdc:
self.witq.telquery(hab=self.agentHab, pre=issr, ri=acdc["ri"], i=acdc["d"])
for label in ("anc", "iss", "acdc"):
ked = embeds[label]
if label not in pathed or not pathed[label]:
print(f"missing path label {label}")
continue
sadder = coring.Sadder(ked=ked)
ims = bytearray(sadder.raw) + pathed[label]
self.psr.parseOne(ims=ims)
[docs]
class SeekerDoer(doing.Doer):
def __init__(self, seeker, cues):
self.seeker = seeker
self.cues = cues
super(SeekerDoer, self).__init__()
[docs]
def recur(self, tyme=None):
if self.cues:
cue = self.cues.popleft()
if cue["kin"] == "saved":
creder = cue["creder"]
try:
self.seeker.index(said=creder.said)
except Exception:
self.cues.append(cue)
return False
else:
self.cues.append(cue)
return False
[docs]
class ExchangeCueDoer(doing.Doer):
def __init__(self, seeker, cues, queries):
self.seeker = seeker
self.cues = cues
self.queries = queries
super(ExchangeCueDoer, self).__init__()
[docs]
def recur(self, tyme=None):
if self.cues:
cue = self.cues.popleft()
if cue["kin"] == "saved":
said = cue["said"]
try:
self.seeker.index(said=said)
except Exception:
self.cues.append(cue)
return False
elif cue["kin"] == "query":
self.queries.append(cue['q'])
return False
else:
self.cues.append(cue)
return False
[docs]
class Initer(doing.Doer):
def __init__(self, agentHab, caid):
self.agentHab = agentHab
self.caid = caid
super(Initer, self).__init__()
[docs]
def recur(self, tyme):
""" Prints Agent name and prefix """
if not self.agentHab.inited:
return False
print(" Agent:", self.agentHab.pre, " Controller:", self.caid)
return True
[docs]
class GroupRequester(doing.Doer):
def __init__(self, hby, agentHab, counselor, groups):
self.hby = hby
self.agentHab = agentHab
self.counselor = counselor
self.groups = groups
super(GroupRequester, self).__init__()
[docs]
def recur(self, tyme):
""" Checks cue for group proceccing requests and processes any with Counselor """
if self.groups:
msg = self.groups.popleft()
serder = msg["serder"]
ghab = self.hby.habs[serder.pre]
prefixer = coring.Prefixer(qb64=serder.pre)
seqner = coring.Seqner(sn=serder.sn)
saider = coring.Saider(qb64=serder.said)
self.counselor.start(ghab=ghab, prefixer=prefixer, seqner=seqner, saider=saider)
return False
[docs]
class Querier(doing.DoDoer):
def __init__(self, hby, agentHab, queries, kvy):
self.hby = hby
self.agentHab = agentHab
self.queries = queries
self.kvy = kvy
super(Querier, self).__init__(always=True)
[docs]
def recur(self, tyme, deeds=None):
""" Processes query reqests submitting any on the cue"""
if self.queries:
msg = self.queries.popleft()
if "pre" not in msg:
return False
pre = msg["pre"]
if "sn" in msg:
sn = int(msg['sn'], 16)
seqNoDo = querying.SeqNoQuerier(hby=self.hby, hab=self.agentHab, pre=pre, sn=sn)
self.extend([seqNoDo])
elif "anchor" in msg:
anchor = msg['anchor']
anchorDo = querying.AnchorQuerier(hby=self.hby, hab=self.agentHab, pre=pre, anchor=anchor)
self.extend([anchorDo])
else:
qryDo = querying.QueryDoer(hby=self.hby, hab=self.agentHab, pre=pre, kvy=self.kvy)
self.extend([qryDo])
return super(Querier, self).recur(tyme, deeds)
[docs]
class Escrower(doing.Doer):
def __init__(self, kvy, rgy, rvy, tvy, exc, vry, registrar, credentialer):
""" Recuring process or escrows for all components in an Agent
Parameters:
kvy (Kevery):
rgy (Regery):
rvy (Revery):
tvy (Tevery):
exc (Exchanger):
vry (Verifier):
registrar (Registrar): Credential TEL escrow processor
credentialer (Credentialer): Credential escrow processor
"""
self.kvy = kvy
self.rgy = rgy
self.rvy = rvy
self.tvy = tvy
self.exc = exc
self.vry = vry
self.registrar = registrar
self.credentialer = credentialer
super(Escrower, self).__init__()
[docs]
def recur(self, tyme):
""" Process all escrows once per loop. """
self.kvy.processEscrows()
self.rgy.processEscrows()
self.rvy.processEscrowReply()
if self.tvy is not None:
self.tvy.processEscrows()
self.exc.processEscrow()
self.vry.processEscrows()
self.registrar.processEscrows()
self.credentialer.processEscrows()
return False
def loadEnds(app):
opColEnd = longrunning.OperationCollectionEnd()
app.add_route("/operations", opColEnd)
opResEnd = longrunning.OperationResourceEnd()
app.add_route("/operations/{name}", opResEnd)
oobiColEnd = OOBICollectionEnd()
app.add_route("/oobis", oobiColEnd)
oobiResEnd = OobiResourceEnd()
app.add_route("/oobis/{alias}", oobiResEnd)
statesEnd = KeyStateCollectionEnd()
app.add_route("/states", statesEnd)
eventsEnd = KeyEventCollectionEnd()
app.add_route("/events", eventsEnd)
queryEnd = QueryCollectionEnd()
app.add_route("/queries", queryEnd)
[docs]
class BootEnd:
""" Resource class for creating datastore in cloud ahab """
def __init__(self, agency):
""" Provides endpoints for initializing and unlocking an agent
Parameters:
agency (Agency): Agency for managing agents
"""
self.authn = authing.Authenticater(agency=agency)
self.agency = agency
[docs]
def on_post(self, req, rep):
""" Inception event POST endpoint
Give me a new Agent. Create Habery using ctrlPRE as database name, agentHab that anchors the caid and
returns the KEL of agentHAB Stores ControllerPRE -> AgentPRE in database
Parameters:
req (Request): falcon.Request HTTP request object
rep (Response): falcon.Response HTTP response object
"""
body = req.get_media()
if "icp" not in body:
raise falcon.HTTPBadRequest(title="invalid inception",
description=f'required field "icp" missing from body')
icp = serdering.SerderKERI(sad=body["icp"])
if "sig" not in body:
raise falcon.HTTPBadRequest(title="invalid inception",
description=f'required field "sig" missing from body')
siger = coring.Siger(qb64=body["sig"])
caid = icp.pre
if self.agency.get(caid=caid) is not None:
raise falcon.HTTPBadRequest(title="agent already exists",
description=f"agent for controller {caid} already exists")
agent = self.agency.create(caid=caid)
try:
ctrlHab = agent.hby.makeSignifyHab(name=agent.caid, ns="agent", serder=icp, sigers=[siger])
except Exception:
self.agency.delete(agent)
raise falcon.HTTPBadRequest(title="invalid inception",
description=f'invalid icp event for caid {agent.caid}')
if ctrlHab.pre != agent.caid:
self.agency.delete(agent)
raise falcon.HTTPBadRequest(title="invalid inception",
description=f'invalid icp event for caid {agent.caid}')
# Client is requesting that the Agent track the Salty parameters
if Algos.salty in body:
salt = body[Algos.salty]
stem = httping.getRequiredParam(salt, "stem")
pidx = httping.getRequiredParam(salt, "pidx")
tier = httping.getRequiredParam(salt, "tier")
sxlt = httping.getRequiredParam(salt, "sxlt")
icodes = httping.getRequiredParam(salt, "icodes")
ncodes = httping.getRequiredParam(salt, "ncodes")
mgr = agent.mgr.get(algo=Algos.salty)
mgr.incept(agent.caid, icodes=icodes, ncodes=ncodes, sxlt=sxlt, pidx=pidx, kidx=0, stem=stem, tier=tier,
transferable=True)
elif Algos.randy in body:
rand = body[Algos.randy]
if "pris" not in rand:
raise falcon.HTTPBadRequest(title="invalid inception",
description=f'required field "pris" missing from body.rand')
pris = rand["pris"]
if "nxts" not in rand:
raise falcon.HTTPBadRequest(title="invalid inception",
description=f'required field "nxts" missing from body.rand')
nxts = rand["nxts"]
mgr = agent.mgr.get(algo=Algos.randy)
mgr.incept(agent.caid, verfers=ctrlHab.kever.verfers,
digers=ctrlHab.kever.digers,
prxs=pris, nxts=nxts)
elif Algos.group in body:
raise falcon.HTTPBadRequest(description="multisig groups not supported as agent controller")
rep.status = falcon.HTTP_202
rep.data = json.dumps(asdict(agent.agentHab.kever.state())).encode("utf-8")
[docs]
class HealthEnd:
"""Health resource for determining that a container is live"""
def on_get(self, req, resp):
resp.status = falcon.HTTP_OK
resp.media = {"message": f"Health is okay. Time is {nowIso8601()}"}
class KeyStateCollectionEnd:
@staticmethod
def on_get(req, rep):
"""
Parameters:
req (Request): falcon.Request HTTP request
rep (Response): falcon.Response HTTP response
---
summary: Display key event log (KEL) for given identifier prefix
description: If provided qb64 identifier prefix is in Kevers, return the current state of the
identifier along with the KEL and all associated signatures and receipts
tags:
- Key Event Log
parameters:
- in: path
name: prefix
schema:
type: string
required: true
description: qb64 identifier prefix of KEL to load
responses:
200:
description: Key event log and key state of identifier
404:
description: Identifier not found in Key event database
"""
agent = req.context.agent
if "pre" not in req.params:
raise falcon.HTTPBadRequest(description="required parameter 'pre' missing")
pres = req.params.get("pre")
pres = pres if isinstance(pres, list) else [pres]
states = []
for pre in pres:
if pre not in agent.hby.kevers:
continue
kever = agent.hby.kevers[pre]
states.append(asdict(kever.state()))
rep.status = falcon.HTTP_200
rep.content_type = "application/json"
rep.data = json.dumps(states).encode("utf-8")
class KeyEventCollectionEnd:
@staticmethod
def on_get(req, rep):
"""
Parameters:
req (Request): falcon.Request HTTP request
rep (Response): falcon.Response HTTP response
---
summary: Display key event log (KEL) for given identifier prefix
description: If provided qb64 identifier prefix is in Kevers, return the current state of the
identifier along with the KEL and all associated signatures and receipts
tags:
- Key Event Log
parameters:
- in: path
name: prefix
schema:
type: string
required: true
description: qb64 identifier prefix of KEL to load
responses:
200:
description: Key event log and key state of identifier
404:
description: Identifier not found in Key event database
"""
agent = req.context.agent
if "pre" not in req.params:
raise falcon.HTTPBadRequest(description="required parameter 'pre' missing")
pre = req.params.get("pre")
preb = pre.encode("utf-8")
events = []
for fn, dig in agent.hby.db.getFelItemPreIter(preb, fn=0):
dgkey = dbing.dgKey(preb, dig) # get message
if not (raw := agent.hby.db.getEvt(key=dgkey)):
raise falcon.HTTPInternalServerError(f"Missing event for dig={dig}.")
serder = serdering.SerderKERI(raw=bytes(raw))
events.append(serder.ked)
rep.status = falcon.HTTP_200
rep.content_type = "application/json"
rep.data = json.dumps(events).encode("utf-8")
class OOBICollectionEnd:
def __init__(self):
""" Create OOBI Collection endpoint instance
"""
@staticmethod
def on_post(req, rep):
""" Resolve OOBI endpoint.
Parameters:
req: falcon.Request HTTP request
rep: falcon.Response HTTP response
---
summary: Resolve OOBI and assign an alias for the remote identifier
description: Resolve OOBI URL or `rpy` message by process results of request and assign 'alias' in contact
data for resolved identifier
tags:
- OOBIs
requestBody:
required: true
content:
application/json:
schema:
description: OOBI
properties:
oobialias:
type: string
description: alias to assign to the identifier resolved from this OOBI
required: false
url:
type: string
description: URL OOBI
rpy:
type: object
description: unsigned KERI `rpy` event message with endpoints
responses:
202:
description: OOBI resolution to key state successful
"""
agent = req.context.agent
body = req.get_media()
if "url" in body:
oobi = body["url"]
dt = helping.nowUTC()
obr = OobiRecord(date=helping.toIso8601(dt))
if "oobialias" in body:
obr.oobialias = body["oobialias"]
agent.hby.db.oobis.pin(keys=(oobi,), val=obr)
elif "rpy" in body:
raise falcon.HTTPNotImplemented(description="'rpy' support not implemented yet")
else:
raise falcon.HTTPBadRequest(description="invalid OOBI request body, either 'rpy' or 'url' is required")
oid = randomNonce()
op = agent.monitor.submit(oid, longrunning.OpTypes.oobi, metadata=dict(oobi=oobi))
rep.status = falcon.HTTP_202
rep.content_type = "application/json"
rep.data = op.to_json().encode("utf-8")
class OobiResourceEnd:
@staticmethod
def on_get(req, rep, alias):
""" OOBI GET endpoint
Parameters:
req: falcon.Request HTTP request
rep: falcon.Response HTTP response
alias: option route parameter for specific identifier to get
---
summary: Get OOBI for specific identifier
description: Generate OOBI for the identifier of the specified alias and role
tags:
- OOBIs
parameters:
- in: path
name: alias
schema:
type: string
required: true
description: human readable alias for the identifier generate OOBI for
- in: query
name: role
schema:
type: string
required: true
description: role for which to generate OOBI
responses:
200:
description: An array of Identifier key state information
content:
application/json:
schema:
description: Key state information for current identifiers
type: object
"""
agent = req.context.agent
hab = agent.hby.habByName(alias)
if hab is None:
raise falcon.HTTPBadRequest(description="Invalid alias to generate OOBI")
role = req.params["role"]
res = dict(role=role)
if role in (kering.Roles.witness,): # Fetch URL OOBIs for all witnesses
oobis = []
for wit in hab.kever.wits:
urls = hab.fetchUrls(eid=wit, scheme=kering.Schemes.http) or hab.fetchUrls(eid=wit,
scheme=kering.Schemes.https)
if not urls:
raise falcon.HTTPNotFound(description=f"unable to query witness {wit}, no http endpoint")
url = urls[kering.Schemes.http] if kering.Schemes.http in urls else urls[kering.Schemes.https]
up = urlparse(url)
oobis.append(urljoin(up.geturl(), f"/oobi/{hab.pre}/witness/{wit}"))
res["oobis"] = oobis
elif role in (kering.Roles.controller,): # Fetch any controller URL OOBIs
oobis = []
urls = hab.fetchUrls(eid=hab.pre, scheme=kering.Schemes.http) or hab.fetchUrls(eid=hab.pre,
scheme=kering.Schemes.https)
if not urls:
raise falcon.HTTPNotFound(description=f"unable to query controller {hab.pre}, no http endpoint")
url = urls[kering.Schemes.http] if kering.Schemes.http in urls else urls[kering.Schemes.https]
up = urlparse(url)
oobis.append(urljoin(up.geturl(), f"/oobi/{hab.pre}/controller"))
res["oobis"] = oobis
elif role in (kering.Roles.agent,):
oobis = []
roleUrls = hab.fetchRoleUrls(hab.pre, scheme=kering.Schemes.http,
role=kering.Roles.agent) or hab.fetchRoleurls(hab.pre,
scheme=kering.Schemes.https,
role=kering.Roles.agent)
if not roleUrls:
raise falcon.HTTPNotFound(description=f"unable to query controller {hab.pre}, no http endpoint")
for eid, urls in roleUrls['agent'].items():
url = urls[kering.Schemes.http] if kering.Schemes.http in urls else urls[kering.Schemes.https]
up = urlparse(url)
oobis.append(urljoin(up.geturl(), f"/oobi/{hab.pre}/agent/{eid}"))
res["oobis"] = oobis
else:
rep.status = falcon.HTTP_404
return
rep.status = falcon.HTTP_200
rep.content_type = "application/json"
rep.data = json.dumps(res).encode("utf-8")
class QueryCollectionEnd:
@staticmethod
def on_post(req, rep):
"""
Parameters:
req (Request): falcon.Request HTTP request
rep (Response): falcon.Response HTTP response
---
summary: Display key event log (KEL) for given identifier prefix
description: If provided qb64 identifier prefix is in Kevers, return the current state of the
identifier along with the KEL and all associated signatures and receipts
tags:
- Query
parameters:
- in: body
name: pre
schema:
type: string
required: true
description: qb64 identifier prefix of KEL to load
responses:
200:
description: Key event log and key state of identifier
404:
description: Identifier not found in Key event database
"""
agent = req.context.agent
body = req.get_media()
pre = httping.getRequiredParam(body, "pre")
qry = dict(pre=pre)
meta = dict()
if "anchor" in body:
meta["anchor"] = body["anchor"]
elif "sn" in body:
meta["sn"] = body["sn"]
else: # Must reset key state so we know when we have a new update.
for (keys, saider) in agent.hby.db.knas.getItemIter(keys=(pre,)):
agent.hby.db.knas.rem(keys)
agent.hby.db.ksns.rem((saider.qb64,))
agent.hby.db.ksns.rem((saider.qb64,))
qry.update(meta)
agent.queries.append(qry)
op = agent.monitor.submit(pre, longrunning.OpTypes.query, metadata=meta)
rep.status = falcon.HTTP_202
rep.content_type = "application/json"
rep.data = op.to_json().encode("utf-8")