feat: Add functions to the Kv API in order to query ranges of keys and/or values #345 - check previous transaction finished
This commit is contained in:
2
src/cache/SortKeyCache.ts
vendored
2
src/cache/SortKeyCache.ts
vendored
@@ -51,7 +51,7 @@ export interface SortKeyCache<V> {
|
||||
|
||||
close(): Promise<void>;
|
||||
|
||||
begin(): void;
|
||||
begin(): Promise<void>;
|
||||
|
||||
rollback(): void;
|
||||
|
||||
|
||||
72
src/cache/impl/LevelDbCache.ts
vendored
72
src/cache/impl/LevelDbCache.ts
vendored
@@ -20,10 +20,12 @@ import { AbstractKeyIteratorOptions } from 'abstract-level/types/abstract-iterat
|
||||
*/
|
||||
|
||||
class ClientValueWrapper<V> {
|
||||
constructor(readonly value: V, readonly tombstone: boolean = false) {}
|
||||
constructor(readonly value: V, readonly tomb: boolean = false) {}
|
||||
}
|
||||
|
||||
export class LevelDbCache<V> implements SortKeyCache<V> {
|
||||
private readonly ongoingTransactionMark = '$$warp-internal-transaction$$';
|
||||
|
||||
private readonly logger = LoggerFactory.INST.create('LevelDbCache');
|
||||
private readonly subLevelSeparator: string;
|
||||
private readonly subLevelOptions: AbstractSublevelOptions<string, ClientValueWrapper<V>>;
|
||||
@@ -79,7 +81,12 @@ export class LevelDbCache<V> implements SortKeyCache<V> {
|
||||
await contractCache.open();
|
||||
try {
|
||||
const result: ClientValueWrapper<V> = await contractCache.get(cacheKey.sortKey);
|
||||
const resultValue = result.tombstone ? null : result.value;
|
||||
let resultValue: V;
|
||||
if (result.tomb === undefined && result.value === undefined) {
|
||||
resultValue = result as V;
|
||||
} else {
|
||||
resultValue = result.tomb ? null : result.value;
|
||||
}
|
||||
return new SortKeyCacheResult<V>(cacheKey.sortKey, resultValue);
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
} catch (e: any) {
|
||||
@@ -98,7 +105,10 @@ export class LevelDbCache<V> implements SortKeyCache<V> {
|
||||
const keys = await contractCache.keys({ reverse: true, limit: 1 }).all();
|
||||
if (keys.length) {
|
||||
const lastValueWrap = await contractCache.get(keys[0]);
|
||||
if (!lastValueWrap.tombstone) {
|
||||
if (lastValueWrap.tomb === undefined && lastValueWrap.value === undefined) {
|
||||
return new SortKeyCacheResult<V>(keys[0], lastValueWrap as V);
|
||||
}
|
||||
if (!lastValueWrap.tomb) {
|
||||
return new SortKeyCacheResult<V>(keys[0], lastValueWrap.value);
|
||||
}
|
||||
}
|
||||
@@ -112,7 +122,7 @@ export class LevelDbCache<V> implements SortKeyCache<V> {
|
||||
const keys = await contractCache.keys({ reverse: true, lte: sortKey, limit: 1 }).all();
|
||||
if (keys.length) {
|
||||
const cachedVal = await contractCache.get(keys[0]);
|
||||
if (!cachedVal.tombstone) {
|
||||
if (!cachedVal.tomb) {
|
||||
return new SortKeyCacheResult<V>(keys[0], cachedVal.value);
|
||||
}
|
||||
}
|
||||
@@ -124,7 +134,7 @@ export class LevelDbCache<V> implements SortKeyCache<V> {
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete operation under the hood is a write operation with setting tombstone flag to true.
|
||||
* Delete operation under the hood is a write operation with setting tomb flag to true.
|
||||
* The idea behind is based on Cassandra Tombstone
|
||||
* https://www.instaclustr.com/support/documentation/cassandra/using-cassandra/managing-tombstones-in-cassandra/
|
||||
* There is a couple of benefits to this approach:
|
||||
@@ -142,11 +152,10 @@ export class LevelDbCache<V> implements SortKeyCache<V> {
|
||||
// manually opening to fix https://github.com/Level/level/issues/221
|
||||
await contractCache.open();
|
||||
await contractCache.put(stateCacheKey.sortKey, valueWrapper);
|
||||
if (!this._rollbackBatch) {
|
||||
this.begin();
|
||||
}
|
||||
if (this._rollbackBatch) {
|
||||
this._rollbackBatch.del(stateCacheKey.sortKey, { sublevel: contractCache });
|
||||
}
|
||||
}
|
||||
|
||||
async delete(key: string): Promise<void> {
|
||||
const contractCache = this.db.sublevel<string, ClientValueWrapper<V>>(key, this.subLevelOptions);
|
||||
@@ -166,7 +175,6 @@ export class LevelDbCache<V> implements SortKeyCache<V> {
|
||||
|
||||
async open(): Promise<void> {
|
||||
await this.db.open();
|
||||
await this.begin();
|
||||
}
|
||||
|
||||
async close(): Promise<void> {
|
||||
@@ -175,20 +183,53 @@ export class LevelDbCache<V> implements SortKeyCache<V> {
|
||||
}
|
||||
}
|
||||
|
||||
begin() {
|
||||
this._rollbackBatch = this.db.batch();
|
||||
async begin(): Promise<void> {
|
||||
await this.initRollbackBatch();
|
||||
}
|
||||
|
||||
async rollback() {
|
||||
if (this._rollbackBatch && this._rollbackBatch.length > 0) {
|
||||
if (this._rollbackBatch) {
|
||||
this._rollbackBatch.del(this.ongoingTransactionMark);
|
||||
await this._rollbackBatch.write();
|
||||
await this._rollbackBatch.close();
|
||||
}
|
||||
this._rollbackBatch = null;
|
||||
}
|
||||
|
||||
private async initRollbackBatch(): Promise<
|
||||
AbstractChainedBatch<MemoryLevel<string, ClientValueWrapper<V>>, string, ClientValueWrapper<V>>
|
||||
> {
|
||||
if (this._rollbackBatch == null) {
|
||||
await this.checkPreviousTransactionFinished();
|
||||
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
|
||||
// @ts-ignore
|
||||
await this.db.put(this.ongoingTransactionMark, 'ongoing');
|
||||
|
||||
this._rollbackBatch = this.db.batch();
|
||||
}
|
||||
return this._rollbackBatch;
|
||||
}
|
||||
|
||||
private async checkPreviousTransactionFinished() {
|
||||
let transactionMarkValue;
|
||||
|
||||
try {
|
||||
transactionMarkValue = await this.db.get(this.ongoingTransactionMark);
|
||||
// eslint-disable-next-line no-empty
|
||||
} catch (error) {}
|
||||
|
||||
if (transactionMarkValue == 'ongoing') {
|
||||
throw new Error(`Database seems to be in inconsistent state. The previous transaction has not finished.`);
|
||||
}
|
||||
}
|
||||
|
||||
async commit() {
|
||||
if (this._rollbackBatch) {
|
||||
await this._rollbackBatch.clear().close();
|
||||
await this._rollbackBatch.clear();
|
||||
await this.db.del(this.ongoingTransactionMark);
|
||||
await this._rollbackBatch.close();
|
||||
}
|
||||
this._rollbackBatch = null;
|
||||
}
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
@@ -207,7 +248,7 @@ export class LevelDbCache<V> implements SortKeyCache<V> {
|
||||
for (const joinedKey of keys) {
|
||||
// default joined key format used by sub-levels:
|
||||
// <separator><contract_tx_id (43 chars)><separator><sort_key>
|
||||
const sortKey = joinedKey.substring(45);
|
||||
const sortKey = joinedKey.split(this.subLevelSeparator)[1];
|
||||
if (sortKey.localeCompare(lastSortKey) > 0) {
|
||||
lastSortKey = sortKey;
|
||||
}
|
||||
@@ -221,6 +262,9 @@ export class LevelDbCache<V> implements SortKeyCache<V> {
|
||||
}
|
||||
|
||||
validateKey(key: string) {
|
||||
if (key.includes(this.ongoingTransactionMark)) {
|
||||
throw new Error(`Validation error: Key ${key} for internal use only`);
|
||||
}
|
||||
if (key.includes(this.subLevelSeparator)) {
|
||||
throw new Error(`Validation error: key ${key} contains db separator ${this.subLevelSeparator}`);
|
||||
}
|
||||
|
||||
@@ -49,7 +49,8 @@ export class ContractInteractionState implements InteractionState {
|
||||
|
||||
async commit(interaction: GQLNodeInterface): Promise<void> {
|
||||
if (interaction.dry) {
|
||||
return await this.rollbackKVs();
|
||||
await this.rollbackKVs();
|
||||
return this.reset();
|
||||
}
|
||||
try {
|
||||
await this.doStoreJson(this._json, interaction);
|
||||
|
||||
@@ -134,6 +134,7 @@ export class JsHandlerApi<State> extends AbstractContractHandler<State> {
|
||||
|
||||
try {
|
||||
await this.swGlobal.kv.open();
|
||||
await this.swGlobal.kv.begin();
|
||||
|
||||
const handlerResult = await Promise.race([timeoutPromise, this.contractFunction(stateClone, interaction)]);
|
||||
|
||||
|
||||
@@ -34,8 +34,16 @@ export class WasmHandlerApi<State> extends AbstractContractHandler<State> {
|
||||
this.assignWrite(executionContext);
|
||||
|
||||
await this.swGlobal.kv.open();
|
||||
await this.swGlobal.kv.begin();
|
||||
const handlerResult = await this.doHandle(interaction);
|
||||
|
||||
if (interactionData.interaction.interactionType === 'view') {
|
||||
// view calls are not allowed to perform any KV modifications
|
||||
await this.swGlobal.kv.rollback();
|
||||
} else {
|
||||
await this.swGlobal.kv.commit();
|
||||
}
|
||||
|
||||
return {
|
||||
type: 'ok',
|
||||
result: handlerResult,
|
||||
@@ -62,10 +70,6 @@ export class WasmHandlerApi<State> extends AbstractContractHandler<State> {
|
||||
};
|
||||
}
|
||||
} finally {
|
||||
if (interactionData.interaction.interactionType === 'view') {
|
||||
// view calls are not allowed to perform any KV modifications
|
||||
await this.swGlobal.kv.rollback();
|
||||
}
|
||||
await this.swGlobal.kv.close();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -313,6 +313,12 @@ export class KV {
|
||||
return this._storage.kvMap(sortKey, options);
|
||||
}
|
||||
|
||||
async begin() {
|
||||
if (this._storage) {
|
||||
return this._storage.begin();
|
||||
}
|
||||
}
|
||||
|
||||
async commit(): Promise<void> {
|
||||
if (this._storage) {
|
||||
if (this._transaction.dryRun) {
|
||||
|
||||
Reference in New Issue
Block a user