feat: Add functions to the Kv API in order to query ranges of keys and/or values #345 - kv delete

This commit is contained in:
Tadeuchi
2023-03-30 00:41:58 +02:00
parent 7717d87218
commit da60ea200c
8 changed files with 130 additions and 96 deletions

View File

@@ -12,7 +12,7 @@ import { LoggerFactory } from "../../../logging/LoggerFactory";
import { WriteInteractionResponse } from "../../../contract/Contract";
import { DeployPlugin } from 'warp-contracts-plugin-deploy';
describe('Testing the Profit Sharing Token', () => {
describe('Testing the PST kv storage range access', () => {
let contractSrc: string;
let wallet: JWKInterface;
@@ -177,7 +177,7 @@ describe('Testing the Profit Sharing Token', () => {
expect((await pst.currentBalance(aliceWalletAddress)).balance).toEqual(200_000);
expect((await pst.currentBalance(walletAddress)).balance).toEqual(555669 - 655 - 200_000);
expect((await pst.getStorageValues(['check.' + walletAddress + '.' + aliceWalletAddress]))
.cachedValue.get('check.' + walletAddress + '.' + aliceWalletAddress)).toBe(0)
.cachedValue.get('check.' + walletAddress + '.' + aliceWalletAddress)).toBeNull()
});
it('should not be able to write check', async () => {

View File

@@ -20,10 +20,12 @@ export async function handle(state, action) {
throw new ContractError(`Caller balance ${callerBalance} not high enough to write check for ${qty}!`);
}
const allChecks = (await SmartWeave.kv.entries({ gte: 'check.' + caller, lte: 'check.' + caller + '.\xff'}))
.reduce((acc, entry) => acc + parseInt(entry.value), 0)
let sumChecks = 0;
for await (let part of (await SmartWeave.kv.kvMap({ gte: 'check.' + caller, lte: 'check.' + caller + '.\xff'})).values()) {
sumChecks = sumChecks + parseInt(part);
}
if (callerBalance < allChecks + qty) {
if (callerBalance < sumChecks + qty) {
throw new ContractError(`Caller balance ${callerBalance} not high enough to write next check ${qty}!`);
}
@@ -49,7 +51,7 @@ export async function handle(state, action) {
callerBalance = callerBalance + check;
await SmartWeave.kv.put(caller, callerBalance);
await SmartWeave.kv.put('check.' + target + '.' + caller, 0);
await SmartWeave.kv.del('check.' + target + '.' + caller);
return {state};
}
@@ -104,8 +106,10 @@ export async function handle(state, action) {
}
if (input.function === 'minted') {
const sumMinted = (await SmartWeave.kv.entries({ gte: 'mint.', lte: 'mint.\xff'}))
.reduce((acc, entry) => acc + parseInt(entry.value), 0)
let sumMinted = 0;
for await (let part of (await SmartWeave.kv.kvMap({ gte: 'mint.', lte: 'mint.\xff'})).values()) {
sumMinted = sumMinted + parseInt(part);
}
return {result: {minted: sumMinted}};
}

View File

@@ -31,6 +31,12 @@ export interface SortKeyCache<V> {
*/
put(cacheKey: CacheKey, value: V): Promise<void>;
/**
* deletes value in cache under given {@link CacheKey.key} from {@link CacheKey.sortKey}.
* the value will be still available if fetched using a lower sortKey
*/
del(cacheKey: CacheKey): Promise<void>;
/**
* removes all data stored under a specified key
*/
@@ -63,11 +69,14 @@ export interface SortKeyCache<V> {
keys(): Promise<string[]>;
/**
* Return filtered keys, based on range options
* Returns keys for a specified range
*/
keys(sortKey?: string, options?: SortKeyCacheRangeOptions): Promise<string[]>;
entries(sortKey: string, options?: SortKeyCacheRangeOptions): Promise<SortKeyCacheEntry<V>[]>;
/**
* Returns a key value map for a specified range
*/
kvMap(sortKey: string, options?: SortKeyCacheRangeOptions): Promise<Map<string, V>>;
/**
* returns underlying storage (LevelDB, LMDB, sqlite...)
@@ -98,12 +107,6 @@ export class CacheKey {
constructor(readonly key: string, readonly sortKey: string) {}
}
export interface SortKeyCacheEntry<V> {
key: string;
value: V;
}
// tslint:disable-next-line:max-classes-per-file
export class SortKeyCacheResult<V> {
constructor(readonly sortKey: string, readonly cachedValue: V) {}
}

View File

@@ -1,4 +1,4 @@
import { BatchDBOp, CacheKey, SortKeyCache, SortKeyCacheEntry, SortKeyCacheResult } from '../SortKeyCache';
import { BatchDBOp, CacheKey, SortKeyCache, SortKeyCacheResult } from '../SortKeyCache';
import { Level } from 'level';
import { MemoryLevel } from 'memory-level';
import { CacheOptions } from '../../core/WarpFactory';
@@ -19,21 +19,35 @@ import { AbstractChainedBatch } from 'abstract-level/types/abstract-chained-batc
* In order to reduce the cache size, the oldest entries are automatically pruned.
*/
class ClientValueWrapper<V> {
constructor(readonly value: V, readonly tombstone: boolean = false) {}
}
export class LevelDbCache<V> implements SortKeyCache<V> {
private readonly logger = LoggerFactory.INST.create('LevelDbCache');
private readonly subLevelSeparator: string;
private readonly subLevelOptions: AbstractSublevelOptions<string, V>;
private readonly subLevelOptions: AbstractSublevelOptions<string, ClientValueWrapper<V>>;
/**
* not using the Level type, as it is not compatible with MemoryLevel (i.e. has more properties)
* and there doesn't seem to be any public interface/abstract type for all Level implementations
* (the AbstractLevel is not exported from the package...)
*/
private _db: MemoryLevel<string, V>;
private _rollbackBatch: AbstractChainedBatch<MemoryLevel<string, V>, string, V>;
private _db: MemoryLevel<string, ClientValueWrapper<V>>;
/**
* Rollback batch is way of recovering kv storage state from before a failed interaction.
* Currently, all operations performed during active transaction are directly saved to kv storage.
* In case the transaction fails the changes will be reverted using the rollback batch.
*/
private _rollbackBatch: AbstractChainedBatch<
MemoryLevel<string, ClientValueWrapper<V>>,
string,
ClientValueWrapper<V>
>;
// Lazy initialization upon first access
private get db(): MemoryLevel<string, V> {
private get db(): MemoryLevel<string, ClientValueWrapper<V>> {
if (!this._db) {
if (this.cacheOptions.inMemory) {
this._db = new MemoryLevel(this.subLevelOptions);
@@ -43,7 +57,7 @@ export class LevelDbCache<V> implements SortKeyCache<V> {
}
const dbLocation = this.cacheOptions.dbLocation;
this.logger.info(`Using location ${dbLocation}`);
this._db = new Level<string, V>(dbLocation, this.subLevelOptions);
this._db = new Level<string, ClientValueWrapper<V>>(dbLocation, this.subLevelOptions);
}
}
return this._db;
@@ -60,16 +74,13 @@ export class LevelDbCache<V> implements SortKeyCache<V> {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
async get(cacheKey: CacheKey, returnDeepCopy?: boolean): Promise<SortKeyCacheResult<V> | null> {
this.validateKey(cacheKey.key);
const contractCache = this.db.sublevel<string, V>(cacheKey.key, this.subLevelOptions);
const contractCache = this.db.sublevel<string, ClientValueWrapper<V>>(cacheKey.key, this.subLevelOptions);
// manually opening to fix https://github.com/Level/level/issues/221
await contractCache.open();
try {
const result = await contractCache.get(cacheKey.sortKey);
return {
sortKey: cacheKey.sortKey,
cachedValue: result
};
const result: ClientValueWrapper<V> = await contractCache.get(cacheKey.sortKey);
const resultValue = result.tombstone ? null : result.value;
return new SortKeyCacheResult<V>(cacheKey.sortKey, resultValue);
// eslint-disable-next-line @typescript-eslint/no-explicit-any
} catch (e: any) {
if (e.code == 'LEVEL_NOT_FOUND') {
@@ -81,41 +92,56 @@ export class LevelDbCache<V> implements SortKeyCache<V> {
}
async getLast(key: string): Promise<SortKeyCacheResult<V> | null> {
const contractCache = this.db.sublevel<string, V>(key, this.subLevelOptions);
const contractCache = this.db.sublevel<string, ClientValueWrapper<V>>(key, this.subLevelOptions);
// manually opening to fix https://github.com/Level/level/issues/221
await contractCache.open();
const keys = await contractCache.keys({ reverse: true, limit: 1 }).all();
if (keys.length) {
return {
sortKey: keys[0],
cachedValue: await contractCache.get(keys[0])
};
} else {
return null;
const lastValueWrap = await contractCache.get(keys[0]);
if (!lastValueWrap.tombstone) {
return new SortKeyCacheResult<V>(keys[0], lastValueWrap.value);
}
}
return null;
}
async getLessOrEqual(key: string, sortKey: string): Promise<SortKeyCacheResult<V> | null> {
const contractCache = this.db.sublevel<string, V>(key, this.subLevelOptions);
const contractCache = this.db.sublevel<string, ClientValueWrapper<V>>(key, this.subLevelOptions);
// manually opening to fix https://github.com/Level/level/issues/221
await contractCache.open();
const keys = await contractCache.keys({ reverse: true, lte: sortKey, limit: 1 }).all();
if (keys.length) {
return {
sortKey: keys[0],
cachedValue: await contractCache.get(keys[0])
};
} else {
return null;
const cachedVal = await contractCache.get(keys[0]);
if (!cachedVal.tombstone) {
return new SortKeyCacheResult<V>(keys[0], cachedVal.value);
}
}
return null;
}
async put(stateCacheKey: CacheKey, value: V): Promise<void> {
await this.setClientValue(stateCacheKey, new ClientValueWrapper(value));
}
/**
* Delete operation under the hood is a write operation with setting tombstone flag to true.
* The idea behind is based on Cassandra Tombstone
* https://www.instaclustr.com/support/documentation/cassandra/using-cassandra/managing-tombstones-in-cassandra/
* There is a couple of benefits to this approach:
* This allows to use kv storage range operations with ease.
* The value will not be accessible only to the next interactions. Interactions reading state for lower sortKey will be able to access it.
* Revert operation for rollback is much easier to implement
*/
async del(cacheKey: CacheKey): Promise<void> {
await this.setClientValue(cacheKey, new ClientValueWrapper(null, true));
}
private async setClientValue(stateCacheKey: CacheKey, valueWrapper: ClientValueWrapper<V>): Promise<void> {
this.validateKey(stateCacheKey.key);
const contractCache = this.db.sublevel<string, V>(stateCacheKey.key, this.subLevelOptions);
const contractCache = this.db.sublevel<string, ClientValueWrapper<V>>(stateCacheKey.key, this.subLevelOptions);
// manually opening to fix https://github.com/Level/level/issues/221
await contractCache.open();
await contractCache.put(stateCacheKey.sortKey, value);
await contractCache.put(stateCacheKey.sortKey, valueWrapper);
if (!this._rollbackBatch) {
this.begin();
}
@@ -123,7 +149,7 @@ export class LevelDbCache<V> implements SortKeyCache<V> {
}
async delete(key: string): Promise<void> {
const contractCache = this.db.sublevel<string, V>(key, this.subLevelOptions);
const contractCache = this.db.sublevel<string, ClientValueWrapper<V>>(key, this.subLevelOptions);
await contractCache.open();
await contractCache.clear();
}
@@ -191,16 +217,7 @@ export class LevelDbCache<V> implements SortKeyCache<V> {
}
async keys(sortKey?: string, options?: SortKeyCacheRangeOptions): Promise<string[]> {
const distinctKeys = new Set<string>();
const rangeOptions: RangeOptions<string> = this.levelRangeOptions(options);
const joinedKeys = await this.db.keys(rangeOptions).all();
joinedKeys
.filter((k) => !sortKey || this.extractSortKey(k).localeCompare(sortKey) <= 0)
.map((k) => this.extractOriginalKey(k))
.forEach((k) => distinctKeys.add(k));
return Array.from(distinctKeys);
return Array.from((await this.kvMap(sortKey, options)).keys());
}
validateKey(key: string) {
@@ -217,17 +234,20 @@ export class LevelDbCache<V> implements SortKeyCache<V> {
return joinedKey.split(this.subLevelSeparator)[2];
}
async entries(sortKey: string, options?: SortKeyCacheRangeOptions): Promise<SortKeyCacheEntry<V>[]> {
const keys: string[] = await this.keys(sortKey, options);
async kvMap(sortKey: string, options?: SortKeyCacheRangeOptions): Promise<Map<string, V>> {
const entries: Map<string, V> = new Map();
const allKeys = (await this.db.keys(this.levelRangeOptions(options)).all())
.filter((k) => !sortKey || this.extractSortKey(k).localeCompare(sortKey) <= 0)
.map((k) => this.extractOriginalKey(k));
return Promise.all(
keys.map(async (k): Promise<SortKeyCacheEntry<V>> => {
return {
key: k,
value: (await this.getLessOrEqual(k, sortKey)).cachedValue
};
})
);
for (const k of allKeys) {
const lastValue = await this.getLessOrEqual(k, sortKey);
if (lastValue) {
entries.set(k, lastValue.cachedValue);
}
}
return entries;
}
private levelRangeOptions(options?: SortKeyCacheRangeOptions): RangeOptions<string> | undefined {
@@ -269,7 +289,7 @@ export class LevelDbCache<V> implements SortKeyCache<V> {
const contracts = await this.keys();
for (let i = 0; i < contracts.length; i++) {
const contractCache = this.db.sublevel<string, V>(contracts[i], this.subLevelOptions);
const contractCache = this.db.sublevel<string, ClientValueWrapper<V>>(contracts[i], this.subLevelOptions);
// manually opening to fix https://github.com/Level/level/issues/221
await contractCache.open();

View File

@@ -150,10 +150,7 @@ export class HandlerBasedContract<State> implements Contract<State> {
if (sortKey && !this.isRoot() && this.interactionState().has(this.txId())) {
const result = this.interactionState().get(this.txId());
return {
sortKey,
cachedValue: result as EvalStateResult<State>
};
return new SortKeyCacheResult<EvalStateResult<State>>(sortKey, result as EvalStateResult<State>);
}
// TODO: not sure if we should synchronize on a contract instance or contractTxId
@@ -725,10 +722,10 @@ export class HandlerBasedContract<State> implements Contract<State> {
const executionContext = await this.createExecutionContextFromTx(this._contractTxId, interactionTx);
if (!this.isRoot() && this.interactionState().has(this.txId())) {
evalStateResult = {
sortKey: interactionTx.sortKey,
cachedValue: this.interactionState().get(this.txId()) as EvalStateResult<State>
};
evalStateResult = new SortKeyCacheResult<EvalStateResult<State>>(
interactionTx.sortKey,
this.interactionState().get(this.txId()) as EvalStateResult<State>
);
} else {
evalStateResult = await this.warp.stateEvaluator.eval<State>(executionContext);
this.interactionState().update(this.txId(), evalStateResult.cachedValue);
@@ -858,10 +855,7 @@ export class HandlerBasedContract<State> implements Contract<State> {
async getStorageValues(keys: string[]): Promise<SortKeyCacheResult<Map<string, unknown>>> {
const lastCached = await this.warp.stateEvaluator.getCache().getLast(this.txId());
if (lastCached == null) {
return {
sortKey: null,
cachedValue: new Map()
};
return new SortKeyCacheResult<Map<string, unknown>>(null, new Map());
}
const storage = this.warp.kvStorageFactory(this.txId());
@@ -872,10 +866,7 @@ export class HandlerBasedContract<State> implements Contract<State> {
const lastValue = await storage.getLessOrEqual(key, lastCached.sortKey);
result.set(key, lastValue == null ? null : lastValue.cachedValue);
}
return {
sortKey: lastCached.sortKey,
cachedValue: result
};
return new SortKeyCacheResult<Map<string, unknown>>(lastCached.sortKey, result);
} finally {
await storage.close();
}
@@ -894,7 +885,6 @@ export class HandlerBasedContract<State> implements Contract<State> {
return result as HandlerBasedContract<unknown>;
}
private async maybeSyncStateWithRemoteSource(
remoteState: SortKeyCacheResult<EvalStateResult<State>>,
upToSortKey: string,

View File

@@ -1,5 +1,5 @@
import { InteractionState } from './InteractionState';
import { CacheKey, SortKeyCache, SortKeyCacheEntry } from '../../cache/SortKeyCache';
import { CacheKey, SortKeyCache } from '../../cache/SortKeyCache';
import { EvalStateResult } from '../../core/modules/StateEvaluator';
import { GQLNodeInterface } from '../../legacy/gqlResult';
import { Warp } from '../../core/Warp';
@@ -27,6 +27,12 @@ export class ContractInteractionState implements InteractionState {
return null;
}
async delKV(contractTxId: string, cacheKey: CacheKey): Promise<void> {
if (this._kv.has(contractTxId)) {
await this._kv.get(contractTxId).del(cacheKey);
}
}
getKvKeys(contractTxId: string, sortKey?: string, options?: SortKeyCacheRangeOptions): Promise<string[]> {
const storage = this._warp.kvStorageFactory(contractTxId);
return storage.keys(sortKey, options);
@@ -36,9 +42,9 @@ export class ContractInteractionState implements InteractionState {
contractTxId: string,
sortKey?: string,
options?: SortKeyCacheRangeOptions
): Promise<SortKeyCacheEntry<unknown>[]> {
): Promise<Map<string, unknown>> {
const storage = this._warp.kvStorageFactory(contractTxId);
return storage.entries(sortKey, options);
return storage.kvMap(sortKey, options);
}
async commit(interaction: GQLNodeInterface): Promise<void> {

View File

@@ -1,4 +1,4 @@
import { CacheKey, SortKeyCacheEntry } from '../../cache/SortKeyCache';
import { CacheKey } from '../../cache/SortKeyCache';
import { EvalStateResult } from '../../core/modules/StateEvaluator';
import { GQLNodeInterface } from '../../legacy/gqlResult';
import { SortKeyCacheRangeOptions } from '../../cache/SortKeyCacheRangeOptions';
@@ -46,11 +46,9 @@ export interface InteractionState {
getKV(contractTxId: string, cacheKey: CacheKey): Promise<unknown>;
delKV(contractTxId: string, cacheKey: CacheKey): Promise<void>;
getKvKeys(contractTxId: string, sortKey?: string, options?: SortKeyCacheRangeOptions): Promise<string[]>;
getKvRange(
contractTxId: string,
sortKey?: string,
options?: SortKeyCacheRangeOptions
): Promise<SortKeyCacheEntry<unknown>[]>;
getKvRange(contractTxId: string, sortKey?: string, options?: SortKeyCacheRangeOptions): Promise<Map<string, unknown>>;
}

View File

@@ -2,7 +2,7 @@
import Arweave from 'arweave';
import { EvaluationOptions } from '../core/modules/StateEvaluator';
import { GQLNodeInterface, GQLTagInterface, VrfData } from './gqlResult';
import { BatchDBOp, CacheKey, PutBatch, SortKeyCache, SortKeyCacheEntry } from '../cache/SortKeyCache';
import { CacheKey, SortKeyCache } from '../cache/SortKeyCache';
import { SortKeyCacheRangeOptions } from '../cache/SortKeyCacheRangeOptions';
import {InteractionState} from "../contract/states/InteractionState";
@@ -283,14 +283,27 @@ export class KV {
return result?.cachedValue || null;
}
async del(key: string): Promise<void> {
this.checkStorageAvailable();
const sortKey = this._transaction.sortKey;
// then we're checking if the values exists in the interactionState
const interactionStateValue = await this._interactionState.delKV(this._contractTxId, new CacheKey(key, this._transaction.sortKey));
if (interactionStateValue != null) {
return interactionStateValue;
}
await this._storage.del(new CacheKey(key, this._transaction.sortKey));
}
async keys(options?: SortKeyCacheRangeOptions): Promise<string[]> {
const sortKey = this._transaction.sortKey;
return await this._storage.keys(sortKey, options)
}
async entries<V>(options?: SortKeyCacheRangeOptions): Promise<SortKeyCacheEntry<V>[]> {
async kvMap<V>(options?: SortKeyCacheRangeOptions): Promise<Map<string, V>> {
const sortKey = this._transaction.sortKey;
return this._storage.entries(sortKey, options)
return this._storage.kvMap(sortKey, options)
}
async commit(): Promise<void> {