Merge branch 'master' into feat/update-market-info-over-ws

This commit is contained in:
taureau75
2022-02-05 20:48:29 -08:00
committed by GitHub
3 changed files with 175 additions and 52 deletions

View File

@@ -11,8 +11,7 @@ dotenv.config();
const PRICE_FEEDS = {};
const OPEN_ORDERS = {};
const NONCES = {};
let ACCOUNT_STATE = null;
let ORDER_BROADCASTING = false;
const WALLETS = {};
const FILL_QUEUE = [];
const MARKETS = {};
@@ -37,26 +36,43 @@ console.log("ACTIVE PAIRS", activePairs);
// Start price feeds
cryptowatchWsSetup();
// Initiate fill loop
setTimeout(processFillQueue, 1000);
// Connect to zksync
const CHAIN_ID = parseInt(MM_CONFIG.zigzagChainId);
const ETH_NETWORK = (CHAIN_ID === 1) ? "mainnet" : "rinkeby";
let syncWallet, ethersProvider, syncProvider, ethWallet,
fillOrdersInterval, indicateLiquidityInterval;
let ethersProvider, syncProvider, fillOrdersInterval, indicateLiquidityInterval;
ethersProvider = ethers.getDefaultProvider(ETH_NETWORK);
try {
syncProvider = await zksync.getDefaultProvider(ETH_NETWORK);
ethWallet = new ethers.Wallet(process.env.ETH_PRIVKEY || MM_CONFIG.ethPrivKey);
syncWallet = await zksync.Wallet.fromEthSigner(ethWallet, syncProvider);
if (!(await syncWallet.isSigningKeySet())) {
console.log("setting sign key");
const signKeyResult = await syncWallet.setSigningKey({
feeToken: "ETH",
ethAuthType: "ECDSA",
const keys = [];
const ethPrivKey = (process.env.ETH_PRIVKEY || MM_CONFIG.ethPrivKey);
if(ethPrivKey && ethPrivKey != "") { keys.push(ethPrivKey); }
const ethPrivKeys = (process.env.ETH_PRIVKEYS || MM_CONFIG.ethPrivKeys);
if(ethPrivKeys && ethPrivKeys.length > 0) {
ethPrivKeys.forEach( key => {
if(key != "" && !keys.includes(key)) {
keys.push(key);
}
});
console.log(signKeyResult);
}
for(let i=0; i<keys.length; i++) {
let ethWallet = new ethers.Wallet(keys[i]);
let syncWallet = await zksync.Wallet.fromEthSigner(ethWallet, syncProvider);
if (!(await syncWallet.isSigningKeySet())) {
console.log("setting sign key");
const signKeyResult = await syncWallet.setSigningKey({
feeToken: "ETH",
ethAuthType: "ECDSA",
});
console.log(signKeyResult);
}
let accountId = await syncWallet.getAccountId();
let account_state = await syncWallet.getAccountState();
WALLETS[accountId] = {
'ethWallet': ethWallet,
'syncWallet': syncWallet,
'account_state': account_state,
'ORDER_BROADCASTING': false,
}
}
} catch (e) {
console.log(e);
@@ -64,9 +80,33 @@ try {
}
// Update account state loop
await updateAccountState();
setInterval(updateAccountState, 30000);
// Log mm balance over all accounts
logBalance();
setInterval(logBalance, 3 * 60 * 60 * 1000); // 3h
// Initiate fill loop
setTimeout(processFillQueue, 1000);
// Get markets info
const activePairsText = activePairs.join(',');
const markets_url = `https://zigzag-markets.herokuapp.com/markets?chainid=${CHAIN_ID}&id=${activePairsText}`
const markets = await fetch(markets_url).then(r => r.json());
if (markets.error) {
console.error(markets);
throw new Error(markets.error);
}
const MARKETS = {};
for (let i in markets) {
const market = markets[i];
MARKETS[market.id] = market;
if (market.alias) {
MARKETS[market.alias] = market;
}
}
let zigzagws = new WebSocket(MM_CONFIG.zigzagWsUrl);
zigzagws.on('open', onWsOpen);
zigzagws.on('error', console.error);
@@ -86,7 +126,9 @@ function onWsOpen() {
function onWsClose () {
console.log("Websocket closed. Restarting");
ORDER_BROADCASTING = false;
Object.keys(WALLETS).forEach(accountId => {
WALLETS[accountId]['ORDER_BROADCASTING'] = false;
});
setTimeout(() => {
clearInterval(fillOrdersInterval)
clearInterval(indicateLiquidityInterval)
@@ -101,7 +143,9 @@ async function handleMessage(json) {
if (!(["lastprice", "liquidity2", "fillstatus"]).includes(msg.op)) console.log(json.toString());
switch(msg.op) {
case 'error':
ORDER_BROADCASTING = false;
Object.keys(WALLETS).forEach(accountId => {
WALLETS[accountId]['ORDER_BROADCASTING'] = false;
});
break;
case 'orders':
const orders = msg.args[0];
@@ -110,7 +154,7 @@ async function handleMessage(json) {
const fillable = isOrderFillable(order);
console.log(fillable);
if (fillable.fillable) {
FILL_QUEUE.push(order);
FILL_QUEUE.push({ order: order, wallets: fillable.wallets});
}
else if (fillable.reason === "badprice") {
OPEN_ORDERS[orderid] = order;
@@ -120,12 +164,19 @@ async function handleMessage(json) {
case "userordermatch":
const chainid = msg.args[0];
const orderid = msg.args[1];
try {
await broadcastfill(chainid, orderid, msg.args[2], msg.args[3]);
} catch (e) {
console.error(e);
const fillOrder = msg.args[3];
const wallet = WALLETS[fillOrder.accountId];
if(!wallet) {
console.error("No wallet with this accountId: "+fillOrder.accountId);
break
} else {
try {
await broadcastfill(chainid, orderid, msg.args[2], fillOrder, wallet);
} catch (e) {
console.error(e);
}
wallet['ORDER_BROADCASTING'] = false;
}
ORDER_BROADCASTING = false;
break
case "marketinfo":
const market_info = msg.args[0];
@@ -154,7 +205,13 @@ function isOrderFillable(order) {
const sellCurrency = (side === 's') ? market.quoteAsset.symbol : market.baseAsset.symbol;
const sellDecimals = (side === 's') ? market.quoteAsset.decimals : market.baseAsset.decimals;
const sellQuantity = (side === 's') ? quoteQuantity : baseQuantity;
const balance = ACCOUNT_STATE.committed.balances[sellCurrency] / 10**sellDecimals;
const neededBalanceBN = sellQuantity * 10**sellDecimals;
const goodWallets = [];
Object.keys(WALLETS).forEach(accountId => {
if (WALLETS[accountId]['account_state'].committed.balances[sellCurrency] > (neededBalanceBN * 1.05)) {
goodWallets.push(accountId);
}
});
const now = Date.now() / 1000 | 0;
if (now > expires) {
@@ -172,7 +229,7 @@ function isOrderFillable(order) {
return { fillable: false, reason: "badsize" };
}
if (balance < sellQuantity) {
if (goodWallets.length === 0) {
return { fillable: false, reason: "badbalance" };
}
@@ -190,7 +247,7 @@ function isOrderFillable(order) {
return { fillable: false, reason: "badprice" };
}
return { fillable: true, reason: null };
return { fillable: true, reason: null, wallets: goodWallets};
}
function genquote(chainid, market_id, side, baseQuantity) {
@@ -240,6 +297,7 @@ function validatePriceFeed(market_id) {
const primaryPrice = PRICE_FEEDS[primaryPriceFeedId];
if (!primaryPrice) throw new Error("Primary price feed unavailable");
// If there is no secondary price feed, the price auto-validates
if (!secondaryPriceFeedId) return true;
@@ -256,7 +314,7 @@ function validatePriceFeed(market_id) {
return true;
}
async function sendfillrequest(orderreceipt) {
async function sendfillrequest(orderreceipt, accountId) {
const chainId = orderreceipt[0];
const orderId = orderreceipt[1];
const market_id = orderreceipt[2];
@@ -297,16 +355,16 @@ async function sendfillrequest(orderreceipt) {
ratio: zksync.utils.tokenRatio(tokenRatio),
validUntil: one_min_expiry
}
const fillOrder = await syncWallet.getOrder(orderDetails);
// Set global flag
ORDER_BROADCASTING = true;
const fillOrder = await WALLETS[accountId].syncWallet.getOrder(orderDetails);
// Set wallet flag
WALLETS[accountId]['ORDER_BROADCASTING'] = true;
const resp = { op: "fillrequest", args: [chainId, orderId, fillOrder] };
zigzagws.send(JSON.stringify(resp));
}
async function broadcastfill(chainid, orderid, swapOffer, fillOrder) {
async function broadcastfill(chainid, orderid, swapOffer, fillOrder, wallet) {
// Nonce check
const nonce = swapOffer.nonce;
const userNonce = NONCES[swapOffer.accountId];
@@ -315,7 +373,7 @@ async function broadcastfill(chainid, orderid, swapOffer, fillOrder) {
}
const randint = (Math.random()*1000).toFixed(0);
console.time('syncswap' + randint);
const swap = await syncWallet.syncSwap({
const swap = await wallet['syncWallet'].syncSwap({
orders: [swapOffer, fillOrder],
feeToken: "ETH",
nonce: fillOrder.nonce
@@ -351,7 +409,7 @@ async function fillOpenOrders() {
const order = OPEN_ORDERS[orderid];
const fillable = isOrderFillable(order);
if (fillable.fillable) {
FILL_QUEUE.push(order);
FILL_QUEUE.push({ order: order, wallets: fillable.wallets});
delete OPEN_ORDERS[orderid];
}
else if (fillable.reason !== "badprice") {
@@ -361,22 +419,33 @@ async function fillOpenOrders() {
}
async function processFillQueue() {
if (ORDER_BROADCASTING) {
setTimeout(processFillQueue, 100);
return false;
}
if (FILL_QUEUE.length === 0) {
setTimeout(processFillQueue, 100);
return false;
return;
}
const order = FILL_QUEUE.shift();
try {
await sendfillrequest(order);
} catch (e) {
console.error(e);
ORDER_BROADCASTING = false;
}
setTimeout(processFillQueue, 50);
await Promise.all(Object.keys(WALLETS).map(async accountId => {
const wallet = WALLETS[accountId];
if (wallet['ORDER_BROADCASTING']) {
return;
}
let index = 0;
for(;index<FILL_QUEUE.length; index++) {
if(FILL_QUEUE[index].wallets.includes(accountId)) {
break;
}
}
if (index < FILL_QUEUE.length) {
const selectedOrder = FILL_QUEUE.splice(index, 1);
try {
await sendfillrequest(selectedOrder[0].order, accountId);
return;
} catch (e) {
console.error(e);
wallet['ORDER_BROADCASTING'] = false;
}
}
}));
setTimeout(processFillQueue, 100);
}
async function cryptowatchWsSetup() {
@@ -457,8 +526,16 @@ function indicateLiquidity (market_id) {
const expires = (Date.now() / 1000 | 0) + 10; // 10s expiry
const side = mmConfig.side || 'd';
const baseBalance = ACCOUNT_STATE.committed.balances[marketInfo.baseAsset.symbol] / 10**marketInfo.baseAsset.decimals;
const quoteBalance = ACCOUNT_STATE.committed.balances[marketInfo.quoteAsset.symbol] / 10**marketInfo.quoteAsset.decimals;
let baseBN = 0, quoteBN = 0;
Object.keys(WALLETS).forEach(accountId => {
const thisBase = WALLETS[accountId]['account_state'].committed.balances[marketInfo.baseAsset.symbol];
const thisQuote = WALLETS[accountId]['account_state'].committed.balances[marketInfo.quoteAsset.symbol];
baseBN = (baseBN < thisBase) ? thisBase : baseBN;
quoteBN = (quoteBN < thisQuote) ? thisQuote : quoteBN;
});
const baseBalance = baseBN / 10**marketInfo.baseAsset.decimals;
const quoteBalance = quoteBN / 10**marketInfo.quoteAsset.decimals;
const maxSellSize = Math.min(baseBalance, mmConfig.maxSize);
const maxBuySize = Math.min(quoteBalance / midPrice, mmConfig.maxSize);
@@ -492,5 +569,45 @@ function getMidPrice (market_id) {
}
async function updateAccountState() {
ACCOUNT_STATE = await syncWallet.getAccountState();
try {
Object.keys(WALLETS).forEach(accountId => {
(WALLETS[accountId]['syncWallet']).getAccountState().then((state) => {
WALLETS[accountId]['account_state'] = state;
})
});
} catch(err) {
// pass
}
}
async function logBalance() {
try {
await updateAccountState();
// fetch all balances over all wallets per token
const balance = {};
Object.keys(WALLETS).forEach(accountId => {
const committedBalaces = WALLETS[accountId]['account_state'].committed.balances;
Object.keys(committedBalaces).forEach(token => {
if(balance[token]) {
balance[token] = balance[token] + parseInt(committedBalaces[token]);
} else {
balance[token] = parseInt(committedBalaces[token]);
}
});
});
// get token price and total in USD
let sum = 0;
await Promise.all(Object.keys(balance).map(async token => {
const price = await syncProvider.getTokenPrice(token.toString());
const tokenNumber = await syncProvider.tokenSet.formatToken(token, balance[token].toString())
sum = sum + price * tokenNumber;
}));
// log to CVS
const date = new Date().toISOString();
const content = date + ";" + sum.toFixed(2) + "\n";
fs.writeFile('price_csv.txt', content, { flag: 'a+' }, err => {});
} catch(err) {
// pass
}
}