From d0b96be7d70dcc4b60ad50161a7e35a0036dad5d Mon Sep 17 00:00:00 2001 From: ppedziwiatr Date: Fri, 3 Dec 2021 14:50:58 +0100 Subject: [PATCH] feat: knex-compatible state cache --- .gitignore | 4 +- package.json | 2 + src/__tests__/integration/knex-cache.test.ts | 196 ++++++++++++++++++ src/cache/impl/KnexStateCache.ts | 138 ++++++++++++ src/cache/impl/MemBlockHeightCache.ts | 10 +- src/cache/index.ts | 5 +- .../modules/impl/CacheableStateEvaluator.ts | 2 - src/core/node/SmartWeaveNodeFactory.ts | 42 ++++ yarn.lock | 24 +++ 9 files changed, 417 insertions(+), 6 deletions(-) create mode 100644 src/__tests__/integration/knex-cache.test.ts create mode 100644 src/cache/impl/KnexStateCache.ts diff --git a/.gitignore b/.gitignore index 6ffadd2..0840901 100644 --- a/.gitignore +++ b/.gitignore @@ -19,4 +19,6 @@ yalc.lock .yalc/ -cache/ +/cache/ + +/db/ diff --git a/package.json b/package.json index fcc996d..97bb153 100644 --- a/package.json +++ b/package.json @@ -57,6 +57,7 @@ "axios": "^0.21.4", "bignumber.js": "^9.0.1", "json-beautify": "^1.1.1", + "knex": "^0.95.14", "lodash": "^4.17.21", "tslog": "^3.2.2" }, @@ -78,6 +79,7 @@ "prettier": "^2.3.2", "rimraf": "^3.0.2", "smartweave": "0.4.45", + "sqlite3": "^5.0.2", "ts-jest": "^27.0.7", "ts-node": "^10.2.1", "tsc-alias": "1.3.10", diff --git a/src/__tests__/integration/knex-cache.test.ts b/src/__tests__/integration/knex-cache.test.ts new file mode 100644 index 0000000..fac9d36 --- /dev/null +++ b/src/__tests__/integration/knex-cache.test.ts @@ -0,0 +1,196 @@ +import fs from 'fs'; + +import ArLocal from 'arlocal'; +import Arweave from 'arweave'; +import { JWKInterface } from 'arweave/node/lib/wallet'; +import { Contract, LoggerFactory, SmartWeave, SmartWeaveNodeFactory } from '@smartweave'; +import path from 'path'; +import { TsLogFactory } from '../../logging/node/TsLogFactory'; +import { addFunds, mineBlock } from './_helpers'; +import knex from 'knex'; + +interface ExampleContractState { + counter: number; +} + +/** + * This integration test should verify whether the basic functions of the SmartWeave client + * work properly when file-based cache is being used. + */ +describe('Testing the SmartWeave client', () => { + let contractSrc: string; + let initialState: string; + + let wallet: JWKInterface; + + let arweave: Arweave; + let arlocal: ArLocal; + let smartweave: SmartWeave; + let contract_1: Contract; + let contract_2: Contract; + + const cacheDir = path.join(__dirname, 'db'); + + const knexConfig = knex({ + client: 'sqlite3', + connection: { + filename: `${cacheDir}/db.sqlite` + }, + useNullAsDefault: true + }); + + beforeAll(async () => { + removeCacheDir(); + fs.mkdirSync(cacheDir); + // note: each tests suit (i.e. file with tests that Jest is running concurrently + // with another files has to have ArLocal set to a different port!) + arlocal = new ArLocal(1780, false); + await arlocal.start(); + + arweave = Arweave.init({ + host: 'localhost', + port: 1780, + protocol: 'http' + }); + + LoggerFactory.INST.logLevel('debug'); + + smartweave = await SmartWeaveNodeFactory.knexCached(arweave, knexConfig); + + wallet = await arweave.wallets.generate(); + await addFunds(arweave, wallet); + + contractSrc = fs.readFileSync(path.join(__dirname, 'data/example-contract.js'), 'utf8'); + initialState = fs.readFileSync(path.join(__dirname, 'data/example-contract-state.json'), 'utf8'); + + // deploying contract using the new SDK. + const contractTxId1 = await smartweave.createContract.deploy({ + wallet, + initState: initialState, + src: contractSrc + }); + + const contractTxId2 = await smartweave.createContract.deploy({ + wallet, + initState: '{"counter": 100}', + src: contractSrc + }); + + contract_1 = smartweave.contract(contractTxId1).connect(wallet); + contract_2 = smartweave.contract(contractTxId2).connect(wallet); + + await mineBlock(arweave); + }); + + afterAll(async () => { + await arlocal.stop(); + await knexConfig.destroy(); + removeCacheDir(); + }); + + it('should properly deploy contract with initial state', async () => { + expect((await contract_1.readState()).state.counter).toEqual(555); + expect((await contract_2.readState()).state.counter).toEqual(100); + }); + + it('should properly add new interaction', async () => { + await contract_1.writeInteraction({ function: 'add' }); + await contract_2.writeInteraction({ function: 'add' }); + await contract_2.writeInteraction({ function: 'add' }); + + await mineBlock(arweave); + + expect((await contract_1.readState()).state.counter).toEqual(556); + expect((await contract_2.readState()).state.counter).toEqual(102); + }); + + it('should properly add another interactions', async () => { + await contract_1.writeInteraction({ function: 'add' }); + await contract_2.writeInteraction({ function: 'add' }); + await mineBlock(arweave); + await contract_1.writeInteraction({ function: 'add' }); + await contract_2.writeInteraction({ function: 'add' }); + await mineBlock(arweave); + await contract_1.writeInteraction({ function: 'add' }); + await contract_2.writeInteraction({ function: 'add' }); + await mineBlock(arweave); + + expect((await contract_1.readState()).state.counter).toEqual(559); + expect((await contract_2.readState()).state.counter).toEqual(105); + }); + + it('should properly view contract state', async () => { + const interactionResult = await contract_1.viewState({ function: 'value' }); + expect(interactionResult.result).toEqual(559); + + const interactionResult2 = await contract_2.viewState({ function: 'value' }); + expect(interactionResult2.result).toEqual(105); + }); + + it('should properly read state with a fresh client', async () => { + const contract_1_2 = (await SmartWeaveNodeFactory.knexCached(arweave, knexConfig)) + .contract(contract_1.txId()) + .connect(wallet); + expect((await contract_1_2.readState()).state.counter).toEqual(559); + + const contract_2_2 = (await SmartWeaveNodeFactory.knexCached(arweave, knexConfig)) + .contract(contract_2.txId()) + .connect(wallet); + expect((await contract_2_2.readState()).state.counter).toEqual(105); + + await contract_1.writeInteraction({ function: 'add' }); + await contract_2.writeInteraction({ function: 'add' }); + await mineBlock(arweave); + await contract_1.writeInteraction({ function: 'add' }); + await contract_2.writeInteraction({ function: 'add' }); + await mineBlock(arweave); + expect((await contract_1_2.readState()).state.counter).toEqual(561); + expect((await contract_2_2.readState()).state.counter).toEqual(107); + }); + + it('should properly read state with another fresh client', async () => { + const contract_1_3 = (await SmartWeaveNodeFactory.knexCached(arweave, knexConfig)) + .contract(contract_1.txId()) + .connect(wallet); + const contract_2_3 = (await SmartWeaveNodeFactory.knexCached(arweave, knexConfig)) + .contract(contract_2.txId()) + .connect(wallet); + expect((await contract_1_3.readState()).state.counter).toEqual(561); + expect((await contract_2_3.readState()).state.counter).toEqual(107); + + await contract_1.writeInteraction({ function: 'add' }); + await contract_2.writeInteraction({ function: 'add' }); + await mineBlock(arweave); + await contract_1.writeInteraction({ function: 'add' }); + await contract_2.writeInteraction({ function: 'add' }); + await mineBlock(arweave); + expect((await contract_1_3.readState()).state.counter).toEqual(563); + expect((await contract_2_3.readState()).state.counter).toEqual(109); + }); + + it('should properly eval state for missing interactions', async () => { + await contract_1.writeInteraction({ function: 'add' }); + await contract_2.writeInteraction({ function: 'add' }); + await mineBlock(arweave); + await contract_1.writeInteraction({ function: 'add' }); + await contract_2.writeInteraction({ function: 'add' }); + await mineBlock(arweave); + + const contract_1_4 = (await SmartWeaveNodeFactory.knexCached(arweave, knexConfig)) + .contract(contract_1.txId()) + .connect(wallet); + const contract_2_4 = (await SmartWeaveNodeFactory.knexCached(arweave, knexConfig)) + .contract(contract_2.txId()) + .connect(wallet); + expect((await contract_1.readState()).state.counter).toEqual(565); + expect((await contract_1_4.readState()).state.counter).toEqual(565); + expect((await contract_2.readState()).state.counter).toEqual(111); + expect((await contract_2_4.readState()).state.counter).toEqual(111); + }); + + function removeCacheDir() { + if (fs.existsSync(cacheDir)) { + fs.rmSync(cacheDir, { recursive: true }); + } + } +}); diff --git a/src/cache/impl/KnexStateCache.ts b/src/cache/impl/KnexStateCache.ts new file mode 100644 index 0000000..37dcf9b --- /dev/null +++ b/src/cache/impl/KnexStateCache.ts @@ -0,0 +1,138 @@ +import { BlockHeightKey, MemBlockHeightSwCache } from '@smartweave/cache'; +import { LoggerFactory } from '@smartweave/logging'; +import { Knex } from 'knex'; +import { createHash } from 'crypto'; +import { StateCache } from '@smartweave'; + +type DbResult = { + contract_id: string; + height: number; + state: string; +}; + +/** + * An implementation of {@link BlockHeightSwCache} that stores its data (ie. contracts state) + * in a Knex-compatible storage (PostgreSQL, CockroachDB, MSSQL, MySQL, MariaDB, SQLite3, Oracle, and Amazon Redshift) + * https://knexjs.org + */ +export class KnexStateCache extends MemBlockHeightSwCache> { + private readonly kLogger = LoggerFactory.INST.create('KnexBlockHeightSwCache'); + + private isFlushing = false; + + private isDirty = false; + + private constructor( + private readonly knex: Knex, + maxStoredInMemoryBlockHeights: number = Number.MAX_SAFE_INTEGER, + cache: DbResult[] + ) { + super(maxStoredInMemoryBlockHeights); + + this.saveCache = this.saveCache.bind(this); + this.flush = this.flush.bind(this); + + cache.forEach((entry) => { + this.putSync( + { + cacheKey: entry.contract_id, + blockHeight: entry.height + }, + JSON.parse(entry.state) + ); + }); + + process.on('exit', async () => { + await this.flush(); + process.exit(); + }); + process.on('SIGINT', async () => { + await this.flush(); + process.exit(); + }); + } + + public static async init( + knex: Knex, + maxStoredInMemoryBlockHeights: number = Number.MAX_SAFE_INTEGER + ): Promise { + if (!(await knex.schema.hasTable('states'))) { + await knex.schema.createTable('states', (table) => { + table.string('contract_id', 64).notNullable(); + table.bigInteger('height').notNullable(); + table.string('hash').notNullable().unique(); + table.json('state').notNullable(); + table.unique(['contract_id', 'height', 'hash'], { indexName: 'states_composite_index' }); + }); + } + + const cache: DbResult[] = await knex + .select(['contract_id', 'height', 'state']) + .from('states') + .max('height') + .groupBy('contract_id') + .orderBy('height', 'desc'); + + return new KnexStateCache(knex, maxStoredInMemoryBlockHeights, cache); + } + + private async saveCache() { + this.isFlushing = true; + + this.kLogger.info(`==== Persisting cache ====`); + try { + for (const contractTxId of Object.keys(this.storage)) { + // store only highest cached height + const toStore = await this.getLast(contractTxId); + + // this check is a bit paranoid, since we're iterating on storage keys.. + if (toStore !== null) { + const { cachedHeight, cachedValue } = toStore; + + // note: JSON.stringify is non-deterministic + // switch to https://www.npmjs.com/package/json-stringify-deterministic ? + const jsonState = JSON.stringify(cachedValue); + + // note: cannot reuse: + // "The Hash object can not be used again after hash.digest() method has been called. + // Multiple calls will cause an error to be thrown." + const hash = createHash('sha256'); + + hash.update(`${contractTxId}|${cachedHeight}|${JSON.stringify(cachedValue)}`); + const digest = hash.digest('hex'); + + // FIXME: batch insert + await this.knex + .insert({ + contract_id: contractTxId, + height: cachedHeight, + hash: digest, + state: jsonState + }) + .into('states') + .onConflict(['contract_id', 'height', 'hash']) + .merge(); + } + } + this.isDirty = false; + } catch (e) { + this.kLogger.error('Error while flushing cache', e); + } finally { + this.isFlushing = false; + this.kLogger.info(`==== Cache persisted ====`); + } + } + + async put({ cacheKey, blockHeight }: BlockHeightKey, value: StateCache): Promise { + this.isDirty = true; + return super.put({ cacheKey, blockHeight }, value); + } + + async flush(): Promise { + if (this.isFlushing || !this.isDirty) { + return; + } + + await this.saveCache(); + } +} diff --git a/src/cache/impl/MemBlockHeightCache.ts b/src/cache/impl/MemBlockHeightCache.ts index a3cc560..0e07fbc 100644 --- a/src/cache/impl/MemBlockHeightCache.ts +++ b/src/cache/impl/MemBlockHeightCache.ts @@ -50,7 +50,11 @@ export class MemBlockHeightSwCache implements BlockHeightSwCache { } async put({ cacheKey, blockHeight }: BlockHeightKey, value: V): Promise { - if (!(await this.contains(cacheKey))) { + this.putSync({ cacheKey, blockHeight }, value); + } + + protected putSync({ cacheKey, blockHeight }: BlockHeightKey, value: V): void { + if (!this.containsSync(cacheKey)) { this.storage[cacheKey] = new Map(); } const cached = this.storage[cacheKey]; @@ -63,6 +67,10 @@ export class MemBlockHeightSwCache implements BlockHeightSwCache { } async contains(key: string): Promise { + return this.containsSync(key); + } + + protected containsSync(key: string): boolean { return Object.prototype.hasOwnProperty.call(this.storage, key); } diff --git a/src/cache/index.ts b/src/cache/index.ts index 9456fb0..f0ce8e9 100644 --- a/src/cache/index.ts +++ b/src/cache/index.ts @@ -1,9 +1,10 @@ export * from './impl/MemBlockHeightCache'; // FileBlockHeightCache has to be exported after MemBlockHeightCache, // otherwise ts-jest complains with -// "TypeError: Class extends value undefined is not a constructor or null" -// funny that standard tsc does not have such issues.. +// "TypeError: Class extends value undefined is not a constructor or null". +// Funny that standard tsc does not have such issues.. export * from './impl/FileBlockHeightCache'; +export * from './impl/KnexStateCache'; export * from './impl/RemoteBlockHeightCache'; export * from './impl/MemCache'; diff --git a/src/core/modules/impl/CacheableStateEvaluator.ts b/src/core/modules/impl/CacheableStateEvaluator.ts index 9999448..121a5dd 100644 --- a/src/core/modules/impl/CacheableStateEvaluator.ts +++ b/src/core/modules/impl/CacheableStateEvaluator.ts @@ -5,14 +5,12 @@ import { ExecutionContext, ExecutionContextModifier, HandlerApi, - LexicographicalInteractionsSorter, StateCache } from '@smartweave/core'; import Arweave from 'arweave'; import { GQLNodeInterface } from '@smartweave/legacy'; import { LoggerFactory } from '@smartweave/logging'; import { CurrentTx } from '@smartweave/contract'; -import { mapReplacer } from '@smartweave/utils'; /** * An implementation of DefaultStateEvaluator that adds caching capabilities. diff --git a/src/core/node/SmartWeaveNodeFactory.ts b/src/core/node/SmartWeaveNodeFactory.ts index 5ea467a..57e0662 100644 --- a/src/core/node/SmartWeaveNodeFactory.ts +++ b/src/core/node/SmartWeaveNodeFactory.ts @@ -11,6 +11,8 @@ import { } from '@smartweave/core'; import { CacheableContractInteractionsLoader, CacheableExecutorFactory, Evolve } from '@smartweave/plugins'; import { FileBlockHeightSwCache, MemBlockHeightSwCache, MemCache } from '@smartweave/cache'; +import { Knex } from 'knex'; +import { KnexStateCache } from '../../cache/impl/KnexStateCache'; /** * A {@link SmartWeave} factory that can be safely used only in Node.js env. @@ -65,4 +67,44 @@ export class SmartWeaveNodeFactory extends SmartWeaveWebFactory { .setExecutorFactory(executorFactory) .setStateEvaluator(stateEvaluator); } + + static async knexCached( + arweave: Arweave, + dbConnection: Knex, + maxStoredInMemoryBlockHeights = 10 + ): Promise { + return (await this.knexCachedBased(arweave, dbConnection, maxStoredInMemoryBlockHeights)).build(); + } + + /** + */ + static async knexCachedBased( + arweave: Arweave, + dbConnection: Knex, + maxStoredInMemoryBlockHeights = 10 + ): Promise { + const definitionLoader = new ContractDefinitionLoader(arweave, new MemCache()); + + const interactionsLoader = new CacheableContractInteractionsLoader( + new ContractInteractionsLoader(arweave), + new MemBlockHeightSwCache() + ); + + const executorFactory = new CacheableExecutorFactory(arweave, new HandlerExecutorFactory(arweave), new MemCache()); + + const stateEvaluator = new CacheableStateEvaluator( + arweave, + await KnexStateCache.init(dbConnection, maxStoredInMemoryBlockHeights), + [new Evolve(definitionLoader, executorFactory)] + ); + + const interactionsSorter = new LexicographicalInteractionsSorter(arweave); + + return SmartWeave.builder(arweave) + .setDefinitionLoader(definitionLoader) + .setInteractionsLoader(interactionsLoader) + .setInteractionsSorter(interactionsSorter) + .setExecutorFactory(executorFactory) + .setStateEvaluator(stateEvaluator); + } } diff --git a/yarn.lock b/yarn.lock index b5ad8e0..96ff393 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2155,6 +2155,11 @@ colorette@1.2.1: resolved "https://registry.yarnpkg.com/colorette/-/colorette-1.2.1.tgz#4d0b921325c14faf92633086a536db6e89564b1b" integrity sha512-puCDz0CzydiSYOrnXpz/PKd69zRrribezjtE9yd4zvytoRc8+RY/KJPvtPFKZS3E3wP6neGyMe0vOTlHO5L3Pw== +colorette@2.0.16: + version "2.0.16" + resolved "https://registry.yarnpkg.com/colorette/-/colorette-2.0.16.tgz#713b9af84fdb000139f04546bd4a93f62a5085da" + integrity sha512-hUewv7oMjCp+wkBv5Rm0v87eJhq4woh5rSR+42YSQJKecCqgIqNkZ6lAlQms/BwHPJA5NKMRlpxPRv0n8HQW6g== + colorette@^1.3.0: version "1.3.0" resolved "https://registry.yarnpkg.com/colorette/-/colorette-1.3.0.tgz#ff45d2f0edb244069d3b772adeb04fed38d0a0af" @@ -4516,6 +4521,25 @@ kleur@^3.0.3: resolved "https://registry.yarnpkg.com/kleur/-/kleur-3.0.3.tgz#a79c9ecc86ee1ce3fa6206d1216c501f147fc07e" integrity sha512-eTIzlVOSUR+JxdDFepEYcBMtZ9Qqdef+rnzWdRZuMbOywu5tO2w2N7rqjoANZ5k9vywhL6Br1VRjUIgTQx4E8w== +knex@^0.95.14: + version "0.95.14" + resolved "https://registry.yarnpkg.com/knex/-/knex-0.95.14.tgz#47eca7757cbc5872b7c9a3c67ae3b7ac6d00cf10" + integrity sha512-j4qLjWySrC/JRRVtOpoR2LcS1yBOsd7Krc6mEukPvmTDX/w11pD52Pq9FYR56/kLXGeAV8jFdWBjsZFi1mscWg== + dependencies: + colorette "2.0.16" + commander "^7.1.0" + debug "4.3.2" + escalade "^3.1.1" + esm "^3.2.25" + getopts "2.2.5" + interpret "^2.2.0" + lodash "^4.17.21" + pg-connection-string "2.5.0" + rechoir "0.7.0" + resolve-from "^5.0.0" + tarn "^3.0.1" + tildify "2.0.0" + knex@^0.95.6: version "0.95.11" resolved "https://registry.yarnpkg.com/knex/-/knex-0.95.11.tgz#1526bd700cb07497252214d34c10f660aee01a3e"