Задача обнаружения арбитражных возможностей при торговле различными финансовыми инструментами на нескольких рынках, похоже, вошла в обиход не только арбитражников, но и алгоритмистов. В настоящее время всякий раз, когда вы ищете решение для него, вы неизбежно попадаете либо на сообщения в блогах, обсуждающие решения четвертой степени, основанные на идее модифицированной версии алгоритма Флойда-Уоршалла, либо на решение проблемы обнаружения арбитража Седжвика и Уэйна. поиск любой одной прибыльной цепочки с использованием логарифмического преобразования с последующим обнаружением отрицательного цикла. И что хуже всего, слишком часто считается, что последний находит самую выгодную возможность. Ой!

Я тоже все это пережил, после того как наткнулся на очередную реинкарнацию головоломки и попытался понять, есть ли какие-то достижения в этой области за последнее время. Все, что я получил, — это дамп одинаково выглядящих публичных репозиториев (большинство из которых по ошибке использовали прием обнаружения отрицательного цикла, чтобы найти лучшее решение) и вышеупомянутые сообщения в блогах. Я чуть было не вложил свои два пенса и представил миру очередную непрошеную реализацию всем известного алгоритма поиска кратчайшего пути для всех пар (правда, написанного на Scala), однако древняя мудрость использования линейной алгебры для решения графовых задач проснулась в моем подсознании и плодоносил через несколько часов.

Чтобы открыть новые горизонты, давайте вспомним тропическое полукольцо max-plus над действительными числами (вы никогда не меняли свою сложную алгебру 101 на бирюзовые воды Карибского моря, не так ли?), которое представляет собой множество действительных чисел плюс отрицательная бесконечность R∪{ −∞} с двумя операциями, называемыми «плюс» (⊕) и «умножение» (⊗), определенными как a ⊕ b = max(a, b) и a ⊗ b = a + b. Если вам не хватает
домашнего задания, проверьте, действительно ли такая алгебраическая структура является полукольцом.

«Аддитивный» и «мультипликативный» моноиды могут быть выражены с помощью cats.Monoid[T] и scala.math.Ordering[T] соответственно. Я буду использовать java.math.BigDecimal как самую простую и ближайшую альтернативу точным действительным числам. Однако для того, чтобы вы могли реализовать настраиваемую реализацию реальных значений, которые, по крайней мере, закрыты при сложении и логарифмировании и которые гарантируют правильные результаты при вычислении максимумов, я представлю область со следующим параметрическим типом:

sealed trait MaxPlusDomain[+T] extends Any with java.io.Serializable
case object NegativeInfinity extends MaxPlusDomain[Nothing]
case class FiniteValue[T](value: T) extends AnyVal with MaxPlusDomain[T]
object MaxPlusDomain {
  def argMax[T](values: IndexedSeq[MaxPlusDomain[T]])(implicit ord: Ordering[T]): (Option[Int], MaxPlusDomain[T]) =
    if (values.isEmpty) (None, NegativeInfinity)
    else {
      var m: Int = 1
      var maxIdx: Int = 0
      var max: MaxPlusDomain[T] = values(maxIdx)
      while (m < values.size) {
        (max, values(m)) match {
          case (FiniteValue(x), acc@FiniteValue(a)) =>
            if (ord.lt(x, a)) {
              maxIdx = m
              max = acc
            }
          case (NegativeInfinity, acc) =>
            maxIdx = m
            max = acc
          case (_, NegativeInfinity) => ()
        }
        m += 1
      }
      (Some(maxIdx), max)
    }
}

Мы можем составить матрицы из элементов этого полукольца, попытаться перемножить их и обнаружить, что алгоритм их умножения будет напоминать классическое умножение матриц из учебника с той оговоркой, что обычное сложение заменено операциями взятия максимума, а обычное умножение заменяется обычным сложением, т. е. если A и B — матрицы с p столбцов и p строк соответственно, а [M]ᵢⱼ — ijᵗʰ элемент матрицы M, то [A⊗B]ᵢⱼ=max([A] ᵢ₁+[B]₁ⱼ,…,[A]ᵢₚ+[B]ₚⱼ).

Если теперь мы возьмем матрицу расстояний D графа и вычислим ее pᵗʰ max-плюс тропическая мощность, то [Dᵖ]ᵢⱼ будет равно длине самого длинного пути от вершины i до вершины j с использованием не более чем p шагов, т. е. наибольшее значение на диагонали силы указывает на наибольшую замкнутую прогулку с не более чем p шагов и формирует многообещающую основу для поиска наиболее прибыльной арбитражной возможности.

Осталось только

  • найдите способ реконструировать эти блуждания, например, отслеживая все ребра в блужданиях при вычислении максимумов:
class MaxPlusTropicalSquareMatrix[T](private val matrix: Array[Array[MaxPlusDomain[T]]],
                                     private val walks: Map[(Int, Int), IndexedSeq[Int]])
                                    (implicit additive: Monoid[T], ord: Ordering[T]) {
  val dimension: Int = matrix.length

  def apply(i: Int, j: Int): MaxPlusDomain[T] = matrix(i)(j)

  def walk(from: Int, to: Int): Option[IndexedSeq[Int]] = walks.get(from -> to)

  private def update(i: Int, j: Int, value: MaxPlusDomain[T]): Unit = {
    matrix(i)(j) = value
  }

  def *(multiplier: MaxPlusTropicalSquareMatrix[T]): MaxPlusTropicalSquareMatrix[T] = {
    require(multiplier.dimension == dimension)
    val product = Array.fill[MaxPlusDomain[T]](dimension, dimension)(NegativeInfinity)
    var walks = scala.collection.mutable.ListBuffer.empty[((Int, Int), IndexedSeq[Int])]
    if (dimension > 0) {
      val accumulator = Array.ofDim[MaxPlusDomain[T]](dimension)
      var i = 0
      while (i < dimension) {
        var j = 0
        while (j < dimension) {
          var k = 0
          while (k < dimension) {
            accumulator(k) = (this (i, k), multiplier(k, j)) match {
              case (NegativeInfinity, _) => NegativeInfinity
              case (_, NegativeInfinity) => NegativeInfinity
              case (FiniteValue(lv), FiniteValue(rv)) => FiniteValue(additive.combine(lv, rv))
            }
            k += 1
          }
          MaxPlusDomain.argMax(accumulator) match {
            case (_, NegativeInfinity) => ()
            case (Some(maxIndex), max: FiniteValue[T]) =>
              product(i)(j) = max
              walk(i, maxIndex).foreach(walk => walks=walks :+ (i -> j) -> (walk :+ j))
          }
          j += 1
        }
        i += 1
      }
    }
    new MaxPlusTropicalSquareMatrix[T](product, Map.from(walks))
  }
}

object MaxPlusTropicalSquareMatrix {
  def apply[T](dimension: Int, values: Map[(Int, Int), FiniteValue[T]])
              (implicit additive: Monoid[T], ord: Ordering[T]) = {
    val matrix = Array.fill[MaxPlusDomain[T]](dimension, dimension)(NegativeInfinity)
    var walks = scala.collection.mutable.ListBuffer.empty[((Int, Int), IndexedSeq[Int])]
    values.foreachEntry { case ((from, to), value) =>
      matrix(from)(to) = value
      walks = walks :+ (from -> to) -> IndexedSeq(from, to)
    }

    new MaxPlusTropicalSquareMatrix[T](matrix, Map.from(walks))
  }
}
  • выделить только те замкнутые блуждания, которые образуют простые циклы, например, с помощью алгоритма черепахи и зайца:
def isSimpleCycle(cycle: IndexedSeq[Int]): Boolean = {
  val next = scala.collection.mutable.Map.empty[Int, Int]
  for (i <- 0 until cycle.size - 1) next(cycle(i)) = cycle(i + 1)
  val start = cycle(0)
  var (slow, fast) = (next(start), next(next(start)))
  while (slow != fast) {
    slow = next(slow)
    fast = next(next(fast))
  }

  var mu = 0
  slow = start
  while (slow != fast) {
    slow = next(slow)
    fast = next(fast)
    mu += 1
  }

  var lambda = 1
  fast = next(slow)
  while (slow != fast) {
    fast = next(fast)
    lambda += 1
  }
  mu == 0 && lambda == cycle.size - 1
}
  • вычислить тропические степени D до числа вершин графа и выбрать среди них максимальное диагональное значение, соответствующее простому циклу:
val (index2Currency, distanceMatrix) = buildDistanceMatrix(fxRates)

var (highestPositiveExponent, bestStrategy) = (ZERO, none[Seq[CurrencyCode]])

var powerOfDistanceMatrix = distanceMatrix
val values = Array.ofDim[MaxPlusDomain[BigDecimal]](powerOfDistanceMatrix.dimension)
for (_ <- 1 until powerOfDistanceMatrix.dimension) {
        powerOfDistanceMatrix *= distanceMatrix
        for (i <- 0 until powerOfDistanceMatrix.dimension) {
          values(i) = powerOfDistanceMatrix.walk(i, i).filter(isSimpleCycle).map(_ => powerOfDistanceMatrix(i, i)).getOrElse(NegativeInfinity)
        }
        MaxPlusDomain.argMax(values) match {
          case (_, NegativeInfinity) => ()
          case (Some(maxIndex), max: FiniteValue[BigDecimal]) =>
            if (max.value.bigDecimal.compareTo(highestPositiveExponent) > 0) {
              highestPositiveExponent = max.value.bigDecimal
              bestStrategy = powerOfDistanceMatrix.walk(maxIndex, maxIndex).map(_.map(index2Currency))
            }
        }
      }
  • и, наконец, каким-то образом преобразовать веса ребер графа, чтобы они имели смысл в новой структуре; к счастью, эту пулю можно пробить, взяв логарифмы веса, согласно Седжвику и Уэйну:
def buildDistanceMatrix(fxRates: Map[CurrencyPair, PosBigDecimal]):
(Map[Int, CurrencyCode], MaxPlusTropicalSquareMatrix[BigDecimal]) = {
  val currency2index = scala.collection.mutable.Map.empty[CurrencyCode, Int]

  var nextIndex = 0

  def nextAvailableIdx: Int = {
    val idx = nextIndex
    nextIndex += 1
    idx
  }

  val matrixValues = fxRates.filter(_._2.value != ZERO).map { case (currencyPair, rate) =>
    val (base, quote) = currencyPair
    val from = currency2index.getOrElseUpdate(base, nextAvailableIdx)
    val to = currency2index.getOrElseUpdate(quote, nextAvailableIdx)

    ((from, to), FiniteValue(BigDecimal(DefaultBigDecimalMath.log(rate.value.bigDecimal))))
  }

  val index2currency = Map.from(currency2index.map{case (currency, index) => index -> currency})

  (index2currency, MaxPlusTropicalSquareMatrix[BigDecimal](nextIndex, matrixValues))
}

Все это в совокупности приводит к алгоритму обнаружения лучших возможностей для арбитража, который работает за O(n⁴), где n — количество торгуемых инструментов. Проверьте репозиторий GitHub для кода и дополнений:



Хорошо известный прием возведения в степень путем возведения в квадрат можно использовать для повышения сложности до O(n³log n).

Однако существует метод, специально предназначенный для вычисления тропического произведения двух матриц n на n, который после замены моей наивной процедуры умножения даст алгоритм обнаружения возможностей субчетвертого арбитража, работающий в n⁴/2ˡᵒᵍ ͩ ⁿ время для некоторого положительного d в реальной оперативной памяти, где сложения и сравнения реальных единиц являются удельной стоимостью.

Первоначально опубликовано на https://mplevako.github.io/2022/08/04/subquartic-arbitrage.html .