diff --git a/polygon/__init__.py b/polygon/__init__.py index da6c5136..4572d3ba 100644 --- a/polygon/__init__.py +++ b/polygon/__init__.py @@ -1,6 +1,7 @@ from .rest import RESTClient from .rest.base import version from .websocket import WebSocketClient +from .rest.futures import FuturesClient from .exceptions import * __version__ = version diff --git a/polygon/rest/__init__.py b/polygon/rest/__init__.py index 7484378e..cb242d7d 100644 --- a/polygon/rest/__init__.py +++ b/polygon/rest/__init__.py @@ -4,6 +4,7 @@ from .snapshot import SnapshotClient from .indicators import IndicatorsClient from .summaries import SummariesClient +from .futures import FuturesClient from .reference import ( MarketsClient, TickersClient, @@ -35,6 +36,7 @@ class RESTClient( ContractsClient, IndicatorsClient, SummariesClient, + FuturesClient, ): def __init__( self, diff --git a/polygon/rest/futures.py b/polygon/rest/futures.py new file mode 100644 index 00000000..9b0ae3aa --- /dev/null +++ b/polygon/rest/futures.py @@ -0,0 +1,409 @@ +from .base import BaseClient +from typing import Optional, Any, Dict, List, Union, Iterator +from .models.futures import ( + FuturesAggregate, + FuturesContract, + FuturesMarketStatus, + FuturesProduct, + FuturesSchedule, + FuturesQuote, + FuturesTrade, +) +from .models import Sort, Order +from urllib3 import HTTPResponse +from datetime import datetime, date +from .models.request import RequestOptionBuilder + + +class FuturesClient(BaseClient): + def list_futures_aggs( + self, + ticker: str, + resolution: str, + window_start: Optional[Union[str, int, datetime, date]] = None, + window_start_gte: Optional[Union[str, int, datetime, date]] = None, + window_start_gt: Optional[Union[str, int, datetime, date]] = None, + window_start_lte: Optional[Union[str, int, datetime, date]] = None, + window_start_lt: Optional[Union[str, int, datetime, date]] = None, + order: Optional[Union[str, Order]] = "desc", + limit: Optional[int] = 1000, + sort: Optional[Union[str, Sort]] = "timestamp", + params: Optional[Dict[str, Any]] = None, + raw: bool = False, + options: Optional[RequestOptionBuilder] = None, + ) -> Union[Iterator[FuturesAggregate], HTTPResponse]: + """ + List aggregates for a futures contract in a given time range with pagination. + + :param ticker: The futures contract identifier (e.g., "ESZ4"). + :param resolution: The resolution of the aggregates (e.g., "1Min", "1D"). + :param window_start: Query by window start timestamp (YYYY-MM-DD or nanosecond timestamp). + :param window_start_gte: Window start greater than or equal to. + :param window_start_gt: Window start greater than. + :param window_start_lte: Window start less than or equal to. + :param window_start_lt: Window start less than. + :param order: Order results (asc or desc). + :param limit: Limit the number of results per page (default 1000, max 50000). + :param sort: Sort field (default "timestamp"). + :param params: Additional query params. + :param raw: Return raw HTTPResponse if True. + :return: Iterator of FuturesAggregate objects or HTTPResponse if raw=True. + """ + url = f"/futures/vX/aggs/{ticker}" + return self._paginate( + path=url, + params=self._get_params( + self.list_futures_aggs, locals(), datetime_res="nanos" + ), + raw=raw, + deserializer=FuturesAggregate.from_dict, + options=options, + ) + + def list_futures_contracts( + self, + product_code: Optional[str] = None, + first_trade_date: Optional[Union[str, date]] = None, + last_trade_date: Optional[Union[str, date]] = None, + as_of: Optional[Union[str, date]] = None, + active: Optional[str] = "all", + type: Optional[str] = "all", + order: Optional[Union[str, Order]] = "asc", + limit: Optional[int] = 100, + sort: Optional[Union[str, Sort]] = "product_code", + params: Optional[Dict[str, Any]] = None, + raw: bool = False, + options: Optional[RequestOptionBuilder] = None, + ) -> Union[Iterator[FuturesContract], HTTPResponse]: + """ + List futures contracts based on various parameters with pagination. + + :param product_code: Filter by product code. + :param first_trade_date: Filter by first trade date (YYYY-MM-DD). + :param last_trade_date: Filter by last trade date (YYYY-MM-DD). + :param as_of: Point-in-time date (YYYY-MM-DD). + :param active: Filter by active status ("all", "true", "false"). + :param type: Filter by contract type ("all", "single", "combo"). + :param order: Order results (asc or desc). + :param limit: Limit the number of results per page (default 100, max 1000). + :param sort: Sort field (default "product_code"). + :param params: Additional query params. + :param raw: Return raw HTTPResponse if True. + :return: Iterator of FuturesContract objects or HTTPResponse if raw=True. + """ + url = "/futures/vX/contracts" + return self._paginate( + path=url, + params=self._get_params(self.list_futures_contracts, locals()), + raw=raw, + deserializer=FuturesContract.from_dict, + options=options, + ) + + def get_futures_contract( + self, + ticker: str, + as_of: Optional[Union[str, date]] = None, + params: Optional[Dict[str, Any]] = None, + raw: bool = False, + options: Optional[RequestOptionBuilder] = None, + ) -> Union[FuturesContract, HTTPResponse]: + """ + Get details for a single futures contract. + + :param ticker: The ticker symbol of the contract (e.g., "ESZ4"). + :param as_of: Point-in-time date (YYYY-MM-DD). + :param params: Additional query params. + :param raw: Return raw HTTPResponse if True. + :return: FuturesContract object or HTTPResponse if raw=True. + """ + url = f"/futures/vX/contracts/{ticker}" + return self._get( + path=url, + params=self._get_params(self.get_futures_contract, locals()), + result_key="results", + deserializer=FuturesContract.from_dict, + raw=raw, + options=options, + ) + + def list_futures_market_statuses( + self, + product_code_any_of: Optional[List[str]] = None, + order: Optional[Union[str, Order]] = "asc", + limit: Optional[int] = 100, + sort: Optional[Union[str, Sort]] = "product_code", + params: Optional[Dict[str, Any]] = None, + raw: bool = False, + options: Optional[RequestOptionBuilder] = None, + ) -> Union[Iterator[FuturesMarketStatus], HTTPResponse]: + """ + List current market statuses for futures products with pagination. + + :param product_code_any_of: Comma-separated list of product codes. + :param order: Order results (asc or desc). + :param limit: Limit the number of results per page (default 100, max 1000). + :param sort: Sort field (default "product_code"). + :param params: Additional query params. + :param raw: Return raw HTTPResponse if True. + :return: Iterator of FuturesMarketStatus objects or HTTPResponse if raw=True. + """ + url = "/futures/vX/market-status" + return self._paginate( + path=url, + params=self._get_params(self.list_futures_market_statuses, locals()), + raw=raw, + deserializer=FuturesMarketStatus.from_dict, + options=options, + ) + + def list_futures_products( + self, + name: Optional[str] = None, + as_of: Optional[Union[str, date]] = None, + exchange_code: Optional[str] = None, + sector: Optional[str] = None, + sub_sector: Optional[str] = None, + asset_class: Optional[str] = None, + asset_sub_class: Optional[str] = None, + type: Optional[str] = "all", + name_search: Optional[str] = None, + order: Optional[Union[str, Order]] = "asc", + limit: Optional[int] = 100, + sort: Optional[Union[str, Sort]] = "name", + params: Optional[Dict[str, Any]] = None, + raw: bool = False, + options: Optional[RequestOptionBuilder] = None, + ) -> Union[Iterator[FuturesProduct], HTTPResponse]: + """ + List futures products based on various parameters with pagination. + + :param name: Filter by product name (exact match). + :param as_of: Point-in-time date (YYYY-MM-DD). + :param exchange_code: Filter by exchange code (MIC). + :param sector: Filter by sector. + :param sub_sector: Filter by sub-sector. + :param asset_class: Filter by asset class. + :param asset_sub_class: Filter by asset sub-class. + :param type: Filter by product type ("all", "single", "combo"). + :param name_search: Search by name (partial match). + :param order: Order results (asc or desc). + :param limit: Limit the number of results per page (default 100, max 1000). + :param sort: Sort field (default "name"). + :param params: Additional query params. + :param raw: Return raw HTTPResponse if True. + :return: Iterator of FuturesProduct objects or HTTPResponse if raw=True. + """ + url = "/futures/vX/products" + return self._paginate( + path=url, + params=self._get_params(self.list_futures_products, locals()), + raw=raw, + deserializer=FuturesProduct.from_dict, + options=options, + ) + + def get_futures_product( + self, + product_code: str, + type: Optional[str] = "single", + as_of: Optional[Union[str, date]] = None, + params: Optional[Dict[str, Any]] = None, + raw: bool = False, + options: Optional[RequestOptionBuilder] = None, + ) -> Union[FuturesProduct, HTTPResponse]: + """ + Get details for a single futures product. + + :param product_code: The unique identifier for the product. + :param type: Product type ("single" or "combo"). + :param as_of: Point-in-time date (YYYY-MM-DD). + :param params: Additional query params. + :param raw: Return raw HTTPResponse if True. + :return: FuturesProduct object or HTTPResponse if raw=True. + """ + url = f"/futures/vX/products/{product_code}" + return self._get( + path=url, + params=self._get_params(self.get_futures_product, locals()), + result_key="results", + deserializer=FuturesProduct.from_dict, + raw=raw, + options=options, + ) + + def list_futures_product_schedules( + self, + product_code: str, + session_end_date: Optional[Union[str, date]] = None, + session_end_date_gte: Optional[Union[str, date]] = None, + session_end_date_gt: Optional[Union[str, date]] = None, + session_end_date_lte: Optional[Union[str, date]] = None, + session_end_date_lt: Optional[Union[str, date]] = None, + order: Optional[Union[str, Order]] = "desc", + limit: Optional[int] = 100, + params: Optional[Dict[str, Any]] = None, + raw: bool = False, + options: Optional[RequestOptionBuilder] = None, + ) -> Union[Iterator[FuturesSchedule], HTTPResponse]: + """ + List trading schedules for a specific futures product with pagination. + + :param product_code: The product code for the futures product. + :param session_end_date: Filter by session end date (YYYY-MM-DD). + :param session_end_date_gte: Session end date greater than or equal to. + :param session_end_date_gt: Session end date greater than. + :param session_end_date_lte: Session end date less than or equal to. + :param session_end_date_lt: Session end date less than. + :param order: Order results (asc or desc). + :param limit: Limit the number of results per page (default 100, max 1000). + :param params: Additional query params. + :param raw: Return raw HTTPResponse if True. + :return: Iterator of FuturesSchedule objects or HTTPResponse if raw=True. + """ + url = f"/futures/vX/products/{product_code}/schedules" + return self._paginate( + path=url, + params=self._get_params(self.list_futures_product_schedules, locals()), + raw=raw, + deserializer=FuturesSchedule.from_dict, + options=options, + ) + + def list_futures_quotes( + self, + ticker: str, + timestamp: Optional[Union[str, int, datetime, date]] = None, + timestamp_gte: Optional[Union[str, int, datetime, date]] = None, + timestamp_gt: Optional[Union[str, int, datetime, date]] = None, + timestamp_lte: Optional[Union[str, int, datetime, date]] = None, + timestamp_lt: Optional[Union[str, int, datetime, date]] = None, + session_start_date: Optional[Union[str, date]] = None, + session_start_date_gte: Optional[Union[str, date]] = None, + session_start_date_gt: Optional[Union[str, date]] = None, + session_start_date_lte: Optional[Union[str, date]] = None, + session_start_date_lt: Optional[Union[str, date]] = None, + order: Optional[Union[str, Order]] = "desc", + limit: Optional[int] = 1000, + sort: Optional[Union[str, Sort]] = "timestamp", + params: Optional[Dict[str, Any]] = None, + raw: bool = False, + options: Optional[RequestOptionBuilder] = None, + ) -> Union[Iterator[FuturesQuote], HTTPResponse]: + """ + List quotes for a futures contract in a given time range with pagination. + + :param ticker: The futures contract identifier (e.g., "ESZ4"). + :param timestamp: Query by quote timestamp (YYYY-MM-DD or nanosecond timestamp). + :param timestamp_gte: Timestamp greater than or equal to. + :param timestamp_gt: Timestamp greater than. + :param timestamp_lte: Timestamp less than or equal to. + :param timestamp_lt: Timestamp less than. + :param session_start_date: Query by session start date (YYYY-MM-DD). + :param session_start_date_gte: Session start date greater than or equal to. + :param session_start_date_gt: Session start date greater than. + :param session_start_date_lte: Session start date less than or equal to. + :param session_start_date_lt: Session start date less than. + :param order: Order results (asc or desc). + :param limit: Limit the number of results per page (default 1000, max 50000). + :param sort: Sort field (default "timestamp"). + :param params: Additional query params. + :param raw: Return raw HTTPResponse if True. + :return: Iterator of FuturesQuote objects or HTTPResponse if raw=True. + """ + url = f"/futures/vX/quotes/{ticker}" + return self._paginate( + path=url, + params=self._get_params( + self.list_futures_quotes, locals(), datetime_res="nanos" + ), + raw=raw, + deserializer=FuturesQuote.from_dict, + options=options, + ) + + def list_futures_schedules_by_session_start_date( + self, + session_start_date: Optional[Union[str, date]] = None, + market_identifier_code: Optional[str] = None, + order: Optional[Union[str, Order]] = "desc", + limit: Optional[int] = 10, + sort: Optional[Union[str, Sort]] = "session_start_date", + params: Optional[Dict[str, Any]] = None, + raw: bool = False, + options: Optional[RequestOptionBuilder] = None, + ) -> Union[Iterator[FuturesSchedule], HTTPResponse]: + """ + List trading schedules by session start date across all products with pagination. + + :param session_start_date: Filter by session start date (YYYY-MM-DD). + :param market_identifier_code: Filter by MIC of the exchange. + :param order: Order results (asc or desc). + :param limit: Limit the number of results per page (default 10, max 1000). + :param sort: Sort field (default "session_start_date"). + :param params: Additional query params. + :param raw: Return raw HTTPResponse if True. + :return: Iterator of FuturesSchedule objects or HTTPResponse if raw=True. + """ + url = "/futures/vX/schedules" + return self._paginate( + path=url, + params=self._get_params( + self.list_futures_schedules_by_session_start_date, locals() + ), + raw=raw, + deserializer=FuturesSchedule.from_dict, + options=options, + ) + + def list_futures_trades( + self, + ticker: str, + timestamp: Optional[Union[str, int, datetime, date]] = None, + timestamp_gte: Optional[Union[str, int, datetime, date]] = None, + timestamp_gt: Optional[Union[str, int, datetime, date]] = None, + timestamp_lte: Optional[Union[str, int, datetime, date]] = None, + timestamp_lt: Optional[Union[str, int, datetime, date]] = None, + session_start_date: Optional[Union[str, date]] = None, + session_start_date_gte: Optional[Union[str, date]] = None, + session_start_date_gt: Optional[Union[str, date]] = None, + session_start_date_lte: Optional[Union[str, date]] = None, + session_start_date_lt: Optional[Union[str, date]] = None, + order: Optional[Union[str, Order]] = "desc", + limit: Optional[int] = 1000, + sort: Optional[Union[str, Sort]] = "timestamp", + params: Optional[Dict[str, Any]] = None, + raw: bool = False, + options: Optional[RequestOptionBuilder] = None, + ) -> Union[Iterator[FuturesTrade], HTTPResponse]: + """ + List trades for a futures contract in a given time range with pagination. + + :param ticker: The futures contract identifier (e.g., "ESZ4"). + :param timestamp: Query by trade timestamp (YYYY-MM-DD or nanosecond timestamp). + :param timestamp_gte: Timestamp greater than or equal to. + :param timestamp_gt: Timestamp greater than. + :param timestamp_lte: Timestamp less than or equal to. + :param timestamp_lt: Timestamp less than. + :param session_start_date: Query by session start date (YYYY-MM-DD). + :param session_start_date_gte: Session start date greater than or equal to. + :param session_start_date_gt: Session start date greater than. + :param session_start_date_lte: Session start date less than or equal to. + :param session_start_date_lt: Session start date less than. + :param order: Order results (asc or desc). + :param limit: Limit the number of results per page (default 1000, max 50000). + :param sort: Sort field (default "timestamp"). + :param params: Additional query params. + :param raw: Return raw HTTPResponse if True. + :return: Iterator of FuturesTrade objects or HTTPResponse if raw=True. + """ + url = f"/futures/vX/trades/{ticker}" + return self._paginate( + path=url, + params=self._get_params( + self.list_futures_trades, locals(), datetime_res="nanos" + ), + raw=raw, + deserializer=FuturesTrade.from_dict, + options=options, + ) diff --git a/polygon/rest/models/futures.py b/polygon/rest/models/futures.py new file mode 100644 index 00000000..7e700092 --- /dev/null +++ b/polygon/rest/models/futures.py @@ -0,0 +1,184 @@ +from typing import Optional, List +from ...modelclass import modelclass + + +@modelclass +class MarketExchanges: + "Contains exchange market status data." + nasdaq: Optional[str] = None + nyse: Optional[str] = None + otc: Optional[str] = None + + +@modelclass +class FuturesAggregate: + """Represents an aggregate for a futures contract.""" + + close: Optional[float] = None + dollar_volume: Optional[float] = None + high: Optional[float] = None + low: Optional[float] = None + open: Optional[float] = None + ticker: Optional[str] = None + transaction_count: Optional[int] = None + underlying_asset: Optional[str] = None + volume: Optional[int] = None + window_end: Optional[int] = None + window_start: Optional[int] = None + + @staticmethod + def from_dict(d): + return FuturesAggregate( + **{k: v for k, v in d.items() if k in FuturesAggregate.__annotations__} + ) + + +@modelclass +class FuturesContract: + """Represents a futures contract.""" + + active: Optional[bool] = None + as_of: Optional[str] = None + days_to_maturity: Optional[int] = None + exchange_code: Optional[str] = None + first_trade_date: Optional[str] = None + last_trade_date: Optional[str] = None + max_order_quantity: Optional[int] = None + min_order_quantity: Optional[int] = None + name: Optional[str] = None + product_code: Optional[str] = None + settlement_date: Optional[str] = None + settlement_tick_size: Optional[float] = None + spread_tick_size: Optional[float] = None + ticker: Optional[str] = None + trade_tick_size: Optional[float] = None + type: Optional[str] = None + + @staticmethod + def from_dict(d): + return FuturesContract( + **{k: v for k, v in d.items() if k in FuturesContract.__annotations__} + ) + + +@modelclass +class FuturesMarketStatus: + """Represents the market status for a futures product.""" + + exchange_code: Optional[str] = None + market_status: Optional[str] = None + product_code: Optional[str] = None + + @staticmethod + def from_dict(d): + return FuturesMarketStatus( + **{k: v for k, v in d.items() if k in FuturesMarketStatus.__annotations__} + ) + + +@modelclass +class FuturesProduct: + """Represents a futures product.""" + + as_of: Optional[str] = None + asset_class: Optional[str] = None + asset_sub_class: Optional[str] = None + exchange_code: Optional[str] = None + last_updated: Optional[str] = None + name: Optional[str] = None + otc_eligible: Optional[bool] = None + price_quotation: Optional[str] = None + product_code: Optional[str] = None + sector: Optional[str] = None + settlement_currency_code: Optional[str] = None + settlement_method: Optional[str] = None + settlement_type: Optional[str] = None + sub_sector: Optional[str] = None + trade_currency_code: Optional[str] = None + type: Optional[str] = None + unit_of_measure: Optional[str] = None + unit_of_measure_quantity: Optional[float] = None + + @staticmethod + def from_dict(d): + return FuturesProduct( + **{k: v for k, v in d.items() if k in FuturesProduct.__annotations__} + ) + + +@modelclass +class FuturesScheduleEvent: + """Represents a single event in a futures schedule.""" + + event: Optional[str] = None + timestamp: Optional[str] = None + + @staticmethod + def from_dict(d): + return FuturesScheduleEvent( + **{k: v for k, v in d.items() if k in FuturesScheduleEvent.__annotations__} + ) + + +@modelclass +class FuturesSchedule: + """Represents a futures trading schedule.""" + + market_identifier_code: Optional[str] = None + product_code: Optional[str] = None + product_name: Optional[str] = None + schedule: Optional[List[FuturesScheduleEvent]] = None + session_end_date: Optional[str] = None + + @staticmethod + def from_dict(d): + schedule = ( + [FuturesScheduleEvent.from_dict(e) for e in d.get("schedule", [])] + if d.get("schedule") + else None + ) + return FuturesSchedule( + market_identifier_code=d.get("market_identifier_code"), + product_code=d.get("product_code"), + product_name=d.get("product_name"), + schedule=schedule, + session_end_date=d.get("session_end_date"), + ) + + +@modelclass +class FuturesQuote: + """Represents a quote for a futures contract.""" + + ask_price: Optional[float] = None + ask_size: Optional[float] = None + ask_timestamp: Optional[int] = None + bid_price: Optional[float] = None + bid_size: Optional[float] = None + bid_timestamp: Optional[int] = None + session_start_date: Optional[str] = None + ticker: Optional[str] = None + timestamp: Optional[int] = None + + @staticmethod + def from_dict(d): + return FuturesQuote( + **{k: v for k, v in d.items() if k in FuturesQuote.__annotations__} + ) + + +@modelclass +class FuturesTrade: + """Represents a trade for a futures contract.""" + + price: Optional[float] = None + session_start_date: Optional[str] = None + size: Optional[float] = None + ticker: Optional[str] = None + timestamp: Optional[int] = None + + @staticmethod + def from_dict(d): + return FuturesTrade( + **{k: v for k, v in d.items() if k in FuturesTrade.__annotations__} + ) diff --git a/polygon/websocket/__init__.py b/polygon/websocket/__init__.py index 77865d3f..60c61df6 100644 --- a/polygon/websocket/__init__.py +++ b/polygon/websocket/__init__.py @@ -140,7 +140,7 @@ async def connect( if m["ev"] == "status": logger.debug("status: %s", m["message"]) continue - cmsg = parse(msgJson, logger) + cmsg = parse(msgJson, self.market, logger) if len(cmsg) > 0: await processor(cmsg) # type: ignore diff --git a/polygon/websocket/models/__init__.py b/polygon/websocket/models/__init__.py index 06cab55d..4ee0b7c6 100644 --- a/polygon/websocket/models/__init__.py +++ b/polygon/websocket/models/__init__.py @@ -1,51 +1,82 @@ -from typing import Dict, Any, List +from typing import Dict, Any, List, Optional from .common import * from .models import * import logging +logger = logging.getLogger(__name__) -def parse_single(data: Dict[str, Any]): - event_type = data["ev"] - if event_type in [EventType.EquityAgg.value, EventType.EquityAggMin.value]: - return EquityAgg.from_dict(data) - elif event_type in [ - EventType.CryptoAgg.value, - EventType.CryptoAggSec.value, - EventType.ForexAgg.value, - EventType.ForexAggSec.value, - ]: - return CurrencyAgg.from_dict(data) - elif event_type == EventType.EquityTrade.value: - return EquityTrade.from_dict(data) - elif event_type == EventType.CryptoTrade.value: - return CryptoTrade.from_dict(data) - elif event_type == EventType.EquityQuote.value: - return EquityQuote.from_dict(data) - elif event_type == EventType.ForexQuote.value: - return ForexQuote.from_dict(data) - elif event_type == EventType.CryptoQuote.value: - return CryptoQuote.from_dict(data) - elif event_type == EventType.Imbalances.value: - return Imbalance.from_dict(data) - elif event_type == EventType.LimitUpLimitDown.value: - return LimitUpLimitDown.from_dict(data) - elif event_type == EventType.CryptoL2.value: - return Level2Book.from_dict(data) - elif event_type == EventType.Value.value: - return IndexValue.from_dict(data) - elif event_type == EventType.LaunchpadValue.value: - return LaunchpadValue.from_dict(data) - elif event_type == EventType.BusinessFairMarketValue.value: - return FairMarketValue.from_dict(data) + +def parse_single(data: Dict[str, Any], market: Market) -> Optional[WebSocketMessage]: + event_type = data.get("ev") + if not event_type: + logger.warning("No event type ('ev') found in message data") + return None + + if market == Market.Stocks: + if event_type == "T": + return EquityTrade.from_dict(data) + elif event_type == "Q": + return EquityQuote.from_dict(data) + elif event_type in ["A", "AM"]: + return EquityAgg.from_dict(data) + # Add more stock-specific events as needed (e.g., "LULD", "NOI") + + elif market == Market.Options: + if event_type == "T": + return OptionTrade.from_dict(data) + elif event_type == "Q": + return OptionQuote.from_dict(data) + elif event_type in ["A", "AM"]: + return OptionAggregate.from_dict(data) + + elif market == Market.Forex: + if event_type == "C": + return ForexQuote.from_dict(data) + elif event_type in ["CA", "CAS"]: + return ForexAggregate.from_dict(data) + + elif market == Market.Crypto: + if event_type == "XT": + return CryptoTrade.from_dict(data) + elif event_type == "XQ": + return CryptoQuote.from_dict(data) + elif event_type in ["XA", "XAS"]: + return CryptoAggregate.from_dict(data) + elif event_type == "XL2": + return CryptoL2Book.from_dict(data) + + elif market == Market.Indices: + if event_type in ["A", "AM"]: + return IndicesAggregate.from_dict(data) + elif event_type == "V": + return IndicesValue.from_dict(data) + + elif market == Market.Futures: + if event_type == "T": + return FuturesTrade.from_dict(data) + elif event_type == "Q": + return FuturesQuote.from_dict(data) + elif event_type in ["A", "AM"]: + return FuturesAggregate.from_dict(data) + + # Handle unknown markets + else: + logger.warning(f"Unknown market: {market}") + return None + + # If event type is unrecognized within a known market + logger.warning(f"Unknown event type '{event_type}' for market '{market}'") return None -def parse(msg: List[Dict[str, Any]], logger: logging.Logger) -> List[WebSocketMessage]: +def parse( + msg: List[Dict[str, Any]], market: Market, logger: logging.Logger +) -> List[WebSocketMessage]: res = [] for m in msg: - parsed = parse_single(m) + parsed = parse_single(m, market) if parsed is None: - if m["ev"] != "status": + if m.get("ev") != "status": logger.warning("could not parse message %s", m) else: res.append(parsed) diff --git a/polygon/websocket/models/common.py b/polygon/websocket/models/common.py index 38aea4c4..930f4813 100644 --- a/polygon/websocket/models/common.py +++ b/polygon/websocket/models/common.py @@ -3,7 +3,7 @@ class Feed(Enum): Delayed = "delayed.polygon.io" - RealTime = "socket.polygon.io" + RealTime = "socket.ny5.polygon.io" # "socket.polygon.io" Nasdaq = "nasdaqfeed.polygon.io" PolyFeed = "polyfeed.polygon.io" PolyFeedPlus = "polyfeedplus.polygon.io" @@ -28,6 +28,11 @@ class Market(Enum): Forex = "forex" Crypto = "crypto" Indices = "indices" + Futures = "futures" + FuturesCME = "futures/cme" + FuturesCBOT = "futures/cbot" + FuturesNYMEX = "futures/nymex" + FuturesCOMEX = "futures/comex" class EventType(Enum): @@ -42,6 +47,10 @@ class EventType(Enum): EquityQuote = "Q" ForexQuote = "C" CryptoQuote = "XQ" + FuturesTrade = "T" + FuturesQuote = "Q" + FuturesAggregateSecond = "A" + FuturesAggregateMinute = "AM" Imbalances = "NOI" LimitUpLimitDown = "LULD" CryptoL2 = "XL2" diff --git a/polygon/websocket/models/models.py b/polygon/websocket/models/models.py index d6fa0c29..4cfeeaf7 100644 --- a/polygon/websocket/models/models.py +++ b/polygon/websocket/models/models.py @@ -359,6 +359,63 @@ def from_dict(d): ) +@modelclass +class FuturesTrade: + ev: Optional[str] = None # Event type ("T") + sym: Optional[str] = None # Ticker symbol (e.g., "ESZ4") + p: Optional[float] = None # Trade price per unit + s: Optional[int] = None # Trade size (number of contracts) + t: Optional[int] = None # SIP timestamp in Unix MS + q: Optional[int] = None # Sequence number + + @staticmethod + def from_dict(d): + return EquityTrade( + d.get("ev", None), + d.get("sym", None), + d.get("p", None), + d.get("s", None), + d.get("t", None), + d.get("q", None), + ) + + +@modelclass +class FuturesQuote: + """Represents a quote event for a futures contract.""" + + ev: Optional[str] = None # Event type ("Q") + sym: Optional[str] = None # Ticker symbol (e.g., "ESZ4") + bp: Optional[float] = None # Bid price per unit + bs: Optional[int] = None # Bid size (number of contracts) + bt: Optional[int] = None # Bid timestamp in Unix MS + ap: Optional[float] = None # Ask price per unit + as_: Optional[int] = ( + None # Ask size (number of contracts, renamed to as_ to avoid Python keyword) + ) + at: Optional[int] = None # Ask timestamp in Unix MS + t: Optional[int] = None # SIP timestamp in Unix MS + + +@modelclass +class FuturesAggregate: + """Represents an aggregate event (second or minute) for a futures contract.""" + + ev: Optional[str] = None # Event type ("A" or "AM") + sym: Optional[str] = None # Ticker symbol (e.g., "6CH5") + v: Optional[int] = None # Tick volume + av: Optional[int] = None # Accumulated volume + op: Optional[float] = None # Official opening price + o: Optional[float] = None # Opening price for this window + c: Optional[float] = None # Closing price for this window + h: Optional[float] = None # Highest price for this window + l: Optional[float] = None # Lowest price for this window + a: Optional[float] = None # Volume-weighted average price + z: Optional[int] = None # Average trade size + s: Optional[int] = None # Start timestamp in Unix MS + e: Optional[int] = None # End timestamp in Unix MS + + WebSocketMessage = NewType( "WebSocketMessage", List[ @@ -376,6 +433,9 @@ def from_dict(d): IndexValue, LaunchpadValue, FairMarketValue, + FuturesTrade, + FuturesQuote, + FuturesAggregate, ] ], )