diff --git a/src/cache/SortKeyCache.ts b/src/cache/SortKeyCache.ts index 6d41cae..6949d0d 100644 --- a/src/cache/SortKeyCache.ts +++ b/src/cache/SortKeyCache.ts @@ -51,7 +51,7 @@ export interface SortKeyCache { close(): Promise; - begin(): void; + begin(): Promise; rollback(): void; diff --git a/src/cache/impl/LevelDbCache.ts b/src/cache/impl/LevelDbCache.ts index 39676d9..541531a 100644 --- a/src/cache/impl/LevelDbCache.ts +++ b/src/cache/impl/LevelDbCache.ts @@ -20,10 +20,12 @@ import { AbstractKeyIteratorOptions } from 'abstract-level/types/abstract-iterat */ class ClientValueWrapper { - constructor(readonly value: V, readonly tombstone: boolean = false) {} + constructor(readonly value: V, readonly tomb: boolean = false) {} } export class LevelDbCache implements SortKeyCache { + private readonly ongoingTransactionMark = '$$warp-internal-transaction$$'; + private readonly logger = LoggerFactory.INST.create('LevelDbCache'); private readonly subLevelSeparator: string; private readonly subLevelOptions: AbstractSublevelOptions>; @@ -79,7 +81,12 @@ export class LevelDbCache implements SortKeyCache { await contractCache.open(); try { const result: ClientValueWrapper = await contractCache.get(cacheKey.sortKey); - const resultValue = result.tombstone ? null : result.value; + let resultValue: V; + if (result.tomb === undefined && result.value === undefined) { + resultValue = result as V; + } else { + resultValue = result.tomb ? null : result.value; + } return new SortKeyCacheResult(cacheKey.sortKey, resultValue); // eslint-disable-next-line @typescript-eslint/no-explicit-any } catch (e: any) { @@ -98,7 +105,10 @@ export class LevelDbCache implements SortKeyCache { const keys = await contractCache.keys({ reverse: true, limit: 1 }).all(); if (keys.length) { const lastValueWrap = await contractCache.get(keys[0]); - if (!lastValueWrap.tombstone) { + if (lastValueWrap.tomb === undefined && lastValueWrap.value === undefined) { + return new SortKeyCacheResult(keys[0], lastValueWrap as V); + } + if (!lastValueWrap.tomb) { return new SortKeyCacheResult(keys[0], lastValueWrap.value); } } @@ -112,7 +122,7 @@ export class LevelDbCache implements SortKeyCache { const keys = await contractCache.keys({ reverse: true, lte: sortKey, limit: 1 }).all(); if (keys.length) { const cachedVal = await contractCache.get(keys[0]); - if (!cachedVal.tombstone) { + if (!cachedVal.tomb) { return new SortKeyCacheResult(keys[0], cachedVal.value); } } @@ -124,7 +134,7 @@ export class LevelDbCache implements SortKeyCache { } /** - * Delete operation under the hood is a write operation with setting tombstone flag to true. + * Delete operation under the hood is a write operation with setting tomb flag to true. * The idea behind is based on Cassandra Tombstone * https://www.instaclustr.com/support/documentation/cassandra/using-cassandra/managing-tombstones-in-cassandra/ * There is a couple of benefits to this approach: @@ -142,10 +152,9 @@ export class LevelDbCache implements SortKeyCache { // manually opening to fix https://github.com/Level/level/issues/221 await contractCache.open(); await contractCache.put(stateCacheKey.sortKey, valueWrapper); - if (!this._rollbackBatch) { - this.begin(); + if (this._rollbackBatch) { + this._rollbackBatch.del(stateCacheKey.sortKey, { sublevel: contractCache }); } - this._rollbackBatch.del(stateCacheKey.sortKey, { sublevel: contractCache }); } async delete(key: string): Promise { @@ -166,7 +175,6 @@ export class LevelDbCache implements SortKeyCache { async open(): Promise { await this.db.open(); - await this.begin(); } async close(): Promise { @@ -175,20 +183,53 @@ export class LevelDbCache implements SortKeyCache { } } - begin() { - this._rollbackBatch = this.db.batch(); + async begin(): Promise { + await this.initRollbackBatch(); } async rollback() { - if (this._rollbackBatch && this._rollbackBatch.length > 0) { + if (this._rollbackBatch) { + this._rollbackBatch.del(this.ongoingTransactionMark); await this._rollbackBatch.write(); + await this._rollbackBatch.close(); + } + this._rollbackBatch = null; + } + + private async initRollbackBatch(): Promise< + AbstractChainedBatch>, string, ClientValueWrapper> + > { + if (this._rollbackBatch == null) { + await this.checkPreviousTransactionFinished(); + // eslint-disable-next-line @typescript-eslint/ban-ts-comment + // @ts-ignore + await this.db.put(this.ongoingTransactionMark, 'ongoing'); + + this._rollbackBatch = this.db.batch(); + } + return this._rollbackBatch; + } + + private async checkPreviousTransactionFinished() { + let transactionMarkValue; + + try { + transactionMarkValue = await this.db.get(this.ongoingTransactionMark); + // eslint-disable-next-line no-empty + } catch (error) {} + + if (transactionMarkValue == 'ongoing') { + throw new Error(`Database seems to be in inconsistent state. The previous transaction has not finished.`); } } async commit() { if (this._rollbackBatch) { - await this._rollbackBatch.clear().close(); + await this._rollbackBatch.clear(); + await this.db.del(this.ongoingTransactionMark); + await this._rollbackBatch.close(); } + this._rollbackBatch = null; } // eslint-disable-next-line @typescript-eslint/no-explicit-any @@ -207,7 +248,7 @@ export class LevelDbCache implements SortKeyCache { for (const joinedKey of keys) { // default joined key format used by sub-levels: // - const sortKey = joinedKey.substring(45); + const sortKey = joinedKey.split(this.subLevelSeparator)[1]; if (sortKey.localeCompare(lastSortKey) > 0) { lastSortKey = sortKey; } @@ -221,6 +262,9 @@ export class LevelDbCache implements SortKeyCache { } validateKey(key: string) { + if (key.includes(this.ongoingTransactionMark)) { + throw new Error(`Validation error: Key ${key} for internal use only`); + } if (key.includes(this.subLevelSeparator)) { throw new Error(`Validation error: key ${key} contains db separator ${this.subLevelSeparator}`); } diff --git a/src/contract/states/ContractInteractionState.ts b/src/contract/states/ContractInteractionState.ts index 0349ffe..aa6a955 100644 --- a/src/contract/states/ContractInteractionState.ts +++ b/src/contract/states/ContractInteractionState.ts @@ -49,7 +49,8 @@ export class ContractInteractionState implements InteractionState { async commit(interaction: GQLNodeInterface): Promise { if (interaction.dry) { - return await this.rollbackKVs(); + await this.rollbackKVs(); + return this.reset(); } try { await this.doStoreJson(this._json, interaction); diff --git a/src/core/modules/impl/handler/JsHandlerApi.ts b/src/core/modules/impl/handler/JsHandlerApi.ts index a48ffea..03ab44c 100644 --- a/src/core/modules/impl/handler/JsHandlerApi.ts +++ b/src/core/modules/impl/handler/JsHandlerApi.ts @@ -134,6 +134,7 @@ export class JsHandlerApi extends AbstractContractHandler { try { await this.swGlobal.kv.open(); + await this.swGlobal.kv.begin(); const handlerResult = await Promise.race([timeoutPromise, this.contractFunction(stateClone, interaction)]); diff --git a/src/core/modules/impl/handler/WasmHandlerApi.ts b/src/core/modules/impl/handler/WasmHandlerApi.ts index d31ace0..b9ba672 100644 --- a/src/core/modules/impl/handler/WasmHandlerApi.ts +++ b/src/core/modules/impl/handler/WasmHandlerApi.ts @@ -34,8 +34,16 @@ export class WasmHandlerApi extends AbstractContractHandler { this.assignWrite(executionContext); await this.swGlobal.kv.open(); + await this.swGlobal.kv.begin(); const handlerResult = await this.doHandle(interaction); - await this.swGlobal.kv.commit(); + + if (interactionData.interaction.interactionType === 'view') { + // view calls are not allowed to perform any KV modifications + await this.swGlobal.kv.rollback(); + } else { + await this.swGlobal.kv.commit(); + } + return { type: 'ok', result: handlerResult, @@ -62,10 +70,6 @@ export class WasmHandlerApi extends AbstractContractHandler { }; } } finally { - if (interactionData.interaction.interactionType === 'view') { - // view calls are not allowed to perform any KV modifications - await this.swGlobal.kv.rollback(); - } await this.swGlobal.kv.close(); } } diff --git a/src/legacy/smartweave-global.ts b/src/legacy/smartweave-global.ts index 5d32766..d35c099 100644 --- a/src/legacy/smartweave-global.ts +++ b/src/legacy/smartweave-global.ts @@ -313,6 +313,12 @@ export class KV { return this._storage.kvMap(sortKey, options); } + async begin() { + if (this._storage) { + return this._storage.begin(); + } + } + async commit(): Promise { if (this._storage) { if (this._transaction.dryRun) {