In this post you will read about:
How to code the OMS and execute orders (buy, sell, modify order, take profit, stop-loss) with IBKR (Interactive Brokers)
How to create Messaging Queue system (gateway) for multiple brokers e.g., OANDA
How to set up IBKR and add orders to our Strategies.
Let’s first explain the structure and basic terms.
Order Management System
A trade order management system (OMS) is a digital tool for executing and tracking orders in the securities market. It's often mentioned alongside Execution Management System (EMS), which is in fact mostly a subset of OMS, and is more responsive and allows for precise, time-sensitive transactions. Buy-side companies usually use EMS to execute trades based on market conditions and positions, and by HFT traders where frequent real-time decisions and transactions are being made.
We will code a simplified yet efficient version of an OMS that provides a view of all ongoing and completed orders to facilitate accurate transaction settlement. Our OMS will also allow us to evaluate our overall performance and profitability through detailed reporting.
It's worth adding that most OMS trades use a protocol called the Financial Information exchange (FIX), which drives most transactions in the securities markets. FIX is an electronic communications protocol used to share international real-time exchange information related to trillions of dollars in securities transactions and markets. The broker platforms we will use, Interactive Brokers (IBKR) and Oanda, both support the Financial Information exchange (FIX) protocol. In both cases, the FIX API supports various order types, real-time market data, and account updates.
Architecture
Our OMS architecture will be created with decoupled components, allowing for a modular and scalable design where different components can communicate and exchange data, regardless of the specific trading platforms being used, such as IBKR, OANDA, Binance, or others. This is another part extending QuantJourney Framework with capabilities of executing orders and checking your brokerage account, do trading, etc. [ALL QuantJourney Framework Code is available to paid customers]
To create flexible OMS, we get help from RabbitMQ, an open-source message-broker software (sometimes called message-oriented middleware) that originally implemented the Advanced Message Queuing Protocol (AMQP). RabbitMQ is quite popular and easy to deploy, making it a suitable choice for our architecture. In fact, there are many other solutions, such as Apache Kafka and ZeroMQ, but we will focus on RabbitMQ as it’s simple.
Let’s see each component:
RabbitMQ - a message broker software that acts as an intermediary for exchanging messages between different parts of a distributed system. It allows us to send and receive messages across various components or services of our application.
IBKR - the trading platform; in QuantJourney Framework you will find the
IBKRExchange
class representing a component that interacts with the Interactive Brokers (IBKR) platform. This class uses theRabbitMQClient
(which internally uses Pika) to send and receive messages to/from RabbitMQ. For example, when an order status changes in IBKR, theIBKRExchange
class can publish a message to a specific queue in RabbitMQ to notify other parts of your system about the order status update.OANDA - like IBKR it’s a trading platform (mostly known for CFDs). I will publish more code for OANDA, as for now it’s empty class
OANDAExchange
that will interact with the OANDA API. This class will also use theRabbitMQClient
to send and receive messages to/from RabbitMQ, just like theIBKRExchange
class does.
Orders and flow
The main goal of our OMS is to enable the execution of orders on different exchanges, serving our algorithmic trading approach and signals generated from our backtesting framework. A trading order within the FIX API usually contains the following information:
Security identifier (i.e., its ticker symbol)
Order direction (i.e., buy, sell, or short)
Order size
Order type (e.g., market, limit, stop, etc.)
Order time-in-force (e.g., day order, fill or kill, good-'til-canceled, etc.)
Order routing (e.g., broker, electronic communication network [ECN], at-the-close [ATC], etc.)
The flow of data and messages will be following:
Our trading application or bot generates a trading signal.
The trading signal is published to a specific queue in RabbitMQ using the
RabbitMQClient
.The
IBKRExchange
class (orOANDAExchange
or any other exchange class) listens to the same queue in RabbitMQ and consumes the trading signal message.Based on the received trading signal, the
IBKRExchange
class interacts with the respective trading platform (IBKR) to execute the trade.After executing the trade, the
IBKRExchange
class publishes a message back to RabbitMQ, indicating the status or result of the trade execution.Other components of your system, such as a risk management module or a UI dashboard, can consume the trade status messages from RabbitMQ to perform further actions or display updates.
Installation
Let’s start with the script to run RabbitMQ:
#!/bin/bash
# Set RabbitMQ environment variables if needed
export RABBITMQ_LOG_BASE="/Users/jakub/_logs"
export RABBITMQ_MNESIA_BASE="/Users/jakub/rabbitmq"
# Ensure the log directory exists
mkdir -p "$RABBITMQ_LOG_BASE"
# Ensure the data directory exists (used for Mnesia database)
mkdir -p "$RABBITMQ_MNESIA_BASE"
# Start RabbitMQ service
# Note: 'brew services start' does not allow direct specification of data or log directories,
# as those should be configured in RabbitMQ's configuration files.
brew services start rabbitmq
echo "RabbitMQ service started."
so we created config rabbitmq.json, in _config directory:
{
"rabbitmq": {
"host": "localhost",
"port": 5672,
"username": "guest",
"password": "guest"
}
}
To use RabbitMQ, we will create the RabbitMQClient
class, which will abstract the details of establishing connections and handling RabbitMQ operations, making it easier to integrate RabbitMQ messaging into your trading system. You can reuse this class across different components, such as the IBKRExchange
class and potentially an OANDAExchange
class, to maintain a consistent way of interacting with RabbitMQ.
We will use Pika, a Python library that provides a way to interact with RabbitMQ from your Python code. It simplifies the process of establishing connections, publishing messages to queues, consuming messages from queues, and handling various RabbitMQ operations. If you want to monitor the queue with e.g., Grafana, it's recommended to add plugins, e.g., 'rabbitmq-plugins enable rabbitmq_management'.
The full code is available on a private GitHub for paid subscribers. Please subscribe to support my work and further development of the QuantJourney Trading Framework.
How to install Interactive Brokers for our OMS?
Here we have a bit more complicated situation.
First, we must download IB client (TWS or IB Gateway) from https://www.interactivebrokers.com/en/trading/tws-updateable-latest.php
For beginners, using TWS (Trader Workstation) is recommended for efficient code debugging and result viewing. It provides a visual account activity overview, and you can switch to IB Gateway when familiar with the system.
What is TWS?
TWS is the typical client that IBKR traders use. This client is great as it provides visual confirmation of the many commands you can send to IB via our Python code.
What is IB Gateway?
The IB Gateway is a minimal, out-of-the-box solution that enables connection and saves resources, typically used in application development.
The flow for TWS / IB Gateway is as follows (note that there is no RabbitMQ, as you can also directly connect to TWS):
Start Trader Workstation and select the Paper Trading tab. You should see a red bar that says, "Simulated Trading". Log in to your account.
I recommend using a paper trading account at the beginning. It helps to familiarize yourself with the platform, test strategies, and ensure your automated systems work correctly, without risking real money. Keep in mind that paper trading is a simulated environment and may differ slightly from live trading.
The main difference with a real account is that you don’t have real money on the line with a paper account. Starting with a new broker, it’s common to accidentally submit the wrong order or use an incorrect position size.
It’s important to note that a demo account offers a simulated environment. While it’s precise, there are scenarios where the behavior will be different from a live account. For example, if you trade an asset with a widespread between the bid and ask, and you place your order in the middle of the two, execution results may differ. IB’s algorithms do a decent job determining whether the order is likely to get filled or not, but without sending it to an exchange, there is no way of knowing for sure.
Then, in TWS, please navigate over to Trader Workstation Configuration which can be found within the TWS terminal under Edit – Global Configuration – API – Settings. You should be looking at a screen that looks like this:
Make sure to check off Enable ActiveX and Socket Clients, this is the main requirement for the API.
That’s mostly all you need to enable our Python classes to interact with your IBKR account (live or paper one).
Let’ code
Firstly, let's create the order.py
file with basic classes for orders, instruments, and other related entities. Here's a starting point for the file:
class OrderType(Enum):
MKT = "MKT" # Market order
LMT = "LMT" # Limit order
PTL = "PTL" # Post-only limit order
STP = "STP" # Stop order
STP_LMT = "STP LMT" # Stop-limit order
TRAIL = "TRAIL" # Trailing stop order
LIT = "LIT" # Limit order with time in force
LOC = "LOC" # Limit order on close
LOO = "LOO" # Limit order on open
MOC = "MOC" # Market order on close
MOO = "MOO" # Market order on open
TRAIL_LMT = "TRAIL LMT" # Trailing stop-limit order
class OrderAction(Enum):
BUY = "BUY"
SELL = "SELL"
class OrderStatus(Enum):
NONE = "NONE"
SUBMITTED = "SUBMITTED"
PARTIAL_FILLED = "PARTIAL-FILLED"
FILLED = "FILLED"
CANCELED = "CANCELED"
FAILED = "FAILED"
class TimeInForce(Enum):
GTC = "GTC" # Good till cancel
IOC = "IOC" # Immediate or cancel
FOK = "FOK" # Fill or kill
GTD = "GTD" # Good till date
MOC = "MOC" # Market on close
MOO = "MOO" # Market on open
ALO = "ALO" # Add liquidity only
class InstrumentType(Enum):
STOCK = "STK"
FUTURE = "FUT"
OPTION = "OPT"
FOREX = "FX"
BOND = "BOND"
CFD = "CFD"
Let's add the TradingOrder
class to the order.py
file. Here's the updated code:
class TradingOrder:
def __init__( self,
symbol,
action,
order_type,
order_id=None,
quantity=0,
price=None,
stop_price=None,
tif=TimeInForce.GTC):
"""
Initialize a new instance of the Order class.
Args:
symbol (str): The market symbol (e.g., "AAPL", "GOOGL").
action (OrderAction): BUY or SELL.
order_type (OrderType): LIMIT, MARKET, POST_ONLY, FOK, IOC.
order_id (str, optional): The unique identifier for the order. Defaults to None.
quantity (int): Number of units to trade.
price (float, optional): The price per unit for LIMIT orders. Defaults to None.
stop_price (float, optional): The stop price for stop orders. Defaults to None.
time_in_force (TimeInForce, optional): The time in force for the order. Defaults to GTC.
"""
self.symbol = symbol
self.action = action
self.order_type = order_type
self.order_id = order_id
self.quantity = quantity
self.price = price
self.stop_price = stop_price
self.tif = tif
self.status = OrderStatus.NONE # Default status
self.fee = 0.0 # Default fee
and TradingContract
class:
class TradingContract:
def __init__( self,
symbol,
min_qty,
price_precision,
quote_precision,
quantity_precision,
min_notional,
currency="USD",
instrument_type=InstrumentType.STOCK):
"""
Initialize a new instance of the Contract class.
Args:
symbol (str): The market symbol (e.g., "AAPL", "GOOGL").
min_qty (int): The minimum quantity allowed for trading.
price_precision (int): The number of decimal places for price precision.
quote_precision (int): The number of decimal places for quote precision.
quantity_precision (int): The number of decimal places for quantity precision.
min_notional (float): The minimum notional value allowed for trading.
currency (str, optional): The currency of the contract. Defaults to "USD".
instrument_type (InstrumentType, optional): The type of instrument. Defaults to STOCK.
"""
self.symbol = symbol
self.min_qty = min_qty
self.price_precision = price_precision
self.quote_precision = quote_precision
self.quantity_precision = quantity_precision
self.min_notional = min_notional
self.currency = currency
self.instrument_type = instrument_type
Please note that for IBKR we will use ib_async library (https://github.com/ib-api-reloaded/ib_async) library. It was originally created by Ewald de Wit known for years as ib_insync. Unfortunately Edward passed away in early 2024, locking all his GitHub, so now it’s managed under the name ib_async by Matt Stancliff and some other contributors.
Now, let's create our IBKRExchange
class that initializes both the Interactive Brokers connection using the ib_async
library and the RabbitMQ connection using the RabbitMQClient
class. Here's an example implementation:
class IBKRExchange:
def __init__( self,
config_path_ibkr="./_config/ibkr.json",
config_path_rabbitmq="./_config/rabbitmq.json"):
"""
Initialize a new instance of the IBKRExchange class and connect to the IBKR and RabbitMQ services.
"""
self.ib = IB()
# Set up IBKR
self.config_ibkr = self.load_config(config_path_ibkr)
self.ib.connect()
# Set up RabbitMQ
self.config_rabbitmq = self.load_config(config_path_rabbitmq)
self.setup_rabbitmq_connection()
self.paper_trading_mode = self.config_ibkr.get('trading', {}).get('paper_trading_mode', False)
def load_config(self, path):
try:
with open(path, 'r') as file:
return json.load(file)
except FileNotFoundError:
logger.error(f"Configuration file not found at {path}. Using defaults.")
return {}
def setup_ibkr_connection(self):
"""
Set up a connection to the Interactive Brokers API.
"""
try:
ibkr_config = self.config_ibkr.get('ibkr', {})
host = ibkr_config.get('host', '127.0.0.1')
port = ibkr_config.get('port', 7497)
client_id = ibkr_config.get('client_id', 1)
# Connect to Interactive Brokers asynchronously and wait for the connection to be established
self.ib.connect(host=host, port=port, clientId=client_id)
logger.info("Connected to Interactive Brokers successfully.")
except Exception as e:
logger.error(f"Failed to connect to Interactive Brokers: {e}")
raise e
def setup_rabbitmq_connection(self):
"""
Set up a connection to the RabbitMQ server.
Read more at: <https://pika.readthedocs.io/en/stable/modules/adapters/blocking.html>
"""
try:
rabbitmq_config = self.config_rabbitmq.get('rabbitmq', {})
credentials = pika.PlainCredentials(
rabbitmq_config.get('username', 'guest'),
rabbitmq_config.get('password', 'guest')
)
connection_parameters = pika.ConnectionParameters(
host=rabbitmq_config.get('host', '127.0.0.1'),
port=rabbitmq_config.get('port', 5672),
credentials=credentials
)
self.rabbitmq_connection = pika.BlockingConnection(connection_parameters)
self.rabbitmq_channel = self.rabbitmq_connection.channel()
# Declare necessary queues here
self.rabbitmq_channel.queue_declare(queue='trading_signals')
self.rabbitmq_channel.queue_declare(queue='order_responses')
logger.info("Connected to RabbitMQ at %s:%s", rabbitmq_config.get('host'), rabbitmq_config.get('port'))
except Exception as e:
logger.error(f"Failed to connect to RabbitMQ: {e}")
raise e
We must create config file ‘ikbr.json’ and put it in _config subdirectory:
{
"ibkr": {
"client_id": 1,
"port": 7497,
"host": "127.0.0.1"
},
"trading": {
"paper_trading_mode": true
}
}
Now we will create the place_order()
method in the IBKRExchange
class. This method will be responsible for placing an order on Interactive Brokers and publishing the order details to RabbitMQ. Here's an example implementation:
def place_order(
self,
contract: Contract,
order_type: str = "MKT",
action: str = "BUY",
quantity: int = 1,
limit_price: Optional[float] = None,
stop_price: Optional[float] = None
) -> dict:
"""
Place an order with specified parameters and retrieve detailed order status.
"""
if contract is None:
logger.info("IBKR: Cannot place order: Contract is None.")
return {"orderId": None, "status": "Error", "message": "Contract is None"}
trading_order = TradingOrder(
symbol=contract.symbol,
action=OrderAction[action].value,
order_type=OrderType[order_type].value,
quantity=quantity,
price=limit_price,
stop_price=stop_price,
)
if order_type == "LMT" and limit_price is not None:
trading_order.price = limit_price
if stop_price is not None:
trading_order.stop_price = stop_price
ib_contract = contract.to_ib_contract()
self.paper_trading_mode = False
try:
if self.paper_trading_mode:
logger.info("IBKR: Paper trading mode: Order will be simulated without execution.")
simulated_order_id = f"SIMULATED_{random.randint(1000, 9999)}"
logger.info(f"Order simulated: {simulated_order_id}, Status: SUBMITTED")
return {
"orderId": simulated_order_id,
"status": "Submitted",
"message": "Order simulated in paper trading mode"
}
else:
ib_contract = Stock(contract.symbol, 'SMART', contract.currency)
ib_order = trading_order.to_ib_order()
trade = self.ib.placeOrder(ib_contract, ib_order)
self.ib.sleep(1) # Wait for order to process
order_status = self.get_order_status(trade.order)
if order_status == "Filled":
fill_price = trade.fills[-1].execution.price if trade.fills else None
fill_quantity = trade.fills[-1].execution.shares if trade.fills else None
logger.info(f"Order {trade.order.orderId} filled at price {fill_price}, quantity {fill_quantity}")
return {
"orderId": trade.order.orderId,
"status": order_status,
"message": f"Order filled at price {fill_price}, quantity {fill_quantity}"
}
elif order_status == "Cancelled":
logger.info(f"Order {trade.order.orderId} cancelled")
return {
"orderId": trade.order.orderId,
"status": order_status,
"message": "Order cancelled"
}
else:
logger.info(f"Order {trade.order.orderId} placed with status {order_status}")
return {
"orderId": trade.order.orderId,
"status": order_status,
"message": f"Order placed with status {order_status}"
}
except Exception as e:
logger.error(f"IBKR: Error placing order: {e}")
return {"orderId": None, "status": "Error", "message": str(e)}
and when we execute an order e.g.
contract = TradingContract("AAPL",
min_qty=1,
price_precision=3,
quote_precision=2,
quantity_precision=2,
min_notional=10.00,
instrument_type=InstrumentType.STOCK)
response = exchange.place_order(contract, order_type="MKT", action="BUY", quantity=1)
we will see on our TWS the created order:
and log on the screen:
Order: Trade(contract=Stock(symbol='AAPL', exchange='SMART', currency='USD'), order=MarketOrder(orderId=15, clientId=1, action='BUY', totalQuantity=1), orderStatus=OrderStatus(orderId=15, status='Cancelled', filled=0.0, remaining=0.0, avgFillPrice=0.0, permId=0, parentId=0, lastFillPrice=0.0, clientId=0, whyHeld='', mktCapPrice=0.0), fills=[], log=[TradeLogEntry(time=datetime.datetime(2024, 5, 15, 9, 13, 12, 840275, tzinfo=datetime.timezone.utc), status='PendingSubmit', message='', errorCode=0), TradeLogEntry(time=datetime.datetime(2024, 5, 15, 9, 13, 13, 489256, tzinfo=datetime.timezone.utc), status=''], advancedError='')
In the QuantJourney framework, each class comes with its own unit test class to ensure the functionality of all methods and to provide examples of how to use them.
In the code I have added some more methods such as get_order_status(), adjust_order_size(), modify_order(), cancel_order(), check_order_fill() , set_stop_loss(), set_take_profit().
We have added also some about account as:
def get_account_summary(self) -> Dict:
"""
Retrieve the account summary from the IB trading account.
Returns:
dict: A dictionary containing various account summary metrics such as net liquid value, total cash value,
and unrealized profit/loss.
"""
try:
account_summary = {summary.tag: summary.value for summary in self.ib.accountSummary()}
logger.info("IBKR: Account summary retrieved successfully.")
return account_summary
except Exception as e:
logger.error(f"IBKR: Failed to retrieve account summary: {e}")
return {}
def account_values(self) -> List[Dict]:
"""
Retrieve account values from the IB trading account.
"""
account_values = self.ib.accountValues()
# Define the key tags we are interested in
important_tags = {
'CashBalance', 'NetLiquidation', 'AvailableFunds', 'BuyingPower',
'EquityWithLoanValue', 'ExcessLiquidity', 'FullInitMarginReq',
'FullMaintMarginReq', 'GrossPositionValue', 'UnrealizedPnL'
}
account_values_data = []
for value in account_values:
if value.tag in important_tags and value.currency == 'USD':
account_values_data.append({
"tag": value.tag,
"value": value.value
})
return account_values_data
Other method to get data from IBKR:
async def get_real_time_data( self,
contract: Contract,
data_type: str = "TRADES"
) -> pd.DataFrame:
"""
Retrieve real-time market data for a given security.
Args:
contract (Contract): The contract object representing the security.
data_type (str): The type of real-time data to retrieve (e.g., "TRADES", "MIDPOINT", "LEVEL2").
Returns:
pd.DataFrame: A DataFrame containing the real-time market data for the security.
"""
try:
ticks = await self.ib.reqTickDataAsync(contract, data_type)
if not ticks:
raise ValueError("No market data received.")
df = util.df(ticks)
return df
except Exception as e:
logger.error(f"IBKR: Error retrieving real-time data: {e}")
return pd.DataFrame()
async def get_historical_data( self,
contract: Contract,
end_date: datetime,
duration: str = "1 Y",
bar_size: str = "1 day",
what_to_show: str = "TRADES"
) -> pd.DataFrame:
"""
Retrieve historical data for a given security.
Args:
contract (Contract): The contract object representing the security.
end_date (datetime): The end date for the historical data.
duration (str): The duration for which to retrieve historical data (e.g., "1 Y" for 1 year).
bar_size (str): The bar size for the historical data (e.g., "1 day", "1 hour").
what_to_show (str): The type of historical data to retrieve (e.g., "TRADES", "MIDPOINT").
Returns:
pd.DataFrame: A DataFrame containing the historical data for the security.
"""
try:
bars = await self.ib.reqHistoricalDataAsync(
contract,
endDateTime=end_date,
durationStr=duration,
barSizeSetting=bar_size,
whatToShow=what_to_show,
useRTH=True,
formatDate=1
)
df = util.df(bars)
return df
except Exception as e:
logger.error(f"IBKR: Error retrieving historical data: {e}")
return pd.DataFrame()
You may modify code to run IBKRExchange directly from your algo or algo-bot, but here is a bit more about RabbitMQ.
RabbitMQ
RabbitMQ serves as a message queue facilitating communication between various components of the trading system. The RabbitMQClient
class simplifies interactions with RabbitMQ, while the IBKRExchange
class leverages the RabbitMQClient
to listen for trading signals and execute trades accordingly. The IBKRExchange
class focuses on the specific logic for interacting with the Interactive Brokers trading platform, and the RabbitMQClient
class handles RabbitMQ communication.
The key methods of RabbitMQClient
are:
publish_trading_signal()
: Publishes trading signals to the "trading_signals" queue.consume_order_responses()
: Consumes order responses from the "order_responses" queue.
def setup_rabbitmq_connection(self):
"""
Set up a connection to the RabbitMQ server.
Read more at: <https://pika.readthedocs.io/en/stable/modules/adapters/blocking.html>
"""
try:
rabbitmq_config = self.config_rabbitmq.get('rabbitmq', {})
credentials = pika.PlainCredentials(
rabbitmq_config.get('username', 'guest'),
rabbitmq_config.get('password', 'guest')
)
connection_parameters = pika.ConnectionParameters(
host=rabbitmq_config.get('host', '127.0.0.1'),
port=rabbitmq_config.get('port', 5672),
credentials=credentials
)
self.rabbitmq_connection = pika.BlockingConnection(connection_parameters)
self.rabbitmq_channel = self.rabbitmq_connection.channel()
# Declare necessary queues here
self.rabbitmq_channel.queue_declare(queue='trading_signals')
self.rabbitmq_channel.queue_declare(queue='order_responses')
logger.info("Connected to RabbitMQ at %s:%s", rabbitmq_config.get('host'), rabbitmq_config.get('port'))
except Exception as e:
logger.error(f"Failed to connect to RabbitMQ: {e}")
raise e
So if you program wants to send request to proceed_order, it must do it with:
from rabbitmq_client import RabbitMQClient
def send_trading_signal(signal):
try:
# Create an instance of the RabbitMQClient
rabbitmq_client = RabbitMQClient()
# Define the queue to send the signal to
queue_name = 'trading_signals'
# Convert the signal to JSON format
message = json.dumps(signal)
# Publish the signal to the specified queue
rabbitmq_client.publish_message(exchange='IBKR', routing_key=queue_name, message=message)
logger.info(f"Sent trading signal: {signal}")
except Exception as e:
logger.error(f"Error sending trading signal: {e}")
finally:
# Close the RabbitMQ connection
rabbitmq_client.close_connection()
signal = {
'symbol': 'AAPL',
'action': 'BUY',
'quantity': 10,
'order_type': 'MKT'
}
send_trading_signal(signal)
The IBKRExchange
class subscribes to RabbitMQ queues and acts upon the received messages using the following methods, which are initiated during the class initialization:
def is_rabbitmq_available(self , host, port):
"""
Check if RabbitMQ is available on the specified host and port.
Args:
host (str): The hostname or IP address of the RabbitMQ server.
port (int): The port number of the RabbitMQ server.
Returns:
bool: True if RabbitMQ is available, False otherwise.
"""
try:
# Create a socket and attempt to connect to the RabbitMQ server
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((host, port))
sock.close()
return True
except socket.error:
return False
def listen_for_signals(self):
"""
Start listening for trading signals from RabbitMQ.
Note: This method will block the main thread and wait for incoming signals.
Channels are created and queues are declared for trading signals and order responses.
On initalisation we've created the following queues:
- trading_signals
- order_responses
"""
if not self.is_rabbitmq_available(self.rabbitmq_host, self.rabbitmq_port):
logger.error("RabbitMQ is not available or not running.")
self.logger.error("RabbitMQ is not available or not running.")
return
try:
connection_parameters = pika.ConnectionParameters(host=self.rabbitmq_host, port=self.rabbitmq_port)
connection = pika.BlockingConnection(connection_parameters)
channel = connection.channel()
channel.queue_declare(queue='trading_signals')
def callback(ch, method, properties, body):
try:
signal = json.loads(body)
logger.info(f"Received signal: {signal}")
self.logger.info(f"Received signal: {signal}")
# Process the signal to place an order
asyncio.run(self.process_signal(signal))
except Exception as e:
logger.error(f"Error processing trading signal: {e}")
self.logger.error(f"Error processing trading signal: {e}")
channel.basic_consume(queue='trading_signals', on_message_callback=callback, auto_ack=True)
logger.info('Waiting for trading signals. To exit press CTRL+C')
self.logger.info('Waiting for trading signals. To exit press CTRL+C')
channel.start_consuming()
except Exception as e:
logger.error(f"Error connecting to RabbitMQ: {e}")
self.logger.error(f"Error connecting to RabbitMQ: {e}")
async def process_signal(self, signal):
"""
Process the trading signal to execute trades.
Args:
signal (dict): The trading signal containing details for the trade.
"""
try:
# Example: Extract details from the signal
symbol = signal.get('symbol')
action = signal.get('action') # BUY or SELL
quantity = signal.get('quantity')
order_type = signal.get('order_type', 'MKT') # Default to Market Order
# Get the contract details
contract = self.get_contract_details(symbol)
if contract:
# Place the order based on the signal
order_response = self.place_order(contract, order_type, action, quantity)
logger.info(f"Order Response: {order_response}")
self.logger.info(f"Order Response: {order_response}")
else:
logger.error(f"Failed to get contract details for {symbol}")
self.logger.error(f"Failed to get contract details for {symbol}")
except Exception as e:
logger.error(f"Error processing trading signal: {e}")
self.logger.error(f"Error processing trading signal: {e}")