Source code for keria.db.basing

# -*- encoding: utf-8 -*-
"""
KERIA
keria.db.basing module

"""
from dataclasses import dataclass
from ordered_set import OrderedSet as oset

from keri.core import coring
from keri.db import dbing, subing, koming

SCALAR_TYPES = ("string", "number")

ISSUER_FIELD = coring.Pather(path=['i'])
ISSUEE_FIELD = coring.Pather(path=['a', 'i'])
SCHEMA_FIELD = coring.Pather(path=['s'])
REGISTRY_FIELD = coring.Pather(path=['ri'])


[docs] @dataclass class IndexRecord: """ Registry Key keyed by Registry name """ subkey: str paths: list
[docs] class AgencyBaser(dbing.LMDBer): """ Agency database for tracking Agent tenants in this KERIA instance. """ TailDirPath = "keri/adb" AltTailDirPath = ".keri/adb" TempPrefix = "keri_adb_" MaxNamedDBs = 10 def __init__(self, headDirPath=None, perm=None, reopen=False, **kwa): """ Setup named sub databases. Inherited Parameters: name is str directory path name differentiator for main database When system employs more than one keri database, name allows differentiating each instance by name default name='main' temp is boolean, assign to .temp True then open in temporary directory, clear on close Othewise then open persistent directory, do not clear on close default temp=False headDirPath is optional str head directory pathname for main database If not provided use default .HeadDirpath default headDirPath=None so uses self.HeadDirPath perm is numeric optional os dir permissions mode default perm=None so do not set mode reopen is boolean, IF True then database will be reopened by this init default reopen=True Notes: dupsort=True for sub DB means allow unique (key,pair) duplicates at a key. Duplicate means that is more than one value at a key but not a redundant copies a (key,value) pair per key. In other words the pair (key,value) must be unique both key and value in combination. Attempting to put the same (key,value) pair a second time does not add another copy. Duplicates are inserted in lexocographic order by value, insertion order. """ if perm is None: perm = self.Perm # defaults to restricted permissions for non temp self.agnt = None self.ctrl = None self.aids = None super(AgencyBaser, self).__init__(headDirPath=headDirPath, perm=perm, reopen=reopen, **kwa)
[docs] def reopen(self, **kwa): """ Open sub databases """ self.opened = super(AgencyBaser, self).reopen(**kwa) # Create by opening first time named sub DBs within main DB instance # Names end with "." as sub DB name must include a non Base64 character # to avoid namespace collisions with Base64 identifier prefixes. # Sub-database keyed by qb64 controller AID mapping to the Prefixer object of the AID of an agent self.agnt = subing.CesrSuber(db=self, subkey='agnt.', klas=coring.Prefixer) # Sub-database keyed by qb64 agent AID mapping to the Prefixer object of the AID of an controller self.ctrl = subing.CesrSuber(db=self, subkey='ctrl.', klas=coring.Prefixer) # Sub-database keyed by qb64 AID mapping to the Prefixer object of the AID of its Agent self.aids = subing.CesrSuber(db=self, subkey='aids.', klas=coring.Prefixer)
[docs] class Seeker(dbing.LMDBer): """ Seeker indexes all credentials in the KERIpy `saved` Creder database. Static indexes are created for issued by/schema and issued to/schema and dynamic indexes are created for all top level scalar valued fields in the credential payload ('a' field). The Seeker uses the schema of each credential to determine the payload fields to index by. """ TailDirPath = "keri/seekdb" AltTailDirPath = ".keri/seekdb" TempPrefix = "keri_seekdb_" MaxNamedDBs = 500 def __init__(self, db, reger, headDirPath=None, perm=None, reopen=False, **kwa): """ Setup named sub databases. Inherited Parameters: name is str directory path name differentiator for main database When system employs more than one keri database, name allows differentiating each instance by name default name='main' temp is boolean, assign to .temp True then open in temporary directory, clear on close Othewise then open persistent directory, do not clear on close default temp=False headDirPath is optional str head directory pathname for main database If not provided use default .HeadDirpath default headDirPath=None so uses self.HeadDirPath perm is numeric optional os dir permissions mode default perm=None so do not set mode reopen is boolean, IF True then database will be reopened by this init default reopen=True """ self.db = db self.reger = reger self.indexes = dict() self.schIdx = None self.dynIdx = None super(Seeker, self).__init__(headDirPath=headDirPath, perm=perm, reopen=reopen, **kwa)
[docs] def reopen(self, **kwa): super(Seeker, self).reopen(**kwa) # List of indexs for a given schema self.schIdx = subing.IoSetSuber(db=self, subkey="schIdx.") # List of dynamically created indexes to be recreated at load self.dynIdx = koming.Komer(db=self, subkey='dynIdx.', schema=IndexRecord, ) for name, idx in self.dynIdx.getItemIter(): key = ".".join(name) self.indexes[key] = subing.CesrDupSuber(db=self, subkey=idx.subkey, klas=coring.Saider) # Create persistent Indexes if they don't already exist self.createIndex(SCHEMA_FIELD.qb64) self.createIndex(REGISTRY_FIELD.qb64) # Index of credentials by issuer/issuee. for field in (ISSUER_FIELD, ISSUEE_FIELD): self.createIndex(field.qb64) subkey = f"{field.qb64}.{SCHEMA_FIELD.qb64}" self.createIndex(subkey)
@property def table(self): return self.reger.saved def value(self, said): saider = self.reger.saved.get(keys=(said,)) creder = self.reger.creds.get(keys=(saider.qb64,)) return creder.sad def saidIter(self): return self.reger.saved.getItemIter() def createIndex(self, key): if self.dynIdx.get(keys=(key,)) is None: self.indexes[key] = subing.CesrDupSuber(db=self, subkey=key, klas=coring.Saider) self.dynIdx.pin(keys=(key,), val=IndexRecord(subkey=key, paths=[key])) def index(self, said): if (saider := self.reger.saved.get(keys=(said,))) is None: raise ValueError(f"{said} is not a verified credential") creder = self.reger.creds.get(keys=(saider.qb64,)) saider = coring.Saider(qb64b=creder.saidb) # Load schema index and if not indexed in schIdx, index it. schemaSaid = creder.schema if not (indexes := self.schIdx.get(keys=(schemaSaid,))): indexes = self.generateIndexes(schemaSaid) for index in indexes: db = self.indexes[index] idx = self.dynIdx.get(keys=(index,)) values = [] for path in idx.paths: pather = coring.Pather(qb64=path) values.append(pather.resolve(creder.sad)) value = "".join(values) db.add(keys=(value,), val=saider)
[docs] def generateIndexes(self, said): """ Parse schema of said, create schIdx entry keyed to said of schema and the subkey indexes in self.indexes """ if (schemer := self.db.schema.get(keys=(said,))) is None: raise ValueError(f"{said} is not a valid schema SAID to index") properties = schemer.sed["properties"] payload = properties['a'] if "oneOf" in payload: oo = payload["oneOf"] for p in oo: if p["type"] == "object": payload = p break if not isinstance(payload, dict): raise ValueError(f"schema with SAID={said} is not value, no payload of values") properties = payload["properties"] attestation = 'i' not in properties # Assign single field Schema and ISSUER index and ISSUER/SCHEMA: self.schIdx.add(keys=(said,), val=SCHEMA_FIELD.qb64b) self.schIdx.add(keys=(said,), val=ISSUER_FIELD.qb64b) self.schIdx.add(keys=(said,), val=REGISTRY_FIELD.qb64b) subkey = f"{ISSUER_FIELD.qb64}.{SCHEMA_FIELD.qb64}" self.schIdx.add(keys=(said,), val=subkey.encode("UTF-8")) # Assign single field ISSUEE index and ISSUEE/SCHEMA if needed if not attestation: self.schIdx.add(keys=(said,), val=ISSUEE_FIELD.qb64b) subkey = f"{ISSUEE_FIELD.qb64}.{SCHEMA_FIELD.qb64}" self.schIdx.add(keys=(said,), val=subkey.encode("UTF-8")) for p, val in properties.items(): if val["type"] not in SCALAR_TYPES: continue pather = coring.Pather(path=['a', p]) if pather.qb64 not in self.indexes: self.indexes[pather.qb64] = subing.CesrDupSuber(db=self, subkey=pather.qb64, klas=coring.Saider) idx = IndexRecord(subkey=pather.qb64, paths=[pather.qb64]) self.dynIdx.pin(keys=(pather.qb64,), val=idx) self.schIdx.add(keys=(said,), val=pather.qb64b) subkey = f"{SCHEMA_FIELD.qb64}.{pather.qb64}" if subkey not in self.indexes: self.indexes[subkey] = subing.CesrDupSuber(db=self, subkey=subkey, klas=coring.Saider) idx = IndexRecord(subkey=subkey, paths=[SCHEMA_FIELD.qb64, pather.qb64]) self.dynIdx.pin(keys=(subkey,), val=idx) self.schIdx.add(keys=(said,), val=subkey) for field in (ISSUER_FIELD, ISSUEE_FIELD): subkey = f"{field.qb64}.{pather.qb64}" if subkey not in self.indexes: self.indexes[subkey] = subing.CesrDupSuber(db=self, subkey=subkey, klas=coring.Saider) idx = IndexRecord(subkey=subkey, paths=[field.qb64, pather.qb64]) self.dynIdx.pin(keys=(subkey,), val=idx) self.schIdx.add(keys=(said,), val=subkey) subkey = f"{field.qb64}.{SCHEMA_FIELD.qb64}.{pather.qb64}" if subkey not in self.indexes: self.indexes[subkey] = subing.CesrDupSuber(db=self, subkey=subkey, klas=coring.Saider) idx = IndexRecord(subkey=subkey, paths=[field.qb64, SCHEMA_FIELD.qb64, pather.qb64]) self.dynIdx.pin(keys=(subkey,), val=idx) self.schIdx.add(keys=(said,), val=subkey) return [index for index in self.schIdx.get(keys=(said,))]
def find(self, filtr, sort=None, skip=None, limit=None): return Cursor(seeker=self, filtr=filtr, sort=sort, skip=skip, limit=limit)
[docs] class ExnSeeker(dbing.LMDBer): """ Seeker indexes all credentials in the KERIpy `saved` Creder database. Static indexes are created for issued by/schema and issued to/schema and dynamic indexes are created for all top level scalar valued fields in the credential payload ('a' field). The Seeker uses the schema of each credential to determine the payload fields to index by. """ TailDirPath = "keri/exndb" AltTailDirPath = ".keri/exndb" TempPrefix = "keri_exndb_" MaxNamedDBs = 36 DATE_FIELD = coring.Pather(path=['dt']) SENDER_FIELD = coring.Pather(path=['i']) RECIPIENT_FIELD = coring.Pather(path=['a', 'i']) ROUTE_FIELD = coring.Pather(path=['r']) # Special field for IPEX messages... consider moving to IpexSeeker if needed SCHEMA = coring.Pather(path=['e', 'acdc', 's']) def __init__(self, db, headDirPath=None, perm=None, reopen=False, **kwa): """ Setup named sub databases. Inherited Parameters: name is str directory path name differentiator for main database When system employs more than one keri database, name allows differentiating each instance by name default name='main' temp is boolean, assign to .temp True then open in temporary directory, clear on close Othewise then open persistent directory, do not clear on close default temp=False headDirPath is optional str head directory pathname for main database If not provided use default .HeadDirpath default headDirPath=None so uses self.HeadDirPath perm is numeric optional os dir permissions mode default perm=None so do not set mode reopen is boolean, IF True then database will be reopened by this init default reopen=True """ self.db = db self.indexes = dict() super(ExnSeeker, self).__init__(headDirPath=headDirPath, perm=perm, reopen=reopen, **kwa)
[docs] def reopen(self, **kwa): super(ExnSeeker, self).reopen(**kwa) # List of dynamically created indexes to be recreated at load # Create persistent Indexes if they don't already exist fields = (self.ROUTE_FIELD, self.SENDER_FIELD, self.RECIPIENT_FIELD, self.DATE_FIELD, self.SCHEMA) # Index of credentials by issuer/issuee. for field in fields: self.createIndex(field.qb64) for subfield in fields: if field == subfield: continue subkey = f"{field.qb64}.{subfield.qb64}" self.createIndex(subkey)
@property def table(self): return self.db.exns def value(self, said): serder = self.db.exns.get(keys=(said,)) return serder.ked def saidIter(self): for (said,), _ in self.db.exns.getItemIter(): yield said def createIndex(self, key): self.indexes[key] = subing.CesrDupSuber(db=self, subkey=key, klas=coring.Saider) def index(self, said): if (serder := self.db.exns.get(keys=(said,))) is None: raise ValueError(f"{said} is not a valid exn") saider = coring.Saider(qb64b=serder.saidb) # Load schema index and if not indexed in schIdx, index it. for index, db in self.indexes.items(): pathers = [coring.Pather(qb64=path) for path in index.split(".")] values = [] for pather in pathers: try: values.append(pather.resolve(serder.ked)) except KeyError: pass value = "".join(values) if not value: continue db.add(keys=(value,), val=saider) def find(self, filtr, sort=None, skip=None, limit=None): return Cursor(seeker=self, filtr=filtr, sort=sort, skip=skip, limit=limit)
class Cursor: def __init__(self, seeker, filtr=None, sort=None, skip=None, limit=None): self.filtr = filtr self.operators = operators(self.filtr) self.names = [op.name for op in self.operators] self.indexable = next((False for op in self.operators if not isinstance(op, Eq)), True) self.values = [op.value for op in self.operators] self.seeker = seeker self._sort = sort self._skip = skip if skip is not None else 0 self._limit = limit if limit is not None else 25 self.cur = None self.saids = None def __iter__(self): return self def __next__(self): if self.saids is None: self._query() if self.cur >= len(self.saids): raise StopIteration said = self.saids[self.cur] self.cur += 1 return said def sort(self, sort): self._sort = sort return self def skip(self, skip): self._skip = skip return self def limit(self, limit): self._limit = limit return self def _query(self): self.cur = 0 if len(self.filtr) == 0: self.saids = self.order([said for (said,), _ in self.seeker.table.getItemIter()]) elif (saids := self.indexSearch()) is not None: self.saids = self.order(saids) elif (saids := self.indexScan()) is not None: self.saids = self.order(saids) else: saids = self.fullTableScan() self.saids = self.order(saids) def indexSearch(self): if len(self.operators) == 1 and self.operators[0].name in self.seeker.indexes: op = self.operators[0] idx = self.seeker.indexes[op.name] return op.index(idx) index = ".".join(self.names) if not (self.indexable and index in self.seeker.indexes): return None idx = self.seeker.indexes[index] val = "".join(self.values) return [val.qb64 for val in idx.getIter(keys=(val,))] def indexScan(self): use = [] scan = [] for idx, op in enumerate(self.operators): if op.name in self.seeker.indexes: use.append(op) else: scan.append(op) if len(use) == 0: return self.fullTableScan() idx = self.seeker.indexes[use[0].name] saids = oset(use[0].index(idx)) if len(saids) == 0: return list() for i, op in enumerate(use[1:]): idx = self.seeker.indexes[op.name] nxt = oset(op.index(idx)) saids &= nxt if len(scan) == 0: return list(saids) else: return self.tableScan(list(saids), scan) def fullTableScan(self): saids = [saider.qb64 for _, saider in self.seeker.saidIter()] return self.tableScan(saids, ops=self.operators) def tableScan(self, saids, ops): res = [] for said in saids: val = self.seeker.value(said) for op in ops: if op(val): res.append(said) return res def order(self, saids): if not self._sort: return self.slice(saids) if (res := self.indexOrder(saids)) is not None: return res else: return self.tableScanOrder(saids) def indexOrder(self, saids): index = ".".join([coring.Pather(bext=s).qb64 for s in self._sort]) if index not in self.seeker.indexes: return None idx = self.seeker.indexes[index] ctx = idx.getItemIter() # Run off the values before start cur = 0 while cur < self._skip: _, saider = next(ctx) if saider.qb64 in saids: cur += 1 # Load values in order until we get enough (based on limit) res = [] for _, saider in ctx: if saider.qb64 in saids: res.append(saider.qb64) if len(res) == self._limit: break return res def slice(self, saids): if self._skip >= len(saids): return [] end = self._skip + self._limit return saids[self._skip:end] def tableScanOrder(self, saids): """ Should we bother implementing table scan sort order We have single field indexes for all fields in credentials so this situation will only occur if multiple fields are selected for which we don't have a multi-column index. In that case, perhaps we raise an exception. For now, we'll just slice the results to honor skip and limit. """ return self.slice(saids)
[docs] def operators(filtr): """ Executable operator factory method An factory for processing a filter dict and generating an array of executable operators to apply to a given credential search """ # filtr = {"-a-i": {"$begins": "984"}} ops = [] for f, v in filtr.items(): if isinstance(v, dict): for op, val in v.items(): match op: case "$eq": ops.append(Eq(field=f, value=val)) case "$begins": ops.append(Begins(field=f, value=val)) pass else: ops.append(Eq(field=f, value=v)) return ops
class Eq: def __init__(self, field, value): self.field = field self.pather = coring.Pather(bext=self.field) self.value = value def __call__(self, *args, **kwargs): if len(args) != 1: raise ValueError(f"invalid argument length={len(args)} for equals operator, must be 2") val = self.pather.resolve(args[0]) return val == self.value @property def name(self) -> str: return self.pather.qb64 def index(self, idx): return [val.qb64 for val in idx.getIter(keys=(self.value,))] class Begins: def __init__(self, field, value): self.field = field self.pather = coring.Pather(bext=self.field) if not isinstance(value, str): raise ValueError(f"invalid type={type(value)} for begins, must be `str`") self.value = value def __call__(self, *args, **kwargs): if len(args) != 1: raise ValueError(f"invalid argument length={len(args)} for begins operator, must be 2") val = self.pather.resolve(args[0]) if not isinstance(val, str): raise ValueError(f"invalid type={type(args[0])} for begins, must be `str`") return val.startswith(self.value) def index(self, idx): return [val.qb64 for _, val in idx.getItemIter(keys=(self.value,))] @property def name(self) -> str: return self.pather.qb64