""".#test.py main module."""
|
import json
|
import time
|
import httpx
|
from whereismysock import all
|
|
import book
|
import protocol
|
|
HOST: str = 'sytev070'
|
PORT: str = '9000'
|
|
USER: str = 'test'
|
PASS: str = 'none'
|
|
|
def send(message):
|
"""Send message to exchange."""
|
return httpx.post(f"http://{HOST}:{PORT}/execution",
|
data={"message": json.dumps(message),
|
"username": USER,
|
"password": PASS}).json()
|
|
|
def logAllIncomingMessages():
|
"""Test function to print websocket."""
|
messages = all(f"ws://{HOST}:{PORT}/information")
|
while True:
|
for message in next(messages):
|
print(message)
|
time.sleep(0.1)
|
|
|
def printRecoveryLog():
|
"""Test function to print recovery log."""
|
so_far_today = httpx.get(f"http://{HOST}:{PORT}/recover").json()
|
for message in so_far_today:
|
print(message)
|
|
|
def recoverBook(bs: {str, book.Book}):
|
"""Recover book from completely lost state."""
|
so_far_today = httpx.get(f"http://{HOST}:{PORT}/recover").json()
|
for message in so_far_today:
|
protocol.handleMessage(bs, message)
|
|
|
message = {"type": "ADD", "product": "F_SOH_APP0104T1700", "price": 99.0,
|
"side": "BUY", "volume": 1000000}
|
# print(json.dumps(message))
|
# print(send(message))
|
# printRecoveryLog()
|
# logAllIncomingMessages()
|
# print(getInformation())
|
# wstest()
|
|
bs = {}
|
# recoverBook(bs)
|
# print(list(bs))
|
# for i in bs:
|
# print(bs[i].product)
|
# bs[i].printBook()
|
book.testBook()
|