Joel Grunbaum
2022-01-20 128c6d51ec8c70e230dc86b100cb887ba3f0378d
protocol.cpp
@@ -2,22 +2,31 @@
#include "book.hpp"
#include "cpp-httplib/httplib.h"
#include "easywsclient/easywsclient.hpp"
#include "json.hpp"
#include "secrets.hpp"
#include <chrono>
#include <cstddef>
#include <iostream>
#include <memory>
#include <queue>
#include <sstream>
#include <stdexcept>
#include <string>
#include <unordered_map>
using namespace std::literals;
namespace protocol
{
static std::unordered_map<json::MessageTypes, book::ProductTypeEnum>
   mapAnnounce;
std::string server = std::string(HOST) + ":" + std::string(PORT);
httplib::Client cli("http://" + server);
#define SERVER HOST ":" PORT
httplib::Client cli("http://" SERVER);
std::unique_ptr<easywsclient::WebSocket> ws;
double lastime = 0;
void initialise()
{
@@ -33,33 +42,72 @@
   // std::ifstream sampleFile("../data.test");
   // std::stringstream ss;
   // ss << sampleFile.rdbuf();
   httplib::Client cli("http://" + server);
   auto res = cli.Get("/recover");
   std::string l;
   // l = ss.str();
   l = res->body;
   std::queue<json::Message*> a(json::parse(l));
   while (!a.empty()) {
      protocol::handleMessage(bs, a.front());
      if (static_cast<json::FromExchange*>(a.front()) != nullptr ||
          static_cast<json::FromExchange*>(a.front())->timestamp > lastime) {
         lastime = static_cast<json::FromExchange*>(a.front())->timestamp;
         protocol::handleMessage(bs, a.front());
      }
      delete a.front();
      a.pop();
   }
   return bs;
}
void createWebSocket()
{
   ws = std::unique_ptr<easywsclient::WebSocket>(
      easywsclient::WebSocket::pointer(
         easywsclient::WebSocket::from_url("ws://" SERVER "/information")));
   ws->poll();
}
std::deque<json::Message*>
catchUp(std::unordered_map<std::string, book::Book>& bs)
{
   std::string feed;
   bool gotMessage;
   std::deque<json::Message*> out;
   do {
      gotMessage = false;
      ws->poll();
      ws->dispatch([gotMessageOut = &gotMessage, messageOut = &feed,
                    ws = ws.get()](const std::string& message) {
         *gotMessageOut = true;
         *messageOut = message;
      });
      if (gotMessage) {
         std::queue<json::Message*> a(json::parse(feed));
         while (!a.empty()) {
            if (static_cast<json::FromExchange*>(a.front()) != nullptr ||
                static_cast<json::FromExchange*>(a.front())->timestamp >
                    lastime) {
               lastime =
                  static_cast<json::FromExchange*>(a.front())->timestamp;
            }
            protocol::handleMessage(bs, a.front());
            out.push_back(a.front());
            a.pop();
         }
      }
   } while (gotMessage);
   return out;
}
json::Message* addOrder(json::AddMessage& order)
{
   std::string message = "{\"message\": " + order.as_string() + ", " +
                         "\"username\": \"" + std::string(USER) +
                         "\", \"password\": \"" + std::string(PASS) + "\"}";
   std::string message = order.as_string();
   return send(message);
}
json::Message* deleteOrder(json::DeleteMessage& order)
{
   std::string message = "{\"message\": " + order.as_string() +
                         ", \"username\": \"" + std::string(USER) +
                         "\", \"password\": \"" + std::string(PASS) + "\"}";
   std::string message = order.as_string();
   return send(message);
}
@@ -67,29 +115,32 @@
                   json::Message* message)
{
   if (mapAnnounce.empty()) initialise();
   if (dynamic_cast<json::FromExchange*>(message) != nullptr) {
      lastime = dynamic_cast<json::FromExchange*>(message)->timestamp;
   }
   switch (message->type) {
   case json::FUTURE_TYPE:
   case json::SPREAD_TYPE:
   case json::CALL_TYPE:
   case json::PUT_TYPE:
      announce(bs, dynamic_cast<json::AnnounceMessage*>(message));
      announce(bs, static_cast<json::AnnounceMessage*>(message));
      break;
   case json::SETTLEMENT:
      settle(bs, dynamic_cast<json::SettleMessage*>(message));
      settle(bs, static_cast<json::SettleMessage*>(message));
      break;
   case json::ADDED:
      addedOrder(bs, dynamic_cast<json::AddedMessage*>(message));
      addedOrder(bs, static_cast<json::AddedMessage*>(message));
      break;
   case json::DELETED:
      deletedOrder(bs, dynamic_cast<json::DeletedMessage*>(message));
      deletedOrder(bs, static_cast<json::DeletedMessage*>(message));
      break;
   case json::TRADE:
      tradeOrder(bs, dynamic_cast<json::TradeMessage*>(message));
      tradeOrder(bs, static_cast<json::TradeMessage*>(message));
      break;
   case json::BROKER_REQUEST:
   case json::BROKER_ACK:
   case json::BROKER_CONFIRM:
      broker(bs, dynamic_cast<json::Broker*>(message));
      broker(bs, static_cast<json::Broker*>(message));
      break;
   default:;
   }
@@ -111,11 +162,11 @@
void addedOrder(std::unordered_map<std::string, book::Book>& bs,
                json::AddedMessage* message)
{
   if (message->side == book::Buy) {
   if (message->side == book::Buy && message->resting) {
      book::Order t(message->price, book::Buy, message->resting,
                    message->timestamp, message->id);
      bs[message->product].bid(t);
   } else {
   } else if (message->side == book::Sell && message->resting) {
      book::Order t(message->price, book::Sell, message->resting,
                    message->timestamp, message->id);
      bs[message->product].ask(t);
@@ -125,25 +176,19 @@
                  json::DeletedMessage* message)
{
   if (message->side == book::Buy) {
      for (std::size_t i = 0; i < bs[message->product].bidSide.size(); i++) {
         if (bs[message->product].bidSide[i].id == message->id) {
            bs[message->product].bidSide[i] =
               bs[message->product].bidSide.back();
            bs[message->product].bidSide.pop_back();
            std::make_heap(bs[message->product].bidSide.begin(),
                           bs[message->product].bidSide.end(),
                           std::less<book::Level>());
      for (auto i = bs[message->product].bidSide.begin();
           i != bs[message->product].bidSide.end(); i++) {
         if (i->id == message->id) {
            bs[message->product].bidSide.erase(i);
            break;
         }
      }
   } else {
      for (std::size_t i = 0; i < bs[message->product].askSide.size(); i++) {
         if (bs[message->product].askSide[i].id == message->id) {
            bs[message->product].askSide[i] =
               bs[message->product].askSide.back();
            bs[message->product].askSide.pop_back();
            std::make_heap(bs[message->product].askSide.begin(),
                           bs[message->product].askSide.end(),
                           std::greater<book::Level>());
      for (auto i = bs[message->product].askSide.begin();
           i != bs[message->product].askSide.end(); i++) {
         if (i->id == message->id) {
            bs[message->product].askSide.erase(i);
            break;
         }
      }
   }
@@ -152,15 +197,42 @@
                json::TradeMessage* message)
{
   if (message->tradeType == json::BUY_AGGRESSOR) {
      book::Order t(message->price, book::Buy, message->volume,
                    message->timestamp, message->passiveOrder);
      bs[message->product].bid(t);
   } else {
      book::Order t(message->price, book::Sell, message->volume,
                    message->timestamp, message->passiveOrder);
      bs[message->product].ask(t);
      if (message->passiveOrderRemaining > 0) {
         for (auto& i : bs[message->product].askSide) {
            if (i.id == message->passiveOrder) {
               i.volume = message->passiveOrderRemaining;
               break;
            }
         }
      } else {
         for (auto i = bs[message->product].askSide.begin();
              i != bs[message->product].askSide.end(); i++) {
            if (i->id == message->passiveOrder) {
               bs[message->product].askSide.erase(i);
               break;
            }
         }
      }
   } else if (message->tradeType == json::SELL_AGGRESSOR) {
      if (message->passiveOrderRemaining > 0) {
         for (auto& i : bs[message->product].bidSide) {
            if (i.id == message->passiveOrder) {
               i.volume = message->passiveOrderRemaining;
               break;
            }
         }
      } else {
         for (auto i = bs[message->product].bidSide.begin();
              i != bs[message->product].bidSide.end(); i++) {
            if (i->id == message->passiveOrder) {
               bs[message->product].bidSide.erase(i);
               break;
            }
         }
      }
   }
}
void broker(std::unordered_map<std::string, book::Book>& bs,
            json::Broker* message)
{
@@ -168,9 +240,16 @@
json::Message* send(std::string& message)
{
   auto res = cli.Post("/execution", message, "text/plain");
   std::queue<json::Message*> a = json::parse(res->body);
   return a.front();
   httplib::MultipartFormDataItems a = {{"message", message, "", ""},
                                        {"username", USER, "", ""},
                                        {"password", PASS, "", ""}};
   auto res =
      cli.Post("/execution",
                "message=" + message + "&username=" USER "&password=" PASS,
                "application/x-www-form-urlencoded");
   std::string b = res->body;
   std::queue<json::Message*> c = json::parse(b);
   return c.front();
}
} // namespace protocol