feat: knex-compatible state cache

This commit is contained in:
ppedziwiatr
2021-12-03 14:50:58 +01:00
committed by Piotr Pędziwiatr
parent bec7677c41
commit d0b96be7d7
9 changed files with 417 additions and 6 deletions

4
.gitignore vendored
View File

@@ -19,4 +19,6 @@ yalc.lock
.yalc/
cache/
/cache/
/db/

View File

@@ -57,6 +57,7 @@
"axios": "^0.21.4",
"bignumber.js": "^9.0.1",
"json-beautify": "^1.1.1",
"knex": "^0.95.14",
"lodash": "^4.17.21",
"tslog": "^3.2.2"
},
@@ -78,6 +79,7 @@
"prettier": "^2.3.2",
"rimraf": "^3.0.2",
"smartweave": "0.4.45",
"sqlite3": "^5.0.2",
"ts-jest": "^27.0.7",
"ts-node": "^10.2.1",
"tsc-alias": "1.3.10",

View File

@@ -0,0 +1,196 @@
import fs from 'fs';
import ArLocal from 'arlocal';
import Arweave from 'arweave';
import { JWKInterface } from 'arweave/node/lib/wallet';
import { Contract, LoggerFactory, SmartWeave, SmartWeaveNodeFactory } from '@smartweave';
import path from 'path';
import { TsLogFactory } from '../../logging/node/TsLogFactory';
import { addFunds, mineBlock } from './_helpers';
import knex from 'knex';
interface ExampleContractState {
counter: number;
}
/**
* This integration test should verify whether the basic functions of the SmartWeave client
* work properly when file-based cache is being used.
*/
describe('Testing the SmartWeave client', () => {
let contractSrc: string;
let initialState: string;
let wallet: JWKInterface;
let arweave: Arweave;
let arlocal: ArLocal;
let smartweave: SmartWeave;
let contract_1: Contract<ExampleContractState>;
let contract_2: Contract<ExampleContractState>;
const cacheDir = path.join(__dirname, 'db');
const knexConfig = knex({
client: 'sqlite3',
connection: {
filename: `${cacheDir}/db.sqlite`
},
useNullAsDefault: true
});
beforeAll(async () => {
removeCacheDir();
fs.mkdirSync(cacheDir);
// note: each tests suit (i.e. file with tests that Jest is running concurrently
// with another files has to have ArLocal set to a different port!)
arlocal = new ArLocal(1780, false);
await arlocal.start();
arweave = Arweave.init({
host: 'localhost',
port: 1780,
protocol: 'http'
});
LoggerFactory.INST.logLevel('debug');
smartweave = await SmartWeaveNodeFactory.knexCached(arweave, knexConfig);
wallet = await arweave.wallets.generate();
await addFunds(arweave, wallet);
contractSrc = fs.readFileSync(path.join(__dirname, 'data/example-contract.js'), 'utf8');
initialState = fs.readFileSync(path.join(__dirname, 'data/example-contract-state.json'), 'utf8');
// deploying contract using the new SDK.
const contractTxId1 = await smartweave.createContract.deploy({
wallet,
initState: initialState,
src: contractSrc
});
const contractTxId2 = await smartweave.createContract.deploy({
wallet,
initState: '{"counter": 100}',
src: contractSrc
});
contract_1 = smartweave.contract<ExampleContractState>(contractTxId1).connect(wallet);
contract_2 = smartweave.contract<ExampleContractState>(contractTxId2).connect(wallet);
await mineBlock(arweave);
});
afterAll(async () => {
await arlocal.stop();
await knexConfig.destroy();
removeCacheDir();
});
it('should properly deploy contract with initial state', async () => {
expect((await contract_1.readState()).state.counter).toEqual(555);
expect((await contract_2.readState()).state.counter).toEqual(100);
});
it('should properly add new interaction', async () => {
await contract_1.writeInteraction({ function: 'add' });
await contract_2.writeInteraction({ function: 'add' });
await contract_2.writeInteraction({ function: 'add' });
await mineBlock(arweave);
expect((await contract_1.readState()).state.counter).toEqual(556);
expect((await contract_2.readState()).state.counter).toEqual(102);
});
it('should properly add another interactions', async () => {
await contract_1.writeInteraction({ function: 'add' });
await contract_2.writeInteraction({ function: 'add' });
await mineBlock(arweave);
await contract_1.writeInteraction({ function: 'add' });
await contract_2.writeInteraction({ function: 'add' });
await mineBlock(arweave);
await contract_1.writeInteraction({ function: 'add' });
await contract_2.writeInteraction({ function: 'add' });
await mineBlock(arweave);
expect((await contract_1.readState()).state.counter).toEqual(559);
expect((await contract_2.readState()).state.counter).toEqual(105);
});
it('should properly view contract state', async () => {
const interactionResult = await contract_1.viewState<unknown, number>({ function: 'value' });
expect(interactionResult.result).toEqual(559);
const interactionResult2 = await contract_2.viewState<unknown, number>({ function: 'value' });
expect(interactionResult2.result).toEqual(105);
});
it('should properly read state with a fresh client', async () => {
const contract_1_2 = (await SmartWeaveNodeFactory.knexCached(arweave, knexConfig))
.contract<ExampleContractState>(contract_1.txId())
.connect(wallet);
expect((await contract_1_2.readState()).state.counter).toEqual(559);
const contract_2_2 = (await SmartWeaveNodeFactory.knexCached(arweave, knexConfig))
.contract<ExampleContractState>(contract_2.txId())
.connect(wallet);
expect((await contract_2_2.readState()).state.counter).toEqual(105);
await contract_1.writeInteraction({ function: 'add' });
await contract_2.writeInteraction({ function: 'add' });
await mineBlock(arweave);
await contract_1.writeInteraction({ function: 'add' });
await contract_2.writeInteraction({ function: 'add' });
await mineBlock(arweave);
expect((await contract_1_2.readState()).state.counter).toEqual(561);
expect((await contract_2_2.readState()).state.counter).toEqual(107);
});
it('should properly read state with another fresh client', async () => {
const contract_1_3 = (await SmartWeaveNodeFactory.knexCached(arweave, knexConfig))
.contract<ExampleContractState>(contract_1.txId())
.connect(wallet);
const contract_2_3 = (await SmartWeaveNodeFactory.knexCached(arweave, knexConfig))
.contract<ExampleContractState>(contract_2.txId())
.connect(wallet);
expect((await contract_1_3.readState()).state.counter).toEqual(561);
expect((await contract_2_3.readState()).state.counter).toEqual(107);
await contract_1.writeInteraction({ function: 'add' });
await contract_2.writeInteraction({ function: 'add' });
await mineBlock(arweave);
await contract_1.writeInteraction({ function: 'add' });
await contract_2.writeInteraction({ function: 'add' });
await mineBlock(arweave);
expect((await contract_1_3.readState()).state.counter).toEqual(563);
expect((await contract_2_3.readState()).state.counter).toEqual(109);
});
it('should properly eval state for missing interactions', async () => {
await contract_1.writeInteraction({ function: 'add' });
await contract_2.writeInteraction({ function: 'add' });
await mineBlock(arweave);
await contract_1.writeInteraction({ function: 'add' });
await contract_2.writeInteraction({ function: 'add' });
await mineBlock(arweave);
const contract_1_4 = (await SmartWeaveNodeFactory.knexCached(arweave, knexConfig))
.contract<ExampleContractState>(contract_1.txId())
.connect(wallet);
const contract_2_4 = (await SmartWeaveNodeFactory.knexCached(arweave, knexConfig))
.contract<ExampleContractState>(contract_2.txId())
.connect(wallet);
expect((await contract_1.readState()).state.counter).toEqual(565);
expect((await contract_1_4.readState()).state.counter).toEqual(565);
expect((await contract_2.readState()).state.counter).toEqual(111);
expect((await contract_2_4.readState()).state.counter).toEqual(111);
});
function removeCacheDir() {
if (fs.existsSync(cacheDir)) {
fs.rmSync(cacheDir, { recursive: true });
}
}
});

138
src/cache/impl/KnexStateCache.ts vendored Normal file
View File

@@ -0,0 +1,138 @@
import { BlockHeightKey, MemBlockHeightSwCache } from '@smartweave/cache';
import { LoggerFactory } from '@smartweave/logging';
import { Knex } from 'knex';
import { createHash } from 'crypto';
import { StateCache } from '@smartweave';
type DbResult = {
contract_id: string;
height: number;
state: string;
};
/**
* An implementation of {@link BlockHeightSwCache} that stores its data (ie. contracts state)
* in a Knex-compatible storage (PostgreSQL, CockroachDB, MSSQL, MySQL, MariaDB, SQLite3, Oracle, and Amazon Redshift)
* https://knexjs.org
*/
export class KnexStateCache extends MemBlockHeightSwCache<StateCache<any>> {
private readonly kLogger = LoggerFactory.INST.create('KnexBlockHeightSwCache');
private isFlushing = false;
private isDirty = false;
private constructor(
private readonly knex: Knex,
maxStoredInMemoryBlockHeights: number = Number.MAX_SAFE_INTEGER,
cache: DbResult[]
) {
super(maxStoredInMemoryBlockHeights);
this.saveCache = this.saveCache.bind(this);
this.flush = this.flush.bind(this);
cache.forEach((entry) => {
this.putSync(
{
cacheKey: entry.contract_id,
blockHeight: entry.height
},
JSON.parse(entry.state)
);
});
process.on('exit', async () => {
await this.flush();
process.exit();
});
process.on('SIGINT', async () => {
await this.flush();
process.exit();
});
}
public static async init(
knex: Knex,
maxStoredInMemoryBlockHeights: number = Number.MAX_SAFE_INTEGER
): Promise<KnexStateCache> {
if (!(await knex.schema.hasTable('states'))) {
await knex.schema.createTable('states', (table) => {
table.string('contract_id', 64).notNullable();
table.bigInteger('height').notNullable();
table.string('hash').notNullable().unique();
table.json('state').notNullable();
table.unique(['contract_id', 'height', 'hash'], { indexName: 'states_composite_index' });
});
}
const cache: DbResult[] = await knex
.select(['contract_id', 'height', 'state'])
.from('states')
.max('height')
.groupBy('contract_id')
.orderBy('height', 'desc');
return new KnexStateCache(knex, maxStoredInMemoryBlockHeights, cache);
}
private async saveCache() {
this.isFlushing = true;
this.kLogger.info(`==== Persisting cache ====`);
try {
for (const contractTxId of Object.keys(this.storage)) {
// store only highest cached height
const toStore = await this.getLast(contractTxId);
// this check is a bit paranoid, since we're iterating on storage keys..
if (toStore !== null) {
const { cachedHeight, cachedValue } = toStore;
// note: JSON.stringify is non-deterministic
// switch to https://www.npmjs.com/package/json-stringify-deterministic ?
const jsonState = JSON.stringify(cachedValue);
// note: cannot reuse:
// "The Hash object can not be used again after hash.digest() method has been called.
// Multiple calls will cause an error to be thrown."
const hash = createHash('sha256');
hash.update(`${contractTxId}|${cachedHeight}|${JSON.stringify(cachedValue)}`);
const digest = hash.digest('hex');
// FIXME: batch insert
await this.knex
.insert({
contract_id: contractTxId,
height: cachedHeight,
hash: digest,
state: jsonState
})
.into('states')
.onConflict(['contract_id', 'height', 'hash'])
.merge();
}
}
this.isDirty = false;
} catch (e) {
this.kLogger.error('Error while flushing cache', e);
} finally {
this.isFlushing = false;
this.kLogger.info(`==== Cache persisted ====`);
}
}
async put({ cacheKey, blockHeight }: BlockHeightKey, value: StateCache<any>): Promise<void> {
this.isDirty = true;
return super.put({ cacheKey, blockHeight }, value);
}
async flush(): Promise<void> {
if (this.isFlushing || !this.isDirty) {
return;
}
await this.saveCache();
}
}

View File

@@ -50,7 +50,11 @@ export class MemBlockHeightSwCache<V = any> implements BlockHeightSwCache<V> {
}
async put({ cacheKey, blockHeight }: BlockHeightKey, value: V): Promise<void> {
if (!(await this.contains(cacheKey))) {
this.putSync({ cacheKey, blockHeight }, value);
}
protected putSync({ cacheKey, blockHeight }: BlockHeightKey, value: V): void {
if (!this.containsSync(cacheKey)) {
this.storage[cacheKey] = new Map();
}
const cached = this.storage[cacheKey];
@@ -63,6 +67,10 @@ export class MemBlockHeightSwCache<V = any> implements BlockHeightSwCache<V> {
}
async contains(key: string): Promise<boolean> {
return this.containsSync(key);
}
protected containsSync(key: string): boolean {
return Object.prototype.hasOwnProperty.call(this.storage, key);
}

5
src/cache/index.ts vendored
View File

@@ -1,9 +1,10 @@
export * from './impl/MemBlockHeightCache';
// FileBlockHeightCache has to be exported after MemBlockHeightCache,
// otherwise ts-jest complains with
// "TypeError: Class extends value undefined is not a constructor or null"
// funny that standard tsc does not have such issues..
// "TypeError: Class extends value undefined is not a constructor or null".
// Funny that standard tsc does not have such issues..
export * from './impl/FileBlockHeightCache';
export * from './impl/KnexStateCache';
export * from './impl/RemoteBlockHeightCache';
export * from './impl/MemCache';

View File

@@ -5,14 +5,12 @@ import {
ExecutionContext,
ExecutionContextModifier,
HandlerApi,
LexicographicalInteractionsSorter,
StateCache
} from '@smartweave/core';
import Arweave from 'arweave';
import { GQLNodeInterface } from '@smartweave/legacy';
import { LoggerFactory } from '@smartweave/logging';
import { CurrentTx } from '@smartweave/contract';
import { mapReplacer } from '@smartweave/utils';
/**
* An implementation of DefaultStateEvaluator that adds caching capabilities.

View File

@@ -11,6 +11,8 @@ import {
} from '@smartweave/core';
import { CacheableContractInteractionsLoader, CacheableExecutorFactory, Evolve } from '@smartweave/plugins';
import { FileBlockHeightSwCache, MemBlockHeightSwCache, MemCache } from '@smartweave/cache';
import { Knex } from 'knex';
import { KnexStateCache } from '../../cache/impl/KnexStateCache';
/**
* A {@link SmartWeave} factory that can be safely used only in Node.js env.
@@ -65,4 +67,44 @@ export class SmartWeaveNodeFactory extends SmartWeaveWebFactory {
.setExecutorFactory(executorFactory)
.setStateEvaluator(stateEvaluator);
}
static async knexCached(
arweave: Arweave,
dbConnection: Knex,
maxStoredInMemoryBlockHeights = 10
): Promise<SmartWeave> {
return (await this.knexCachedBased(arweave, dbConnection, maxStoredInMemoryBlockHeights)).build();
}
/**
*/
static async knexCachedBased(
arweave: Arweave,
dbConnection: Knex,
maxStoredInMemoryBlockHeights = 10
): Promise<SmartWeaveBuilder> {
const definitionLoader = new ContractDefinitionLoader(arweave, new MemCache());
const interactionsLoader = new CacheableContractInteractionsLoader(
new ContractInteractionsLoader(arweave),
new MemBlockHeightSwCache()
);
const executorFactory = new CacheableExecutorFactory(arweave, new HandlerExecutorFactory(arweave), new MemCache());
const stateEvaluator = new CacheableStateEvaluator(
arweave,
await KnexStateCache.init(dbConnection, maxStoredInMemoryBlockHeights),
[new Evolve(definitionLoader, executorFactory)]
);
const interactionsSorter = new LexicographicalInteractionsSorter(arweave);
return SmartWeave.builder(arweave)
.setDefinitionLoader(definitionLoader)
.setInteractionsLoader(interactionsLoader)
.setInteractionsSorter(interactionsSorter)
.setExecutorFactory(executorFactory)
.setStateEvaluator(stateEvaluator);
}
}

View File

@@ -2155,6 +2155,11 @@ colorette@1.2.1:
resolved "https://registry.yarnpkg.com/colorette/-/colorette-1.2.1.tgz#4d0b921325c14faf92633086a536db6e89564b1b"
integrity sha512-puCDz0CzydiSYOrnXpz/PKd69zRrribezjtE9yd4zvytoRc8+RY/KJPvtPFKZS3E3wP6neGyMe0vOTlHO5L3Pw==
colorette@2.0.16:
version "2.0.16"
resolved "https://registry.yarnpkg.com/colorette/-/colorette-2.0.16.tgz#713b9af84fdb000139f04546bd4a93f62a5085da"
integrity sha512-hUewv7oMjCp+wkBv5Rm0v87eJhq4woh5rSR+42YSQJKecCqgIqNkZ6lAlQms/BwHPJA5NKMRlpxPRv0n8HQW6g==
colorette@^1.3.0:
version "1.3.0"
resolved "https://registry.yarnpkg.com/colorette/-/colorette-1.3.0.tgz#ff45d2f0edb244069d3b772adeb04fed38d0a0af"
@@ -4516,6 +4521,25 @@ kleur@^3.0.3:
resolved "https://registry.yarnpkg.com/kleur/-/kleur-3.0.3.tgz#a79c9ecc86ee1ce3fa6206d1216c501f147fc07e"
integrity sha512-eTIzlVOSUR+JxdDFepEYcBMtZ9Qqdef+rnzWdRZuMbOywu5tO2w2N7rqjoANZ5k9vywhL6Br1VRjUIgTQx4E8w==
knex@^0.95.14:
version "0.95.14"
resolved "https://registry.yarnpkg.com/knex/-/knex-0.95.14.tgz#47eca7757cbc5872b7c9a3c67ae3b7ac6d00cf10"
integrity sha512-j4qLjWySrC/JRRVtOpoR2LcS1yBOsd7Krc6mEukPvmTDX/w11pD52Pq9FYR56/kLXGeAV8jFdWBjsZFi1mscWg==
dependencies:
colorette "2.0.16"
commander "^7.1.0"
debug "4.3.2"
escalade "^3.1.1"
esm "^3.2.25"
getopts "2.2.5"
interpret "^2.2.0"
lodash "^4.17.21"
pg-connection-string "2.5.0"
rechoir "0.7.0"
resolve-from "^5.0.0"
tarn "^3.0.1"
tildify "2.0.0"
knex@^0.95.6:
version "0.95.11"
resolved "https://registry.yarnpkg.com/knex/-/knex-0.95.11.tgz#1526bd700cb07497252214d34c10f660aee01a3e"