From da60ea200ca7834c7c32f5a6c8667e6ea63be24b Mon Sep 17 00:00:00 2001 From: Tadeuchi Date: Thu, 30 Mar 2023 00:41:58 +0200 Subject: [PATCH] feat: Add functions to the Kv API in order to query ranges of keys and/or values #345 - kv delete --- .../basic/pst-kv-check-iteration.test.ts | 4 +- .../integration/data/kv-storage-range.js | 16 ++- src/cache/SortKeyCache.ts | 19 +-- src/cache/impl/LevelDbCache.ts | 122 ++++++++++-------- src/contract/HandlerBasedContract.ts | 24 +--- .../states/ContractInteractionState.ts | 12 +- src/contract/states/InteractionState.ts | 10 +- src/legacy/smartweave-global.ts | 19 ++- 8 files changed, 130 insertions(+), 96 deletions(-) diff --git a/src/__tests__/integration/basic/pst-kv-check-iteration.test.ts b/src/__tests__/integration/basic/pst-kv-check-iteration.test.ts index 98136b4..4534540 100644 --- a/src/__tests__/integration/basic/pst-kv-check-iteration.test.ts +++ b/src/__tests__/integration/basic/pst-kv-check-iteration.test.ts @@ -12,7 +12,7 @@ import { LoggerFactory } from "../../../logging/LoggerFactory"; import { WriteInteractionResponse } from "../../../contract/Contract"; import { DeployPlugin } from 'warp-contracts-plugin-deploy'; -describe('Testing the Profit Sharing Token', () => { +describe('Testing the PST kv storage range access', () => { let contractSrc: string; let wallet: JWKInterface; @@ -177,7 +177,7 @@ describe('Testing the Profit Sharing Token', () => { expect((await pst.currentBalance(aliceWalletAddress)).balance).toEqual(200_000); expect((await pst.currentBalance(walletAddress)).balance).toEqual(555669 - 655 - 200_000); expect((await pst.getStorageValues(['check.' + walletAddress + '.' + aliceWalletAddress])) - .cachedValue.get('check.' + walletAddress + '.' + aliceWalletAddress)).toBe(0) + .cachedValue.get('check.' + walletAddress + '.' + aliceWalletAddress)).toBeNull() }); it('should not be able to write check', async () => { diff --git a/src/__tests__/integration/data/kv-storage-range.js b/src/__tests__/integration/data/kv-storage-range.js index bdec124..4919272 100644 --- a/src/__tests__/integration/data/kv-storage-range.js +++ b/src/__tests__/integration/data/kv-storage-range.js @@ -20,10 +20,12 @@ export async function handle(state, action) { throw new ContractError(`Caller balance ${callerBalance} not high enough to write check for ${qty}!`); } - const allChecks = (await SmartWeave.kv.entries({ gte: 'check.' + caller, lte: 'check.' + caller + '.\xff'})) - .reduce((acc, entry) => acc + parseInt(entry.value), 0) + let sumChecks = 0; + for await (let part of (await SmartWeave.kv.kvMap({ gte: 'check.' + caller, lte: 'check.' + caller + '.\xff'})).values()) { + sumChecks = sumChecks + parseInt(part); + } - if (callerBalance < allChecks + qty) { + if (callerBalance < sumChecks + qty) { throw new ContractError(`Caller balance ${callerBalance} not high enough to write next check ${qty}!`); } @@ -49,7 +51,7 @@ export async function handle(state, action) { callerBalance = callerBalance + check; await SmartWeave.kv.put(caller, callerBalance); - await SmartWeave.kv.put('check.' + target + '.' + caller, 0); + await SmartWeave.kv.del('check.' + target + '.' + caller); return {state}; } @@ -104,8 +106,10 @@ export async function handle(state, action) { } if (input.function === 'minted') { - const sumMinted = (await SmartWeave.kv.entries({ gte: 'mint.', lte: 'mint.\xff'})) - .reduce((acc, entry) => acc + parseInt(entry.value), 0) + let sumMinted = 0; + for await (let part of (await SmartWeave.kv.kvMap({ gte: 'mint.', lte: 'mint.\xff'})).values()) { + sumMinted = sumMinted + parseInt(part); + } return {result: {minted: sumMinted}}; } diff --git a/src/cache/SortKeyCache.ts b/src/cache/SortKeyCache.ts index e758371..369323f 100644 --- a/src/cache/SortKeyCache.ts +++ b/src/cache/SortKeyCache.ts @@ -31,6 +31,12 @@ export interface SortKeyCache { */ put(cacheKey: CacheKey, value: V): Promise; + /** + * deletes value in cache under given {@link CacheKey.key} from {@link CacheKey.sortKey}. + * the value will be still available if fetched using a lower sortKey + */ + del(cacheKey: CacheKey): Promise; + /** * removes all data stored under a specified key */ @@ -63,11 +69,14 @@ export interface SortKeyCache { keys(): Promise; /** - * Return filtered keys, based on range options + * Returns keys for a specified range */ keys(sortKey?: string, options?: SortKeyCacheRangeOptions): Promise; - entries(sortKey: string, options?: SortKeyCacheRangeOptions): Promise[]>; + /** + * Returns a key value map for a specified range + */ + kvMap(sortKey: string, options?: SortKeyCacheRangeOptions): Promise>; /** * returns underlying storage (LevelDB, LMDB, sqlite...) @@ -98,12 +107,6 @@ export class CacheKey { constructor(readonly key: string, readonly sortKey: string) {} } -export interface SortKeyCacheEntry { - key: string; - value: V; -} - -// tslint:disable-next-line:max-classes-per-file export class SortKeyCacheResult { constructor(readonly sortKey: string, readonly cachedValue: V) {} } diff --git a/src/cache/impl/LevelDbCache.ts b/src/cache/impl/LevelDbCache.ts index 2a318d7..ba7ab79 100644 --- a/src/cache/impl/LevelDbCache.ts +++ b/src/cache/impl/LevelDbCache.ts @@ -1,4 +1,4 @@ -import { BatchDBOp, CacheKey, SortKeyCache, SortKeyCacheEntry, SortKeyCacheResult } from '../SortKeyCache'; +import { BatchDBOp, CacheKey, SortKeyCache, SortKeyCacheResult } from '../SortKeyCache'; import { Level } from 'level'; import { MemoryLevel } from 'memory-level'; import { CacheOptions } from '../../core/WarpFactory'; @@ -19,21 +19,35 @@ import { AbstractChainedBatch } from 'abstract-level/types/abstract-chained-batc * In order to reduce the cache size, the oldest entries are automatically pruned. */ +class ClientValueWrapper { + constructor(readonly value: V, readonly tombstone: boolean = false) {} +} + export class LevelDbCache implements SortKeyCache { private readonly logger = LoggerFactory.INST.create('LevelDbCache'); private readonly subLevelSeparator: string; - private readonly subLevelOptions: AbstractSublevelOptions; + private readonly subLevelOptions: AbstractSublevelOptions>; /** * not using the Level type, as it is not compatible with MemoryLevel (i.e. has more properties) * and there doesn't seem to be any public interface/abstract type for all Level implementations * (the AbstractLevel is not exported from the package...) */ - private _db: MemoryLevel; - private _rollbackBatch: AbstractChainedBatch, string, V>; + private _db: MemoryLevel>; + + /** + * Rollback batch is way of recovering kv storage state from before a failed interaction. + * Currently, all operations performed during active transaction are directly saved to kv storage. + * In case the transaction fails the changes will be reverted using the rollback batch. + */ + private _rollbackBatch: AbstractChainedBatch< + MemoryLevel>, + string, + ClientValueWrapper + >; // Lazy initialization upon first access - private get db(): MemoryLevel { + private get db(): MemoryLevel> { if (!this._db) { if (this.cacheOptions.inMemory) { this._db = new MemoryLevel(this.subLevelOptions); @@ -43,7 +57,7 @@ export class LevelDbCache implements SortKeyCache { } const dbLocation = this.cacheOptions.dbLocation; this.logger.info(`Using location ${dbLocation}`); - this._db = new Level(dbLocation, this.subLevelOptions); + this._db = new Level>(dbLocation, this.subLevelOptions); } } return this._db; @@ -60,16 +74,13 @@ export class LevelDbCache implements SortKeyCache { // eslint-disable-next-line @typescript-eslint/no-unused-vars async get(cacheKey: CacheKey, returnDeepCopy?: boolean): Promise | null> { this.validateKey(cacheKey.key); - const contractCache = this.db.sublevel(cacheKey.key, this.subLevelOptions); + const contractCache = this.db.sublevel>(cacheKey.key, this.subLevelOptions); // manually opening to fix https://github.com/Level/level/issues/221 await contractCache.open(); try { - const result = await contractCache.get(cacheKey.sortKey); - - return { - sortKey: cacheKey.sortKey, - cachedValue: result - }; + const result: ClientValueWrapper = await contractCache.get(cacheKey.sortKey); + const resultValue = result.tombstone ? null : result.value; + return new SortKeyCacheResult(cacheKey.sortKey, resultValue); // eslint-disable-next-line @typescript-eslint/no-explicit-any } catch (e: any) { if (e.code == 'LEVEL_NOT_FOUND') { @@ -81,41 +92,56 @@ export class LevelDbCache implements SortKeyCache { } async getLast(key: string): Promise | null> { - const contractCache = this.db.sublevel(key, this.subLevelOptions); + const contractCache = this.db.sublevel>(key, this.subLevelOptions); // manually opening to fix https://github.com/Level/level/issues/221 await contractCache.open(); const keys = await contractCache.keys({ reverse: true, limit: 1 }).all(); if (keys.length) { - return { - sortKey: keys[0], - cachedValue: await contractCache.get(keys[0]) - }; - } else { - return null; + const lastValueWrap = await contractCache.get(keys[0]); + if (!lastValueWrap.tombstone) { + return new SortKeyCacheResult(keys[0], lastValueWrap.value); + } } + return null; } async getLessOrEqual(key: string, sortKey: string): Promise | null> { - const contractCache = this.db.sublevel(key, this.subLevelOptions); + const contractCache = this.db.sublevel>(key, this.subLevelOptions); // manually opening to fix https://github.com/Level/level/issues/221 await contractCache.open(); const keys = await contractCache.keys({ reverse: true, lte: sortKey, limit: 1 }).all(); if (keys.length) { - return { - sortKey: keys[0], - cachedValue: await contractCache.get(keys[0]) - }; - } else { - return null; + const cachedVal = await contractCache.get(keys[0]); + if (!cachedVal.tombstone) { + return new SortKeyCacheResult(keys[0], cachedVal.value); + } } + return null; } async put(stateCacheKey: CacheKey, value: V): Promise { + await this.setClientValue(stateCacheKey, new ClientValueWrapper(value)); + } + + /** + * Delete operation under the hood is a write operation with setting tombstone flag to true. + * The idea behind is based on Cassandra Tombstone + * https://www.instaclustr.com/support/documentation/cassandra/using-cassandra/managing-tombstones-in-cassandra/ + * There is a couple of benefits to this approach: + * This allows to use kv storage range operations with ease. + * The value will not be accessible only to the next interactions. Interactions reading state for lower sortKey will be able to access it. + * Revert operation for rollback is much easier to implement + */ + async del(cacheKey: CacheKey): Promise { + await this.setClientValue(cacheKey, new ClientValueWrapper(null, true)); + } + + private async setClientValue(stateCacheKey: CacheKey, valueWrapper: ClientValueWrapper): Promise { this.validateKey(stateCacheKey.key); - const contractCache = this.db.sublevel(stateCacheKey.key, this.subLevelOptions); + const contractCache = this.db.sublevel>(stateCacheKey.key, this.subLevelOptions); // manually opening to fix https://github.com/Level/level/issues/221 await contractCache.open(); - await contractCache.put(stateCacheKey.sortKey, value); + await contractCache.put(stateCacheKey.sortKey, valueWrapper); if (!this._rollbackBatch) { this.begin(); } @@ -123,7 +149,7 @@ export class LevelDbCache implements SortKeyCache { } async delete(key: string): Promise { - const contractCache = this.db.sublevel(key, this.subLevelOptions); + const contractCache = this.db.sublevel>(key, this.subLevelOptions); await contractCache.open(); await contractCache.clear(); } @@ -191,16 +217,7 @@ export class LevelDbCache implements SortKeyCache { } async keys(sortKey?: string, options?: SortKeyCacheRangeOptions): Promise { - const distinctKeys = new Set(); - const rangeOptions: RangeOptions = this.levelRangeOptions(options); - const joinedKeys = await this.db.keys(rangeOptions).all(); - - joinedKeys - .filter((k) => !sortKey || this.extractSortKey(k).localeCompare(sortKey) <= 0) - .map((k) => this.extractOriginalKey(k)) - .forEach((k) => distinctKeys.add(k)); - - return Array.from(distinctKeys); + return Array.from((await this.kvMap(sortKey, options)).keys()); } validateKey(key: string) { @@ -217,17 +234,20 @@ export class LevelDbCache implements SortKeyCache { return joinedKey.split(this.subLevelSeparator)[2]; } - async entries(sortKey: string, options?: SortKeyCacheRangeOptions): Promise[]> { - const keys: string[] = await this.keys(sortKey, options); + async kvMap(sortKey: string, options?: SortKeyCacheRangeOptions): Promise> { + const entries: Map = new Map(); + const allKeys = (await this.db.keys(this.levelRangeOptions(options)).all()) + .filter((k) => !sortKey || this.extractSortKey(k).localeCompare(sortKey) <= 0) + .map((k) => this.extractOriginalKey(k)); - return Promise.all( - keys.map(async (k): Promise> => { - return { - key: k, - value: (await this.getLessOrEqual(k, sortKey)).cachedValue - }; - }) - ); + for (const k of allKeys) { + const lastValue = await this.getLessOrEqual(k, sortKey); + if (lastValue) { + entries.set(k, lastValue.cachedValue); + } + } + + return entries; } private levelRangeOptions(options?: SortKeyCacheRangeOptions): RangeOptions | undefined { @@ -269,7 +289,7 @@ export class LevelDbCache implements SortKeyCache { const contracts = await this.keys(); for (let i = 0; i < contracts.length; i++) { - const contractCache = this.db.sublevel(contracts[i], this.subLevelOptions); + const contractCache = this.db.sublevel>(contracts[i], this.subLevelOptions); // manually opening to fix https://github.com/Level/level/issues/221 await contractCache.open(); diff --git a/src/contract/HandlerBasedContract.ts b/src/contract/HandlerBasedContract.ts index 888c52a..f9d264e 100644 --- a/src/contract/HandlerBasedContract.ts +++ b/src/contract/HandlerBasedContract.ts @@ -150,10 +150,7 @@ export class HandlerBasedContract implements Contract { if (sortKey && !this.isRoot() && this.interactionState().has(this.txId())) { const result = this.interactionState().get(this.txId()); - return { - sortKey, - cachedValue: result as EvalStateResult - }; + return new SortKeyCacheResult>(sortKey, result as EvalStateResult); } // TODO: not sure if we should synchronize on a contract instance or contractTxId @@ -725,10 +722,10 @@ export class HandlerBasedContract implements Contract { const executionContext = await this.createExecutionContextFromTx(this._contractTxId, interactionTx); if (!this.isRoot() && this.interactionState().has(this.txId())) { - evalStateResult = { - sortKey: interactionTx.sortKey, - cachedValue: this.interactionState().get(this.txId()) as EvalStateResult - }; + evalStateResult = new SortKeyCacheResult>( + interactionTx.sortKey, + this.interactionState().get(this.txId()) as EvalStateResult + ); } else { evalStateResult = await this.warp.stateEvaluator.eval(executionContext); this.interactionState().update(this.txId(), evalStateResult.cachedValue); @@ -858,10 +855,7 @@ export class HandlerBasedContract implements Contract { async getStorageValues(keys: string[]): Promise>> { const lastCached = await this.warp.stateEvaluator.getCache().getLast(this.txId()); if (lastCached == null) { - return { - sortKey: null, - cachedValue: new Map() - }; + return new SortKeyCacheResult>(null, new Map()); } const storage = this.warp.kvStorageFactory(this.txId()); @@ -872,10 +866,7 @@ export class HandlerBasedContract implements Contract { const lastValue = await storage.getLessOrEqual(key, lastCached.sortKey); result.set(key, lastValue == null ? null : lastValue.cachedValue); } - return { - sortKey: lastCached.sortKey, - cachedValue: result - }; + return new SortKeyCacheResult>(lastCached.sortKey, result); } finally { await storage.close(); } @@ -894,7 +885,6 @@ export class HandlerBasedContract implements Contract { return result as HandlerBasedContract; } - private async maybeSyncStateWithRemoteSource( remoteState: SortKeyCacheResult>, upToSortKey: string, diff --git a/src/contract/states/ContractInteractionState.ts b/src/contract/states/ContractInteractionState.ts index 21aee11..0349ffe 100644 --- a/src/contract/states/ContractInteractionState.ts +++ b/src/contract/states/ContractInteractionState.ts @@ -1,5 +1,5 @@ import { InteractionState } from './InteractionState'; -import { CacheKey, SortKeyCache, SortKeyCacheEntry } from '../../cache/SortKeyCache'; +import { CacheKey, SortKeyCache } from '../../cache/SortKeyCache'; import { EvalStateResult } from '../../core/modules/StateEvaluator'; import { GQLNodeInterface } from '../../legacy/gqlResult'; import { Warp } from '../../core/Warp'; @@ -27,6 +27,12 @@ export class ContractInteractionState implements InteractionState { return null; } + async delKV(contractTxId: string, cacheKey: CacheKey): Promise { + if (this._kv.has(contractTxId)) { + await this._kv.get(contractTxId).del(cacheKey); + } + } + getKvKeys(contractTxId: string, sortKey?: string, options?: SortKeyCacheRangeOptions): Promise { const storage = this._warp.kvStorageFactory(contractTxId); return storage.keys(sortKey, options); @@ -36,9 +42,9 @@ export class ContractInteractionState implements InteractionState { contractTxId: string, sortKey?: string, options?: SortKeyCacheRangeOptions - ): Promise[]> { + ): Promise> { const storage = this._warp.kvStorageFactory(contractTxId); - return storage.entries(sortKey, options); + return storage.kvMap(sortKey, options); } async commit(interaction: GQLNodeInterface): Promise { diff --git a/src/contract/states/InteractionState.ts b/src/contract/states/InteractionState.ts index cb19848..a912fde 100644 --- a/src/contract/states/InteractionState.ts +++ b/src/contract/states/InteractionState.ts @@ -1,4 +1,4 @@ -import { CacheKey, SortKeyCacheEntry } from '../../cache/SortKeyCache'; +import { CacheKey } from '../../cache/SortKeyCache'; import { EvalStateResult } from '../../core/modules/StateEvaluator'; import { GQLNodeInterface } from '../../legacy/gqlResult'; import { SortKeyCacheRangeOptions } from '../../cache/SortKeyCacheRangeOptions'; @@ -46,11 +46,9 @@ export interface InteractionState { getKV(contractTxId: string, cacheKey: CacheKey): Promise; + delKV(contractTxId: string, cacheKey: CacheKey): Promise; + getKvKeys(contractTxId: string, sortKey?: string, options?: SortKeyCacheRangeOptions): Promise; - getKvRange( - contractTxId: string, - sortKey?: string, - options?: SortKeyCacheRangeOptions - ): Promise[]>; + getKvRange(contractTxId: string, sortKey?: string, options?: SortKeyCacheRangeOptions): Promise>; } diff --git a/src/legacy/smartweave-global.ts b/src/legacy/smartweave-global.ts index 60a01db..7579806 100644 --- a/src/legacy/smartweave-global.ts +++ b/src/legacy/smartweave-global.ts @@ -2,7 +2,7 @@ import Arweave from 'arweave'; import { EvaluationOptions } from '../core/modules/StateEvaluator'; import { GQLNodeInterface, GQLTagInterface, VrfData } from './gqlResult'; -import { BatchDBOp, CacheKey, PutBatch, SortKeyCache, SortKeyCacheEntry } from '../cache/SortKeyCache'; +import { CacheKey, SortKeyCache } from '../cache/SortKeyCache'; import { SortKeyCacheRangeOptions } from '../cache/SortKeyCacheRangeOptions'; import {InteractionState} from "../contract/states/InteractionState"; @@ -283,14 +283,27 @@ export class KV { return result?.cachedValue || null; } + async del(key: string): Promise { + this.checkStorageAvailable(); + const sortKey = this._transaction.sortKey; + + // then we're checking if the values exists in the interactionState + const interactionStateValue = await this._interactionState.delKV(this._contractTxId, new CacheKey(key, this._transaction.sortKey)); + if (interactionStateValue != null) { + return interactionStateValue; + } + + await this._storage.del(new CacheKey(key, this._transaction.sortKey)); + } + async keys(options?: SortKeyCacheRangeOptions): Promise { const sortKey = this._transaction.sortKey; return await this._storage.keys(sortKey, options) } - async entries(options?: SortKeyCacheRangeOptions): Promise[]> { + async kvMap(options?: SortKeyCacheRangeOptions): Promise> { const sortKey = this._transaction.sortKey; - return this._storage.entries(sortKey, options) + return this._storage.kvMap(sortKey, options) } async commit(): Promise {