feat: uncommitted state for internal writes
This commit is contained in:
@@ -22,14 +22,7 @@ import { LoggerFactory } from '../logging/LoggerFactory';
|
||||
import { Evolve } from '../plugins/Evolve';
|
||||
import { ArweaveWrapper } from '../utils/ArweaveWrapper';
|
||||
import { sleep } from '../utils/utils';
|
||||
import {
|
||||
BenchmarkStats,
|
||||
Contract,
|
||||
CurrentTx,
|
||||
InnerCallData,
|
||||
WriteInteractionOptions,
|
||||
WriteInteractionResponse
|
||||
} from './Contract';
|
||||
import { BenchmarkStats, Contract, InnerCallData, WriteInteractionOptions, WriteInteractionResponse } from './Contract';
|
||||
import { ArTransfer, ArWallet, emptyTransfer, Tags } from './deploy/CreateContract';
|
||||
import { InnerWritesEvaluator } from './InnerWritesEvaluator';
|
||||
import { generateMockVrf } from '../utils/vrf';
|
||||
@@ -37,6 +30,7 @@ import { Signature, CustomSignature } from './Signature';
|
||||
import { ContractDefinition } from '../core/ContractDefinition';
|
||||
import { EvaluationOptionsEvaluator } from './EvaluationOptionsEvaluator';
|
||||
import { WarpFetchWrapper } from '../core/WarpFetchWrapper';
|
||||
import { Mutex } from 'async-mutex';
|
||||
|
||||
/**
|
||||
* An implementation of {@link Contract} that is backwards compatible with current style
|
||||
@@ -66,6 +60,8 @@ export class HandlerBasedContract<State> implements Contract<State> {
|
||||
|
||||
private _uncommittedStates = new Map<string, EvalStateResult<unknown>>();
|
||||
|
||||
private readonly mutex = new Mutex();
|
||||
|
||||
constructor(
|
||||
private readonly _contractTxId: string,
|
||||
protected readonly warp: Warp,
|
||||
@@ -129,52 +125,71 @@ export class HandlerBasedContract<State> implements Contract<State> {
|
||||
|
||||
async readState(
|
||||
sortKeyOrBlockHeight?: string | number,
|
||||
caller?: string,
|
||||
interactions?: GQLNodeInterface[]
|
||||
): Promise<SortKeyCacheResult<EvalStateResult<State>>> {
|
||||
this.logger.info('Read state for', {
|
||||
contractTxId: this._contractTxId,
|
||||
sortKeyOrBlockHeight
|
||||
});
|
||||
const initBenchmark = Benchmark.measure();
|
||||
this.maybeResetRootContract();
|
||||
if (!this.isRoot() && sortKeyOrBlockHeight == null) {
|
||||
throw new Error('SortKey MUST be always set for non-root contract calls');
|
||||
}
|
||||
|
||||
const { stateEvaluator } = this.warp;
|
||||
|
||||
const sortKey =
|
||||
typeof sortKeyOrBlockHeight == 'number'
|
||||
? this._sorter.generateLastSortKey(sortKeyOrBlockHeight)
|
||||
: sortKeyOrBlockHeight;
|
||||
|
||||
const executionContext = await this.createExecutionContext(this._contractTxId, sortKey, false, interactions);
|
||||
this.logger.info('Execution Context', {
|
||||
srcTxId: executionContext.contractDefinition?.srcTxId,
|
||||
missingInteractions: executionContext.sortedInteractions?.length,
|
||||
cachedSortKey: executionContext.cachedState?.sortKey
|
||||
});
|
||||
initBenchmark.stop();
|
||||
if (sortKey && !this.isRoot() && this.hasUncommittedState(this.txId())) {
|
||||
const result = this.getUncommittedState(this.txId());
|
||||
return {
|
||||
sortKey,
|
||||
cachedValue: result as EvalStateResult<State>
|
||||
};
|
||||
}
|
||||
|
||||
const stateBenchmark = Benchmark.measure();
|
||||
const result = await stateEvaluator.eval(executionContext);
|
||||
stateBenchmark.stop();
|
||||
// TODO: not sure if we should synchronize on a contract instance or contractTxId
|
||||
// in the latter case, the warp instance should keep a map contractTxId -> mutex
|
||||
const releaseMutex = await this.mutex.acquire();
|
||||
try {
|
||||
const initBenchmark = Benchmark.measure();
|
||||
this.maybeResetRootContract();
|
||||
|
||||
const total = (initBenchmark.elapsed(true) as number) + (stateBenchmark.elapsed(true) as number);
|
||||
const executionContext = await this.createExecutionContext(this._contractTxId, sortKey, false, interactions);
|
||||
this.logger.info('Execution Context', {
|
||||
srcTxId: executionContext.contractDefinition?.srcTxId,
|
||||
missingInteractions: executionContext.sortedInteractions?.length,
|
||||
cachedSortKey: executionContext.cachedState?.sortKey
|
||||
});
|
||||
initBenchmark.stop();
|
||||
|
||||
this._benchmarkStats = {
|
||||
gatewayCommunication: initBenchmark.elapsed(true) as number,
|
||||
stateEvaluation: stateBenchmark.elapsed(true) as number,
|
||||
total
|
||||
};
|
||||
const stateBenchmark = Benchmark.measure();
|
||||
const result = await stateEvaluator.eval(executionContext);
|
||||
stateBenchmark.stop();
|
||||
|
||||
this.logger.info('Benchmark', {
|
||||
'Gateway communication ': initBenchmark.elapsed(),
|
||||
'Contract evaluation ': stateBenchmark.elapsed(),
|
||||
'Total: ': `${total.toFixed(0)}ms`
|
||||
});
|
||||
const total = (initBenchmark.elapsed(true) as number) + (stateBenchmark.elapsed(true) as number);
|
||||
|
||||
return result;
|
||||
this._benchmarkStats = {
|
||||
gatewayCommunication: initBenchmark.elapsed(true) as number,
|
||||
stateEvaluation: stateBenchmark.elapsed(true) as number,
|
||||
total
|
||||
};
|
||||
|
||||
this.logger.info('Benchmark', {
|
||||
'Gateway communication ': initBenchmark.elapsed(),
|
||||
'Contract evaluation ': stateBenchmark.elapsed(),
|
||||
'Total: ': `${total.toFixed(0)}ms`
|
||||
});
|
||||
|
||||
if (sortKey && !this.isRoot()) {
|
||||
this.setUncommittedState(this.txId(), result.cachedValue);
|
||||
}
|
||||
|
||||
return result;
|
||||
} finally {
|
||||
releaseMutex();
|
||||
}
|
||||
}
|
||||
|
||||
async readStateFor(
|
||||
@@ -198,7 +213,7 @@ export class HandlerBasedContract<State> implements Contract<State> {
|
||||
interactionTx: GQLNodeInterface
|
||||
): Promise<InteractionResult<State, View>> {
|
||||
this.logger.info(`View state for ${this._contractTxId}`, interactionTx);
|
||||
return await this.callContractForTx<Input, View>(input, interactionTx);
|
||||
return await this.doApplyInputOnTx<Input, View>(input, interactionTx);
|
||||
}
|
||||
|
||||
async dryWrite<Input>(
|
||||
@@ -211,9 +226,9 @@ export class HandlerBasedContract<State> implements Contract<State> {
|
||||
return await this.callContract<Input>(input, caller, undefined, tags, transfer);
|
||||
}
|
||||
|
||||
async dryWriteFromTx<Input>(input: Input, transaction: GQLNodeInterface): Promise<InteractionResult<State, unknown>> {
|
||||
this.logger.info(`Dry-write from transaction ${transaction.id} for ${this._contractTxId}`);
|
||||
return await this.callContractForTx<Input>(input, transaction);
|
||||
async applyInput<Input>(input: Input, transaction: GQLNodeInterface): Promise<InteractionResult<State, unknown>> {
|
||||
this.logger.info(`Apply-input from transaction ${transaction.id} for ${this._contractTxId}`);
|
||||
return await this.doApplyInputOnTx<Input>(input, transaction);
|
||||
}
|
||||
|
||||
async writeInteraction<Input>(
|
||||
@@ -683,14 +698,25 @@ export class HandlerBasedContract<State> implements Contract<State> {
|
||||
return handleResult;
|
||||
}
|
||||
|
||||
private async callContractForTx<Input, View = unknown>(
|
||||
private async doApplyInputOnTx<Input, View = unknown>(
|
||||
input: Input,
|
||||
interactionTx: GQLNodeInterface
|
||||
): Promise<InteractionResult<State, View>> {
|
||||
this.maybeResetRootContract();
|
||||
|
||||
let evalStateResult: SortKeyCacheResult<EvalStateResult<State>>;
|
||||
|
||||
const executionContext = await this.createExecutionContextFromTx(this._contractTxId, interactionTx);
|
||||
const evalStateResult = await this.warp.stateEvaluator.eval<State>(executionContext);
|
||||
|
||||
if (!this.isRoot() && this.hasUncommittedState(this.txId())) {
|
||||
evalStateResult = {
|
||||
sortKey: interactionTx.sortKey,
|
||||
cachedValue: this.getUncommittedState(this.txId()) as EvalStateResult<State>
|
||||
};
|
||||
} else {
|
||||
evalStateResult = await this.warp.stateEvaluator.eval<State>(executionContext);
|
||||
this.setUncommittedState(this.txId(), evalStateResult.cachedValue);
|
||||
}
|
||||
|
||||
this.logger.debug('callContractForTx - evalStateResult', {
|
||||
result: evalStateResult.cachedValue.state,
|
||||
@@ -803,15 +829,6 @@ export class HandlerBasedContract<State> implements Contract<State> {
|
||||
return this._rootSortKey;
|
||||
}
|
||||
|
||||
private getRoot(): Contract<unknown> {
|
||||
let result: Contract = this;
|
||||
while (!result.isRoot()) {
|
||||
result = result.parent();
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
getEoEvaluator(): EvaluationOptionsEvaluator {
|
||||
const root = this.getRoot() as HandlerBasedContract<unknown>;
|
||||
return root._eoEvaluator;
|
||||
@@ -850,10 +867,38 @@ export class HandlerBasedContract<State> implements Contract<State> {
|
||||
getUncommittedState(contractTxId: string): EvalStateResult<unknown> {
|
||||
return (this.getRoot() as HandlerBasedContract<unknown>)._uncommittedStates.get(contractTxId);
|
||||
}
|
||||
|
||||
setUncommittedState(contractTxId: string, result: EvalStateResult<unknown>): void {
|
||||
(this.getRoot() as HandlerBasedContract<unknown>)._uncommittedStates.set(contractTxId, result);
|
||||
this.getRoot()._uncommittedStates.set(contractTxId, result);
|
||||
}
|
||||
|
||||
hasUncommittedState(contractTxId: string): boolean {
|
||||
return (this.getRoot() as HandlerBasedContract<unknown>)._uncommittedStates.has(contractTxId);
|
||||
return this.getRoot()._uncommittedStates.has(contractTxId);
|
||||
}
|
||||
|
||||
resetUncommittedState(): void {
|
||||
this.getRoot()._uncommittedStates = new Map();
|
||||
}
|
||||
|
||||
async commitStates(interaction: GQLNodeInterface): Promise<void> {
|
||||
const uncommittedStates = this.getRoot()._uncommittedStates;
|
||||
try {
|
||||
if (uncommittedStates.size > 1) {
|
||||
for (const [k, v] of uncommittedStates) {
|
||||
await this.warp.stateEvaluator.putInCache(k, interaction, v);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
this.resetUncommittedState();
|
||||
}
|
||||
}
|
||||
|
||||
private getRoot(): HandlerBasedContract<unknown> {
|
||||
let result: Contract = this;
|
||||
while (!result.isRoot()) {
|
||||
result = result.parent();
|
||||
}
|
||||
|
||||
return result as HandlerBasedContract<unknown>;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user