Skip to content

Refactor HistoricalDataStreamingSubscription to use generic Subscription<T> #431

@wboayue

Description

@wboayue

Summary

HistoricalDataStreamingSubscription (in both sync.rs and async.rs) reimplements cancel-on-drop infrastructure that the generic Subscription<T> already provides: AtomicBool cancelled guard, message_bus storage, cancel message encoding, tokio::spawn in async Drop, etc. This adds ~150 lines of duplicated lifecycle code.

Proposed Change

Replace HistoricalDataStreamingSubscription with Subscription<HistoricalBarUpdate> by implementing StreamDecoder<HistoricalBarUpdate> for HistoricalBarUpdate.

Key steps

  1. Implement StreamDecoder<HistoricalBarUpdate> — dispatch on message type (HistoricalData, HistoricalDataUpdate, HistoricalDataEnd) in decode(), returning the appropriate enum variant. This follows the existing MarketDepths pattern.
  2. Implement cancel_message() — delegate to encoders::encode_cancel_historical_data(request_id).
  3. Replace custom struct with Subscription<HistoricalBarUpdate> in both async.rs and sync.rs factory functions.
  4. Remove the manual cancel(), Drop impl, and associated fields (request_id, message_bus, cancelled).

Notes

  • DecoderContext already carries server_version and time_zone: Option<&'static Tz> — both needed during decoding. The factory function resolves timezone before construction, so it will always be Some.
  • Cancel-on-drop, dedup guard, and async tokio::spawn in Drop all come from the generic Subscription<T> for free.

Context

Introduced in #427.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions