feat: AutoSync connected contracts when syncState #333
This commit is contained in:
@@ -20,7 +20,14 @@ import { LoggerFactory } from '../logging/LoggerFactory';
|
||||
import { Evolve } from '../plugins/Evolve';
|
||||
import { ArweaveWrapper } from '../utils/ArweaveWrapper';
|
||||
import { sleep } from '../utils/utils';
|
||||
import { BenchmarkStats, Contract, InnerCallData, WriteInteractionOptions, WriteInteractionResponse } from './Contract';
|
||||
import {
|
||||
BenchmarkStats,
|
||||
Contract,
|
||||
DREContractStatusResponse,
|
||||
InnerCallData,
|
||||
WriteInteractionOptions,
|
||||
WriteInteractionResponse
|
||||
} from './Contract';
|
||||
import { ArTransfer, ArWallet, emptyTransfer, Tags } from './deploy/CreateContract';
|
||||
import { InnerWritesEvaluator } from './InnerWritesEvaluator';
|
||||
import { generateMockVrf } from '../utils/vrf';
|
||||
@@ -58,6 +65,7 @@ export class HandlerBasedContract<State> implements Contract<State> {
|
||||
private _children: HandlerBasedContract<unknown>[] = [];
|
||||
|
||||
private _uncommittedStates = new Map<string, EvalStateResult<unknown>>();
|
||||
private _dreStates = new Map<string, SortKeyCacheResult<EvalStateResult<State>>>();
|
||||
|
||||
private readonly mutex = new Mutex();
|
||||
|
||||
@@ -471,13 +479,14 @@ export class HandlerBasedContract<State> implements Contract<State> {
|
||||
const { definitionLoader, interactionsLoader, stateEvaluator } = this.warp;
|
||||
|
||||
const benchmark = Benchmark.measure();
|
||||
const cachedState = await stateEvaluator.latestAvailableState<State>(contractTxId, upToSortKey);
|
||||
let cachedState = await stateEvaluator.latestAvailableState<State>(contractTxId, upToSortKey);
|
||||
|
||||
this.logger.debug('cache lookup', benchmark.elapsed());
|
||||
benchmark.reset();
|
||||
|
||||
const evolvedSrcTxId = Evolve.evolvedSrcTxId(cachedState?.cachedValue?.state);
|
||||
let handler, contractDefinition, sortedInteractions, contractEvaluationOptions;
|
||||
let handler, contractDefinition, contractEvaluationOptions, remoteState;
|
||||
let sortedInteractions = interactions || [];
|
||||
|
||||
this.logger.debug('Cached state', cachedState, upToSortKey);
|
||||
|
||||
@@ -502,22 +511,29 @@ export class HandlerBasedContract<State> implements Contract<State> {
|
||||
contractDefinition = await definitionLoader.load<State>(contractTxId, evolvedSrcTxId);
|
||||
contractEvaluationOptions = this.resolveEvaluationOptions(contractDefinition.manifest?.evaluationOptions);
|
||||
|
||||
sortedInteractions = interactions
|
||||
? interactions
|
||||
: await interactionsLoader.load(
|
||||
contractTxId,
|
||||
cachedState?.sortKey,
|
||||
this.getToSortKey(upToSortKey),
|
||||
contractEvaluationOptions
|
||||
);
|
||||
if (contractEvaluationOptions.remoteStateSyncEnabled && !contractEvaluationOptions.useKVStorage) {
|
||||
remoteState = await this.getRemoteContractState(contractTxId);
|
||||
cachedState = await this.maybeSyncStateWithRemoteSource(remoteState, upToSortKey, cachedState);
|
||||
}
|
||||
|
||||
// (2) ...but we still need to return only interactions up to original "upToSortKey"
|
||||
if (!remoteState && sortedInteractions.length == 0) {
|
||||
sortedInteractions = await interactionsLoader.load(
|
||||
contractTxId,
|
||||
cachedState?.sortKey,
|
||||
this.getToSortKey(upToSortKey),
|
||||
contractEvaluationOptions
|
||||
);
|
||||
}
|
||||
|
||||
// we still need to return only interactions up to original "upToSortKey"
|
||||
if (cachedState?.sortKey) {
|
||||
sortedInteractions = sortedInteractions.filter((i) => i.sortKey.localeCompare(cachedState?.sortKey) > 0);
|
||||
}
|
||||
|
||||
if (upToSortKey) {
|
||||
sortedInteractions = sortedInteractions.filter((i) => i.sortKey.localeCompare(upToSortKey) <= 0);
|
||||
}
|
||||
|
||||
this.logger.debug('contract and interactions load', benchmark.elapsed());
|
||||
if (this.isRoot() && sortedInteractions.length) {
|
||||
// note: if the root contract has zero interactions, it still should be safe
|
||||
@@ -563,6 +579,29 @@ export class HandlerBasedContract<State> implements Contract<State> {
|
||||
return this.getRootEoEvaluator().forForeignContract(rootManifestEvalOptions);
|
||||
}
|
||||
|
||||
private async getRemoteContractState(contractId: string): Promise<SortKeyCacheResult<EvalStateResult<State>>> {
|
||||
if (this.hasDreState(contractId)) {
|
||||
return this.getDreState(contractId);
|
||||
} else {
|
||||
const dreResponse = await this.fetchRemoteContractState(contractId);
|
||||
if (dreResponse != null) {
|
||||
return this.setDREState(contractId, dreResponse);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private async fetchRemoteContractState(contractId: string): Promise<DREContractStatusResponse<State> | null> {
|
||||
return this.warpFetchWrapper
|
||||
.fetch(`${this._evaluationOptions.remoteStateSyncSource}?id=${contractId}&events=false`)
|
||||
.then((res) => {
|
||||
return res.ok ? res.json() : Promise.reject(res);
|
||||
})
|
||||
.catch((error) => {
|
||||
throw new Error(`Unable to read contract state from DRE. ${error.status}. ${error.body?.message}`);
|
||||
});
|
||||
}
|
||||
|
||||
private getToSortKey(upToSortKey?: string) {
|
||||
if (this._parentContract?.rootSortKey) {
|
||||
if (!upToSortKey) {
|
||||
@@ -599,6 +638,7 @@ export class HandlerBasedContract<State> implements Contract<State> {
|
||||
this.warp.interactionsLoader.clearCache();
|
||||
this._children = [];
|
||||
this._uncommittedStates = new Map();
|
||||
this._dreStates = new Map();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -820,7 +860,7 @@ export class HandlerBasedContract<State> implements Contract<State> {
|
||||
throw new Error(`Unable to retrieve state. ${error.status}: ${error.body?.message}`);
|
||||
});
|
||||
|
||||
await stateEvaluator.syncState(this._contractTxId, response.sortKey, response.state, response.validity);
|
||||
await stateEvaluator.syncState<State>(this._contractTxId, response.sortKey, response.state, response.validity);
|
||||
|
||||
return this;
|
||||
}
|
||||
@@ -900,6 +940,55 @@ export class HandlerBasedContract<State> implements Contract<State> {
|
||||
}
|
||||
}
|
||||
|
||||
private async maybeSyncStateWithRemoteSource(
|
||||
remoteState: SortKeyCacheResult<EvalStateResult<State>>,
|
||||
upToSortKey: string,
|
||||
cachedState: SortKeyCacheResult<EvalStateResult<State>>
|
||||
): Promise<SortKeyCacheResult<EvalStateResult<State>>> {
|
||||
const { stateEvaluator } = this.warp;
|
||||
if (this.isStateHigherThanAndUpTo(remoteState, cachedState?.sortKey, upToSortKey)) {
|
||||
return await stateEvaluator.syncState<State>(
|
||||
this._contractTxId,
|
||||
remoteState.sortKey,
|
||||
remoteState.cachedValue.state,
|
||||
remoteState.cachedValue.validity
|
||||
);
|
||||
}
|
||||
return cachedState;
|
||||
}
|
||||
|
||||
private isStateHigherThanAndUpTo(
|
||||
remoteState: SortKeyCacheResult<EvalStateResult<State>>,
|
||||
fromSortKey: string,
|
||||
upToSortKey: string
|
||||
) {
|
||||
return (
|
||||
remoteState &&
|
||||
(!upToSortKey || upToSortKey >= remoteState.sortKey) &&
|
||||
(!fromSortKey || remoteState.sortKey > fromSortKey)
|
||||
);
|
||||
}
|
||||
|
||||
setDREState(
|
||||
contractTxId: string,
|
||||
result: DREContractStatusResponse<State>
|
||||
): SortKeyCacheResult<EvalStateResult<State>> {
|
||||
const dreCachedState = new SortKeyCacheResult(
|
||||
result.sortKey,
|
||||
new EvalStateResult(result.state, {}, result.errorMessages)
|
||||
);
|
||||
this.getRoot()._dreStates.set(contractTxId, dreCachedState);
|
||||
return dreCachedState;
|
||||
}
|
||||
|
||||
getDreState(contractTxId: string): SortKeyCacheResult<EvalStateResult<State>> {
|
||||
return this.getRoot()._dreStates.get(contractTxId) as SortKeyCacheResult<EvalStateResult<State>>;
|
||||
}
|
||||
|
||||
hasDreState(contractTxId: string): boolean {
|
||||
return this.getRoot()._dreStates.has(contractTxId);
|
||||
}
|
||||
|
||||
private getRoot(): HandlerBasedContract<unknown> {
|
||||
let result: Contract = this;
|
||||
while (!result.isRoot()) {
|
||||
|
||||
Reference in New Issue
Block a user