Source code for keria.core.keeping

# -*- encoding: utf-8 -*-
"""
KERI
keria.core.keeping module

"""
from dataclasses import dataclass, asdict, field

from keri.app.keeping import PreSit, Algos, PubLot, PubSet
from keri.core import coring
from keri.core.coring import Tiers, MtrDex
from keri.db import dbing, subing, koming
from keri.help import helping


[docs] @dataclass() class Prefix: pidx: int = 0 # prefix index for this keypair sequence algo: str = Algos.salty # salty default uses indices and salt to create new key pairs
[docs] @dataclass() class SaltyPrm: """ Salty prefix's parameters for creating new key pairs """ sxlt: str = '' # qualified b64 encoded AID salt pidx: int = 0 # prefix index for this keypair sequence kidx: int = 0 # key index for this keypair sequence stem: str = '' # default unique path stem for salty algo tier: str = '' # security tier for stretch index salty algo dcode: str = '' # next digest hasing code icodes: list = field(default_factory=list) # current signing key seed codes ncodes: list = field(default_factory=list) # next key seed codes transferable: bool = False def __iter__(self): return iter(asdict(self))
[docs] class RemoteKeeper(dbing.LMDBer): """ RemoteKeeper stores data for Salty or Randy Encrypted edge key generation. """ TailDirPath = "keri/rks" AltTailDirPath = ".keri/rks" TempPrefix = "keri_rks_" MaxNamedDBs = 10 def __init__(self, headDirPath=None, perm=None, reopen=False, **kwa): """ Setup named sub databases. Inherited Parameters: name is str directory path name differentiator for main database When system employs more than one keri database, name allows differentiating each instance by name default name='main' temp is boolean, assign to .temp True then open in temporary directory, clear on close Othewise then open persistent directory, do not clear on close default temp=False headDirPath is optional str head directory pathname for main database If not provided use default .HeadDirpath default headDirPath=None so uses self.HeadDirPath perm is numeric optional os dir permissions mode default perm=None so do not set mode reopen is boolean, IF True then database will be reopened by this init default reopen=True Notes: dupsort=True for sub DB means allow unique (key,pair) duplicates at a key. Duplicate means that is more than one value at a key but not a redundant copies a (key,value) pair per key. In other words the pair (key,value) must be unique both key and value in combination. Attempting to put the same (key,value) pair a second time does not add another copy. Duplicates are inserted in lexocographic order by value, insertion order. """ self.pubs = None self.sits = None self.sprms = None self.pres = None self.mhabs = None self.nxts = None self.prxs = None self.gbls = None if perm is None: perm = self.Perm # defaults to restricted permissions for non temp super(RemoteKeeper, self).__init__(headDirPath=headDirPath, perm=perm, reopen=reopen, **kwa)
[docs] def reopen(self, **kwa): """ Open sub databases """ self.opened = super(RemoteKeeper, self).reopen(**kwa) # Create by opening first time named sub DBs within main DB instance # Names end with "." as sub DB name must include a non Base64 character # to avoid namespace collisions with Base64 identifier prefixes. self.gbls = subing.Suber(db=self, subkey='gbls.') self.prxs = subing.CesrSuber(db=self, subkey='prxs.', klas=coring.Cipher) self.nxts = subing.CesrSuber(db=self, subkey='nxts.', klas=coring.Cipher) self.mhabs = subing.CesrSuber(db=self, subkey='mhabs.', klas=coring.Prefixer) self.pres = koming.Komer(db=self, subkey='pres.', schema=Prefix, ) # New Prefix self.sprms = koming.Komer(db=self, subkey='sprms.', schema=SaltyPrm, ) # New Salty Parameters self.sits = koming.Komer(db=self, subkey='sits.', schema=PreSit, ) # Prefix Situation self.pubs = koming.Komer(db=self, subkey='pubs.', schema=PubSet, ) # public key set at pre.ridx return self.opened
class RemoteManager: def __init__(self, hby, rb: RemoteKeeper = None): self.hby = hby self.rb = rb if rb is not None else RemoteKeeper(name=hby.name, base=hby.base, temp=hby.temp, reopen=True, clear=False, headDirPath=hby.db.headDirPath) def get(self, algo: Algos = None, pre=None): if pre is not None: if (pp := self.rb.pres.get(pre)) is None: raise ValueError("Attempt to load nonexistent pre={}.".format(pre)) algo = pp.algo match algo: case Algos.salty: return SaltyManager(rb=self.rb) case Algos.randy: return RandyManager(rb=self.rb) case Algos.group: return GroupManager(rb=self.rb, rm=self) case _: return ExternKeeper(rb=self.rb) @property def sxlt(self): return self.rb.gbls.get("sxlt") @sxlt.setter def sxlt(self, sxlt): self.rb.gbls.pin("sxlt", sxlt) def delete_sxlt(self): return self.rb.gbls.rem("sxlt") class SaltyManager: def __init__(self, rb: RemoteKeeper): self.rb = rb def incept(self, pre, *, icodes, ncodes, sxlt, dcode=MtrDex.Blake3_256, pidx=0, kidx=0, stem="", tier=Tiers.low, transferable=False): pp = Prefix( pidx=pidx, algo=Algos.salty ) sp = SaltyPrm(sxlt=sxlt, pidx=pidx, kidx=kidx, stem=stem, tier=tier, icodes=icodes, ncodes=ncodes, dcode=dcode, transferable=transferable ) if not self.rb.pres.put(pre, val=pp): raise ValueError("Already incepted pre={}.".format(pre)) if not self.rb.sprms.put(pre, val=sp): raise ValueError("Already incepted prm for pre={}.".format(pre)) def rotate(self, pre, ncodes, pidx, kidx, stem, sxlt, icodes, tier, transferable, dcode=MtrDex.Blake3_256): if (pp := self.rb.pres.get(pre)) is None or pp.algo != Algos.salty: raise ValueError("Attempt to rotate nonexistent or invalid pre={}.".format(pre)) if (sp := self.rb.sprms.get(pre)) is None: raise ValueError("Attempt to rotate nonexistent or invalid pre={}.".format(pre)) sp = SaltyPrm(pidx=pidx, sxlt=sxlt, kidx=kidx, stem=stem, tier=sp.tier, icodes=sp.icodes, ncodes=ncodes, dcode=dcode, transferable=transferable ) if not self.rb.sprms.pin(pre, val=sp): raise ValueError("Unable to rotate salty prms for pre={}.".format(pre)) def params(self, pre): if (pp := self.rb.pres.get(pre)) is None or pp.algo != Algos.salty: raise ValueError("Attempt to load nonexistent or invalid pre={}.".format(pre)) if (pp := self.rb.sprms.get(pre)) is None: raise ValueError("Attempt to load nonexistent pre={}.".format(pre)) prms = dict( salty=asdict(pp) ) return prms class RandyManager: def __init__(self, rb: RemoteKeeper): self.rb = rb def incept(self, pre, verfers, digers, prxs, nxts, transferable): pp = Prefix( algo=Algos.randy ) if not self.rb.pres.put(pre, val=pp): raise ValueError("Already incepted pre={}.".format(pre)) dt = helping.nowIso8601() ps = PreSit( new=PubLot(pubs=[verfer.qb64 for verfer in verfers], dt=dt), nxt=PubLot(pubs=[diger.qb64 for diger in digers], dt=dt)) if not self.rb.sits.put(pre, val=ps): raise ValueError("Already incepted sit for pre={}.".format(pre)) # Secret to encrypt here if len(prxs) != len(verfers): raise ValueError("If encrypted private keys are provided, must match verfers") for idx, prx in enumerate(prxs): cipher = coring.Cipher(qb64=prx) self.rb.prxs.put(keys=verfers[idx].qb64b, val=cipher) if nxts is not None: if len(nxts) != len(digers): raise ValueError("If encrypted private next keys are provided, must match digers") for idx, prx in enumerate(nxts): cipher = coring.Cipher(qb64=prx) self.rb.nxts.put(keys=digers[idx].qb64b, val=cipher) def rotate(self, pre, verfers, digers, prxs, nxts, transferable): if (pp := self.rb.pres.get(pre)) is None or pp.algo != Algos.randy: raise ValueError("Attempt to rotate non-existant or invalid pre={}.".format(pre)) dt = helping.nowIso8601() ps = PreSit( new=PubLot(pubs=[verfer.qb64 for verfer in verfers], dt=dt), nxt=PubLot(pubs=[diger.qb64 for diger in digers], dt=dt)) if not self.rb.sits.pin(pre, val=ps): raise ValueError("Already incepted sit for pre={}.".format(pre)) # Secret to encrypt here if len(prxs) != len(verfers): raise ValueError("If encrypted private keys are provided, must match verfers") for idx, prx in enumerate(prxs): cipher = coring.Cipher(qb64=prx) self.rb.prxs.put(keys=verfers[idx].qb64b, val=cipher) if nxts is not None: if len(nxts) != len(digers): raise ValueError("If encrypted private keys are provided, must match verfers") for idx, prx in enumerate(nxts): cipher = coring.Cipher(qb64=prx) self.rb.nxts.put(keys=digers[idx].qb64b, val=cipher) def params(self, pre): if (pp := self.rb.pres.get(pre)) is None or pp.algo != Algos.randy: raise ValueError("Attempt to load nonexistent or invalid pre={}.".format(pre)) prxs = [] if (ps := self.rb.sits.get(pre)) is None: raise ValueError("Attempt to load nonexistent pre={}.".format(pre)) for pub in ps.new.pubs: if (prx := self.rb.prxs.get(keys=pub)) is not None: prxs.append(prx) nxts = [] for pub in ps.nxt.pubs: if (nxt := self.rb.nxts.get(keys=pub)) is not None: nxts.append(nxt) prms = dict( randy=dict( prxs=[prx.qb64 for prx in prxs], nxts=[nxt.qb64 for nxt in nxts], ), ) return prms class GroupManager: def __init__(self, rb: RemoteKeeper, rm: RemoteManager): self.rb = rb self.rm = rm def incept(self, pre, mpre, verfers, digers): pp = Prefix( algo=Algos.group ) if not self.rb.pres.put(pre, val=pp): raise ValueError("Already incepted pre={}.".format(pre)) if not self.rb.mhabs.put(pre, val=coring.Prefixer(qb64=mpre)): raise ValueError("Already incepted pre={}.".format(pre)) dt = helping.nowIso8601() ps = PreSit( new=PubLot(pubs=[verfer.qb64 for verfer in verfers], dt=dt), nxt=PubLot(pubs=[diger.qb64 for diger in digers], dt=dt)) if not self.rb.sits.put(pre, val=ps): raise ValueError("Already incepted sit for pre={}.".format(pre)) def rotate(self, pre, verfers, digers): if (pp := self.rb.pres.get(pre)) is None or pp.algo != Algos.group: raise ValueError(f"Attempt to rotate nonexistant or invalid pre={pre}, algo={pp.algo}.") dt = helping.nowIso8601() ps = PreSit( new=PubLot(pubs=[verfer.qb64 for verfer in verfers], dt=dt), nxt=PubLot(pubs=[diger.qb64 for diger in digers], dt=dt)) if not self.rb.sits.pin(pre, val=ps): raise ValueError(f"Error saving sit rotating pre={pre}.") def params(self, pre): if (pp := self.rb.pres.get(pre)) is None or pp.algo != Algos.group: raise ValueError(f"Attempt to load nonexistent or invalid pre={pre}.") if (mpre := self.rb.mhabs.get(pre)) is None: raise ValueError("Attempt to load nonexistent pre={}.".format(pre)) if (ps := self.rb.sits.get(pre)) is None: raise ValueError(f"Attempt to load invalid sit for pre={pre}.") prms = dict( group=dict( mhab=self.rm.get(pre=mpre.qb64).params(mpre.qb64), keys=[verfer for verfer in ps.new.pubs], ndigs=[diger for diger in ps.nxt.pubs] ) ) return prms class ExternKeeper: def __init__(self, rb: RemoteKeeper): self.rb = rb def incept(self, **kwargs): pass