feat: subscription plugin poc

feat: subscription plugin
This commit is contained in:
ppe
2022-11-16 11:47:47 +01:00
committed by just_ppe
parent e96328628a
commit 4942dfc49c
8 changed files with 892 additions and 64 deletions

View File

@@ -1,60 +1,103 @@
/* eslint-disable */
import Arweave from 'arweave';
import {defaultCacheOptions, LexicographicalInteractionsSorter, LoggerFactory, WarpFactory} from '../src';
import * as fs from 'fs';
import knex from 'knex';
import {defaultCacheOptions, LoggerFactory, Warp, WarpFactory} from '../src';
import os from 'os';
import path from "path";
import stringify from "safe-stable-stringify";
import {WarpPlugin, WarpPluginType} from "../src/core/WarpPlugin";
import {GQLNodeInterface} from "smartweave/lib/interfaces/gqlResult";
import {initPubSub, subscribe} from "warp-contracts-pubsub";
import {JWKInterface} from "arweave/web/lib/wallet";
import fs from "fs";
const logger = LoggerFactory.INST.create('Contract');
//LoggerFactory.use(new TsLogFactory());
LoggerFactory.INST.logLevel('debug');
LoggerFactory.INST.logLevel('debug', 'ArweaveGatewayInteractionsLoader');
LoggerFactory.INST.logLevel('info', 'CacheableStateEvaluator');
LoggerFactory.INST.logLevel('info', 'WASM:Rust');
LoggerFactory.INST.logLevel('info');
LoggerFactory.INST.logLevel('debug', 'WarpSubscriptionPlugin');
//LoggerFactory.INST.logLevel('debug', 'CacheableStateEvaluator');
global.WebSocket = require('ws');
initPubSub()
async function main() {
printTestInfo();
const heapUsedBefore = Math.round((process.memoryUsage().heapUsed / 1024 / 1024) * 100) / 100;
const rssUsedBefore = Math.round((process.memoryUsage().rss / 1024 / 1024) * 100) / 100;
const arweave = Arweave.init({
/* host: 'arweave.testnet1.bundlr.network',*/ // Hostname or IP address for a Arweave host
host: 'arweave.net',
port: 443, // Port
protocol: 'https', // Network protocol http or https
timeout: 60000, // Network request timeouts in milliseconds
logging: false // Enable network request logging
});
interface InteractionMessage {
contractTxId: string,
sortKey: string,
lastSortKey: string,
interaction: GQLNodeInterface
}
class ExamplePlugin implements WarpPlugin<GQLNodeInterface, boolean> {
process(input: GQLNodeInterface): boolean {
return false;
abstract class WarpSubscriptionPlugin<R> implements WarpPlugin<InteractionMessage, Promise<R>> {
protected readonly logger = LoggerFactory.INST.create(WarpSubscriptionPlugin.name);
constructor(protected readonly contractTxId: string, protected readonly warp: Warp) {
subscribe(`interactions/${contractTxId}`, async ({data}) => {
const message = JSON.parse(data);
this.logger.debug('New message received', message);
await this.process(message);
}, console.error)
.then(() => {
this.logger.debug('Subscribed to interactions for', this.contractTxId);
})
.catch(e => {
this.logger.error('Error while subscribing', e);
});
}
abstract process(input: InteractionMessage): Promise<R>;
type(): WarpPluginType {
return 'evm-signature-verification';
return 'subscription';
}
}
class StateUpdatePlugin extends WarpSubscriptionPlugin<Promise<any>> {
async process(input: InteractionMessage): Promise<any> {
this.logger.debug('From implementation', input);
const lastStoredKey = (await warp.stateEvaluator.latestAvailableState(this.contractTxId))?.sortKey;
if (lastStoredKey?.localeCompare(input.lastSortKey) === 0) {
this.logger.debug('Safe to use new interaction');
return await warp.contract(this.contractTxId)
.readStateFor([input.interaction]);
} else {
this.logger.debug('Unsafe to use new interaction');
return await warp.contract(this.contractTxId).readState();
}
}
}
const warp = WarpFactory
.forMainnet({...defaultCacheOptions, inMemory: true})
.use(new ExamplePlugin())
.forMainnet({...defaultCacheOptions, inMemory: false});
const plugin = new StateUpdatePlugin("Ws9hhYckc-zSnVmbBep6q_kZD5zmzYzDmgMC50nMiuE", warp)
warp.use(plugin);
let wallet: JWKInterface = readJSON('./.secrets/33F0QHcb22W7LwWR1iRC8Az1ntZG09XQ03YWuw2ABqA.json');
;
try {
const contract = warp.contract("Ws9hhYckc-zSnVmbBep6q_kZD5zmzYzDmgMC50nMiuE");
const cacheResult = await contract
.setEvaluationOptions({
})
.readState();
const contract = warp
.contract("Ws9hhYckc-zSnVmbBep6q_kZD5zmzYzDmgMC50nMiuE")
.connect(wallet);
await contract.writeInteraction({
function: 'vrf'
}, {vrf: true});
console.log(cacheResult.cachedValue.state);
/* const cacheResult = await contract
.setEvaluationOptions({
})
.readState();
console.log(cacheResult.cachedValue.state);*/
} catch (e) {
console.error(e);
}
@@ -101,9 +144,15 @@ function printTestInfo() {
console.log('===============');
const sorter = new LexicographicalInteractionsSorter(arweave);
warp.interactionsLoader.load(contractId, sorter.generateLastSortKey(666), sorter.generateLastSortKey(777));
}
main().catch((e) => console.error(e));
function readJSON(path: string): JWKInterface {
const content = fs.readFileSync(path, "utf-8");
try {
return JSON.parse(content);
} catch (e) {
throw new Error(`File "${path}" does not contain a valid JSON`);
}
}