Skip to Content

Trading Operations (Python)

This guide shows how to place and cancel orders using Python.

Prerequisites

Make sure you have:

Fetching Market Info

Before trading, get market decimals for price/size conversion:

import requests API_URL = "https://zo-devnet.n1.xyz" def get_market_info(): resp = requests.get(f"{API_URL}/info") info = resp.json() markets = {} for market in info["markets"]: markets[market["marketId"]] = { "symbol": market["symbol"], "price_decimals": market["priceDecimals"], "size_decimals": market["sizeDecimals"] } return markets markets = get_market_info() print(f"BTC market: {markets[0]}") # Output: {'symbol': 'BTCUSD', 'price_decimals': 1, 'size_decimals': 4}

Placing a Limit Order

import requests import schema_pb2 API_URL = "https://zo-devnet.n1.xyz" def place_limit_order(session_id, session_key, market_id, side, price, size): """ Place a limit order. Args: session_id: Active session ID session_key: Session signing key market_id: Market to trade (0 = BTC) side: schema_pb2.Side.BID or schema_pb2.Side.ASK price: Price in USD (will be converted to raw) size: Size in base units (will be converted to raw) """ # Get market decimals markets = get_market_info() market = markets[market_id] # Convert to raw values price_raw = int(price * (10 ** market["price_decimals"])) size_raw = int(size * (10 ** market["size_decimals"])) # Build action server_time = int(requests.get(f"{API_URL}/timestamp").json()) action = schema_pb2.Action() action.current_timestamp = server_time action.place_order.session_id = session_id action.place_order.market_id = market_id action.place_order.side = side action.place_order.fill_mode = schema_pb2.FillMode.LIMIT action.place_order.price = price_raw action.place_order.size = size_raw # Execute with session signing receipt = execute_action(action, session_key, session_sign) if receipt.HasField("err"): error_name = schema_pb2.Error.Name(receipt.err) raise Exception(f"PlaceOrder failed: {error_name}") result = receipt.place_order_result if result.HasField("posted"): print(f"Order posted! ID: {result.posted.order_id}") return result.posted.order_id if result.fills: print(f"Order filled! {len(result.fills)} fills") return None return None # Example usage order_id = place_limit_order( session_id=session_id, session_key=session_key, market_id=0, # BTC side=schema_pb2.Side.BID, # Buy price=90000.0, size=0.001 )

Placing a Market Order

def place_market_order(session_id, session_key, market_id, side, size): """Place a market order (no price specified).""" markets = get_market_info() market = markets[market_id] size_raw = int(size * (10 ** market["size_decimals"])) server_time = int(requests.get(f"{API_URL}/timestamp").json()) action = schema_pb2.Action() action.current_timestamp = server_time action.place_order.session_id = session_id action.place_order.market_id = market_id action.place_order.side = side action.place_order.fill_mode = schema_pb2.FillMode.FILL_OR_KILL # FOK for market orders action.place_order.size = size_raw # Note: no price for market orders receipt = execute_action(action, session_key, session_sign) if receipt.HasField("err"): raise Exception(f"Error: {schema_pb2.Error.Name(receipt.err)}") print(f"Market order executed! Fills: {len(receipt.place_order_result.fills)}") return receipt.place_order_result

Canceling an Order

def cancel_order(session_id, session_key, order_id): """Cancel an existing order.""" server_time = int(requests.get(f"{API_URL}/timestamp").json()) action = schema_pb2.Action() action.current_timestamp = server_time action.cancel_order_by_id.session_id = session_id action.cancel_order_by_id.order_id = order_id receipt = execute_action(action, session_key, session_sign) if receipt.HasField("err"): raise Exception(f"Error: {schema_pb2.Error.Name(receipt.err)}") print(f"Order {order_id} cancelled!") return receipt # Example cancel_order(session_id, session_key, order_id)

Canceling by Client Order ID

Cancel orders using the client_order_id you specified when placing the order.

def cancel_order_by_client_id(session_id, session_key, client_order_id, account_id=None): """ Cancel an order by its client order ID. Args: session_id: Active session ID session_key: Session signing key client_order_id: The client_order_id you specified when placing the order account_id: Optional account ID (defaults to first account) """ server_time = int(requests.get(f"{API_URL}/timestamp").json()) action = schema_pb2.Action() action.current_timestamp = server_time action.cancel_order_by_client_id.session_id = session_id action.cancel_order_by_client_id.client_order_id = client_order_id if account_id is not None: action.cancel_order_by_client_id.sender_account_id = account_id receipt = execute_action(action, session_key, session_sign) if receipt.HasField("err"): raise Exception(f"Error: {schema_pb2.Error.Name(receipt.err)}") print(f"Order with client_order_id {client_order_id} cancelled!") return receipt # Example: Place order with client_order_id, then cancel by it my_order_id = 12345 # Your own tracking ID # Place order with client_order_id action = schema_pb2.Action() action.current_timestamp = int(requests.get(f"{API_URL}/timestamp").json()) action.place_order.session_id = session_id action.place_order.market_id = 0 action.place_order.side = schema_pb2.Side.BID action.place_order.fill_mode = schema_pb2.FillMode.LIMIT action.place_order.price = int(90000 * 10) # Assuming 1 price decimal action.place_order.size = int(0.001 * 10000) # Assuming 4 size decimals action.place_order.client_order_id = my_order_id # Your tracking ID receipt = execute_action(action, session_key, session_sign) # Later, cancel by client_order_id cancel_order_by_client_id(session_id, session_key, my_order_id)

Complete Trading Script

import json import requests import binascii from google.protobuf.internal import encoder, decoder from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey from base58 import b58encode import schema_pb2 API_URL = "https://zo-devnet.n1.xyz" # Helper functions def get_varint_bytes(value): return encoder._VarintBytes(value) def read_varint(buffer, offset=0): return decoder._DecodeVarint32(buffer, offset) def user_sign(key, msg): return key.sign(binascii.hexlify(msg)) def session_sign(key, msg): return key.sign(msg) def execute_action(action, signing_key, sign_func): payload = action.SerializeToString() message = get_varint_bytes(len(payload)) + payload signature = sign_func(signing_key, message) resp = requests.post( f"{API_URL}/action", data=message + signature, headers={"Content-Type": "application/octet-stream"} ) resp.raise_for_status() msg_len, pos = read_varint(resp.content, 0) receipt = schema_pb2.Receipt() receipt.ParseFromString(resp.content[pos:pos + msg_len]) return receipt def main(): # Load keys with open("id.json", "r") as f: key_data = json.load(f) user_key = Ed25519PrivateKey.from_private_bytes(bytes(key_data[:32])) session_key = Ed25519PrivateKey.generate() # Create session server_time = int(requests.get(f"{API_URL}/timestamp").json()) action = schema_pb2.Action() action.current_timestamp = server_time action.create_session.user_pubkey = user_key.public_key().public_bytes_raw() action.create_session.session_pubkey = session_key.public_key().public_bytes_raw() action.create_session.expiry_timestamp = server_time + 3600 receipt = execute_action(action, user_key, user_sign) session_id = receipt.create_session_result.session_id print(f"Session: {session_id}") # Get market info info = requests.get(f"{API_URL}/info").json() market = info["markets"][0] price_dec = market["priceDecimals"] size_dec = market["sizeDecimals"] print(f"Trading {market['symbol']}") # Place order server_time = int(requests.get(f"{API_URL}/timestamp").json()) action = schema_pb2.Action() action.current_timestamp = server_time action.place_order.session_id = session_id action.place_order.market_id = 0 action.place_order.side = schema_pb2.Side.BID action.place_order.fill_mode = schema_pb2.FillMode.LIMIT action.place_order.price = int(90000 * (10 ** price_dec)) action.place_order.size = int(0.001 * (10 ** size_dec)) receipt = execute_action(action, session_key, session_sign) if receipt.HasField("err"): print(f"Error: {schema_pb2.Error.Name(receipt.err)}") elif receipt.place_order_result.HasField("posted"): order_id = receipt.place_order_result.posted.order_id print(f"Order placed: {order_id}") # Cancel it server_time = int(requests.get(f"{API_URL}/timestamp").json()) action = schema_pb2.Action() action.current_timestamp = server_time action.cancel_order_by_id.session_id = session_id action.cancel_order_by_id.order_id = order_id receipt = execute_action(action, session_key, session_sign) print("Order cancelled!") if __name__ == "__main__": main()

Atomic Operations

Execute multiple place/cancel actions in a single atomic transaction.

[!IMPORTANT] Constraints:

  • Maximum 4 operations per atomic transaction
  • Ordering per market: Cancels → Trades → Places (cancels must come first, placements last)
  • Across different markets, order can be any
def atomic_cancel_and_place(session_id, session_key, cancel_order_ids, new_orders): """ Atomically cancel orders and place new ones. Args: cancel_order_ids: List of order IDs to cancel new_orders: List of dicts with {market_id, side, price, size} """ server_time = int(requests.get(f"{API_URL}/timestamp").json()) markets = get_market_info() action = schema_pb2.Action() action.current_timestamp = server_time action.atomic.session_id = session_id # Add cancel sub-actions for order_id in cancel_order_ids: sub = action.atomic.actions.add() sub.cancel_order.order_id = order_id # Add place sub-actions (using trade_or_place) for order in new_orders: market = markets[order["market_id"]] sub = action.atomic.actions.add() sub.trade_or_place.market_id = order["market_id"] sub.trade_or_place.order_type.side = order["side"] sub.trade_or_place.order_type.fill_mode = schema_pb2.FillMode.LIMIT sub.trade_or_place.order_type.is_reduce_only = False sub.trade_or_place.limit.price = int(order["price"] * (10 ** market["price_decimals"])) sub.trade_or_place.limit.size = int(order["size"] * (10 ** market["size_decimals"])) receipt = execute_action(action, session_key, session_sign) if receipt.HasField("err"): raise Exception(f"Atomic failed: {schema_pb2.Error.Name(receipt.err)}") print(f"Atomic action executed! Action ID: {receipt.action_id}") return receipt # Example: Cancel old orders and place new quotes atomic_cancel_and_place( session_id=session_id, session_key=session_key, cancel_order_ids=[old_bid_id, old_ask_id], new_orders=[ {"market_id": 0, "side": schema_pb2.Side.BID, "price": 89900, "size": 0.01}, {"market_id": 0, "side": schema_pb2.Side.ASK, "price": 90100, "size": 0.01}, ] )

Trigger Orders (Stop-Loss / Take-Profit)

[!CAUTION] Experimental Feature: Trigger orders (TP/SL) are currently unstable and may change. Use with caution in production.

Add Trigger

def add_trigger(session_id, session_key, market_id, side, kind, trigger_price, limit_price=None): """ Add a stop-loss or take-profit trigger. Args: kind: schema_pb2.TriggerKind.STOP_LOSS or TAKE_PROFIT trigger_price: Price that activates the trigger limit_price: Optional limit price for the triggered order """ server_time = int(requests.get(f"{API_URL}/timestamp").json()) markets = get_market_info() market = markets[market_id] action = schema_pb2.Action() action.current_timestamp = server_time action.add_trigger.session_id = session_id action.add_trigger.market_id = market_id # TriggerKey contains kind and side action.add_trigger.key.kind = kind action.add_trigger.key.side = side # TriggerPrices contains trigger_price and optional limit_price action.add_trigger.prices.trigger_price = int(trigger_price * (10 ** market["price_decimals"])) if limit_price: action.add_trigger.prices.limit_price = int(limit_price * (10 ** market["price_decimals"])) receipt = execute_action(action, session_key, session_sign) if receipt.HasField("err"): raise Exception(f"AddTrigger failed: {schema_pb2.Error.Name(receipt.err)}") print(f"Trigger added! Action ID: {receipt.action_id}") return receipt # Example: Add stop-loss at $85,000 add_trigger( session_id=session_id, session_key=session_key, market_id=0, side=schema_pb2.Side.ASK, # Sell to close long kind=schema_pb2.TriggerKind.STOP_LOSS, trigger_price=85000, limit_price=84900 # Slight slippage protection )

Remove Trigger

def remove_trigger(session_id, session_key, market_id, side, kind): """Remove an existing trigger.""" server_time = int(requests.get(f"{API_URL}/timestamp").json()) action = schema_pb2.Action() action.current_timestamp = server_time action.remove_trigger.session_id = session_id action.remove_trigger.market_id = market_id # TriggerKey contains kind and side action.remove_trigger.key.kind = kind action.remove_trigger.key.side = side receipt = execute_action(action, session_key, session_sign) if receipt.HasField("err"): raise Exception(f"RemoveTrigger failed: {schema_pb2.Error.Name(receipt.err)}") print("Trigger removed!") return receipt

Next Steps

Last updated on