-
Notifications
You must be signed in to change notification settings - Fork 71
Open
Description
Summary
HistoricalDataStreamingSubscription (in both sync.rs and async.rs) reimplements cancel-on-drop infrastructure that the generic Subscription<T> already provides: AtomicBool cancelled guard, message_bus storage, cancel message encoding, tokio::spawn in async Drop, etc. This adds ~150 lines of duplicated lifecycle code.
Proposed Change
Replace HistoricalDataStreamingSubscription with Subscription<HistoricalBarUpdate> by implementing StreamDecoder<HistoricalBarUpdate> for HistoricalBarUpdate.
Key steps
- Implement
StreamDecoder<HistoricalBarUpdate>— dispatch on message type (HistoricalData,HistoricalDataUpdate,HistoricalDataEnd) indecode(), returning the appropriate enum variant. This follows the existingMarketDepthspattern. - Implement
cancel_message()— delegate toencoders::encode_cancel_historical_data(request_id). - Replace custom struct with
Subscription<HistoricalBarUpdate>in bothasync.rsandsync.rsfactory functions. - Remove the manual
cancel(),Dropimpl, and associated fields (request_id,message_bus,cancelled).
Notes
DecoderContextalready carriesserver_versionandtime_zone: Option<&'static Tz>— both needed during decoding. The factory function resolves timezone before construction, so it will always beSome.- Cancel-on-drop, dedup guard, and async
tokio::spawninDropall come from the genericSubscription<T>for free.
Context
Introduced in #427.
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels