18 min read

根据 DEX 交易计算 Token 历史价格

一般来说,CEXs 都提供有相应的 API 接口,可以让我们方便的获取加密货币的实时价格和历史价格。而 DEX 里 Token 的价格就没有那么方便获取到了。主流 DEX 有 Uniswap、Curve、1Inch 等。本篇文章以 Uniswap 为例,介绍如何根据链上 DEX 产生的交易记录生成 Token 的历史价格。
根据 DEX 交易计算 Token 历史价格
Photo by Kanchanara / Unsplash

比特币、以太坊等各种加密货币在最初发展时, CEX(中心化交易所 Centralized Exchage)便是买卖双方进行交易的重要平台。主导这一领域的主要是中心化玩家几大交易所,这与区块链去中心化匿名化的特点不符。随着可用技术的快速发展,大量去中心化交易工具不断问世,在这其中,DEX(去中心化交易平台 Decentralized Exchange)的诞生,使得买卖双方在无需任何中介介入的情况下完成交易。

使用 CEX 意味着你需要将自己的资产存入平台内,放弃对其的控制权,由交易所以你的名义进行交易;而 DEX 现在主流的平台几乎都是使用自动做市商(AMM,Automated Market Maker)机制实现交易,由于 AMM 基于链上合约,我们能获取到历史上所有的交易记录,由交易记录就能算出价格。

一般来说,CEXs 都提供有相应的 API 接口,可以让我们方便的获取加密货币的实时价格和历史价格。而 DEX 里 Token 的价格就没有那么方便获取到了。主流 DEX 有 Uniswap、Curve、1Inch 等。本篇文章以 Uniswap 为例,介绍如何根据链上 DEX 产生的交易记录生成 Token 的历史价格。


Token 价格随着交易供求关系的改变而不断变化。为了能拿到交易记录,首先,我们需要从链上提取相关的数据。这里以 Uniswap 为例。从链上提取数据并不像常规关系型数据库那样非常直接和方便,我们想要获取的数据,是以某种编码后的形式存储于链上,为了能提取出我们需要的信息,我们需要了解 Uniswap 是如何工作的,交易信息是存储在哪里的、如何存储的。

Uniswap 摒弃了数字交易平台的传统架构,不使用订单簿,而是使用“恒定乘积做市商模型”,该模型是自动化做市商(AMM)模型的变体。AMM 是一种智能合约,能为任意两种 Token 创建一个流动性资金池(Liquidity Pool)用于用户进行 Token 交换。Uniswap是一种开源协议,实现的智能合约也是开源的,从中我们可以知道如何获取交易记录。

Uniswap 目前在使用的有两个版本,v2 和 v3。这两个版本的合约,在用户成功完成兑换(Swap)时,会创建一条 Swap Event Log。这个 Log 里包含了两个 Token 之前兑换的数量关系,两个版本的 Event 虽然参数格式不一致,但是其中都包含有我们需要的信息。

event Swap(
    address indexed sender,
    uint amount0In,
    uint amount1In,
    uint amount0Out,
    uint amount1Out,
    address indexed to
);
Uniswap V2 Swap Event (https://github.com/Uniswap/v2-core/blob/4dd59067c76dea4a0e8e4bfdda41877a6b16dedc/contracts/UniswapV2Pair.sol#L51-L58)
  • 当使用 token0 兑换 token1 时,amount0In 表示支付的 token0 的数量, amount1Out 表示兑换得到的 token1 的数量,此时 amount1In 和 amount0Out 为 0。
  • 当使用 token1 兑换 token0 时,情况相反。
/// @notice Emitted by the pool for any swaps between token0 and token1
/// @param sender The address that initiated the swap call, and that received the callback
/// @param recipient The address that received the output of the swap
/// @param amount0 The delta of the token0 balance of the pool
/// @param amount1 The delta of the token1 balance of the pool
/// @param sqrtPriceX96 The sqrt(price) of the pool after the swap, as a Q64.96
/// @param liquidity The liquidity of the pool after the swap
/// @param tick The log base 1.0001 of price of the pool after the swap
event Swap(
    address indexed sender,
    address indexed recipient,
    int256 amount0,
    int256 amount1,
    uint160 sqrtPriceX96,
    uint128 liquidity,
    int24 tick
);
Uniswap V3 Swap Event (https://github.com/Uniswap/v3-core/blob/234f27b9bc745eee37491802aa37a0202649e344/contracts/interfaces/pool/IUniswapV3PoolEvents.sol#L64-L80)
  • amount0 表示 token0 在池子中的增减情况,值有正负:值为正表示用户支付的 token0 的数量,会增加池子中 token0 的总量;值为负表示用户得到的 token0 的数量,会减少池子中 token0 的总量。
  • amount1 情况相反

仅仅是拿到 Logs 还不行,Log data 只记录了两种 token 之间的交易数量的关系,但是我们并不知道这是哪两个 token,还需要通过 Web3 接口从相应的 Uniswap Token Pools 合约中拿到 token0 和 token1 的地址。得到的数据存储在 DexPool 表里。

const contractAddress = ''
const UNISWAP_POOL_SIMPLE_ABI: AbiItem[] = [{"inputs":[],"name":"factory","outputs":[{"internalType":"address","name":"","type":"address"}],"stateMutability":"view","type":"function"},{"inputs":[],"name":"token0","outputs":[{"internalType":"address","name":"","type":"address"}],"stateMutability":"view","type":"function"},{"inputs":[],"name":"token1","outputs":[{"internalType":"address","name":"","type":"address"}],"stateMutability":"view","type":"function"}]

const contract = new web3Rpc.web3.eth.Contract(UNISWAP_POOL_SIMPLE_ABI, contractAddress)

const factory = await contract.methods.factory().call()
const token0 = await contract.methods.token0().call()
const token1 = await contract.methods.token1().call()

interface DexPool {
  contractAddress: string;
  factory: string;
  token0: string;
  token1: string;
}
获取 Uniswap Pool 的 token0 和 token1 地址

顺带一提,Uniswap V2/V3 结构类似,有两个重要的合约,UniswapFactory 和 UniswapPool。UniswapFactory 合约用来创建任意两个 Token 的 UniswapPool 合约,而 UniswapPool 才是真正用户交互进行 Token 兑换的地方。

交易记录的源头自然是区块链脸上数据没错,不过实际提取数据可以有多种途径:

实际上我们并不需要完整的链上 Logs 数据,只需要拿到和 Uniswap 相关的就行。可以通过 Uniswap Swap Event 相应的 topics 来从完整 Logs 数据中筛选出 Uniswap 相关的 Logs。这里我们以 Uniswap V2 为例:

Uniswap V2 / 从 Event 的定义算出对应的 Topics

从上图可以知道,只要是 Uniswap V2 Swap Event 的 Log,那么 topic0 就一定是 0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822,并且非空的 topics 数量为 3(topcis 最多可以有4个,由一个基础 Topic[topic0] 和三个追加 Topics[topic1-3] 组成)。Uniswap V3 同理。所以我们可以通过这两个条件,从完全的 Logs 中初略筛选出我们想要的数据。

/**
 * Uniswap V2 - Swap Event
 **/
const maybeUniswapV2SwapTopic = (log: IWeb3Log) => {
  const { topics, data } = log
  return (
    data.startsWith('0x') &&
    data != '0x0000000000000000000000000000000000000000000000000000000000000000' &&
    topics.length === 3 &&
    topics[0] === '0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822'
  )
}
const logs = await this.defaultWeb3CallService.ethWeb3.eth
  .getPastLogs({
    // fromBlock: from,
    // toBlock: to,
    topics: ['0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822', null, null],
  })
  .then((logs: IWeb3Log[]) => logs.filter(maybeUniswapV2SwapTopic))


/**
 * Uniswap V3 - Swap Event
 **/
export const maybeUniswapV3SwapTopic = (log: IWeb3Log) => {
  const { topics, data } = log
  return (
    data.startsWith('0x') &&
    data != '0x0000000000000000000000000000000000000000000000000000000000000000' &&
    topics.length === 3 &&
    topics[0] === '0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67'
  )
}
const logs = await this.defaultWeb3CallService.ethWeb3.eth
  .getPastLogs({
    // fromBlock: from,
    // toBlock: to,
    topics: ['0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67', null, null],
  })
  .then((logs: IWeb3Log[]) => logs.filter(maybeUniswapV3SwapTopic))
以 Web3.js 的 getPastLogs 方法为例

(PS: 不管是通过何种途径拿到的 Logs 数据,实际开发的时候肯定是会存进数据库的。不过本篇文章中为了方法,就直接用 TypeScript 的 interface 来表示数据表结构了。)

interface UniswapSwapLog {
  tx_hash: string;
  log_index: number;
  address: string;
  data: string;
  topic_0: string;
  topic_1: string;
  topic_2: string;
  block_timestamp: string;
}
SwapLog 的表结构(Typescript 形式)

总之,通过上面的一些方法和步骤,我们可以得到 Uniswap V2/V3 的历史 Swap Logs。至于如何存储在数据库中,是所有平台的 Log 存一个表还是每个平台单独存一个表都行,唯一需要注意的是,如果存一个表,需要多加一个字段来标记,这个Log来自哪个平台,因为不同平台产生的Log data数据的解析方式不一样。

UniswapSwapLog 包含有 DEX 平台交易信息的原始数据,但是 data 还处于未解析的状态。接下来要做的一个步骤是把 UniswapSwapLog 的数据解析并且转换成交易信息,最终得到的数据存储为 DexTrade 表。

interface DexTrade {
  contract: string;
  input_token: string;
  output_token: string;
  input_amount: string;
  output_amount: string;
  sender: string;
  recipient: string;
  tx_hash: string;
  log_index: number;
  timestamp: string;
  source: 'uniswap_v2' | 'uniswap_v3';
}
DexTrade 的表结构(Typescript 形式)

不同的平台不同的合约产生的 SwapLog 转换到 DexTrade 的方式不一,要分别处理。

import { chain } from 'lodash';

const UNISWAP_V2_SWAP_ABI: AbiInput[] = [{"indexed":true,"internalType":"address","name":"sender","type":"address"},{"indexed":false,"internalType":"uint256","name":"amount0In","type":"uint256"},{"indexed":false,"internalType":"uint256","name":"amount1In","type":"uint256"},{"indexed":false,"internalType":"uint256","name":"amount0Out","type":"uint256"},{"indexed":false,"internalType":"uint256","name":"amount1Out","type":"uint256"},{"indexed":true,"internalType":"address","name":"to","type":"address"}]

function decodeUniswapV2SwapLogData(data: string, topic1: string, topic2: string) {
  const web3 = new Web3()
  const decoded = web3.eth.abi.decodeLog(
    UNISWAP_V2_SWAP_ABI,
    data,
    [topic1, topic2],
  ) as any
  return decoded
}

function getDexTradesFromUniswapV2SwapLog(logs: UniswapSwapLog[], pools: DexPool[]): DexTrade[] {
  const docs = chain(logs).map((log): DexTrade | null => {
    try {
      const pool = pools.find(pool => pool.contractAddress.toLowerCase() === log.address.toLowerCase())
      if (!pool) throw new Error(`UniswapPool not found: ${log.address}`)
      const decodedData = decodeUniswapV2SwapLogData(log.data, log.topic_2, log.topic_3)
      const tokenPairData = (() => {
        if (!(
            (decodedData.amount0In === '0' && decodedData.amount1Out === '0') ||
            (decodedData.amount1In === '0' && decodedData.amount0Out === '0')
        )) {
          throw new Error('swap_event_data_layout_mismatch')
        }
        if (decodedData.amount0In === '0') {
          return {
            input_token: pool.token1.toLowerCase(),
            output_token: pool.token0.toLowerCase(),
            input_amount: decodedData.amount1In,
            output_amount: decodedData.amount0Out,
          }
        } else {
          return {
            input_token: pool.token0.toLowerCase(),
            output_token: pool.token1.toLowerCase(),
            input_amount: decodedData.amount0In,
            output_amount: decodedData.amount1Out,
          }
        }
      })()
      return {
        contract: log.address.toLowerCase(),
        ...tokenPairData,
        sender: decodedData.sender,
        recipient: decodedData.to,
        tx_hash: log.tx_hash.toLowerCase(),
        log_index: log.log_index,
        timestamp: log.block_timestamp,
        source: 'uniswap_v2',
      }
    } catch (error) {
      return null
    }
  })
  .compact()
  .value()

  return docs
}
从 Uniswap V2 Swap Logs 得到 DexTrades
import { chain } from 'lodash';

const UNISWAP_V3_SWAP_ABI: AbiInput[] = [{"indexed":true,"internalType":"address","name":"sender","type":"address"},{"indexed":true,"internalType":"address","name":"recipient","type":"address"},{"indexed":false,"internalType":"int256","name":"amount0","type":"int256"},{"indexed":false,"internalType":"int256","name":"amount1","type":"int256"},{"indexed":false,"internalType":"uint160","name":"sqrtPriceX96","type":"uint160"},{"indexed":false,"internalType":"uint128","name":"liquidity","type":"uint128"},{"indexed":false,"internalType":"int24","name":"tick","type":"int24"}]

function decodeUniswapV3SwapLogData(data: string, topic1: string, topic2: string) {
  const web3 = new Web3()
  const decoded = web3.eth.abi.decodeLog(
    UNISWAP_V3_SWAP_ABI,
    data,
    [topic1, topic2],
  ) as any
  return decoded
}

function getDexTradesFromUniswapV3SwapLog(logs: UniswapSwapLog[], pools: DexPool[]): DexTrade[] {
  const docs = chain(logs).map((log): DexTrade | null => {
    try {
      const pool = pools.find(pool => pool.contractAddress.toLowerCase() === log.address.toLowerCase())
      if (!pool) throw new Error(`UniswapPool not found: ${log.address}`)
      const decodedData = decodeUniswapV3SwapLogData(log.data, log.topic_2, log.topic_3)
      const tokenPairData = (() => {
        if (Number(decodedData.amount0) > 0) {
          return {
            input_token: pool.token0.toLowerCase(),
            output_token: pool.token1.toLowerCase(),
            input_amount: decodedData.amount0,
            output_amount: decodedData.amount1[0] === '-' ? decodedData.amount1.slice(1) : decodedData.amount1,
          }
        } else {
          return {
            input_token: pool.token1.toLowerCase(),
            output_token: pool.token0.toLowerCase(),
            input_amount: decodedData.amount1,
            output_amount: decodedData.amount0[0] === '-' ? decodedData.amount0.slice(1) : decodedData.amount0
          }
        }
      })()
      return {
        contract: log.address.toLowerCase(),
        ...tokenPairData,
        sender: decodedData.sender,
        recipient: decodedData.recipient,
        tx_hash: log.tx_hash.toLowerCase(),
        log_index: log.log_index,
        timestamp: log.block_timestamp,
        source: 'uniswap_v3',
      }
    } catch (error) {
      return null
    }
  })
  .compact()
  .value()

  return docs
}
从 Uniswap V3 Swap Logs 得到 DexTrades

现在我们得到了交易记录,接下来是考虑如何通过交易记录计算得到历史价格。对于历史价格,不同的需求计算的细节会有所不同,比如得到的历史价格数据如果是用于普通折线图,则只算一个平均价格即可,如果数据用于 K 线图,则额外还需要计算开盘价、收盘价、最高价和最低价,并且还要确认价格的间隔粒度,每天的价格还是每小时甚至每15分钟的价格。本篇以每天的平均价格为例,提一种每日平均价的计算思路。

为了能计算出 Token 的价格,我们首先要弄清楚 Token 是如何有价格的,或者说,Token 是如何有美元价格的。

以 AAVE 币为例。首先我们只考虑最简单的情况,只有一个池子 UniPool<AAVE⬌USDT> 的情况。AAVE 和 USDT 在 AMM 的模型下可以自由兑换,某一时刻用户 A 用 1AAVE 兑换得到了 150USDT,那么就可以说,此刻 AAVE 的价格为 150USD。

另一种情况,如果 UNTK (虚构的一种 Token)没有和稳定币(USDT、USDC 等)做池子,而是和 ETH 这样的大 Token 做池子 UniPool<UNTK⬌ETH>,然后 ETH 又和 USDT 做了池子 UniPool<ETH⬌USDT>。某一时刻用户 A 用 1UNTK 兑换得到了 0.01ETH,而此时用户 B 用 1 ETH 兑换得到了 3000USDT,那么就可以间接的得出此时 UNTK 的价格是 30USD。

以此类推,任意一种 Token,只要存在一条兑换链,最终可兑换到稳定币,那么就可以得出这个 Token 的价格。这是基础的价格计算规则,但是实际会有一些问题需要考虑处理:

  • 任意一种 Token,可能存在多条兑换链,可计算出多个价格;
  • 存在的多条兑换链,其中某些兑换链,可能有恶意的 Token 和池子参与其中,导致最终计算的价格不准确甚至是错误的。

理论上,任意一种 Token 可以通过一条很长的兑换链最终得出价格,但是实际上,兑换链越长,最终得到的结果很有可能不准确。举个简单的例子:

  1. 1ETH ⬌ 3000 USDT
  2. 1UNTK ⬌ 0.01ETH
  3. 1AAVE ⬌ 150USDT
  4. 1UNTK ⬌ 0.5AAVE

从这四个池子中,可以通过两条兑换链得出 UNTK 的价格,分别是 1UNTK = 0.01ETH = 30USDT 和 1UNTK = 0.5AAVE = 75USDT。4号池子是某个用户随意创建出来的池子,但是实际池子里并没有足够的资金进行正常价值的兑换。这位用户可能的确是在某个时刻用 1UNTK 最终换到了 75 USDT,但是由于池子的资金总量过小,另一个人用户再次兑换的时候,并不能以相同的价格进行兑换,这种小池子的交易理论上都应该排除在价格计算之外。

我们可以对 Token 做一个分类:

  • 稳定币 Token(StableToken),比如 USDT、USDC、DAI 等;
  • 白名单重要 Token(VipToken),比如 WETH、WBTC 等;
  • 普通 Token (CommonToken),比如 AAVE、1INCH 等,包含 StableToken 和 VipToken;

于是我们可以规定一个价格计算的策略:1. 任意 Token 可作为兑换链的起始 Token;2. 兑换链中参与的 Token 除起始 Token 外只能包含 StableToken 和 VipToken;3. 兑换链结尾 Token 只能是 StableToken;4. 兑换链长度最多为 3。

对于所有的 Token 两两之间的池子,从全局的角度来看可以构建成一种图,任意两个 Token 之前的兑换池子看作两条有向路径。有了这样的图,就能很方便的找到任意一个 Token 到 StableToken 的兑换链。

本篇文章以 Node.js 为例,使用 Graphology 这个工具库来构建图。在创建图之前,我们先要从 DexTrade 表(数据库以 Clickhouse 为例)中聚合出任意一天的 Token 交易对。

WITH raw_data AS (
  SELECT
    timestamp,
    (input_token < output_token ? input_token : output_token) token0,
    (input_token < output_token ? output_token : input_token) token1,
    (input_token < output_token
      ? input_amount
      : output_amount
    ) token0_amount,
    (input_token < output_token
      ? output_amount
      : input_amount
    ) token1_amount,
    (input_token < output_token ? (toInt128(output_amount) / toInt128(input_amount)) : (toInt128(input_amount) / toInt128(output_amount))) rate
  FROM DexTrade
  WHERE timestamp >= '2022-02-15 00:00:00' AND timestamp < '2022-02-16 00:00:00'
), average_rate AS (
    SELECT
      date_trunc('day', timestamp) date,
      token0, token1,
      AVG(rate) rate
    FROM raw_data
    GROUP BY date_trunc('day', timestamp), token0, token1
), data AS (
    SELECT *
    FROM raw_data
    LEFT JOIN average_rate
      ON average_rate.token0 = raw_data.token0
      AND average_rate.token1 = raw_data.token1
      AND average_rate.date = date_trunc('day', timestamp)
    WHERE ABS(raw_data.rate - average_rate.rate) < average_rate.rate * 4
)
SELECT
  date_trunc('day', timestamp) date,
  token0,
  token1,
  toString(SUM(toInt256(token0_amount))) token0_total_amount,
  toString(SUM(toInt256(token1_amount))) token1_total_amount
FROM data
GROUP BY date_trunc('day', timestamp), token0, token1
从 DexTrade 聚合出某一天 Token 交易对

这个查询比较复杂,对数据做了一些处理,这里做一些说明:

  • 交易对 token0 和 token1,为了能得到唯一的(unique)交易对,任意两个 Token 通过字母表顺序排序确定哪一个是 token0,哪一个是 token1;
  • 针对当天任意一个交易对,计算平均交易兑换率,用来排除掉一些不靠谱的交易记录(如果某条交易兑换率比平均的4倍还大,则排除掉);
  • 实际聚合的是当天,交易对两个 token 的交易总量,而不是比率。
聚合任意一天的 Token 交易对的查询结果

有了交易对,构建 Token 兑换的有向图就变得很容易了。

import { DirectedGraph } from 'graphology';


// 每天 Token 交易对
export interface DexTradeDailySummary {
  // 从数据库查询中得到的数据
  token0: string;
  token1: string;
  token0_total_amount: string;
  token1_total_amount: string;
  date: string;

  // 使用 Web3.js 获取到的 token 信息
  token0Obj: {
    address: string;
    name: string;
    symbol: string;
  };
  token1Obj: {
    address: string;
    name: string;
    symbol: string;
  };
}

const summaries: DexTradeDailySummary[]

const graph = new DirectedGraph<
  { symbol: string },
  { sourceToken: TokenInfo; targetToken: TokenInfo; exchange: { base: BigNumber; rate: BigNumber; } },
  {}
>();
for (const summary of summaries) {
  if (!graph.hasNode(summary.token0)) {
    graph.addNode(summary.token0, { symbol: summary.token0Obj.symbol });
  }
  if (!graph.hasNode(summary.token1)) {
    graph.addNode(summary.token1, { symbol: summary.token1Obj.symbol });
  }

  const token0_total_amount = new BigNumber(summary.token0_total_amount).dividedBy(10 ** summary.token0Obj.decimals);
  const token1_total_amount = new BigNumber(summary.token1_total_amount).dividedBy(10 ** summary.token1Obj.decimals);
  if (!graph.hasDirectedEdge(summary.token0, summary.token1)) {
    const key = `${summary.token0}->${summary.token1}`;
    graph.addDirectedEdgeWithKey(key, summary.token0, summary.token1, {
      sourceToken: summary.token0Obj,
      targetToken: summary.token1Obj,
      exchange: {
        base: token0_total_amount,
        rate: token1_total_amount.dividedBy(token0_total_amount),
      },
    });
  }
  if (!graph.hasDirectedEdge(summary.token1, summary.token0)) {
    const key = `${summary.token1}->${summary.token0}`;
    graph.addDirectedEdgeWithKey(key, summary.token1, summary.token0, {
      sourceToken: summary.token1Obj,
      targetToken: summary.token0Obj,
      exchange: {
        base: token1_total_amount,
        rate: token0_total_amount.dividedBy(token1_total_amount),
      },
    });
  }
}
构建 Token 兑换路径图

构建好了 Token 兑换关系图,即可遍历兑换链按照前文的计算方法和策略算出 Token 的价格。

import { chain, isArray } from 'lodash';

const WETH = '0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2'
const WBTC = '0x2260fac5e5542a773aa44fbcfedf7c193bc2c599'

const USDT_TOKEN = '0xdac17f958d2ee523a2206206994597c13d831ec7';
const USDC_TOKEN = '0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48';
const DAI_TOKEN = '0x6b175474e89094c44da98b954eedeac495271d0f';

const STABLE_USD_TOKENS = [USDT_TOKEN, USDC_TOKEN, DAI_TOKEN];
const VIP_TOKENS = [WETH, WBTC]

interface ITokenExchangeVolumePair {
  address: string;
  data: {
    targetVolume: BigNumber;
    usdVolume: BigNumber;
  };
  time: Date;
}

const generateHistoryPrices = (targetTokenAddresses: string[], baseUsdToken: string): ITokenExchangeVolumePair[] => {
  if (!graph.hasNode(baseUsdToken)) return [];

  const volumePairs: ITokenExchangeVolumePair[] = []
  for (const targetTokenAddress of targetTokenAddresses) {
    if (targetTokenAddress.toLowerCase() === baseUsdToken.toLowerCase()) continue
    if (!graph.hasNode(targetTokenAddress)) continue

    const paths = (() => {
      const paths: string[][] = []

      graph.forEachEdge(targetTokenAddress, (edge, attr, source, target) => {
        if (target === baseUsdToken) {
          paths.push([targetTokenAddress, baseUsdToken])
        }
      })

      if (!STABLE_USD_TOKENS.includes(targetTokenAddress)) {
        graph.forEachEdge(targetTokenAddress, (edge1, attr1, source1, target1) => {
          if (VIP_TOKENS.includes(target1)) {
            graph.forEachEdge(target1, (edge2, attr2, target2) => {
              if (target2 === baseUsdToken) {
                paths.push([targetTokenAddress, target1, target2])
              }
            })
          }
        })
      }

      return paths
    })()

    if (!paths || !isArray(paths) || paths.length === 0) continue;

    const volumeData = (() => {
      const data: {
        targetVolume: BigNumber;
        usdVolume: BigNumber;
      }[] = []
      for (const path of paths) {
        let targetVolume : BigNumber = new BigNumber(0);
        let usdVolume: BigNumber = new BigNumber(0);
        path.forEach((token, index) => {
          if (index === 0) return;
          const key = `${path[index-1]}->${path[index]}`;
          const exchange = graph.getDirectedEdgeAttribute(key, 'exchange')  || {
            base: new BigNumber(1),
            rate: new BigNumber(1),
          };
          if (index === 1) {
            targetVolume = exchange.base
            usdVolume = targetVolume.multipliedBy(exchange.rate)
          } else {
            usdVolume = usdVolume.multipliedBy(exchange.rate)
          }
        })
        data.push({
          targetVolume,
          usdVolume,
        })
      }
      if (data.length === 0) return null
      return {
        targetVolume: BigNumber.sum(...data.map((i) => i.targetVolume)),
        usdVolume: BigNumber.sum(...data.map((i) => i.usdVolume)),
      }
    })()
    if (!volumeData) continue

    volumePairs.push({
      address: targetTokenAddress,
      data: volumeData,
      time,
    })
  }
  return volumePairs
}

let allVolumePairs: ITokenExchangeVolumePair[] = []
for (const baseUsdToken of STABLE_USD_TOKENS) {
      allVolumePairs = allVolumePairs.concat(generateHistoryPrices(tokenAddresses, baseUsdToken))
}

const historyPrices = chain(allVolumePairs)
  .groupBy((i) => i.address)
  .mapValues((items, address): IClickhouseDexTokenHistoryPrice => {
    const price = BigNumber
      .sum(...items.map((i) => i.data.usdVolume))
      .dividedBy(BigNumber.sum(...items.map((i) => i.data.targetVolume)))

    return {
      address: items[0].address,
      price: price.toFixed(4),
      time,
    }
  })
  .values()
  .value()
通过 Token 兑换关系图,遍历兑换链计算出 Token 价格

实现的代码比较复杂,这里也做一些说明:

  • 对于稳定币的价格计算做特殊处理,稳定币只和其余稳定币做交易对计算价格;
  • 对于多个兑换链,计算目标 Token 总量和可换得的稳定币的总量,取和再算平均数。

最终可以得到这样的数据表:

interface DexTokenHistoryPrice {
  address: string;
  price: string;
  volume: string;
  time: Date;
}

值得注意的一点,对于一些小众的 Token,可能存在整天都没有交易的情况,在目前的实现方式下,数据表里不会有相应当天的历史价格记录。所以如果这样的数据表用来在数据库查询里连接到其他表参与计算时,要注意处理某些 Token 某些天没有数据的情况,一般来说,可以往前找一个有记录的价格当作当天的价格。同样也是因为这个原因,如果想把历史价格的时间间隔改小(比如每小时),目前的实现方式可能就不太合适,要做适当调整。

最终得到 AAVE 的一般准确的历史价格图表:

使用 echarts 展示生成的历史价格(AAVE)