Skip to content

Data Sources

Base Interface

interface

DataSource

Bases: ABC

Base class for all data sources.

Source code in src/quantrl_lab/data/interface.py
class DataSource(ABC):
    """Base class for all data sources."""

    @property
    @abstractmethod
    def source_name(self) -> str:
        """Return the name of this data source."""
        pass

    def connect(self) -> None:
        """
        Connect to the data source.

        Default implementation for sources that don't require
        connections. Override this method if your data source needs
        connection management.
        """
        pass

    def disconnect(self) -> None:
        """
        Disconnect from the data source.

        Default implementation for sources that don't require
        connections. Override this method if your data source needs
        connection management.
        """
        pass

    def is_connected(self) -> bool:
        """
        Check if the data source is currently connected.

        Returns:
            True for sources that don't require connections (always available).
            Override this method if your data source needs connection management.
        """
        return True

    def list_available_instruments(
        self,
        instrument_type: Optional[str] = None,
        market: Optional[str] = None,
        **kwargs: Any,
    ) -> List[str]:
        """
        Return a list of available instrument symbols or identifiers
        that this source can provide data for.

        Args:
            instrument_type: Optional filter by type (e.g., 'stock', 'future', 'option', 'crypto', 'forex').
            market: Optional filter by market (e.g., 'NASDAQ', 'NYSE', 'crypto_spot', 'crypto_futures').
            **kwargs: Provider-specific additional filter parameters.

        Returns:
            A list of string identifiers for the available instruments.

        Raises:
            NotImplementedError: If the data source doesn't support instrument discovery.
        """
        raise NotImplementedError(
            f"{self.__class__.__name__} does not support instrument discovery. "
            "This data source may be designed for specific instruments only."
        )

    @property
    def supported_features(self) -> List[str]:
        """Return a list of supported features."""
        features = []

        if isinstance(self, HistoricalDataCapable):
            features.append("historical_bars")
        if isinstance(self, NewsDataCapable):
            features.append("news")
        if isinstance(self, LiveDataCapable):
            features.append("live_data")
        if isinstance(self, StreamingCapable):
            features.append("streaming")
        if isinstance(self, ConnectionManaged):
            features.append("connection_managed")
        if isinstance(self, FundamentalDataCapable):
            features.append("fundamental_data")
        if isinstance(self, MacroDataCapable):
            features.append("macro_data")
        if isinstance(self, AnalystDataCapable):
            features.append("analyst_data")
        if isinstance(self, SectorDataCapable):
            features.append("sector_data")
        if isinstance(self, CompanyProfileCapable):
            features.append("company_profile")

        # Check if instrument discovery is implemented (method is overridden)
        if (
            hasattr(self.__class__, "list_available_instruments")
            and self.__class__.list_available_instruments is not DataSource.list_available_instruments
        ):
            features.append("instrument_discovery")

        return features

    def supports_feature(self, feature_name: str) -> bool:
        """Check if the data source supports a specific feature."""
        return feature_name in self.supported_features

    def __repr__(self) -> str:
        """
        Return a string representation of the data source.

        Returns:
            str: A string representation of the data source.
        """
        return f"<{self.__class__.__name__}(name='{self.source_name}', connected={self.is_connected})>"

source_name abstractmethod property

Return the name of this data source.

supported_features property

Return a list of supported features.

connect()

Connect to the data source.

Default implementation for sources that don't require connections. Override this method if your data source needs connection management.

Source code in src/quantrl_lab/data/interface.py
def connect(self) -> None:
    """
    Connect to the data source.

    Default implementation for sources that don't require
    connections. Override this method if your data source needs
    connection management.
    """
    pass

disconnect()

Disconnect from the data source.

Default implementation for sources that don't require connections. Override this method if your data source needs connection management.

Source code in src/quantrl_lab/data/interface.py
def disconnect(self) -> None:
    """
    Disconnect from the data source.

    Default implementation for sources that don't require
    connections. Override this method if your data source needs
    connection management.
    """
    pass

is_connected()

Check if the data source is currently connected.

Returns:

Type Description
bool

True for sources that don't require connections (always available).

bool

Override this method if your data source needs connection management.

Source code in src/quantrl_lab/data/interface.py
def is_connected(self) -> bool:
    """
    Check if the data source is currently connected.

    Returns:
        True for sources that don't require connections (always available).
        Override this method if your data source needs connection management.
    """
    return True

list_available_instruments(instrument_type=None, market=None, **kwargs)

Return a list of available instrument symbols or identifiers that this source can provide data for.

Parameters:

Name Type Description Default
instrument_type Optional[str]

Optional filter by type (e.g., 'stock', 'future', 'option', 'crypto', 'forex').

None
market Optional[str]

Optional filter by market (e.g., 'NASDAQ', 'NYSE', 'crypto_spot', 'crypto_futures').

None
**kwargs Any

Provider-specific additional filter parameters.

{}

Returns:

Type Description
List[str]

A list of string identifiers for the available instruments.

Raises:

Type Description
NotImplementedError

If the data source doesn't support instrument discovery.

Source code in src/quantrl_lab/data/interface.py
def list_available_instruments(
    self,
    instrument_type: Optional[str] = None,
    market: Optional[str] = None,
    **kwargs: Any,
) -> List[str]:
    """
    Return a list of available instrument symbols or identifiers
    that this source can provide data for.

    Args:
        instrument_type: Optional filter by type (e.g., 'stock', 'future', 'option', 'crypto', 'forex').
        market: Optional filter by market (e.g., 'NASDAQ', 'NYSE', 'crypto_spot', 'crypto_futures').
        **kwargs: Provider-specific additional filter parameters.

    Returns:
        A list of string identifiers for the available instruments.

    Raises:
        NotImplementedError: If the data source doesn't support instrument discovery.
    """
    raise NotImplementedError(
        f"{self.__class__.__name__} does not support instrument discovery. "
        "This data source may be designed for specific instruments only."
    )

supports_feature(feature_name)

Check if the data source supports a specific feature.

Source code in src/quantrl_lab/data/interface.py
def supports_feature(self, feature_name: str) -> bool:
    """Check if the data source supports a specific feature."""
    return feature_name in self.supported_features

__repr__()

Return a string representation of the data source.

Returns:

Name Type Description
str str

A string representation of the data source.

Source code in src/quantrl_lab/data/interface.py
def __repr__(self) -> str:
    """
    Return a string representation of the data source.

    Returns:
        str: A string representation of the data source.
    """
    return f"<{self.__class__.__name__}(name='{self.source_name}', connected={self.is_connected})>"

ConnectionManaged

Bases: Protocol

Protocol for data sources that require explicit connection management.

Sources implementing this protocol need to manage persistent connections, authentication sessions, or other stateful connections.

Source code in src/quantrl_lab/data/interface.py
@runtime_checkable
class ConnectionManaged(Protocol):
    """
    Protocol for data sources that require explicit connection
    management.

    Sources implementing this protocol need to manage persistent
    connections, authentication sessions, or other stateful connections.
    """

    def connect(self) -> None:
        """Establish connection to the data source."""
        ...

    def disconnect(self) -> None:
        """Close connection to the data source."""
        ...

    def is_connected(self) -> bool:
        """Check if currently connected to the data source."""
        ...

connect()

Establish connection to the data source.

Source code in src/quantrl_lab/data/interface.py
def connect(self) -> None:
    """Establish connection to the data source."""
    ...

disconnect()

Close connection to the data source.

Source code in src/quantrl_lab/data/interface.py
def disconnect(self) -> None:
    """Close connection to the data source."""
    ...

is_connected()

Check if currently connected to the data source.

Source code in src/quantrl_lab/data/interface.py
def is_connected(self) -> bool:
    """Check if currently connected to the data source."""
    ...

HistoricalDataCapable

Bases: Protocol

Protocol for data sources that provide historical OHLCV data.

Source code in src/quantrl_lab/data/interface.py
@runtime_checkable
class HistoricalDataCapable(Protocol):
    """Protocol for data sources that provide historical OHLCV data."""

    def get_historical_ohlcv_data(
        self,
        symbols: Union[str, List[str]],
        start: Optional[Union[str, datetime]] = None,
        end: Optional[Union[str, datetime]] = None,
        timeframe: str = "1d",
        **kwargs,
    ) -> pd.DataFrame:
        """Get historical OHLCV data."""
        ...

get_historical_ohlcv_data(symbols, start=None, end=None, timeframe='1d', **kwargs)

Get historical OHLCV data.

Source code in src/quantrl_lab/data/interface.py
def get_historical_ohlcv_data(
    self,
    symbols: Union[str, List[str]],
    start: Optional[Union[str, datetime]] = None,
    end: Optional[Union[str, datetime]] = None,
    timeframe: str = "1d",
    **kwargs,
) -> pd.DataFrame:
    """Get historical OHLCV data."""
    ...

NewsDataCapable

Bases: Protocol

Protocol for data sources that provide news data.

Source code in src/quantrl_lab/data/interface.py
@runtime_checkable
class NewsDataCapable(Protocol):
    """Protocol for data sources that provide news data."""

    def get_news_data(
        self,
        symbols: Union[str, List[str]],
        start: Union[str, datetime],
        end: Optional[Union[str, datetime]] = None,
        **kwargs,
    ) -> pd.DataFrame:
        """Get news for specified symbols and time range."""
        ...

get_news_data(symbols, start, end=None, **kwargs)

Get news for specified symbols and time range.

Source code in src/quantrl_lab/data/interface.py
def get_news_data(
    self,
    symbols: Union[str, List[str]],
    start: Union[str, datetime],
    end: Optional[Union[str, datetime]] = None,
    **kwargs,
) -> pd.DataFrame:
    """Get news for specified symbols and time range."""
    ...

LiveDataCapable

Bases: Protocol

Protocol for data sources with real-time data capabilities.

It checks if the class has the following methods: - get_latest_quote - get_latest_trade

Source code in src/quantrl_lab/data/interface.py
@runtime_checkable
class LiveDataCapable(Protocol):
    """
    Protocol for data sources with real-time data capabilities.

    It checks if the class has the following methods:
    - get_latest_quote
    - get_latest_trade
    """

    def get_latest_quote(self, symbols: Union[str, List[str]], **kwargs) -> pd.DataFrame:
        """Get latest market data."""
        ...

    def get_latest_trade(self, symbols: Union[str, List[str]], **kwargs) -> pd.DataFrame:
        """Get latest trade data."""
        ...

get_latest_quote(symbols, **kwargs)

Get latest market data.

Source code in src/quantrl_lab/data/interface.py
def get_latest_quote(self, symbols: Union[str, List[str]], **kwargs) -> pd.DataFrame:
    """Get latest market data."""
    ...

get_latest_trade(symbols, **kwargs)

Get latest trade data.

Source code in src/quantrl_lab/data/interface.py
def get_latest_trade(self, symbols: Union[str, List[str]], **kwargs) -> pd.DataFrame:
    """Get latest trade data."""
    ...

StreamingCapable

Bases: Protocol

Protocol for data sources with streaming capabilities.

It checks if the class has the following methods: - subscribe_to_updates - start_streaming - stop

Source code in src/quantrl_lab/data/interface.py
@runtime_checkable
class StreamingCapable(Protocol):
    """
    Protocol for data sources with streaming capabilities.

    It checks if the class has the following methods:
    - subscribe_to_updates
    - start_streaming
    - stop
    """

    async def subscribe_to_updates(
        self,
        symbol: str,
        data_type: str = "trades",
        **kwargs,
    ):
        """Subscribe to real-time data updates."""
        ...

    async def start_streaming(self):
        """Start the data stream."""
        ...

    async def stop_streaming(self):
        """Stop the data stream."""
        ...

subscribe_to_updates(symbol, data_type='trades', **kwargs) async

Subscribe to real-time data updates.

Source code in src/quantrl_lab/data/interface.py
async def subscribe_to_updates(
    self,
    symbol: str,
    data_type: str = "trades",
    **kwargs,
):
    """Subscribe to real-time data updates."""
    ...

start_streaming() async

Start the data stream.

Source code in src/quantrl_lab/data/interface.py
async def start_streaming(self):
    """Start the data stream."""
    ...

stop_streaming() async

Stop the data stream.

Source code in src/quantrl_lab/data/interface.py
async def stop_streaming(self):
    """Stop the data stream."""
    ...

FundamentalDataCapable

Bases: Protocol

Protocol for data sources that provide fundamental data.

It checks if the class has the following methods: - get_fundamental_data

Source code in src/quantrl_lab/data/interface.py
@runtime_checkable
class FundamentalDataCapable(Protocol):
    """
    Protocol for data sources that provide fundamental data.

    It checks if the class has the following methods:
    - get_fundamental_data
    """

    def get_fundamental_data(self, symbols: str, metrics: List[str], **kwargs) -> pd.DataFrame:
        """Get fundamental data for specified symbols and metrics."""
        ...

get_fundamental_data(symbols, metrics, **kwargs)

Get fundamental data for specified symbols and metrics.

Source code in src/quantrl_lab/data/interface.py
def get_fundamental_data(self, symbols: str, metrics: List[str], **kwargs) -> pd.DataFrame:
    """Get fundamental data for specified symbols and metrics."""
    ...

MacroDataCapable

Bases: Protocol

Protocol for data sources that provide macroeconomic data.

It checks if the class has the following methods: - get_macro_data

Source code in src/quantrl_lab/data/interface.py
@runtime_checkable
class MacroDataCapable(Protocol):
    """
    Protocol for data sources that provide macroeconomic data.

    It checks if the class has the following methods:
    - get_macro_data
    """

    def get_macro_data(
        self,
        indicators: Union[str, List[str]],
        start: Union[str, datetime],
        end: Union[str, datetime],
    ) -> pd.DataFrame:
        """Get macroeconomic data for specified indicators and time
        range."""
        ...

get_macro_data(indicators, start, end)

Get macroeconomic data for specified indicators and time range.

Source code in src/quantrl_lab/data/interface.py
def get_macro_data(
    self,
    indicators: Union[str, List[str]],
    start: Union[str, datetime],
    end: Union[str, datetime],
) -> pd.DataFrame:
    """Get macroeconomic data for specified indicators and time
    range."""
    ...

AnalystDataCapable

Bases: Protocol

Protocol for data sources that provide analyst ratings and grades data.

This includes analyst recommendations, upgrades/downgrades, price targets, and other research-based insights from financial analysts.

It checks if the class has the following methods: - get_historical_grades - get_historical_rating

Source code in src/quantrl_lab/data/interface.py
@runtime_checkable
class AnalystDataCapable(Protocol):
    """
    Protocol for data sources that provide analyst ratings and grades
    data.

    This includes analyst recommendations, upgrades/downgrades, price targets,
    and other research-based insights from financial analysts.

    It checks if the class has the following methods:
    - get_historical_grades
    - get_historical_rating
    """

    def get_historical_grades(self, symbol: str, **kwargs: Any) -> pd.DataFrame:
        """
        Get historical analyst grades/recommendations for a symbol.

        Args:
            symbol: Stock symbol to fetch grades for
            **kwargs: Additional provider-specific parameters

        Returns:
            pd.DataFrame: Historical analyst grades data
        """
        ...

    def get_historical_rating(self, symbol: str, limit: int = 100, **kwargs: Any) -> pd.DataFrame:
        """
        Get historical analyst ratings for a symbol.

        Args:
            symbol: Stock symbol to fetch ratings for
            limit: Number of records to return (default: 100)
            **kwargs: Additional provider-specific parameters

        Returns:
            pd.DataFrame: Historical analyst ratings data
        """
        ...

get_historical_grades(symbol, **kwargs)

Get historical analyst grades/recommendations for a symbol.

Parameters:

Name Type Description Default
symbol str

Stock symbol to fetch grades for

required
**kwargs Any

Additional provider-specific parameters

{}

Returns:

Type Description
DataFrame

pd.DataFrame: Historical analyst grades data

Source code in src/quantrl_lab/data/interface.py
def get_historical_grades(self, symbol: str, **kwargs: Any) -> pd.DataFrame:
    """
    Get historical analyst grades/recommendations for a symbol.

    Args:
        symbol: Stock symbol to fetch grades for
        **kwargs: Additional provider-specific parameters

    Returns:
        pd.DataFrame: Historical analyst grades data
    """
    ...

get_historical_rating(symbol, limit=100, **kwargs)

Get historical analyst ratings for a symbol.

Parameters:

Name Type Description Default
symbol str

Stock symbol to fetch ratings for

required
limit int

Number of records to return (default: 100)

100
**kwargs Any

Additional provider-specific parameters

{}

Returns:

Type Description
DataFrame

pd.DataFrame: Historical analyst ratings data

Source code in src/quantrl_lab/data/interface.py
def get_historical_rating(self, symbol: str, limit: int = 100, **kwargs: Any) -> pd.DataFrame:
    """
    Get historical analyst ratings for a symbol.

    Args:
        symbol: Stock symbol to fetch ratings for
        limit: Number of records to return (default: 100)
        **kwargs: Additional provider-specific parameters

    Returns:
        pd.DataFrame: Historical analyst ratings data
    """
    ...

SectorDataCapable

Bases: Protocol

Protocol for data sources that provide sector and industry performance data.

This includes historical performance metrics for market sectors and industries, enabling sector rotation and market trend analysis.

It checks if the class has the following methods: - get_historical_sector_performance - get_historical_industry_performance

Source code in src/quantrl_lab/data/interface.py
@runtime_checkable
class SectorDataCapable(Protocol):
    """
    Protocol for data sources that provide sector and industry
    performance data.

    This includes historical performance metrics for market sectors and
    industries, enabling sector rotation and market trend analysis.

    It checks if the class has the following methods:
    - get_historical_sector_performance
    - get_historical_industry_performance
    """

    def get_historical_sector_performance(self, sector: str, **kwargs: Any) -> pd.DataFrame:
        """
        Get historical performance data for a specific market sector.

        Args:
            sector: Market sector name (e.g., "Energy", "Technology", "Healthcare")
            **kwargs: Additional provider-specific parameters

        Returns:
            pd.DataFrame: Historical sector performance data
        """
        ...

    def get_historical_industry_performance(self, industry: str, **kwargs: Any) -> pd.DataFrame:
        """
        Get historical performance data for a specific industry.

        Args:
            industry: Industry name (e.g., "Biotechnology", "Software", "Banks")
            **kwargs: Additional provider-specific parameters

        Returns:
            pd.DataFrame: Historical industry performance data
        """
        ...

get_historical_sector_performance(sector, **kwargs)

Get historical performance data for a specific market sector.

Parameters:

Name Type Description Default
sector str

Market sector name (e.g., "Energy", "Technology", "Healthcare")

required
**kwargs Any

Additional provider-specific parameters

{}

Returns:

Type Description
DataFrame

pd.DataFrame: Historical sector performance data

Source code in src/quantrl_lab/data/interface.py
def get_historical_sector_performance(self, sector: str, **kwargs: Any) -> pd.DataFrame:
    """
    Get historical performance data for a specific market sector.

    Args:
        sector: Market sector name (e.g., "Energy", "Technology", "Healthcare")
        **kwargs: Additional provider-specific parameters

    Returns:
        pd.DataFrame: Historical sector performance data
    """
    ...

get_historical_industry_performance(industry, **kwargs)

Get historical performance data for a specific industry.

Parameters:

Name Type Description Default
industry str

Industry name (e.g., "Biotechnology", "Software", "Banks")

required
**kwargs Any

Additional provider-specific parameters

{}

Returns:

Type Description
DataFrame

pd.DataFrame: Historical industry performance data

Source code in src/quantrl_lab/data/interface.py
def get_historical_industry_performance(self, industry: str, **kwargs: Any) -> pd.DataFrame:
    """
    Get historical performance data for a specific industry.

    Args:
        industry: Industry name (e.g., "Biotechnology", "Software", "Banks")
        **kwargs: Additional provider-specific parameters

    Returns:
        pd.DataFrame: Historical industry performance data
    """
    ...

CompanyProfileCapable

Bases: Protocol

Protocol for data sources that provide company profile and metadata.

This includes company information such as sector/industry classification, executive information, key financial metrics, and company details.

It checks if the class has the following method: - get_company_profile

Source code in src/quantrl_lab/data/interface.py
@runtime_checkable
class CompanyProfileCapable(Protocol):
    """
    Protocol for data sources that provide company profile and metadata.

    This includes company information such as sector/industry classification,
    executive information, key financial metrics, and company details.

    It checks if the class has the following method:
    - get_company_profile
    """

    def get_company_profile(self, symbol: Union[str, List[str]], **kwargs: Any) -> pd.DataFrame:
        """
        Get company profile information including sector, industry, and
        key metrics.

        Args:
            symbol: Stock ticker symbol or list of symbols
            **kwargs: Additional provider-specific parameters

        Returns:
            pd.DataFrame: Company profile data with metadata
        """
        ...

get_company_profile(symbol, **kwargs)

Get company profile information including sector, industry, and key metrics.

Parameters:

Name Type Description Default
symbol Union[str, List[str]]

Stock ticker symbol or list of symbols

required
**kwargs Any

Additional provider-specific parameters

{}

Returns:

Type Description
DataFrame

pd.DataFrame: Company profile data with metadata

Source code in src/quantrl_lab/data/interface.py
def get_company_profile(self, symbol: Union[str, List[str]], **kwargs: Any) -> pd.DataFrame:
    """
    Get company profile information including sector, industry, and
    key metrics.

    Args:
        symbol: Stock ticker symbol or list of symbols
        **kwargs: Additional provider-specific parameters

    Returns:
        pd.DataFrame: Company profile data with metadata
    """
    ...

Data Loaders

Alpaca

alpaca_loader

AlpacaDataLoader

Bases: DataSource, HistoricalDataCapable, LiveDataCapable, StreamingCapable, NewsDataCapable, ConnectionManaged

Alpaca implementation that provides market data from Alpaca APIs.

Source code in src/quantrl_lab/data/sources/alpaca_loader.py
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
class AlpacaDataLoader(
    DataSource,
    HistoricalDataCapable,
    LiveDataCapable,
    StreamingCapable,
    NewsDataCapable,
    ConnectionManaged,
):
    """Alpaca implementation that provides market data from Alpaca
    APIs."""

    NEWS_API_BASE_URL = "https://data.alpaca.markets/v1beta1/news"
    DEFAULT_NEWS_SORT = "desc"
    DEFAULT_NEWS_LIMIT = 50

    _stock_stream_client_instance = None

    def __init__(
        self,
        api_key: str = None,
        secret_key: str = None,
        stock_historical_client: StockHistoricalDataClient = None,
        stock_stream_client: StockDataStream = None,
    ):
        # `or` returns the first truthy value, so env vars serve as fallback when args are None
        self.api_key = api_key or os.environ.get("ALPACA_API_KEY")
        self.secret_key = secret_key or os.environ.get("ALPACA_SECRET_KEY")

        if stock_historical_client is not None:
            self.stock_historical_client = stock_historical_client
        else:
            self.stock_historical_client = StockHistoricalDataClient(self.api_key, self.secret_key)

        if AlpacaDataLoader._stock_stream_client_instance is None:
            AlpacaDataLoader._stock_stream_client_instance = StockDataStream(self.api_key, self.secret_key)
        self.stock_stream_client = AlpacaDataLoader._stock_stream_client_instance

        self.subscribers = {"quotes": [], "trades": [], "bars": []}
        self._subscribed_symbols = set()

    @property
    def source_name(self) -> str:
        return "Alpaca"

    def connect(self) -> None:
        """
        Connect to the historical data client of Alpaca.

        Reinitializes the stock historical client with current credentials.

        Raises:
            AuthenticationError: If API credentials are not provided.
        """
        if not self.api_key or not self.secret_key:
            raise AuthenticationError("Alpaca API credentials not provided")
        self.stock_historical_client = StockHistoricalDataClient(self.api_key, self.secret_key)

    def disconnect(self) -> None:
        """Disconnect from the historical data client."""
        if self.stock_historical_client:
            self.stock_historical_client.close()

    def is_connected(self) -> bool:
        """
        Check if the historical client is initialized and credentials
        are valid.

        Returns:
            bool: True if the client is initialized with valid credentials, False otherwise.
        """
        try:
            return self.stock_historical_client is not None and (
                self.api_key is not None and self.secret_key is not None
            )
        except Exception:
            return False

    def list_available_instruments(
        self,
        instrument_type: Optional[str] = None,
        market: Optional[str] = None,
        **kwargs,
    ) -> List[str]:
        # TODO
        pass

    def get_historical_ohlcv_data(
        self,
        symbols: Union[str, List[str]],
        start: Optional[Union[str, datetime]] = None,
        end: Optional[Union[str, datetime]] = None,
        timeframe: str = "1d",
        **kwargs: Any,
    ) -> pd.DataFrame:
        """
        Get historical OHLCV data from Alpaca.

        `end` is not compulsory and defaults to today if not provided.

        Args:
            symbols (Union[str, List[str]]): Stock symbol(s) to fetch data for.
            start (Union[str, datetime], optional): Start date for historical data.
            end (Union[str, datetime], optional): End date for historical data. Defaults to today.
            timeframe (str, optional): The bar timeframe (1d, 1h, 1m, etc.). Defaults to "1d".
            **kwargs: Additional arguments to pass to Alpaca API.

        Returns:
            pd.DataFrame: Raw OHLCV data.
        """
        if start is None:
            raise InvalidParametersError("Alpaca requires a 'start' date for historical data.")

        start_dt, end_dt = normalize_date_range(start, end, default_end_to_now=True)
        symbol_list = normalize_symbols(symbols)

        logger.info(
            "Fetching historical data for {symbols} from {start} to {end} with timeframe {timeframe}",
            symbols=symbol_list,
            start=start_dt,
            end=end_dt,
            timeframe=timeframe,
        )

        alpaca_timeframe = ALPACA_MAPPINGS.get_timeframe(timeframe)

        request_params = StockBarsRequest(
            symbol_or_symbols=symbol_list,
            timeframe=alpaca_timeframe,
            start=start_dt,
            end=end_dt,
            **kwargs,
        )

        bars = self.stock_historical_client.get_stock_bars(request_params)
        bars_df = bars.df.reset_index()
        bars_df = standardize_ohlcv_columns(bars_df, ALPACA_MAPPINGS.ohlcv_columns)
        bars_df = add_date_column_from_timestamp(bars_df, timestamp_col="Timestamp")

        num_symbols = len(set(bars_df["Symbol"])) if "Symbol" in bars_df.columns else 1
        log_dataframe_info(
            bars_df,
            f"Fetched OHLCV data for {num_symbols} symbol(s)",
            symbol=None,
        )

        return bars_df

    def get_latest_quote(self, symbol: str, **kwargs: Any) -> Dict:
        """
        Get the latest quote for a symbol from Alpaca.

        Args:
            symbol (str): Stock symbol to fetch quote for.
            **kwargs: Additional arguments such as feed type.

        Returns:
            Dict: Output dictionary.
        """
        request_params = StockLatestQuoteRequest(symbol_or_symbols=symbol)
        return self.stock_historical_client.get_stock_latest_quote(request_params)

    def get_latest_trade(self, symbol: str, **kwargs: Any) -> Dict:
        """
        Get the latest trade for a symbol from Alpaca.

        Args:
            symbol (str): Stock symbol to fetch trade for.
            **kwargs: Additional arguments such as feed type.

        Returns:
            Dict: Output dictionary.
        """
        request_params = StockLatestTradeRequest(symbol_or_symbols=symbol)
        return self.stock_historical_client.get_stock_latest_trade(request_params)

    async def _trade_handler(self, trade_data: Trade):
        """Process incoming trade data."""
        logger.debug("Trade: {data}", data=trade_data)

    async def subscribe_to_updates(self, symbol: str, data_type: str = "trades") -> None:
        """
        Subscribe to real-time market data updates.

        Args:
            symbol (str): The stock symbol to subscribe to.
            data_type (str, optional): The type of data to subscribe to
                ('trades', 'quotes', 'bars'). Defaults to "trades".
        """
        if data_type == "trades":
            self.stock_stream_client.subscribe_trades(self._trade_handler, symbol)
        elif data_type == "quotes":

            async def quote_handler(data):
                logger.debug("Quote: {data}", data=data)

            self.stock_stream_client.subscribe_quotes(quote_handler, symbol)
        elif data_type == "bars":

            async def bar_handler(data):
                logger.debug("Bar: {data}", data=data)

            self.stock_stream_client.subscribe_bars(bar_handler, symbol)
        else:
            logger.error("Unknown data type '{data_type}' for subscription", data_type=data_type)
            return

        self._subscribed_symbols.add(symbol)
        logger.success("Subscribed to {data_type} for {symbol}", data_type=data_type, symbol=symbol)

    async def start_streaming(self):
        """Initialize, subscribe, and run the data stream."""
        logger.info("Initializing stream...")
        try:
            if not self._subscribed_symbols:
                logger.warning("No symbols subscribed. Call subscribe_to_updates() first.")
                return
            await self.stock_stream_client._run_forever()
        except KeyboardInterrupt:
            logger.info("Stream stopped by user.")
        except Exception as e:
            logger.exception("An error occurred while streaming: {e}", e=e)

    async def stop_streaming(self):
        """Stop the WebSocket connection and clean up resources."""
        logger.info("Stopping WebSocket connection...")
        await self.stock_stream_client.stop_ws()
        logger.success("WebSocket connection stopped")

    def get_news_data(
        self,
        symbols: Union[str, List[str]],
        start: Union[str, datetime],
        end: Optional[Union[str, datetime]] = None,
        limit: int = 50,
        include_content: bool = False,
        verbose: bool = False,
        silent_errors: bool = False,
        **kwargs: Any,
    ) -> Union[pd.DataFrame, Dict]:
        """
        Get news for specified symbols from Alpaca News API.

        Args:
            symbols (Union[str, List[str]]): Stock symbol(s) to fetch news for.
            start (Union[str, datetime]): Start date for news.
            end (Union[str, datetime], optional): End date for news. Defaults to today.
            limit (int, optional): Number of news items per request. Defaults to 50.
            include_content (bool, optional): Whether to include full article content. Defaults to False.
            verbose (bool, optional): Whether to log progress. Defaults to False.
            silent_errors (bool, optional): Whether to suppress connection errors. Defaults to False.
            **kwargs: Additional parameters.

        Returns:
            pd.DataFrame: News data.
        """
        symbol_list = normalize_symbols(symbols)
        symbols_str = ",".join(symbol_list)

        start_dt, end_dt = normalize_date_range(start, end, default_end_to_now=True)
        start_str = format_date_to_string(start_dt)
        end_str = format_date_to_string(end_dt)

        # Alpaca's Python SDK does not include a News API client, so requests is used directly
        headers = {
            "accept": "application/json",
            "APCA-API-KEY-ID": self.api_key,
            "APCA-API-SECRET-KEY": self.secret_key,
        }

        params = {
            "symbols": symbols_str,
            "start": start_str,
            "end": end_str,
            "limit": limit,
            "include_content": str(include_content).lower(),
            "sort": self.DEFAULT_NEWS_SORT,
        }

        all_news = []
        page_token = None
        page_count = 0

        if verbose:
            logger.info(
                "Fetching news for {symbols} from {start} to {end} (limit={limit}, include_content={include})",
                symbols=symbols_str,
                start=start_str,
                end=end_str,
                limit=limit,
                include=include_content,
            )

        while True:
            if page_token:
                params["page_token"] = page_token

            try:
                response = requests.get(self.NEWS_API_BASE_URL, headers=headers, params=params)
                response.raise_for_status()

                data = response.json()
                news_items = data.get("news", [])

                if not news_items:
                    break

                all_news.extend(news_items)
                page_count += 1

                if verbose:
                    logger.debug(
                        "Fetched page {page} (total_items={total})",
                        page=page_count,
                        total=len(all_news),
                    )

                page_token = data.get("next_page_token")
                if not page_token:
                    break

            except requests.exceptions.RequestException as e:
                if silent_errors:
                    logger.debug("Silent failure fetching news: {e}", e=e)
                else:
                    logger.error("Error fetching news: {e}", e=e)
                break

        if verbose:
            logger.success("Total news items fetched: {n}", n=len(all_news))

        if all_news:
            return pd.DataFrame(all_news)
        else:
            return pd.DataFrame()

    async def async_fetch_news(
        self,
        session: "aiohttp.ClientSession",
        symbol: str,
        start: Union[str, datetime],
        end: Optional[Union[str, datetime]] = None,
        limit: int = 50,
        include_content: bool = False,
    ) -> Tuple[str, pd.DataFrame]:
        """
        Async fetch of Alpaca news for a single symbol.

        Ports the pagination loop from get_news_data() using aiohttp instead
        of requests, keeping the event loop free during I/O waits.

        Args:
            session (aiohttp.ClientSession): Shared aiohttp session.
            symbol (str): Stock symbol to fetch news for.
            start (Union[str, datetime]): Start date for news.
            end (Union[str, datetime], optional): End date for news. Defaults to today.
            limit (int, optional): Number of news items per request. Defaults to 50.
            include_content (bool, optional): Whether to include full article content. Defaults to False.

        Returns:
            Tuple[str, pd.DataFrame]: Tuple of (symbol, DataFrame). DataFrame is empty on failure.
        """
        start_dt, end_dt = normalize_date_range(start, end, default_end_to_now=True)
        start_str = format_date_to_string(start_dt)
        end_str = format_date_to_string(end_dt)

        headers = {
            "accept": "application/json",
            "APCA-API-KEY-ID": self.api_key,
            "APCA-API-SECRET-KEY": self.secret_key,
        }
        params = {
            "symbols": symbol,
            "start": start_str,
            "end": end_str,
            "limit": limit,
            "include_content": str(include_content).lower(),
            "sort": self.DEFAULT_NEWS_SORT,
        }

        all_news = []
        page_token = None

        try:
            while True:
                if page_token:
                    params["page_token"] = page_token

                async with session.get(self.NEWS_API_BASE_URL, headers=headers, params=params) as response:
                    response.raise_for_status()
                    data = await response.json(content_type=None)

                news_items = data.get("news", [])
                if not news_items:
                    break
                all_news.extend(news_items)

                page_token = data.get("next_page_token")
                if not page_token:
                    break

        except Exception as e:
            logger.error("async_fetch_news failed for {symbol}: {e}", symbol=symbol, e=e)

        df = pd.DataFrame(all_news) if all_news else pd.DataFrame()
        return symbol, df

connect()

Connect to the historical data client of Alpaca.

Reinitializes the stock historical client with current credentials.

Raises:

Type Description
AuthenticationError

If API credentials are not provided.

Source code in src/quantrl_lab/data/sources/alpaca_loader.py
def connect(self) -> None:
    """
    Connect to the historical data client of Alpaca.

    Reinitializes the stock historical client with current credentials.

    Raises:
        AuthenticationError: If API credentials are not provided.
    """
    if not self.api_key or not self.secret_key:
        raise AuthenticationError("Alpaca API credentials not provided")
    self.stock_historical_client = StockHistoricalDataClient(self.api_key, self.secret_key)

disconnect()

Disconnect from the historical data client.

Source code in src/quantrl_lab/data/sources/alpaca_loader.py
def disconnect(self) -> None:
    """Disconnect from the historical data client."""
    if self.stock_historical_client:
        self.stock_historical_client.close()

is_connected()

Check if the historical client is initialized and credentials are valid.

Returns:

Name Type Description
bool bool

True if the client is initialized with valid credentials, False otherwise.

Source code in src/quantrl_lab/data/sources/alpaca_loader.py
def is_connected(self) -> bool:
    """
    Check if the historical client is initialized and credentials
    are valid.

    Returns:
        bool: True if the client is initialized with valid credentials, False otherwise.
    """
    try:
        return self.stock_historical_client is not None and (
            self.api_key is not None and self.secret_key is not None
        )
    except Exception:
        return False

get_historical_ohlcv_data(symbols, start=None, end=None, timeframe='1d', **kwargs)

Get historical OHLCV data from Alpaca.

end is not compulsory and defaults to today if not provided.

Parameters:

Name Type Description Default
symbols Union[str, List[str]]

Stock symbol(s) to fetch data for.

required
start Union[str, datetime]

Start date for historical data.

None
end Union[str, datetime]

End date for historical data. Defaults to today.

None
timeframe str

The bar timeframe (1d, 1h, 1m, etc.). Defaults to "1d".

'1d'
**kwargs Any

Additional arguments to pass to Alpaca API.

{}

Returns:

Type Description
DataFrame

pd.DataFrame: Raw OHLCV data.

Source code in src/quantrl_lab/data/sources/alpaca_loader.py
def get_historical_ohlcv_data(
    self,
    symbols: Union[str, List[str]],
    start: Optional[Union[str, datetime]] = None,
    end: Optional[Union[str, datetime]] = None,
    timeframe: str = "1d",
    **kwargs: Any,
) -> pd.DataFrame:
    """
    Get historical OHLCV data from Alpaca.

    `end` is not compulsory and defaults to today if not provided.

    Args:
        symbols (Union[str, List[str]]): Stock symbol(s) to fetch data for.
        start (Union[str, datetime], optional): Start date for historical data.
        end (Union[str, datetime], optional): End date for historical data. Defaults to today.
        timeframe (str, optional): The bar timeframe (1d, 1h, 1m, etc.). Defaults to "1d".
        **kwargs: Additional arguments to pass to Alpaca API.

    Returns:
        pd.DataFrame: Raw OHLCV data.
    """
    if start is None:
        raise InvalidParametersError("Alpaca requires a 'start' date for historical data.")

    start_dt, end_dt = normalize_date_range(start, end, default_end_to_now=True)
    symbol_list = normalize_symbols(symbols)

    logger.info(
        "Fetching historical data for {symbols} from {start} to {end} with timeframe {timeframe}",
        symbols=symbol_list,
        start=start_dt,
        end=end_dt,
        timeframe=timeframe,
    )

    alpaca_timeframe = ALPACA_MAPPINGS.get_timeframe(timeframe)

    request_params = StockBarsRequest(
        symbol_or_symbols=symbol_list,
        timeframe=alpaca_timeframe,
        start=start_dt,
        end=end_dt,
        **kwargs,
    )

    bars = self.stock_historical_client.get_stock_bars(request_params)
    bars_df = bars.df.reset_index()
    bars_df = standardize_ohlcv_columns(bars_df, ALPACA_MAPPINGS.ohlcv_columns)
    bars_df = add_date_column_from_timestamp(bars_df, timestamp_col="Timestamp")

    num_symbols = len(set(bars_df["Symbol"])) if "Symbol" in bars_df.columns else 1
    log_dataframe_info(
        bars_df,
        f"Fetched OHLCV data for {num_symbols} symbol(s)",
        symbol=None,
    )

    return bars_df

get_latest_quote(symbol, **kwargs)

Get the latest quote for a symbol from Alpaca.

Parameters:

Name Type Description Default
symbol str

Stock symbol to fetch quote for.

required
**kwargs Any

Additional arguments such as feed type.

{}

Returns:

Name Type Description
Dict Dict

Output dictionary.

Source code in src/quantrl_lab/data/sources/alpaca_loader.py
def get_latest_quote(self, symbol: str, **kwargs: Any) -> Dict:
    """
    Get the latest quote for a symbol from Alpaca.

    Args:
        symbol (str): Stock symbol to fetch quote for.
        **kwargs: Additional arguments such as feed type.

    Returns:
        Dict: Output dictionary.
    """
    request_params = StockLatestQuoteRequest(symbol_or_symbols=symbol)
    return self.stock_historical_client.get_stock_latest_quote(request_params)

get_latest_trade(symbol, **kwargs)

Get the latest trade for a symbol from Alpaca.

Parameters:

Name Type Description Default
symbol str

Stock symbol to fetch trade for.

required
**kwargs Any

Additional arguments such as feed type.

{}

Returns:

Name Type Description
Dict Dict

Output dictionary.

Source code in src/quantrl_lab/data/sources/alpaca_loader.py
def get_latest_trade(self, symbol: str, **kwargs: Any) -> Dict:
    """
    Get the latest trade for a symbol from Alpaca.

    Args:
        symbol (str): Stock symbol to fetch trade for.
        **kwargs: Additional arguments such as feed type.

    Returns:
        Dict: Output dictionary.
    """
    request_params = StockLatestTradeRequest(symbol_or_symbols=symbol)
    return self.stock_historical_client.get_stock_latest_trade(request_params)

subscribe_to_updates(symbol, data_type='trades') async

Subscribe to real-time market data updates.

Parameters:

Name Type Description Default
symbol str

The stock symbol to subscribe to.

required
data_type str

The type of data to subscribe to ('trades', 'quotes', 'bars'). Defaults to "trades".

'trades'
Source code in src/quantrl_lab/data/sources/alpaca_loader.py
async def subscribe_to_updates(self, symbol: str, data_type: str = "trades") -> None:
    """
    Subscribe to real-time market data updates.

    Args:
        symbol (str): The stock symbol to subscribe to.
        data_type (str, optional): The type of data to subscribe to
            ('trades', 'quotes', 'bars'). Defaults to "trades".
    """
    if data_type == "trades":
        self.stock_stream_client.subscribe_trades(self._trade_handler, symbol)
    elif data_type == "quotes":

        async def quote_handler(data):
            logger.debug("Quote: {data}", data=data)

        self.stock_stream_client.subscribe_quotes(quote_handler, symbol)
    elif data_type == "bars":

        async def bar_handler(data):
            logger.debug("Bar: {data}", data=data)

        self.stock_stream_client.subscribe_bars(bar_handler, symbol)
    else:
        logger.error("Unknown data type '{data_type}' for subscription", data_type=data_type)
        return

    self._subscribed_symbols.add(symbol)
    logger.success("Subscribed to {data_type} for {symbol}", data_type=data_type, symbol=symbol)

start_streaming() async

Initialize, subscribe, and run the data stream.

Source code in src/quantrl_lab/data/sources/alpaca_loader.py
async def start_streaming(self):
    """Initialize, subscribe, and run the data stream."""
    logger.info("Initializing stream...")
    try:
        if not self._subscribed_symbols:
            logger.warning("No symbols subscribed. Call subscribe_to_updates() first.")
            return
        await self.stock_stream_client._run_forever()
    except KeyboardInterrupt:
        logger.info("Stream stopped by user.")
    except Exception as e:
        logger.exception("An error occurred while streaming: {e}", e=e)

stop_streaming() async

Stop the WebSocket connection and clean up resources.

Source code in src/quantrl_lab/data/sources/alpaca_loader.py
async def stop_streaming(self):
    """Stop the WebSocket connection and clean up resources."""
    logger.info("Stopping WebSocket connection...")
    await self.stock_stream_client.stop_ws()
    logger.success("WebSocket connection stopped")

get_news_data(symbols, start, end=None, limit=50, include_content=False, verbose=False, silent_errors=False, **kwargs)

Get news for specified symbols from Alpaca News API.

Parameters:

Name Type Description Default
symbols Union[str, List[str]]

Stock symbol(s) to fetch news for.

required
start Union[str, datetime]

Start date for news.

required
end Union[str, datetime]

End date for news. Defaults to today.

None
limit int

Number of news items per request. Defaults to 50.

50
include_content bool

Whether to include full article content. Defaults to False.

False
verbose bool

Whether to log progress. Defaults to False.

False
silent_errors bool

Whether to suppress connection errors. Defaults to False.

False
**kwargs Any

Additional parameters.

{}

Returns:

Type Description
Union[DataFrame, Dict]

pd.DataFrame: News data.

Source code in src/quantrl_lab/data/sources/alpaca_loader.py
def get_news_data(
    self,
    symbols: Union[str, List[str]],
    start: Union[str, datetime],
    end: Optional[Union[str, datetime]] = None,
    limit: int = 50,
    include_content: bool = False,
    verbose: bool = False,
    silent_errors: bool = False,
    **kwargs: Any,
) -> Union[pd.DataFrame, Dict]:
    """
    Get news for specified symbols from Alpaca News API.

    Args:
        symbols (Union[str, List[str]]): Stock symbol(s) to fetch news for.
        start (Union[str, datetime]): Start date for news.
        end (Union[str, datetime], optional): End date for news. Defaults to today.
        limit (int, optional): Number of news items per request. Defaults to 50.
        include_content (bool, optional): Whether to include full article content. Defaults to False.
        verbose (bool, optional): Whether to log progress. Defaults to False.
        silent_errors (bool, optional): Whether to suppress connection errors. Defaults to False.
        **kwargs: Additional parameters.

    Returns:
        pd.DataFrame: News data.
    """
    symbol_list = normalize_symbols(symbols)
    symbols_str = ",".join(symbol_list)

    start_dt, end_dt = normalize_date_range(start, end, default_end_to_now=True)
    start_str = format_date_to_string(start_dt)
    end_str = format_date_to_string(end_dt)

    # Alpaca's Python SDK does not include a News API client, so requests is used directly
    headers = {
        "accept": "application/json",
        "APCA-API-KEY-ID": self.api_key,
        "APCA-API-SECRET-KEY": self.secret_key,
    }

    params = {
        "symbols": symbols_str,
        "start": start_str,
        "end": end_str,
        "limit": limit,
        "include_content": str(include_content).lower(),
        "sort": self.DEFAULT_NEWS_SORT,
    }

    all_news = []
    page_token = None
    page_count = 0

    if verbose:
        logger.info(
            "Fetching news for {symbols} from {start} to {end} (limit={limit}, include_content={include})",
            symbols=symbols_str,
            start=start_str,
            end=end_str,
            limit=limit,
            include=include_content,
        )

    while True:
        if page_token:
            params["page_token"] = page_token

        try:
            response = requests.get(self.NEWS_API_BASE_URL, headers=headers, params=params)
            response.raise_for_status()

            data = response.json()
            news_items = data.get("news", [])

            if not news_items:
                break

            all_news.extend(news_items)
            page_count += 1

            if verbose:
                logger.debug(
                    "Fetched page {page} (total_items={total})",
                    page=page_count,
                    total=len(all_news),
                )

            page_token = data.get("next_page_token")
            if not page_token:
                break

        except requests.exceptions.RequestException as e:
            if silent_errors:
                logger.debug("Silent failure fetching news: {e}", e=e)
            else:
                logger.error("Error fetching news: {e}", e=e)
            break

    if verbose:
        logger.success("Total news items fetched: {n}", n=len(all_news))

    if all_news:
        return pd.DataFrame(all_news)
    else:
        return pd.DataFrame()

async_fetch_news(session, symbol, start, end=None, limit=50, include_content=False) async

Async fetch of Alpaca news for a single symbol.

Ports the pagination loop from get_news_data() using aiohttp instead of requests, keeping the event loop free during I/O waits.

Parameters:

Name Type Description Default
session ClientSession

Shared aiohttp session.

required
symbol str

Stock symbol to fetch news for.

required
start Union[str, datetime]

Start date for news.

required
end Union[str, datetime]

End date for news. Defaults to today.

None
limit int

Number of news items per request. Defaults to 50.

50
include_content bool

Whether to include full article content. Defaults to False.

False

Returns:

Type Description
Tuple[str, DataFrame]

Tuple[str, pd.DataFrame]: Tuple of (symbol, DataFrame). DataFrame is empty on failure.

Source code in src/quantrl_lab/data/sources/alpaca_loader.py
async def async_fetch_news(
    self,
    session: "aiohttp.ClientSession",
    symbol: str,
    start: Union[str, datetime],
    end: Optional[Union[str, datetime]] = None,
    limit: int = 50,
    include_content: bool = False,
) -> Tuple[str, pd.DataFrame]:
    """
    Async fetch of Alpaca news for a single symbol.

    Ports the pagination loop from get_news_data() using aiohttp instead
    of requests, keeping the event loop free during I/O waits.

    Args:
        session (aiohttp.ClientSession): Shared aiohttp session.
        symbol (str): Stock symbol to fetch news for.
        start (Union[str, datetime]): Start date for news.
        end (Union[str, datetime], optional): End date for news. Defaults to today.
        limit (int, optional): Number of news items per request. Defaults to 50.
        include_content (bool, optional): Whether to include full article content. Defaults to False.

    Returns:
        Tuple[str, pd.DataFrame]: Tuple of (symbol, DataFrame). DataFrame is empty on failure.
    """
    start_dt, end_dt = normalize_date_range(start, end, default_end_to_now=True)
    start_str = format_date_to_string(start_dt)
    end_str = format_date_to_string(end_dt)

    headers = {
        "accept": "application/json",
        "APCA-API-KEY-ID": self.api_key,
        "APCA-API-SECRET-KEY": self.secret_key,
    }
    params = {
        "symbols": symbol,
        "start": start_str,
        "end": end_str,
        "limit": limit,
        "include_content": str(include_content).lower(),
        "sort": self.DEFAULT_NEWS_SORT,
    }

    all_news = []
    page_token = None

    try:
        while True:
            if page_token:
                params["page_token"] = page_token

            async with session.get(self.NEWS_API_BASE_URL, headers=headers, params=params) as response:
                response.raise_for_status()
                data = await response.json(content_type=None)

            news_items = data.get("news", [])
            if not news_items:
                break
            all_news.extend(news_items)

            page_token = data.get("next_page_token")
            if not page_token:
                break

    except Exception as e:
        logger.error("async_fetch_news failed for {symbol}: {e}", symbol=symbol, e=e)

    df = pd.DataFrame(all_news) if all_news else pd.DataFrame()
    return symbol, df

YFinance

yfinance_loader

YFinanceDataLoader

Bases: DataSource, FundamentalDataCapable, HistoricalDataCapable

Yahoo Finance implementation that provides market data and fundamental data.

Source code in src/quantrl_lab/data/sources/yfinance_loader.py
class YFinanceDataLoader(DataSource, FundamentalDataCapable, HistoricalDataCapable):
    """Yahoo Finance implementation that provides market data and
    fundamental data."""

    def __init__(
        self,
        max_retries: int = 3,
        delay: int = 1,
    ):
        # Do not initialize ticker-related variables here to keep the object reusable
        self.max_retries = max_retries
        self.delay = delay

    @property
    def source_name(self) -> str:
        return "Yahoo Finance"

    def connect(self):
        """yfinance doesn't require explicit connection - it uses HTTP requests."""
        pass

    def disconnect(self):
        """yfinance doesn't require explicit connection - it uses HTTP requests."""
        pass

    def is_connected(self) -> bool:
        """yfinance uses HTTP requests - assume connected if no network issues."""
        return True

    def list_available_instruments(
        self,
        instrument_type: Optional[str] = None,
        market: Optional[str] = None,
        **kwargs,
    ) -> List[str]:
        # TODO
        pass

    def get_fundamental_data(
        self,
        symbol: str,
        frequency: str = "quarterly",
        **kwargs: Any,
    ) -> pd.DataFrame:
        """
        Get all fundamental data for a symbol including income
        statement, cash flow, and balance sheet.

        Args:
            symbol (str): Stock symbol; only a single symbol is supported.
            frequency (str, optional): Frequency of data. Defaults to "quarterly".
            **kwargs: Additional yfinance parameters.

        Returns:
            pd.DataFrame: DataFrame with raw fundamental data.
        """
        income_statement = self._get_income_statement(symbol, frequency=frequency)
        cash_flow = self._get_cash_flow(symbol, frequency=frequency)
        balance_sheet = self._get_balance_sheet(symbol, frequency=frequency)

        df = income_statement.merge(cash_flow, on="Date", how="outer")
        df = df.merge(balance_sheet, on="Date", how="outer")

        df["Symbol"] = symbol

        essential_columns = [
            "Date",
            "Symbol",
        ] + financial_columns.get_all_statement_columns()
        available_columns = [col for col in essential_columns if col in df.columns]

        return df[available_columns]

    def _get_income_statement(self, symbol: str, frequency: str = "quarterly") -> pd.DataFrame:
        """
        Get income statement for a symbol.

        Args:
            symbol (str): Stock symbol; only a single symbol is supported.
            frequency (str, optional): Frequency of data. Defaults to "quarterly".

        Returns:
            pd.DataFrame: DataFrame with raw income statement data.
        """
        logger.info("Fetching income statement for {symbol}", symbol=symbol)
        ticker = yf.Ticker(symbol)
        df = ticker.get_income_stmt(freq=frequency).T.reset_index(names="Date")
        df["Date"] = pd.to_datetime(df["Date"])
        return df

    def _get_cash_flow(self, symbol: str, frequency: str = "quarterly") -> pd.DataFrame:
        """
        Get cash flow statement for a symbol.

        Args:
            symbol (str): Stock symbol; only a single symbol is supported.
            frequency (str, optional): Frequency of data. Defaults to "quarterly".

        Returns:
            pd.DataFrame: DataFrame with raw cash flow data.
        """
        logger.info("Fetching cash flow statement for {symbol}", symbol=symbol)
        ticker = yf.Ticker(symbol)
        df = ticker.get_cashflow(freq=frequency).T.reset_index(names="Date")
        df["Date"] = pd.to_datetime(df["Date"])
        return df

    def _get_balance_sheet(self, symbol: str, frequency: str = "quarterly") -> pd.DataFrame:
        """
        Get balance sheet for a symbol.

        Args:
            symbol (str): Stock symbol; only a single symbol is supported.
            frequency (str, optional): Frequency of data. Defaults to "quarterly".

        Returns:
            pd.DataFrame: DataFrame with raw balance sheet data.
        """
        logger.info("Fetching balance sheet for {symbol}", symbol=symbol)
        ticker = yf.Ticker(symbol)
        df = ticker.get_balance_sheet(freq=frequency).T.reset_index(names="Date")
        df["Date"] = pd.to_datetime(df["Date"])
        return df

    def get_historical_ohlcv_data(
        self,
        symbols: Union[str, List[str]],
        start: Optional[Union[str, datetime]] = None,
        end: Optional[Union[str, datetime]] = None,
        timeframe: str = "1d",
        **kwargs: Any,
    ) -> pd.DataFrame:
        """
        Get historical OHLCV data for a list of symbols.

        Args:
            symbols (Union[str, List[str]]): A single symbol or a list of symbols.
            start (Union[str, datetime], optional): Start date or datetime.
            end (Union[str, datetime], optional): End date or datetime.
            timeframe (str, optional): Bar interval. Defaults to "1d".
            **kwargs: Additional yfinance parameters, including 'period' (e.g., '1y', 'max').

        Returns:
            pd.DataFrame: Output dataframe with OHLCV data (raw).

        Raises:
            ValueError: If all elements in 'symbols' are not strings.
            TypeError: If 'symbols' is not a string or list of strings.
            ValueError: If interval is invalid.
            ValueError: If start or end date is invalid.
            ValueError: If start date is not before end date.
            ValueError: If 1 min interval start date is not within 30 days from today.
        """
        symbol_list = normalize_symbols(symbols)

        if timeframe not in YFinanceInterval.values():
            raise InvalidParametersError(f"Invalid interval. Must be one of {YFinanceInterval.values()}.")

        period = kwargs.pop("period", None)
        start_dt, end_dt = None, None

        if start is not None:
            start_dt, end_dt = normalize_date_range(start, end, default_end_to_now=True, validate_order=True)

            # Yahoo Finance restricts 1m interval data to the last 30 days
            if timeframe == "1m" and start_dt < datetime.now() - timedelta(days=30):
                raise InvalidParametersError(
                    "For 1 min interval, the start date must be within 30 days from the current date."
                )
        elif period is None:
            logger.warning("Neither 'start' nor 'period' provided. Defaulting to period='1mo'")
            period = "1mo"

        for attempt in range(self.max_retries):
            try:
                result = pd.DataFrame()
                for symbol in symbol_list:
                    ticker = yf.Ticker(symbol)
                    if start_dt is not None:
                        data = ticker.history(start=start_dt, end=end_dt, interval=timeframe, **kwargs).assign(
                            Symbol=symbol
                        )
                    else:
                        data = ticker.history(period=period, interval=timeframe, **kwargs).assign(Symbol=symbol)
                    result = pd.concat([result, data])

                df_result = result.reset_index()
                log_dataframe_info(df_result, f"Fetched OHLCV data for {len(symbol_list)} symbol(s)")
                return df_result

            except Exception as e:
                if attempt < self.max_retries - 1:
                    logger.warning(
                        "Failed to fetch data for {symbols} (attempt {attempt}/{max_retries}): {error}",
                        symbols=symbol_list,
                        attempt=attempt + 1,
                        max_retries=self.max_retries,
                        error=str(e),
                    )
                    time.sleep(self.delay)
                else:
                    logger.error(
                        "Failed to fetch data for {symbols} after {max_retries} retries: {error}",
                        symbols=symbol_list,
                        max_retries=self.max_retries,
                        error=str(e),
                    )
                    return pd.DataFrame()

    def _fetch_single_symbol(
        self,
        symbol: str,
        start_dt: Optional[datetime],
        end_dt: Optional[datetime],
        timeframe: str,
        period: Optional[str] = None,
    ) -> pd.DataFrame:
        """
        Fetch OHLCV data for a single symbol (blocking). Used by
        async_fetch_ohlcv.

        Args:
            symbol (str): Stock symbol to fetch.
            start_dt (datetime, optional): Start datetime.
            end_dt (datetime, optional): End datetime.
            timeframe (str): Bar interval.
            period (str, optional): yfinance period string (e.g. '1mo'). Defaults to None.

        Returns:
            pd.DataFrame: OHLCV data with reset index.
        """
        ticker = yf.Ticker(symbol)
        if start_dt is not None:
            data = ticker.history(start=start_dt, end=end_dt, interval=timeframe).assign(Symbol=symbol)
        else:
            data = ticker.history(period=period or "1mo", interval=timeframe).assign(Symbol=symbol)
        return data.reset_index()

    async def async_fetch_ohlcv(
        self,
        symbol: str,
        start: Optional[Union[str, datetime]] = None,
        end: Optional[Union[str, datetime]] = None,
        timeframe: str = "1d",
    ) -> Tuple[str, pd.DataFrame]:
        """
        Async wrapper around yfinance OHLCV fetch for a single symbol.

        Uses asyncio.to_thread() to run the blocking yfinance SDK call in a
        background thread, keeping the event loop free for concurrent fetches.

        Args:
            symbol (str): Stock symbol to fetch.
            start (Union[str, datetime], optional): Start date or datetime.
            end (Union[str, datetime], optional): End date or datetime.
            timeframe (str, optional): Bar interval. Defaults to "1d".

        Returns:
            Tuple[str, pd.DataFrame]: Tuple of (symbol, DataFrame). DataFrame is empty on failure.
        """
        start_dt, end_dt = None, None
        if start is not None:
            start_dt, end_dt = normalize_date_range(start, end, default_end_to_now=True)

        try:
            df = await asyncio.to_thread(self._fetch_single_symbol, symbol, start_dt, end_dt, timeframe)
            if df.empty:
                logger.warning("async_fetch_ohlcv: empty result for {symbol}", symbol=symbol)
            return symbol, df
        except Exception as e:
            logger.error("async_fetch_ohlcv failed for {symbol}: {e}", symbol=symbol, e=e)
            return symbol, pd.DataFrame()

connect()

yfinance doesn't require explicit connection - it uses HTTP requests.

Source code in src/quantrl_lab/data/sources/yfinance_loader.py
def connect(self):
    """yfinance doesn't require explicit connection - it uses HTTP requests."""
    pass

disconnect()

yfinance doesn't require explicit connection - it uses HTTP requests.

Source code in src/quantrl_lab/data/sources/yfinance_loader.py
def disconnect(self):
    """yfinance doesn't require explicit connection - it uses HTTP requests."""
    pass

is_connected()

yfinance uses HTTP requests - assume connected if no network issues.

Source code in src/quantrl_lab/data/sources/yfinance_loader.py
def is_connected(self) -> bool:
    """yfinance uses HTTP requests - assume connected if no network issues."""
    return True

get_fundamental_data(symbol, frequency='quarterly', **kwargs)

Get all fundamental data for a symbol including income statement, cash flow, and balance sheet.

Parameters:

Name Type Description Default
symbol str

Stock symbol; only a single symbol is supported.

required
frequency str

Frequency of data. Defaults to "quarterly".

'quarterly'
**kwargs Any

Additional yfinance parameters.

{}

Returns:

Type Description
DataFrame

pd.DataFrame: DataFrame with raw fundamental data.

Source code in src/quantrl_lab/data/sources/yfinance_loader.py
def get_fundamental_data(
    self,
    symbol: str,
    frequency: str = "quarterly",
    **kwargs: Any,
) -> pd.DataFrame:
    """
    Get all fundamental data for a symbol including income
    statement, cash flow, and balance sheet.

    Args:
        symbol (str): Stock symbol; only a single symbol is supported.
        frequency (str, optional): Frequency of data. Defaults to "quarterly".
        **kwargs: Additional yfinance parameters.

    Returns:
        pd.DataFrame: DataFrame with raw fundamental data.
    """
    income_statement = self._get_income_statement(symbol, frequency=frequency)
    cash_flow = self._get_cash_flow(symbol, frequency=frequency)
    balance_sheet = self._get_balance_sheet(symbol, frequency=frequency)

    df = income_statement.merge(cash_flow, on="Date", how="outer")
    df = df.merge(balance_sheet, on="Date", how="outer")

    df["Symbol"] = symbol

    essential_columns = [
        "Date",
        "Symbol",
    ] + financial_columns.get_all_statement_columns()
    available_columns = [col for col in essential_columns if col in df.columns]

    return df[available_columns]

get_historical_ohlcv_data(symbols, start=None, end=None, timeframe='1d', **kwargs)

Get historical OHLCV data for a list of symbols.

Parameters:

Name Type Description Default
symbols Union[str, List[str]]

A single symbol or a list of symbols.

required
start Union[str, datetime]

Start date or datetime.

None
end Union[str, datetime]

End date or datetime.

None
timeframe str

Bar interval. Defaults to "1d".

'1d'
**kwargs Any

Additional yfinance parameters, including 'period' (e.g., '1y', 'max').

{}

Returns:

Type Description
DataFrame

pd.DataFrame: Output dataframe with OHLCV data (raw).

Raises:

Type Description
ValueError

If all elements in 'symbols' are not strings.

TypeError

If 'symbols' is not a string or list of strings.

ValueError

If interval is invalid.

ValueError

If start or end date is invalid.

ValueError

If start date is not before end date.

ValueError

If 1 min interval start date is not within 30 days from today.

Source code in src/quantrl_lab/data/sources/yfinance_loader.py
def get_historical_ohlcv_data(
    self,
    symbols: Union[str, List[str]],
    start: Optional[Union[str, datetime]] = None,
    end: Optional[Union[str, datetime]] = None,
    timeframe: str = "1d",
    **kwargs: Any,
) -> pd.DataFrame:
    """
    Get historical OHLCV data for a list of symbols.

    Args:
        symbols (Union[str, List[str]]): A single symbol or a list of symbols.
        start (Union[str, datetime], optional): Start date or datetime.
        end (Union[str, datetime], optional): End date or datetime.
        timeframe (str, optional): Bar interval. Defaults to "1d".
        **kwargs: Additional yfinance parameters, including 'period' (e.g., '1y', 'max').

    Returns:
        pd.DataFrame: Output dataframe with OHLCV data (raw).

    Raises:
        ValueError: If all elements in 'symbols' are not strings.
        TypeError: If 'symbols' is not a string or list of strings.
        ValueError: If interval is invalid.
        ValueError: If start or end date is invalid.
        ValueError: If start date is not before end date.
        ValueError: If 1 min interval start date is not within 30 days from today.
    """
    symbol_list = normalize_symbols(symbols)

    if timeframe not in YFinanceInterval.values():
        raise InvalidParametersError(f"Invalid interval. Must be one of {YFinanceInterval.values()}.")

    period = kwargs.pop("period", None)
    start_dt, end_dt = None, None

    if start is not None:
        start_dt, end_dt = normalize_date_range(start, end, default_end_to_now=True, validate_order=True)

        # Yahoo Finance restricts 1m interval data to the last 30 days
        if timeframe == "1m" and start_dt < datetime.now() - timedelta(days=30):
            raise InvalidParametersError(
                "For 1 min interval, the start date must be within 30 days from the current date."
            )
    elif period is None:
        logger.warning("Neither 'start' nor 'period' provided. Defaulting to period='1mo'")
        period = "1mo"

    for attempt in range(self.max_retries):
        try:
            result = pd.DataFrame()
            for symbol in symbol_list:
                ticker = yf.Ticker(symbol)
                if start_dt is not None:
                    data = ticker.history(start=start_dt, end=end_dt, interval=timeframe, **kwargs).assign(
                        Symbol=symbol
                    )
                else:
                    data = ticker.history(period=period, interval=timeframe, **kwargs).assign(Symbol=symbol)
                result = pd.concat([result, data])

            df_result = result.reset_index()
            log_dataframe_info(df_result, f"Fetched OHLCV data for {len(symbol_list)} symbol(s)")
            return df_result

        except Exception as e:
            if attempt < self.max_retries - 1:
                logger.warning(
                    "Failed to fetch data for {symbols} (attempt {attempt}/{max_retries}): {error}",
                    symbols=symbol_list,
                    attempt=attempt + 1,
                    max_retries=self.max_retries,
                    error=str(e),
                )
                time.sleep(self.delay)
            else:
                logger.error(
                    "Failed to fetch data for {symbols} after {max_retries} retries: {error}",
                    symbols=symbol_list,
                    max_retries=self.max_retries,
                    error=str(e),
                )
                return pd.DataFrame()

async_fetch_ohlcv(symbol, start=None, end=None, timeframe='1d') async

Async wrapper around yfinance OHLCV fetch for a single symbol.

Uses asyncio.to_thread() to run the blocking yfinance SDK call in a background thread, keeping the event loop free for concurrent fetches.

Parameters:

Name Type Description Default
symbol str

Stock symbol to fetch.

required
start Union[str, datetime]

Start date or datetime.

None
end Union[str, datetime]

End date or datetime.

None
timeframe str

Bar interval. Defaults to "1d".

'1d'

Returns:

Type Description
Tuple[str, DataFrame]

Tuple[str, pd.DataFrame]: Tuple of (symbol, DataFrame). DataFrame is empty on failure.

Source code in src/quantrl_lab/data/sources/yfinance_loader.py
async def async_fetch_ohlcv(
    self,
    symbol: str,
    start: Optional[Union[str, datetime]] = None,
    end: Optional[Union[str, datetime]] = None,
    timeframe: str = "1d",
) -> Tuple[str, pd.DataFrame]:
    """
    Async wrapper around yfinance OHLCV fetch for a single symbol.

    Uses asyncio.to_thread() to run the blocking yfinance SDK call in a
    background thread, keeping the event loop free for concurrent fetches.

    Args:
        symbol (str): Stock symbol to fetch.
        start (Union[str, datetime], optional): Start date or datetime.
        end (Union[str, datetime], optional): End date or datetime.
        timeframe (str, optional): Bar interval. Defaults to "1d".

    Returns:
        Tuple[str, pd.DataFrame]: Tuple of (symbol, DataFrame). DataFrame is empty on failure.
    """
    start_dt, end_dt = None, None
    if start is not None:
        start_dt, end_dt = normalize_date_range(start, end, default_end_to_now=True)

    try:
        df = await asyncio.to_thread(self._fetch_single_symbol, symbol, start_dt, end_dt, timeframe)
        if df.empty:
            logger.warning("async_fetch_ohlcv: empty result for {symbol}", symbol=symbol)
        return symbol, df
    except Exception as e:
        logger.error("async_fetch_ohlcv failed for {symbol}: {e}", symbol=symbol, e=e)
        return symbol, pd.DataFrame()

Alpha Vantage

alpha_vantage_loader

AlphaVantageDataLoader

Bases: DataSource, FundamentalDataCapable, HistoricalDataCapable, MacroDataCapable, NewsDataCapable

Alpha Vantage implementation that provides various datasets.

Source code in src/quantrl_lab/data/sources/alpha_vantage_loader.py
  32
  33
  34
  35
  36
  37
  38
  39
  40
  41
  42
  43
  44
  45
  46
  47
  48
  49
  50
  51
  52
  53
  54
  55
  56
  57
  58
  59
  60
  61
  62
  63
  64
  65
  66
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
class AlphaVantageDataLoader(
    DataSource,
    FundamentalDataCapable,
    HistoricalDataCapable,
    MacroDataCapable,
    NewsDataCapable,
):
    """Alpha Vantage implementation that provides various datasets."""

    DEFAULT_MAX_RETRIES = 3
    DEFAULT_RETRY_DELAY = 5
    DEFAULT_RATE_LIMIT_DELAY = 1.2
    NUMERIC_COLUMNS = ["Open", "High", "Low", "Close", "Volume", "Adj_close"]

    def __init__(
        self,
        api_key: str = None,
        max_retries: int = DEFAULT_MAX_RETRIES,
        delay: int = DEFAULT_RETRY_DELAY,
        rate_limit_delay: float = DEFAULT_RATE_LIMIT_DELAY,
    ):
        self.api_key = api_key or os.environ.get("ALPHA_VANTAGE_API_KEY")
        self.max_retries = max_retries
        self.delay = delay
        self.rate_limit_delay = rate_limit_delay
        self._last_request_time: float = 0

    @property
    def source_name(self) -> str:
        return "Alpha Vantage"

    def connect(self):
        """Alpha Vantage doesn't require explicit connection - it uses HTTP requests."""
        pass

    def disconnect(self):
        """Alpha Vantage doesn't require explicit connection - it uses HTTP requests."""
        pass

    def is_connected(self) -> bool:
        """Alpha Vantage uses HTTP requests - assume connected if no network issues."""
        return True

    def list_available_instruments(
        self,
        instrument_type: Optional[str] = None,
        market: Optional[str] = None,
        **kwargs,
    ) -> List[str]:
        """
        Alpha Vantage does not provide a direct API to list all
        available instruments.

        This method is a placeholder.
        """
        logger.warning("Alpha Vantage does not support listing available instruments.")
        return []

    def get_historical_ohlcv_data(
        self,
        symbols: str,
        start: Optional[Union[str, datetime]] = None,
        end: Optional[Union[str, datetime]] = None,
        timeframe: str = "1d",
        **kwargs: Any,
    ) -> pd.DataFrame:
        """
        Get historical OHLCV data from Alpha Vantage.

        Args:
            symbols (str): Stock symbol to fetch data for.
            start (Union[str, datetime], optional): Start date for filtering.
                If None, no start filtering is applied. Defaults to None.
            end (Union[str, datetime], optional): End date for filtering.
                If None, no end filtering is applied. Defaults to None.
            timeframe (str, optional): Time interval - "1d" (daily), or intraday
                ("1min", "5min", "15min", "30min", "60min"). Defaults to "1d".
            **kwargs: Additional parameters. 'adjusted' (bool) enables split/dividend
                adjustment for daily data (premium). 'outputsize' (str) is "compact" or "full"
                (premium). 'month' (str, "YYYY-MM") fetches historical intraday month (premium).

        Returns:
            pd.DataFrame: OHLCV data, optionally filtered by date range.

        Note:
            outputsize='full' and historical intraday 'month' parameter require a premium API key.
            Rate limit: 25 requests/day, 1 request/second burst limit on the free tier.
        """
        adjusted = kwargs.pop("adjusted", False)

        parsed_start_date = None
        parsed_end_date = None

        if start is not None or end is not None:
            if start is not None and end is not None:
                parsed_start_date, parsed_end_date = normalize_date_range(
                    start, end, default_end_to_now=False, validate_order=True
                )
            elif start is not None:
                from quantrl_lab.data.utils import normalize_date

                parsed_start_date = normalize_date(start)
            elif end is not None:
                from quantrl_lab.data.utils import normalize_date

                parsed_end_date = normalize_date(end)

        if parsed_start_date or parsed_end_date:
            if parsed_start_date and parsed_end_date:
                logger.info(
                    "Fetching {timeframe} data for {symbol} from {start} to {end}",
                    timeframe=timeframe,
                    symbol=symbols,
                    start=parsed_start_date.date(),
                    end=parsed_end_date.date(),
                )
            elif parsed_start_date:
                logger.info(
                    "Fetching {timeframe} data for {symbol} from {start} onwards",
                    timeframe=timeframe,
                    symbol=symbols,
                    start=parsed_start_date.date(),
                )
            else:
                logger.info(
                    "Fetching {timeframe} data for {symbol} up to {end}",
                    timeframe=timeframe,
                    symbol=symbols,
                    end=parsed_end_date.date(),
                )
        else:
            logger.info(
                "Fetching {timeframe} data for {symbol} (all available data)",
                timeframe=timeframe,
                symbol=symbols,
            )

        if timeframe == "1d":
            # Default to compact to avoid premium-only "full" output size
            if "outputsize" not in kwargs:
                kwargs["outputsize"] = "compact"
                logger.debug(
                    "Defaulting to outputsize='compact' (last 100 data points). "
                    "Use outputsize='full' with premium API key for 20+ years of data."
                )

            if adjusted:
                raw_data = self._get_daily_adjusted_data(symbols, **kwargs)
                logger.debug(f"Using adjusted daily data for {symbols}")
            else:
                raw_data = self._get_daily_data(symbols, **kwargs)
                logger.debug(f"Using raw daily data for {symbols}")

            time_series_key = "Time Series (Daily)"

        elif timeframe in ["1min", "5min", "15min", "30min", "60min"]:
            if adjusted:
                logger.warning("Adjusted prices not available for intraday data, using raw prices")

            if "month" in kwargs:
                logger.info(f"Fetching {timeframe} intraday data for {symbols} for month: {kwargs['month']}")
            else:
                logger.info(
                    f"Fetching {timeframe} intraday data for {symbols} (recent data - typically last 15-30 days)"
                )
                logger.debug("For historical intraday data, specify 'month=\"YYYY-MM\"' in kwargs")

            raw_data = self._get_intraday_data(symbols, interval=timeframe, **kwargs)
            time_series_key = f"Time Series ({timeframe})"
        else:
            raise InvalidParametersError(
                f"Unsupported timeframe: {timeframe}. Use '1d' or intraday intervals like "
                "'1min', '5min', '15min', '30min', '60min'"
            )

        if not raw_data:
            logger.error(f"Failed to fetch data for {symbols}")
            return pd.DataFrame()

        if time_series_key not in raw_data:
            logger.error(
                f"Expected key '{time_series_key}' not found in API response for {symbols}. "
                "This may be due to rate limits, invalid symbol, or no data available."
            )
            available_keys = list(raw_data.keys())
            logger.debug(f"Available keys in response: {available_keys}")
            return pd.DataFrame()

        time_series = raw_data[time_series_key]

        if not time_series:
            logger.warning(f"Empty time series data for {symbols}")
            return pd.DataFrame()

        df = pd.DataFrame.from_dict(time_series, orient="index")

        column_mapping = ALPHA_VANTAGE_COLUMN_MAPPER.get_mapping(timeframe, adjusted)
        df = df.rename(columns=column_mapping)

        expected_columns = list(set(column_mapping.values()))
        df = df[df.columns.intersection(expected_columns)]

        numeric_columns = self.NUMERIC_COLUMNS.copy()
        if "Dividend" in df.columns:
            numeric_columns.append("Dividend")
        if "Split_coeff" in df.columns:
            numeric_columns.append("Split_coeff")

        df = convert_columns_to_numeric(df, columns=numeric_columns, errors="coerce")

        df.index = pd.to_datetime(df.index)
        df.index.name = "Date"

        # Alpha Vantage returns newest first, so sort ascending
        df = df.sort_index()

        if parsed_start_date is not None:
            df = df[df.index >= parsed_start_date]
        if parsed_end_date is not None:
            df = df[df.index <= parsed_end_date]

        if df.empty:
            if parsed_start_date or parsed_end_date:
                logger.warning(
                    "No data found for {symbol} matching the specified date criteria",
                    symbol=symbols,
                )
            else:
                logger.warning("No data found for {symbol}", symbol=symbols)
        else:
            data_type = (
                "adjusted daily"
                if (timeframe == "1d" and adjusted)
                else (f"{timeframe} intraday" if timeframe != "1d" else "daily")
            )
            log_dataframe_info(df, f"Retrieved {data_type} records", symbol=symbols)

        df.reset_index(inplace=True)
        df["Date"] = df["Date"].dt.strftime("%Y-%m-%d")

        return df

    def get_fundamental_data(
        self, symbol: str, metrics: List[Union[FundamentalMetric, str]], **kwargs: Any
    ) -> Union[pd.DataFrame, Dict]:
        """
        Get fundamental data for a single symbol by combining multiple
        Alpha Vantage API calls.

        Args:
            symbol (str): Stock symbol to fetch data for.
            metrics (List[Union[FundamentalMetric, str]]): List of FundamentalMetric enums or strings.
            **kwargs: Additional parameters. 'return_format' (str) is 'dict' or 'dataframe'.
                Defaults to 'dict'.

        Returns:
            Union[pd.DataFrame, Dict]: Dict with combined fundamental data.
        """
        results = {}

        # Map metrics to private methods - use enum objects as keys consistently
        metric_methods = {
            FundamentalMetric.COMPANY_OVERVIEW: self._get_company_overview,
            # Remark: etf profile not useful in our context
            # FundamentalMetric.ETF_PROFILE: self._get_etf_profile,
            FundamentalMetric.DIVIDENDS: self._get_dividend_data,
            FundamentalMetric.SPLITS: self._get_splits_data,
            FundamentalMetric.INCOME_STATEMENT: self._get_income_statement_data,
            FundamentalMetric.BALANCE_SHEET: self._get_balance_sheet_data,
            FundamentalMetric.CASH_FLOW: self._get_cash_flow_data,
            FundamentalMetric.EARNINGS: self._get_earnings_data,
        }

        logger.info(f"Fetching fundamental data for {symbol}")

        for metric in metrics:
            if isinstance(metric, str):
                try:
                    metric_enum = FundamentalMetric(metric.lower())
                except ValueError:
                    logger.warning(f"Unknown metric '{metric}' for symbol {symbol}")
                    results[metric] = None
                    continue
            else:
                metric_enum = metric

            if metric_enum in metric_methods:
                method = metric_methods[metric_enum]
                data = method(symbol)

                if data:
                    results[metric_enum.value] = data
                    logger.success(f"Successfully fetched {metric_enum.value} for {symbol}")
                else:
                    logger.warning(f"Failed to fetch {metric_enum.value} for {symbol}")
                    results[metric_enum.value] = None
            else:
                logger.warning(f"Unsupported metric '{metric_enum}' for symbol {symbol}")
                results[metric_enum.value] = None

        return results

    def get_news_data(
        self,
        symbols: Union[str, List[str]],
        start: Union[str, datetime],
        end: Optional[Union[str, datetime]] = None,
        limit: int = 50,
        **kwargs: Any,
    ) -> pd.DataFrame:
        """
        Fetch news data for given symbols from Alpha Vantage.

        Retrieves news articles related to the specified symbols within the given date range.
        Supports additional parameters like 'sort' and 'topics' to customize the news data.

        Args:
            symbols (Union[str, List[str]]): Symbols to fetch news for.
            start (Union[str, datetime]): Start datetime for news data.
            end (Union[str, datetime], optional): End datetime for news. Defaults to None (current time).
            limit (int, optional): Maximum number of news items to fetch. Defaults to 50.
            **kwargs: Additional parameters for the API request, such as 'sort' or 'topics'.

        Returns:
            pd.DataFrame: DataFrame containing news data for the specified symbols.
        """
        time_from = format_av_datetime(start)

        if end is None:
            end = datetime.now()
        time_to = format_av_datetime(end)

        if isinstance(symbols, str):
            tickers = symbols
        else:
            tickers = ",".join(symbols)

        logger.info(f"Fetching news for {tickers} from {time_from} to {time_to}")

        params = {
            "tickers": tickers,
            "time_from": time_from,
            "time_to": time_to,
            "limit": str(limit),
        }

        if "sort" in kwargs:
            params["sort"] = kwargs.pop("sort")
            logger.debug(f"Using sort order: {params['sort']}")

        if "topics" in kwargs:
            params["topics"] = kwargs.pop("topics")
            logger.debug(f"Using topics from kwargs: {params['topics']}")

        params.update(kwargs)

        news_data = self._make_api_request("NEWS_SENTIMENT", symbol="", **params)

        news_df = pd.DataFrame(news_data["feed"]) if news_data and "feed" in news_data else pd.DataFrame()

        if news_df.empty:
            logger.warning(
                f"No news data retrieved for {tickers}. This may be due to rate limits or no data available."
            )
            return news_df

        if "time_published" in news_df.columns:
            news_df.rename(columns={"time_published": "created_at"}, inplace=True)
        else:
            logger.error(f"Expected 'time_published' column not found in news data for {tickers}")
            logger.debug(f"Available columns: {list(news_df.columns)}")
            return pd.DataFrame()

        try:
            news_df["created_at"] = pd.to_datetime(news_df["created_at"], format="%Y%m%dT%H%M%S")
            news_df["Date"] = news_df["created_at"].dt.date
        except Exception as e:
            logger.error(f"Failed to parse news timestamps for {tickers}: {e}")
            return pd.DataFrame()

        if "ticker_sentiment" in news_df.columns:
            try:
                news_df["sentiment_score"] = (
                    news_df["ticker_sentiment"].apply(lambda x: self._find_ticker_sentiment(x, tickers)).astype(float)
                )
            except Exception as e:
                logger.warning(f"Failed to extract sentiment scores for {tickers}: {e}")

        logger.success(f"Retrieved {len(news_df)} news items for {tickers}")

        return news_df

    def _find_ticker_sentiment(self, sentiment_list: List[Dict], ticker_symbol: str) -> Optional[float]:
        """
        Find the sentiment score for a specific ticker in the sentiment
        list.

        Args:
            sentiment_list (List[Dict]): A list of dictionaries containing sentiments
                for different tickers.
            ticker_symbol (str): The ticker symbol to search for (e.g., 'AAPL').

        Returns:
            Optional[float]: The sentiment score for the specified ticker, or None if not found.
        """
        if not isinstance(sentiment_list, list):
            return None

        for item in sentiment_list:
            if item.get("ticker") == ticker_symbol:
                return item["ticker_sentiment_score"]
        return None

    def get_macro_data(
        self,
        indicators: Union[str, List[str], Dict[str, Dict]],
        start: Union[str, datetime],
        end: Union[str, datetime],
        **kwargs: Any,
    ) -> Dict[str, Any]:
        """
        Get macroeconomic data for specified indicators.

        Supports both standard indicator names and advanced dictionary format where each
        indicator can have its own parameters, e.g.:
        {"real_gdp": {"interval": "quarterly"}, "treasury_yield": {"interval": "monthly", "maturity": "10year"}}

        Args:
            indicators (Union[str, List[str], Dict[str, Dict]]): Indicator(s) to fetch data for.
            start (Union[str, datetime]): Start date.
            end (Union[str, datetime]): End date.
            **kwargs: Additional parameters for the API request.

        Returns:
            Dict[str, Any]: Dictionary containing macroeconomic data for the specified indicators.
                Each key is the indicator name, and the value is the fetched data.
        """
        if isinstance(indicators, dict):
            return self._get_macro_data_with_params(indicators, **kwargs)
        else:
            if isinstance(indicators, (str, MacroIndicator)):
                indicators = [indicators]

            indicator_params = {ind: {} for ind in indicators}
            return self._get_macro_data_with_params(indicator_params, **kwargs)

    def _get_macro_data_with_params(
        self, indicator_params: Dict[Union[str, MacroIndicator], Dict], **global_kwargs
    ) -> Dict[str, Any]:
        """
        Fetch macro data for multiple indicators with per-indicator
        parameter overrides.

        Args:
            indicator_params (Dict[Union[str, MacroIndicator], Dict]): Mapping from indicator
                to its specific keyword arguments.
            **global_kwargs: Global parameters applied to all indicators unless overridden.

        Returns:
            Dict[str, Any]: Mapping from indicator name to fetched data (or None on failure).
        """
        results = {}

        indicator_methods = {
            MacroIndicator.REAL_GDP: self._get_real_gdp_data,
            MacroIndicator.REAL_GDP_PER_CAPITA: self._get_real_gdp_per_capita_data,
            MacroIndicator.TREASURY_YIELD: self._get_treasury_yield_data,
            MacroIndicator.FEDERAL_FUNDS_RATE: self._get_federal_funds_rate_data,
            MacroIndicator.CPI: self._get_cpi_data,
            MacroIndicator.INFLATION: self._get_inflation_data,
            MacroIndicator.RETAIL_SALES: self._get_retail_sales_data,
            MacroIndicator.DURABLE_GOODS: self._get_durable_goods_data,
            MacroIndicator.UNEMPLOYMENT_RATE: self._get_unemployment_rate_data,
            MacroIndicator.NON_FARM_PAYROLL: self._get_non_farm_payroll_data,
        }

        logger.info(f"Fetching macro data for indicators: {list(indicator_params.keys())}")

        for indicator, ind_kwargs in indicator_params.items():
            if isinstance(indicator, str):
                try:
                    indicator_enum = MacroIndicator(indicator.lower())
                except ValueError:
                    logger.warning(f"Unknown macro indicator '{indicator}'")
                    results[indicator] = None
                    continue
            else:
                indicator_enum = indicator

            if indicator_enum in indicator_methods:
                method = indicator_methods[indicator_enum]

                merged_kwargs = {**global_kwargs, **ind_kwargs}

                try:
                    method_kwargs = self._get_method_specific_kwargs(indicator_enum, merged_kwargs)

                    data = method(**method_kwargs)
                    if data:
                        results[indicator_enum.value] = data
                        logger.success(f"Successfully fetched {indicator_enum.value} data")
                    else:
                        logger.warning(f"Failed to fetch {indicator_enum.value} data")
                        results[indicator_enum.value] = None

                except ValueError as e:
                    logger.error(f"Parameter validation error for {indicator_enum.value}: {e}")
                    results[indicator_enum.value] = None
                except Exception as e:
                    logger.error(f"Error fetching {indicator_enum.value} data: {e}")
                    results[indicator_enum.value] = None
            else:
                logger.warning(f"Unsupported macro indicator '{indicator_enum}'")
                results[indicator_enum.value] = None

        return results

    def _get_method_specific_kwargs(self, indicator: MacroIndicator, kwargs: Dict) -> Dict:
        """
        Get method-specific parameters for macroeconomic indicators.

        Args:
            indicator (MacroIndicator): Enum to filter kwargs for.
            kwargs (Dict): Additional parameters for the API request.

        Returns:
            Dict: Filtered kwargs for the specific indicator method.

        Raises:
            ValueError: If the interval or maturity parameters are invalid.
        """
        indicator_config = {
            MacroIndicator.REAL_GDP: {
                "params": ["interval"],
                "valid_intervals": ["quarterly", "annual"],
                "default_interval": "annual",
            },
            MacroIndicator.REAL_GDP_PER_CAPITA: {
                "params": [],
            },
            MacroIndicator.TREASURY_YIELD: {
                "params": ["interval", "maturity"],
                "valid_intervals": ["daily", "weekly", "monthly"],
                "valid_maturities": [
                    "3month",
                    "2year",
                    "5year",
                    "7year",
                    "10year",
                    "30year",
                ],
                "default_interval": "monthly",
                "default_maturity": "10year",
            },
            MacroIndicator.FEDERAL_FUNDS_RATE: {
                "params": ["interval"],
                "valid_intervals": ["daily", "weekly", "monthly"],
                "default_interval": "monthly",
            },
            MacroIndicator.CPI: {
                "params": ["interval"],
                "valid_intervals": ["semiannual", "monthly"],
                "default_interval": "monthly",
            },
            MacroIndicator.INFLATION: {"params": []},
            MacroIndicator.RETAIL_SALES: {"params": []},
            MacroIndicator.DURABLE_GOODS: {"params": []},
            MacroIndicator.UNEMPLOYMENT_RATE: {"params": []},
            MacroIndicator.NON_FARM_PAYROLL: {"params": []},
        }

        config = indicator_config.get(indicator, {"params": []})
        filtered_kwargs = {}

        if "interval" in config.get("params", []):
            interval = kwargs.get("interval", config.get("default_interval"))
            valid_intervals = config.get("valid_intervals", [])

            if interval and valid_intervals and interval not in valid_intervals:
                raise InvalidParametersError(
                    f"Invalid interval '{interval}' for {indicator.value}. Valid options: {valid_intervals}"
                )

            if interval:
                filtered_kwargs["interval"] = interval

        if "maturity" in config.get("params", []):
            maturity = kwargs.get("maturity", config.get("default_maturity"))
            valid_maturities = config.get("valid_maturities", [])

            if maturity and valid_maturities and maturity not in valid_maturities:
                raise InvalidParametersError(
                    f"Invalid maturity '{maturity}' for {indicator.value}. Valid options: {valid_maturities}"
                )

            if maturity:
                filtered_kwargs["maturity"] = maturity

        for key, value in kwargs.items():
            if key not in ["interval", "maturity"] and key not in filtered_kwargs:
                filtered_kwargs[key] = value

        return filtered_kwargs

    def _make_api_request(self, function: str, symbol: str = "", **params) -> Optional[Dict[str, Any]]:
        """
        Centralized private method for making Alpha Vantage API
        requests.

        Args:
            function (str): Alpha Vantage function name (e.g., 'TIME_SERIES_DAILY').
            symbol (str, optional): Stock symbol. Omitted for macro/news endpoints. Defaults to "".
            **params: Additional query parameters for the API request.

        Returns:
            Optional[Dict[str, Any]]: Parsed JSON response, or None if all retries are exhausted.
        """
        elapsed = time.time() - self._last_request_time
        if elapsed < self.rate_limit_delay:
            sleep_time = self.rate_limit_delay - elapsed
            time.sleep(sleep_time)

        url_params = {
            "function": function,
            "apikey": self.api_key,
            **params,
        }

        if symbol:
            url_params["symbol"] = symbol

        for attempt in range(self.max_retries):
            try:
                self._last_request_time = time.time()
                response = requests.get(ALPHA_VANTAGE_API_BASE, params=url_params, timeout=30)

                if response.status_code == 200:
                    data = response.json()

                    if "Error Message" in data:
                        error_msg = f"API Error: {data['Error Message']}"
                        if symbol:
                            error_msg += f" for {symbol}"
                        logger.error(error_msg)
                        return None

                    if "Information" in data:
                        logger.warning(f"API Information message: {data['Information']}")
                        return data

                    if "Note" in data and "API call frequency" in data.get("Note", ""):
                        warning_msg = "Rate limit hit"
                        if symbol:
                            warning_msg += f" for {symbol}"
                        logger.warning(f"{warning_msg}, retrying...")

                        if attempt < self.max_retries - 1:
                            wait_time = self.delay * (2**attempt)
                            time.sleep(wait_time)
                            continue
                        return None

                    success_msg = f"Successfully fetched {function} data"
                    if symbol:
                        success_msg += f" for {symbol}"
                    logger.info(success_msg)
                    return data

                elif response.status_code == 429:
                    if attempt < self.max_retries - 1:
                        wait_time = self.delay * (2**attempt)
                        logger.warning(f"Rate limited, waiting {wait_time}s before retry...")
                        time.sleep(wait_time)
                        continue

                response.raise_for_status()

            except requests.exceptions.Timeout:
                timeout_msg = f"Timeout (attempt {attempt + 1})"
                if symbol:
                    timeout_msg = f"Timeout for {symbol} (attempt {attempt + 1})"
                logger.warning(timeout_msg)
            except requests.exceptions.ConnectionError:
                conn_msg = f"Connection error (attempt {attempt + 1})"
                if symbol:
                    conn_msg = f"Connection error for {symbol} (attempt {attempt + 1})"
                logger.warning(conn_msg)
            except requests.exceptions.RequestException as e:
                req_msg = f"Request error: {e} (attempt {attempt + 1})"
                if symbol:
                    req_msg = f"Request error for {symbol}: {e} (attempt {attempt + 1})"
                logger.warning(req_msg)

            if attempt < self.max_retries - 1:
                time.sleep(self.delay * (attempt + 1))

        error_msg = f"Failed to fetch {function} data after {self.max_retries} attempts"
        if symbol:
            error_msg = f"Failed to fetch {function} data for {symbol} after {self.max_retries} attempts"
        logger.error(error_msg)
        return None

    def _get_company_overview(self, symbol: str) -> Optional[Dict[str, Any]]:
        """
        Fetch company overview data from Alpha Vantage including key
        statistics and company info.

        Args:
            symbol (str): Stock symbol to fetch data for.

        Returns:
            Optional[Dict[str, Any]]: Data in dictionary format, or None if request fails.
        """
        return self._make_api_request("OVERVIEW", symbol)

    def _get_etf_profile(self, symbol: str) -> Optional[Dict[str, Any]]:
        """
        Fetch ETF profile data from Alpha Vantage including ETF holdings
        and other profile info.

        Note: ETF profile is not typically used in our context, but included for completeness.

        Args:
            symbol (str): ETF symbol to fetch data for.

        Returns:
            Optional[Dict[str, Any]]: Data in dictionary format, or None if request fails.
        """
        return self._make_api_request("ETF_PROFILE", symbol)

    def _get_dividend_data(self, symbol: str) -> Optional[Dict[str, Any]]:
        """
        Fetch dividend payment history from Alpha Vantage.

        Args:
            symbol (str): Stock symbol to fetch dividend data for.

        Returns:
            Optional[Dict[str, Any]]: Data in dictionary format, or None if request fails.
        """
        return self._make_api_request("DIVIDENDS", symbol)

    def _get_splits_data(self, symbol: str) -> Optional[Dict[str, Any]]:
        """
        Fetch stock split history from Alpha Vantage.

        Note: not useful in our context, but included for completeness.

        Args:
            symbol (str): Stock symbol to fetch split data for.

        Returns:
            Optional[Dict[str, Any]]: Data in dictionary format, or None if request fails.
        """
        return self._make_api_request("SPLITS", symbol)

    def _get_income_statement_data(self, symbol: str) -> Optional[Dict[str, Any]]:
        """
        Fetch income statement data from Alpha Vantage.

        Args:
            symbol (str): Stock symbol to fetch income statement for.

        Returns:
            Optional[Dict[str, Any]]: Income statement data in dictionary format,
                or None if request fails.
        """
        return self._make_api_request("INCOME_STATEMENT", symbol)

    def _get_balance_sheet_data(self, symbol: str) -> Optional[Dict[str, Any]]:
        """
        Fetch balance sheet data from Alpha Vantage.

        Args:
            symbol (str): Stock symbol to fetch balance sheet for.

        Returns:
            Optional[Dict[str, Any]]: Balance sheet data in dictionary format,
                or None if request fails.
        """
        return self._make_api_request("BALANCE_SHEET", symbol)

    def _get_cash_flow_data(self, symbol: str) -> Optional[Dict[str, Any]]:
        """
        Fetch cash flow statement data from Alpha Vantage.

        Args:
            symbol (str): Stock symbol to fetch cash flow statement for.

        Returns:
            Optional[Dict[str, Any]]: Cash flow statement data in dictionary format,
                or None if request fails.
        """
        return self._make_api_request("CASH_FLOW", symbol)

    def _get_earnings_data(self, symbol: str) -> Optional[Dict[str, Any]]:
        """
        Fetch earnings data from Alpha Vantage.

        Args:
            symbol (str): Stock symbol to fetch earnings data for.

        Returns:
            Optional[Dict[str, Any]]: Earnings data in dictionary format, or None if request fails.
        """
        return self._make_api_request("EARNINGS", symbol)

    def _get_intraday_data(
        self,
        symbol: str,
        interval: str = "5min",
        outputsize: str = "full",
        month: Optional[str] = None,
        **kwargs,
    ) -> Optional[Dict[str, Any]]:
        """
        Fetch intraday data from Alpha Vantage.

        Args:
            symbol (str): Stock symbol to fetch data for.
            interval (str, optional): Time interval (1min, 5min, 15min, 30min, 60min).
                Defaults to "5min".
            outputsize (str, optional): 'compact' or 'full'. Defaults to "full".
            month (str, optional): Month in YYYY-MM format for historical intraday data.
                Defaults to None.
            **kwargs: Additional Alpha Vantage API parameters.

        Returns:
            Optional[Dict[str, Any]]: Intraday data, or None if request fails.
        """
        params = {
            "interval": interval,
            "outputsize": outputsize,
        }

        if month:
            params["month"] = month

        params.update(kwargs)

        return self._make_api_request("TIME_SERIES_INTRADAY", symbol, **params)

    def _get_daily_data(self, symbol: str, outputsize: str = "full", **kwargs) -> Optional[Dict[str, Any]]:
        """
        Fetch daily time series data from Alpha Vantage.

        Args:
            symbol (str): Stock symbol to fetch data for.
            outputsize (str, optional): 'compact' (last 100 days) or 'full' (20+ years of data).
                Defaults to "full".
            **kwargs: Additional Alpha Vantage API parameters.

        Returns:
            Optional[Dict[str, Any]]: Daily OHLCV data, or None if request fails.
        """
        params = {"outputsize": outputsize}
        params.update(kwargs)

        return self._make_api_request("TIME_SERIES_DAILY", symbol, **params)

    def _get_daily_adjusted_data(self, symbol: str, outputsize: str = "full", **kwargs) -> Optional[Dict[str, Any]]:
        """
        Fetch daily adjusted time series data from Alpha Vantage.

        Includes dividend and split adjustments. Requires a premium API key.

        Args:
            symbol (str): Stock symbol to fetch data for.
            outputsize (str, optional): 'compact' (last 100 days) or 'full' (20+ years of data).
                Defaults to "full".
            **kwargs: Additional Alpha Vantage API parameters.

        Returns:
            Optional[Dict[str, Any]]: Daily adjusted OHLCV data, or None if request fails.
        """
        params = {"outputsize": outputsize}
        params.update(kwargs)

        return self._make_api_request("TIME_SERIES_DAILY_ADJUSTED", symbol, **params)

    def _get_real_gdp_data(self, interval: str = "annual", **kwargs) -> Optional[Dict[str, Any]]:
        """
        Fetch real GDP data from Alpha Vantage.

        Args:
            interval (str, optional): Data frequency. Available options are "quarterly" and
                "annual". Defaults to "annual".
            **kwargs: Additional parameters for the API request.

        Returns:
            Optional[Dict[str, Any]]: Real GDP data in dictionary format, or None if request fails.

        Raises:
            ValueError: If the interval is not one of the valid options.
        """
        if interval not in ["quarterly", "annual"]:
            raise InvalidParametersError(f"Invalid interval '{interval}'. Use 'quarterly' or 'annual'.")

        params = {"interval": interval}
        params.update(kwargs)

        return self._make_api_request("REAL_GDP", symbol="", **params)

    def _get_real_gdp_per_capita_data(self, **kwargs) -> Optional[Dict[str, Any]]:
        """
        Fetch real GDP per capita data from Alpha Vantage.

        Returns:
            Optional[Dict[str, Any]]: Real GDP per capita data in dictionary format,
                or None if request fails.
        """
        return self._make_api_request("REAL_GDP_PER_CAPITA", symbol="", **kwargs)

    def _get_treasury_yield_data(
        self, interval: str = "monthly", maturity: str = "10year", **kwargs
    ) -> Optional[Dict[str, Any]]:
        """
        Fetch treasury yield data from Alpha Vantage.

        Args:
            interval (str, optional): Data frequency. Defaults to "monthly".
            maturity (str, optional): Bond maturity (e.g., "10year"). Defaults to "10year".
            **kwargs: Additional parameters for the API request.

        Returns:
            Optional[Dict[str, Any]]: Treasury yield data in dictionary format,
                or None if request fails.

        Raises:
            ValueError: If the interval or maturity parameters are invalid.
        """
        valid_intervals = ["daily", "weekly", "monthly"]
        if interval not in valid_intervals:
            raise InvalidParametersError(f"Invalid interval '{interval}'. Use one of: {valid_intervals}")

        valid_maturities = ["3month", "2year", "5year", "7year", "10year", "30year"]
        if maturity not in valid_maturities:
            raise InvalidParametersError(f"Invalid maturity '{maturity}'. Use one of: {valid_maturities}")

        params = {"interval": interval, "maturity": maturity}
        params.update(kwargs)

        return self._make_api_request("TREASURY_YIELD", symbol="", **params)

    def _get_federal_funds_rate_data(self, interval: str = "monthly", **kwargs) -> Optional[Dict[str, Any]]:
        """
        Fetch federal funds rate data from Alpha Vantage.

        Args:
            interval (str, optional): Data frequency. Defaults to "monthly".
            **kwargs: Additional parameters for the API request.

        Returns:
            Optional[Dict[str, Any]]: Federal funds rate data in dictionary format,
                or None if request fails.

        Raises:
            ValueError: If the interval is not one of the valid options.
        """
        valid_intervals = ["daily", "weekly", "monthly"]
        if interval not in valid_intervals:
            raise InvalidParametersError(f"Invalid interval '{interval}'. Use one of: {valid_intervals}")

        params = {"interval": interval}
        params.update(kwargs)

        return self._make_api_request("FEDERAL_FUNDS_RATE", symbol="", **params)

    def _get_cpi_data(self, interval: str = "monthly", **kwargs) -> Optional[Dict[str, Any]]:
        """
        Fetch Consumer Price Index (CPI) data from Alpha Vantage.

        Args:
            interval (str, optional): Data frequency. Defaults to "monthly".
            **kwargs: Additional parameters for the API request.

        Returns:
            Optional[Dict[str, Any]]: CPI data in dictionary format, or None if request fails.

        Raises:
            ValueError: If the interval is not one of the valid options.
        """
        valid_intervals = ["semiannual", "monthly"]
        if interval not in valid_intervals:
            raise InvalidParametersError(f"Invalid interval '{interval}'. Use one of: {valid_intervals}")

        params = {"interval": interval}
        params.update(kwargs)

        return self._make_api_request("CPI", symbol="", **params)

    def _get_inflation_data(self, **kwargs) -> Optional[Dict[str, Any]]:
        """
        Fetch inflation data from Alpha Vantage.

        Returns:
            Optional[Dict[str, Any]]: Inflation data in dictionary format, or None if request fails.
        """
        return self._make_api_request("INFLATION", symbol="", **kwargs)

    def _get_retail_sales_data(self, **kwargs) -> Optional[Dict[str, Any]]:
        """
        Fetch retail sales data from Alpha Vantage.

        Returns:
            Optional[Dict[str, Any]]: Retail sales data in dictionary format,
                or None if request fails.
        """
        return self._make_api_request("RETAIL_SALES", symbol="", **kwargs)

    def _get_durable_goods_data(self, **kwargs) -> Optional[Dict[str, Any]]:
        """
        Fetch durable goods data from Alpha Vantage.

        Returns:
            Optional[Dict[str, Any]]: Durable goods data in dictionary format,
                or None if request fails.
        """
        return self._make_api_request("DURABLE_GOODS", symbol="", **kwargs)

    def _get_unemployment_rate_data(self, **kwargs) -> Optional[Dict[str, Any]]:
        """
        Fetch unemployment rate data from Alpha Vantage.

        Returns:
            Optional[Dict[str, Any]]: Unemployment rate data in dictionary format,
                or None if request fails.
        """
        return self._make_api_request("UNEMPLOYMENT", symbol="", **kwargs)

    def _get_non_farm_payroll_data(self, **kwargs) -> Optional[Dict[str, Any]]:
        """
        Fetch non-farm payroll data from Alpha Vantage.

        Returns:
            Optional[Dict[str, Any]]: Non-farm payroll data in dictionary format,
                or None if request fails.
        """
        return self._make_api_request("NONFARM_PAYROLL", symbol="", **kwargs)

connect()

Alpha Vantage doesn't require explicit connection - it uses HTTP requests.

Source code in src/quantrl_lab/data/sources/alpha_vantage_loader.py
def connect(self):
    """Alpha Vantage doesn't require explicit connection - it uses HTTP requests."""
    pass

disconnect()

Alpha Vantage doesn't require explicit connection - it uses HTTP requests.

Source code in src/quantrl_lab/data/sources/alpha_vantage_loader.py
def disconnect(self):
    """Alpha Vantage doesn't require explicit connection - it uses HTTP requests."""
    pass

is_connected()

Alpha Vantage uses HTTP requests - assume connected if no network issues.

Source code in src/quantrl_lab/data/sources/alpha_vantage_loader.py
def is_connected(self) -> bool:
    """Alpha Vantage uses HTTP requests - assume connected if no network issues."""
    return True

list_available_instruments(instrument_type=None, market=None, **kwargs)

Alpha Vantage does not provide a direct API to list all available instruments.

This method is a placeholder.

Source code in src/quantrl_lab/data/sources/alpha_vantage_loader.py
def list_available_instruments(
    self,
    instrument_type: Optional[str] = None,
    market: Optional[str] = None,
    **kwargs,
) -> List[str]:
    """
    Alpha Vantage does not provide a direct API to list all
    available instruments.

    This method is a placeholder.
    """
    logger.warning("Alpha Vantage does not support listing available instruments.")
    return []

get_historical_ohlcv_data(symbols, start=None, end=None, timeframe='1d', **kwargs)

Get historical OHLCV data from Alpha Vantage.

Parameters:

Name Type Description Default
symbols str

Stock symbol to fetch data for.

required
start Union[str, datetime]

Start date for filtering. If None, no start filtering is applied. Defaults to None.

None
end Union[str, datetime]

End date for filtering. If None, no end filtering is applied. Defaults to None.

None
timeframe str

Time interval - "1d" (daily), or intraday ("1min", "5min", "15min", "30min", "60min"). Defaults to "1d".

'1d'
**kwargs Any

Additional parameters. 'adjusted' (bool) enables split/dividend adjustment for daily data (premium). 'outputsize' (str) is "compact" or "full" (premium). 'month' (str, "YYYY-MM") fetches historical intraday month (premium).

{}

Returns:

Type Description
DataFrame

pd.DataFrame: OHLCV data, optionally filtered by date range.

Note

outputsize='full' and historical intraday 'month' parameter require a premium API key. Rate limit: 25 requests/day, 1 request/second burst limit on the free tier.

Source code in src/quantrl_lab/data/sources/alpha_vantage_loader.py
def get_historical_ohlcv_data(
    self,
    symbols: str,
    start: Optional[Union[str, datetime]] = None,
    end: Optional[Union[str, datetime]] = None,
    timeframe: str = "1d",
    **kwargs: Any,
) -> pd.DataFrame:
    """
    Get historical OHLCV data from Alpha Vantage.

    Args:
        symbols (str): Stock symbol to fetch data for.
        start (Union[str, datetime], optional): Start date for filtering.
            If None, no start filtering is applied. Defaults to None.
        end (Union[str, datetime], optional): End date for filtering.
            If None, no end filtering is applied. Defaults to None.
        timeframe (str, optional): Time interval - "1d" (daily), or intraday
            ("1min", "5min", "15min", "30min", "60min"). Defaults to "1d".
        **kwargs: Additional parameters. 'adjusted' (bool) enables split/dividend
            adjustment for daily data (premium). 'outputsize' (str) is "compact" or "full"
            (premium). 'month' (str, "YYYY-MM") fetches historical intraday month (premium).

    Returns:
        pd.DataFrame: OHLCV data, optionally filtered by date range.

    Note:
        outputsize='full' and historical intraday 'month' parameter require a premium API key.
        Rate limit: 25 requests/day, 1 request/second burst limit on the free tier.
    """
    adjusted = kwargs.pop("adjusted", False)

    parsed_start_date = None
    parsed_end_date = None

    if start is not None or end is not None:
        if start is not None and end is not None:
            parsed_start_date, parsed_end_date = normalize_date_range(
                start, end, default_end_to_now=False, validate_order=True
            )
        elif start is not None:
            from quantrl_lab.data.utils import normalize_date

            parsed_start_date = normalize_date(start)
        elif end is not None:
            from quantrl_lab.data.utils import normalize_date

            parsed_end_date = normalize_date(end)

    if parsed_start_date or parsed_end_date:
        if parsed_start_date and parsed_end_date:
            logger.info(
                "Fetching {timeframe} data for {symbol} from {start} to {end}",
                timeframe=timeframe,
                symbol=symbols,
                start=parsed_start_date.date(),
                end=parsed_end_date.date(),
            )
        elif parsed_start_date:
            logger.info(
                "Fetching {timeframe} data for {symbol} from {start} onwards",
                timeframe=timeframe,
                symbol=symbols,
                start=parsed_start_date.date(),
            )
        else:
            logger.info(
                "Fetching {timeframe} data for {symbol} up to {end}",
                timeframe=timeframe,
                symbol=symbols,
                end=parsed_end_date.date(),
            )
    else:
        logger.info(
            "Fetching {timeframe} data for {symbol} (all available data)",
            timeframe=timeframe,
            symbol=symbols,
        )

    if timeframe == "1d":
        # Default to compact to avoid premium-only "full" output size
        if "outputsize" not in kwargs:
            kwargs["outputsize"] = "compact"
            logger.debug(
                "Defaulting to outputsize='compact' (last 100 data points). "
                "Use outputsize='full' with premium API key for 20+ years of data."
            )

        if adjusted:
            raw_data = self._get_daily_adjusted_data(symbols, **kwargs)
            logger.debug(f"Using adjusted daily data for {symbols}")
        else:
            raw_data = self._get_daily_data(symbols, **kwargs)
            logger.debug(f"Using raw daily data for {symbols}")

        time_series_key = "Time Series (Daily)"

    elif timeframe in ["1min", "5min", "15min", "30min", "60min"]:
        if adjusted:
            logger.warning("Adjusted prices not available for intraday data, using raw prices")

        if "month" in kwargs:
            logger.info(f"Fetching {timeframe} intraday data for {symbols} for month: {kwargs['month']}")
        else:
            logger.info(
                f"Fetching {timeframe} intraday data for {symbols} (recent data - typically last 15-30 days)"
            )
            logger.debug("For historical intraday data, specify 'month=\"YYYY-MM\"' in kwargs")

        raw_data = self._get_intraday_data(symbols, interval=timeframe, **kwargs)
        time_series_key = f"Time Series ({timeframe})"
    else:
        raise InvalidParametersError(
            f"Unsupported timeframe: {timeframe}. Use '1d' or intraday intervals like "
            "'1min', '5min', '15min', '30min', '60min'"
        )

    if not raw_data:
        logger.error(f"Failed to fetch data for {symbols}")
        return pd.DataFrame()

    if time_series_key not in raw_data:
        logger.error(
            f"Expected key '{time_series_key}' not found in API response for {symbols}. "
            "This may be due to rate limits, invalid symbol, or no data available."
        )
        available_keys = list(raw_data.keys())
        logger.debug(f"Available keys in response: {available_keys}")
        return pd.DataFrame()

    time_series = raw_data[time_series_key]

    if not time_series:
        logger.warning(f"Empty time series data for {symbols}")
        return pd.DataFrame()

    df = pd.DataFrame.from_dict(time_series, orient="index")

    column_mapping = ALPHA_VANTAGE_COLUMN_MAPPER.get_mapping(timeframe, adjusted)
    df = df.rename(columns=column_mapping)

    expected_columns = list(set(column_mapping.values()))
    df = df[df.columns.intersection(expected_columns)]

    numeric_columns = self.NUMERIC_COLUMNS.copy()
    if "Dividend" in df.columns:
        numeric_columns.append("Dividend")
    if "Split_coeff" in df.columns:
        numeric_columns.append("Split_coeff")

    df = convert_columns_to_numeric(df, columns=numeric_columns, errors="coerce")

    df.index = pd.to_datetime(df.index)
    df.index.name = "Date"

    # Alpha Vantage returns newest first, so sort ascending
    df = df.sort_index()

    if parsed_start_date is not None:
        df = df[df.index >= parsed_start_date]
    if parsed_end_date is not None:
        df = df[df.index <= parsed_end_date]

    if df.empty:
        if parsed_start_date or parsed_end_date:
            logger.warning(
                "No data found for {symbol} matching the specified date criteria",
                symbol=symbols,
            )
        else:
            logger.warning("No data found for {symbol}", symbol=symbols)
    else:
        data_type = (
            "adjusted daily"
            if (timeframe == "1d" and adjusted)
            else (f"{timeframe} intraday" if timeframe != "1d" else "daily")
        )
        log_dataframe_info(df, f"Retrieved {data_type} records", symbol=symbols)

    df.reset_index(inplace=True)
    df["Date"] = df["Date"].dt.strftime("%Y-%m-%d")

    return df

get_fundamental_data(symbol, metrics, **kwargs)

Get fundamental data for a single symbol by combining multiple Alpha Vantage API calls.

Parameters:

Name Type Description Default
symbol str

Stock symbol to fetch data for.

required
metrics List[Union[FundamentalMetric, str]]

List of FundamentalMetric enums or strings.

required
**kwargs Any

Additional parameters. 'return_format' (str) is 'dict' or 'dataframe'. Defaults to 'dict'.

{}

Returns:

Type Description
Union[DataFrame, Dict]

Union[pd.DataFrame, Dict]: Dict with combined fundamental data.

Source code in src/quantrl_lab/data/sources/alpha_vantage_loader.py
def get_fundamental_data(
    self, symbol: str, metrics: List[Union[FundamentalMetric, str]], **kwargs: Any
) -> Union[pd.DataFrame, Dict]:
    """
    Get fundamental data for a single symbol by combining multiple
    Alpha Vantage API calls.

    Args:
        symbol (str): Stock symbol to fetch data for.
        metrics (List[Union[FundamentalMetric, str]]): List of FundamentalMetric enums or strings.
        **kwargs: Additional parameters. 'return_format' (str) is 'dict' or 'dataframe'.
            Defaults to 'dict'.

    Returns:
        Union[pd.DataFrame, Dict]: Dict with combined fundamental data.
    """
    results = {}

    # Map metrics to private methods - use enum objects as keys consistently
    metric_methods = {
        FundamentalMetric.COMPANY_OVERVIEW: self._get_company_overview,
        # Remark: etf profile not useful in our context
        # FundamentalMetric.ETF_PROFILE: self._get_etf_profile,
        FundamentalMetric.DIVIDENDS: self._get_dividend_data,
        FundamentalMetric.SPLITS: self._get_splits_data,
        FundamentalMetric.INCOME_STATEMENT: self._get_income_statement_data,
        FundamentalMetric.BALANCE_SHEET: self._get_balance_sheet_data,
        FundamentalMetric.CASH_FLOW: self._get_cash_flow_data,
        FundamentalMetric.EARNINGS: self._get_earnings_data,
    }

    logger.info(f"Fetching fundamental data for {symbol}")

    for metric in metrics:
        if isinstance(metric, str):
            try:
                metric_enum = FundamentalMetric(metric.lower())
            except ValueError:
                logger.warning(f"Unknown metric '{metric}' for symbol {symbol}")
                results[metric] = None
                continue
        else:
            metric_enum = metric

        if metric_enum in metric_methods:
            method = metric_methods[metric_enum]
            data = method(symbol)

            if data:
                results[metric_enum.value] = data
                logger.success(f"Successfully fetched {metric_enum.value} for {symbol}")
            else:
                logger.warning(f"Failed to fetch {metric_enum.value} for {symbol}")
                results[metric_enum.value] = None
        else:
            logger.warning(f"Unsupported metric '{metric_enum}' for symbol {symbol}")
            results[metric_enum.value] = None

    return results

get_news_data(symbols, start, end=None, limit=50, **kwargs)

Fetch news data for given symbols from Alpha Vantage.

Retrieves news articles related to the specified symbols within the given date range. Supports additional parameters like 'sort' and 'topics' to customize the news data.

Parameters:

Name Type Description Default
symbols Union[str, List[str]]

Symbols to fetch news for.

required
start Union[str, datetime]

Start datetime for news data.

required
end Union[str, datetime]

End datetime for news. Defaults to None (current time).

None
limit int

Maximum number of news items to fetch. Defaults to 50.

50
**kwargs Any

Additional parameters for the API request, such as 'sort' or 'topics'.

{}

Returns:

Type Description
DataFrame

pd.DataFrame: DataFrame containing news data for the specified symbols.

Source code in src/quantrl_lab/data/sources/alpha_vantage_loader.py
def get_news_data(
    self,
    symbols: Union[str, List[str]],
    start: Union[str, datetime],
    end: Optional[Union[str, datetime]] = None,
    limit: int = 50,
    **kwargs: Any,
) -> pd.DataFrame:
    """
    Fetch news data for given symbols from Alpha Vantage.

    Retrieves news articles related to the specified symbols within the given date range.
    Supports additional parameters like 'sort' and 'topics' to customize the news data.

    Args:
        symbols (Union[str, List[str]]): Symbols to fetch news for.
        start (Union[str, datetime]): Start datetime for news data.
        end (Union[str, datetime], optional): End datetime for news. Defaults to None (current time).
        limit (int, optional): Maximum number of news items to fetch. Defaults to 50.
        **kwargs: Additional parameters for the API request, such as 'sort' or 'topics'.

    Returns:
        pd.DataFrame: DataFrame containing news data for the specified symbols.
    """
    time_from = format_av_datetime(start)

    if end is None:
        end = datetime.now()
    time_to = format_av_datetime(end)

    if isinstance(symbols, str):
        tickers = symbols
    else:
        tickers = ",".join(symbols)

    logger.info(f"Fetching news for {tickers} from {time_from} to {time_to}")

    params = {
        "tickers": tickers,
        "time_from": time_from,
        "time_to": time_to,
        "limit": str(limit),
    }

    if "sort" in kwargs:
        params["sort"] = kwargs.pop("sort")
        logger.debug(f"Using sort order: {params['sort']}")

    if "topics" in kwargs:
        params["topics"] = kwargs.pop("topics")
        logger.debug(f"Using topics from kwargs: {params['topics']}")

    params.update(kwargs)

    news_data = self._make_api_request("NEWS_SENTIMENT", symbol="", **params)

    news_df = pd.DataFrame(news_data["feed"]) if news_data and "feed" in news_data else pd.DataFrame()

    if news_df.empty:
        logger.warning(
            f"No news data retrieved for {tickers}. This may be due to rate limits or no data available."
        )
        return news_df

    if "time_published" in news_df.columns:
        news_df.rename(columns={"time_published": "created_at"}, inplace=True)
    else:
        logger.error(f"Expected 'time_published' column not found in news data for {tickers}")
        logger.debug(f"Available columns: {list(news_df.columns)}")
        return pd.DataFrame()

    try:
        news_df["created_at"] = pd.to_datetime(news_df["created_at"], format="%Y%m%dT%H%M%S")
        news_df["Date"] = news_df["created_at"].dt.date
    except Exception as e:
        logger.error(f"Failed to parse news timestamps for {tickers}: {e}")
        return pd.DataFrame()

    if "ticker_sentiment" in news_df.columns:
        try:
            news_df["sentiment_score"] = (
                news_df["ticker_sentiment"].apply(lambda x: self._find_ticker_sentiment(x, tickers)).astype(float)
            )
        except Exception as e:
            logger.warning(f"Failed to extract sentiment scores for {tickers}: {e}")

    logger.success(f"Retrieved {len(news_df)} news items for {tickers}")

    return news_df

get_macro_data(indicators, start, end, **kwargs)

Get macroeconomic data for specified indicators.

Supports both standard indicator names and advanced dictionary format where each indicator can have its own parameters, e.g.: {"real_gdp": {"interval": "quarterly"}, "treasury_yield": {"interval": "monthly", "maturity": "10year"}}

Parameters:

Name Type Description Default
indicators Union[str, List[str], Dict[str, Dict]]

Indicator(s) to fetch data for.

required
start Union[str, datetime]

Start date.

required
end Union[str, datetime]

End date.

required
**kwargs Any

Additional parameters for the API request.

{}

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: Dictionary containing macroeconomic data for the specified indicators. Each key is the indicator name, and the value is the fetched data.

Source code in src/quantrl_lab/data/sources/alpha_vantage_loader.py
def get_macro_data(
    self,
    indicators: Union[str, List[str], Dict[str, Dict]],
    start: Union[str, datetime],
    end: Union[str, datetime],
    **kwargs: Any,
) -> Dict[str, Any]:
    """
    Get macroeconomic data for specified indicators.

    Supports both standard indicator names and advanced dictionary format where each
    indicator can have its own parameters, e.g.:
    {"real_gdp": {"interval": "quarterly"}, "treasury_yield": {"interval": "monthly", "maturity": "10year"}}

    Args:
        indicators (Union[str, List[str], Dict[str, Dict]]): Indicator(s) to fetch data for.
        start (Union[str, datetime]): Start date.
        end (Union[str, datetime]): End date.
        **kwargs: Additional parameters for the API request.

    Returns:
        Dict[str, Any]: Dictionary containing macroeconomic data for the specified indicators.
            Each key is the indicator name, and the value is the fetched data.
    """
    if isinstance(indicators, dict):
        return self._get_macro_data_with_params(indicators, **kwargs)
    else:
        if isinstance(indicators, (str, MacroIndicator)):
            indicators = [indicators]

        indicator_params = {ind: {} for ind in indicators}
        return self._get_macro_data_with_params(indicator_params, **kwargs)

Financial Modeling Prep (FMP)

fmp_loader

FMPDataSource

Bases: DataSource, HistoricalDataCapable, AnalystDataCapable, SectorDataCapable, CompanyProfileCapable

Financial Modeling Prep data source for historical stock data and analyst insights.

Supports both end-of-day (daily) and intraday data. Intraday timeframes: 5min, 15min, 30min, 1hour, 4hour Daily timeframe: 1d

Implements the following protocols: - HistoricalDataCapable: OHLCV data (daily and intraday) - AnalystDataCapable: Analyst grades and ratings data - SectorDataCapable: Historical sector and industry performance data - CompanyProfileCapable: Company profile and metadata

Source code in src/quantrl_lab/data/sources/fmp_loader.py
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
class FMPDataSource(
    DataSource,
    HistoricalDataCapable,
    AnalystDataCapable,
    SectorDataCapable,
    CompanyProfileCapable,
):
    """
    Financial Modeling Prep data source for historical stock data and
    analyst insights.

    Supports both end-of-day (daily) and intraday data.
    Intraday timeframes: 5min, 15min, 30min, 1hour, 4hour
    Daily timeframe: 1d

    Implements the following protocols:
    - HistoricalDataCapable: OHLCV data (daily and intraday)
    - AnalystDataCapable: Analyst grades and ratings data
    - SectorDataCapable: Historical sector and industry performance data
    - CompanyProfileCapable: Company profile and metadata
    """

    BASE_URL = "https://financialmodelingprep.com/stable"
    RATE_LIMIT_SLEEP = 1  # seconds
    INTRADAY_TIMEFRAMES = {"5min", "15min", "30min", "1hour", "4hour"}

    def __init__(self, api_key: Optional[str] = None):
        """
        Initialize FMP data source.

        Args:
            api_key (str, optional): FMP API key. If not provided, will try to read from
                FMP_API_KEY environment variable.
        """
        self.api_key = api_key or os.environ.get("FMP_API_KEY")
        if not self.api_key:
            raise AuthenticationError("FMP API key must be provided or set in FMP_API_KEY environment variable")

        self._request_wrapper = HTTPRequestWrapper(
            max_retries=3,
            retry_strategy=RetryStrategy.EXPONENTIAL,
            base_delay=1.0,
            rate_limit_delay=self.RATE_LIMIT_SLEEP,
            timeout=30.0,
        )

    @property
    def source_name(self) -> str:
        return "FinancialModelingPrep"

    def _make_request(self, endpoint: str, params: Dict[str, Any]) -> Any:
        """
        Make an HTTP request to the FMP API with retry logic.

        Args:
            endpoint (str): API endpoint path.
            params (Dict[str, Any]): Query parameters.

        Returns:
            Any: JSON response data.

        Raises:
            requests.HTTPError: If the request fails after retries.
        """
        params["apikey"] = self.api_key
        url = f"{self.BASE_URL}/{endpoint}"

        return self._request_wrapper.make_request(
            url=url,
            method="GET",
            params=params,
            raise_on_error=True,
        )

    def _get_intraday_data(
        self,
        symbol: str,
        start: Union[str, datetime],
        end: Optional[Union[str, datetime]],
        timeframe: str,
        nonadjusted: bool = False,
    ) -> pd.DataFrame:
        """
        Get intraday OHLCV data from FMP historical-chart endpoint.

        Args:
            symbol (str): Stock symbol to fetch data for.
            start (Union[str, datetime]): Start date for historical data.
            end (Union[str, datetime], optional): End date for historical data.
            timeframe (str): Intraday timeframe (5min, 15min, 30min, 1hour, 4hour).
            nonadjusted (bool, optional): If True, returns unadjusted prices. Defaults to False.

        Returns:
            pd.DataFrame: Intraday OHLCV data with standardized column names.
        """
        start_dt, end_dt = normalize_date_range(start, end, default_end_to_now=True)
        start_str = format_date_to_string(start_dt)
        end_str = format_date_to_string(end_dt)

        logger.info(
            "Fetching {timeframe} intraday data for {symbol} from {start} to {end}",
            timeframe=timeframe,
            symbol=symbol,
            start=start_str,
            end=end_str,
        )

        endpoint = f"historical-chart/{timeframe}"
        params = {
            "symbol": symbol,
            "from": start_str,
            "to": end_str,
            "nonadjusted": str(nonadjusted).lower(),
        }

        data = self._make_request(endpoint, params)

        df = convert_to_dataframe_safe(data, expected_min_rows=0, symbol=symbol)
        if df.empty:
            return df

        column_mapping = {
            "date": "Timestamp",
            "open": "Open",
            "high": "High",
            "low": "Low",
            "close": "Close",
            "volume": "Volume",
        }

        df = standardize_ohlcv_dataframe(
            df,
            column_mapping=column_mapping,
            symbol=symbol,
            timestamp_col="Timestamp",
            add_date=True,
            sort_data=True,
            convert_numeric=True,
        )

        log_dataframe_info(df, f"Fetched {timeframe} intraday data", symbol=symbol)
        return df

    def get_historical_ohlcv_data(
        self,
        symbols: Union[str, List[str]],
        start: Optional[Union[str, datetime]] = None,
        end: Optional[Union[str, datetime]] = None,
        timeframe: str = "1d",
        **kwargs: Any,
    ) -> pd.DataFrame:
        """
        Get historical OHLCV data from FMP (daily or intraday).

        Args:
            symbols (Union[str, List[str]]): Stock symbol(s) to fetch data for.
            start (Union[str, datetime], optional): Start date for historical data.
            end (Union[str, datetime], optional): End date for historical data.
            timeframe (str, optional): Timeframe - "1d" for daily, or intraday:
                "5min", "15min", "30min", "1hour", "4hour". Defaults to "1d".
            **kwargs: Additional arguments including 'nonadjusted' (bool) for intraday data.

        Returns:
            pd.DataFrame: OHLCV data with standardized column names.

        Raises:
            ValueError: If timeframe is not supported.
        """
        if start is None:
            raise InvalidParametersError("FMP requires a 'start' date for historical data.")

        # FMP only supports single symbols
        symbol = get_single_symbol(symbols, warn_on_multiple=True)

        if timeframe in self.INTRADAY_TIMEFRAMES:
            nonadjusted = kwargs.get("nonadjusted", False)
            return self._get_intraday_data(symbol, start, end, timeframe, nonadjusted)

        if timeframe != "1d":
            logger.warning(f"Timeframe {timeframe} not supported by FMP. Using daily (1d) data.")

        start_dt, end_dt = normalize_date_range(start, end, default_end_to_now=True)
        start_str = format_date_to_string(start_dt)
        end_str = format_date_to_string(end_dt)

        logger.info(
            "Fetching EOD data for {symbol} from {start} to {end}",
            symbol=symbol,
            start=start_str,
            end=end_str,
        )

        endpoint = "historical-price-eod/full"
        params = {
            "symbol": symbol,
            "from": start_str,
            "to": end_str,
        }

        data = self._make_request(endpoint, params)

        df = convert_to_dataframe_safe(data, expected_min_rows=0, symbol=symbol)
        if df.empty:
            return df

        column_mapping = {
            "date": "Timestamp",
            "open": "Open",
            "high": "High",
            "low": "Low",
            "close": "Close",
            "volume": "Volume",
        }

        df = standardize_ohlcv_dataframe(
            df,
            column_mapping=column_mapping,
            symbol=symbol,
            timestamp_col="Timestamp",
            add_date=True,
            sort_data=True,
            convert_numeric=True,
        )

        log_dataframe_info(df, "Fetched EOD data", symbol=symbol)
        return df

    def get_historical_grades(self, symbol: str) -> pd.DataFrame:
        """
        Get historical analyst grades for a symbol.

        Args:
            symbol (str): Stock symbol to fetch data for.

        Returns:
            pd.DataFrame: Historical grades data.
        """
        endpoint = "grades-historical"
        params = {"symbol": symbol}

        data = self._make_request(endpoint, params)

        if not data or not isinstance(data, list):
            logger.warning(f"No historical grades found for symbol: {symbol}")
            return pd.DataFrame()

        df = pd.DataFrame(data)

        if df.empty:
            logger.warning(f"Empty grades dataset returned for symbol: {symbol}")
            return pd.DataFrame()

        if "date" in df.columns:
            df["date"] = pd.to_datetime(df["date"])
            df.sort_values("date", inplace=True)

        logger.success(
            "Fetched {n} historical grades for {symbol}",
            n=len(df),
            symbol=symbol,
        )

        return df

    def get_historical_rating(self, symbol: str, limit: int = 100) -> pd.DataFrame:
        """
        Get historical ratings for a symbol.

        Args:
            symbol (str): Stock symbol to fetch data for.
            limit (int, optional): Number of records to return. Defaults to 100.

        Returns:
            pd.DataFrame: Historical ratings data.
        """
        endpoint = "ratings-historical"
        params = {"symbol": symbol, "limit": limit}

        data = self._make_request(endpoint, params)

        if not data or not isinstance(data, list):
            logger.warning(f"No historical ratings found for symbol: {symbol}")
            return pd.DataFrame()

        df = pd.DataFrame(data)

        if df.empty:
            logger.warning(f"Empty ratings dataset returned for symbol: {symbol}")
            return pd.DataFrame()

        if "date" in df.columns:
            df["date"] = pd.to_datetime(df["date"])
            df.sort_values("date", inplace=True)

        logger.success(
            "Fetched {n} historical ratings for {symbol}",
            n=len(df),
            symbol=symbol,
        )

        return df

    def get_historical_sector_performance(self, sector: str, start: str = None, end: str = None) -> pd.DataFrame:
        """
        Get historical performance data for a specific market sector.

        This endpoint provides historical performance metrics for market sectors,
        allowing analysis of sector trends and performance over time.

        Args:
            sector (str): Market sector name (e.g., "Energy", "Technology", "Healthcare",
                "Financials", "Consumer Cyclical", "Industrials", "Basic Materials",
                "Consumer Defensive", "Real Estate", "Utilities", "Communication Services").
            start (str, optional): Start date in 'YYYY-MM-DD' format. Defaults to API default.
            end (str, optional): End date in 'YYYY-MM-DD' format. Defaults to API default.

        Returns:
            pd.DataFrame: Historical sector performance data with columns including date,
                sector, and performance metrics.

        Raises:
            ValueError: If sector is invalid or API request fails.

        Example:
            >>> source = FMPDataSource()
            >>> df = source.get_historical_sector_performance("Energy", start="2024-01-01", end="2024-12-31")
            >>> print(df.head())
        """
        if not sector or not isinstance(sector, str):
            raise InvalidParametersError("Sector must be a non-empty string")

        logger.info("Fetching historical performance for sector: {sector}", sector=sector)

        endpoint = "historical-sector-performance"
        params = {"sector": sector}

        if start:
            params["from"] = start
        if end:
            params["to"] = end

        data = self._make_request(endpoint, params)

        if not data or not isinstance(data, list):
            logger.warning(f"No historical sector performance data found for sector: {sector}")
            return pd.DataFrame()

        df = convert_to_dataframe_safe(data, expected_min_rows=0, symbol=sector)

        if df.empty:
            logger.warning(f"Empty sector performance dataset returned for sector: {sector}")
            return pd.DataFrame()

        if "date" in df.columns:
            df["date"] = pd.to_datetime(df["date"])
            df.sort_values("date", inplace=True)

        log_dataframe_info(df, "Fetched sector performance", symbol=sector)

        logger.success(
            "Fetched {n} records of historical sector performance for {sector}",
            n=len(df),
            sector=sector,
        )

        return df

    def get_historical_industry_performance(self, industry: str, start: str = None, end: str = None) -> pd.DataFrame:
        """
        Get historical performance data for a specific industry.

        This endpoint provides historical performance metrics for industries,
        enabling long-term trend analysis and industry evolution tracking.

        Args:
            industry (str): Industry name (e.g., "Biotechnology", "Software", "Banks",
                "Oil & Gas", "Semiconductors", "Insurance", "Auto Manufacturers",
                "Pharmaceuticals", "Consumer Electronics", "Aerospace & Defense").
            start (str, optional): Start date in 'YYYY-MM-DD' format. Defaults to API default.
            end (str, optional): End date in 'YYYY-MM-DD' format. Defaults to API default.

        Returns:
            pd.DataFrame: Historical industry performance data with columns including date,
                industry, and performance metrics.

        Raises:
            ValueError: If industry is invalid or API request fails.

        Example:
            >>> source = FMPDataSource()
            >>> df = source.get_historical_industry_performance("Biotechnology", start="2024-01-01", end="2024-12-31")
            >>> print(df.head())
        """
        if not industry or not isinstance(industry, str):
            raise InvalidParametersError("Industry must be a non-empty string")

        logger.info("Fetching historical performance for industry: {industry}", industry=industry)

        endpoint = "historical-industry-performance"
        params = {"industry": industry}

        if start:
            params["from"] = start
        if end:
            params["to"] = end

        data = self._make_request(endpoint, params)

        if not data or not isinstance(data, list):
            logger.warning(f"No historical industry performance data found for industry: {industry}")
            return pd.DataFrame()

        df = convert_to_dataframe_safe(data, expected_min_rows=0, symbol=industry)

        if df.empty:
            logger.warning(f"Empty industry performance dataset returned for industry: {industry}")
            return pd.DataFrame()

        if "date" in df.columns:
            df["date"] = pd.to_datetime(df["date"])
            df.sort_values("date", inplace=True)

        log_dataframe_info(df, "Fetched industry performance", symbol=industry)

        logger.success(
            "Fetched {n} records of historical industry performance for {industry}",
            n=len(df),
            industry=industry,
        )

        return df

    def get_company_profile(self, symbol: Union[str, List[str]]) -> pd.DataFrame:
        """
        Get company profile information including sector, industry, and
        key metrics.

        This endpoint provides comprehensive company information including business
        description, sector/industry classification, executive information, and
        key financial metrics.

        Args:
            symbol (Union[str, List[str]]): Stock ticker symbol (e.g., "AAPL", "MSFT") or
                list of symbols (only first symbol will be used if list is provided).

        Returns:
            pd.DataFrame: Company profile data with columns including symbol, companyName,
                sector, industry, description, ceo, website, exchange, mktCap, price, beta,
                volAvg, currency, ipoDate, address, fullTimeEmployees, and asset type flags.

        Raises:
            ValueError: If symbol is invalid or API request fails.

        Example:
            >>> source = FMPDataSource()
            >>> profile = source.get_company_profile("AAPL")
            >>> print(f"Sector: {profile.iloc[0].get('sector')}")
            >>> print(f"Industry: {profile.iloc[0].get('industry')}")
            >>> print(f"CEO: {profile.iloc[0].get('ceo')}")

        Use Cases:
            - Get sector/industry classification for stocks
            - Screen stocks by sector or industry
            - Retrieve company metadata for analysis
            - Build company information datasets
        """
        symbols = normalize_symbols(symbol)
        validate_symbols(symbols)
        symbol = get_single_symbol(symbols)

        if not symbol or not isinstance(symbol, str):
            raise InvalidParametersError("Symbol must be a non-empty string")

        logger.info("Fetching company profile for: {symbol}", symbol=symbol)

        endpoint = "profile"
        params = {"symbol": symbol}

        data = self._make_request(endpoint, params)

        if not data or not isinstance(data, list):
            logger.warning(f"No company profile data found for symbol: {symbol}")
            return pd.DataFrame()

        df = convert_to_dataframe_safe(data, expected_min_rows=0, symbol=symbol)

        if df.empty:
            logger.warning(f"Empty company profile dataset returned for symbol: {symbol}")
            return pd.DataFrame()

        if not df.empty:
            company_name = df.iloc[0].get("companyName", "Unknown")
            sector = df.iloc[0].get("sector", "N/A")
            industry = df.iloc[0].get("industry", "N/A")

            logger.success(
                "Fetched company profile for {symbol}: {name} ({sector} - {industry})",
                symbol=symbol,
                name=company_name,
                sector=sector,
                industry=industry,
            )

        return df

    def _get_async_wrapper(self) -> AsyncHTTPRequestWrapper:
        """Return a shared async wrapper configured for FMP rate
        limits."""
        return AsyncHTTPRequestWrapper(
            max_retries=3,
            base_delay=1.0,
            concurrency=5,  # Conservative for FMP free tier
            timeout=30.0,
        )

    async def _async_request(
        self,
        session: "aiohttp.ClientSession",
        endpoint: str,
        params: Dict[str, Any],
        wrapper: AsyncHTTPRequestWrapper,
    ) -> Any:
        """
        Async equivalent of _make_request().

        Args:
            session (aiohttp.ClientSession): Shared aiohttp session.
            endpoint (str): API endpoint path.
            params (Dict[str, Any]): Query parameters (apikey is added automatically).
            wrapper (AsyncHTTPRequestWrapper): Configured async request wrapper.

        Returns:
            Any: JSON response data.
        """
        params = {**params, "apikey": self.api_key}
        url = f"{self.BASE_URL}/{endpoint}"
        return await wrapper.make_request(session, url, params=params)

    async def async_fetch_ohlcv(
        self,
        session: "aiohttp.ClientSession",
        symbol: str,
        start: Union[str, datetime],
        end: Optional[Union[str, datetime]] = None,
        timeframe: str = "1d",
    ) -> Tuple[str, pd.DataFrame]:
        """
        Async fetch of EOD OHLCV data for a single symbol.

        Args:
            session (aiohttp.ClientSession): Shared aiohttp session.
            symbol (str): Stock symbol to fetch.
            start (Union[str, datetime]): Start date for historical data.
            end (Union[str, datetime], optional): End date for historical data.
            timeframe (str, optional): Timeframe string. Defaults to "1d".

        Returns:
            Tuple[str, pd.DataFrame]: Tuple of (symbol, df).
        """
        wrapper = self._get_async_wrapper()
        start_dt, end_dt = normalize_date_range(start, end, default_end_to_now=True)
        params = {
            "symbol": symbol,
            "from": format_date_to_string(start_dt),
            "to": format_date_to_string(end_dt),
        }
        data = await self._async_request(session, "historical-price-eod/full", params, wrapper)
        df = convert_to_dataframe_safe(data or [], symbol=symbol)
        if not df.empty:
            column_mapping = {
                "date": "Timestamp",
                "open": "Open",
                "high": "High",
                "low": "Low",
                "close": "Close",
                "volume": "Volume",
            }
            df = standardize_ohlcv_dataframe(
                df,
                column_mapping=column_mapping,
                symbol=symbol,
                timestamp_col="Timestamp",
                add_date=True,
                sort_data=True,
                convert_numeric=True,
            )
        return symbol, df

    async def async_fetch_ratings(
        self,
        session: "aiohttp.ClientSession",
        symbol: str,
        limit: int = 500,
    ) -> Tuple[str, pd.DataFrame]:
        """
        Async fetch of historical analyst ratings.

        Args:
            session (aiohttp.ClientSession): Shared aiohttp session.
            symbol (str): Stock symbol to fetch.
            limit (int, optional): Maximum number of records to return. Defaults to 500.

        Returns:
            Tuple[str, pd.DataFrame]: Tuple of (symbol, df).
        """
        wrapper = self._get_async_wrapper()
        data = await self._async_request(session, "ratings-historical", {"symbol": symbol, "limit": limit}, wrapper)
        if not data or not isinstance(data, list):
            return symbol, pd.DataFrame()
        df = pd.DataFrame(data)
        if "date" in df.columns:
            df["date"] = pd.to_datetime(df["date"])
            df.sort_values("date", inplace=True)
        return symbol, df

    async def async_fetch_company_profile(
        self,
        session: "aiohttp.ClientSession",
        symbol: str,
    ) -> Tuple[str, pd.DataFrame]:
        """
        Async fetch of company profile (sector, industry).

        Args:
            session (aiohttp.ClientSession): Shared aiohttp session.
            symbol (str): Stock symbol to fetch.

        Returns:
            Tuple[str, pd.DataFrame]: Tuple of (symbol, df).
        """
        wrapper = self._get_async_wrapper()
        data = await self._async_request(session, "profile", {"symbol": symbol}, wrapper)
        df = convert_to_dataframe_safe(data or [], symbol=symbol)
        return symbol, df

    async def async_fetch_sector_perf(
        self,
        session: "aiohttp.ClientSession",
        sector: str,
        start: str,
        end: str,
    ) -> Tuple[str, pd.DataFrame]:
        """
        Async fetch of historical sector performance.

        Args:
            session (aiohttp.ClientSession): Shared aiohttp session.
            sector (str): Market sector name.
            start (str): Start date in 'YYYY-MM-DD' format.
            end (str): End date in 'YYYY-MM-DD' format.

        Returns:
            Tuple[str, pd.DataFrame]: Tuple of (sector, df).
        """
        wrapper = self._get_async_wrapper()
        params = {"sector": sector, "from": start, "to": end}
        data = await self._async_request(session, "historical-sector-performance", params, wrapper)
        df = convert_to_dataframe_safe(data or [], symbol=sector)
        if not df.empty and "date" in df.columns:
            df["date"] = pd.to_datetime(df["date"])
            df.sort_values("date", inplace=True)
        return sector, df

    async def async_fetch_industry_perf(
        self,
        session: "aiohttp.ClientSession",
        industry: str,
        start: str,
        end: str,
    ) -> Tuple[str, pd.DataFrame]:
        """
        Async fetch of historical industry performance.

        Args:
            session (aiohttp.ClientSession): Shared aiohttp session.
            industry (str): Industry name.
            start (str): Start date in 'YYYY-MM-DD' format.
            end (str): End date in 'YYYY-MM-DD' format.

        Returns:
            Tuple[str, pd.DataFrame]: Tuple of (industry, df).
        """
        wrapper = self._get_async_wrapper()
        params = {"industry": industry, "from": start, "to": end}
        data = await self._async_request(session, "historical-industry-performance", params, wrapper)
        df = convert_to_dataframe_safe(data or [], symbol=industry)
        if not df.empty and "date" in df.columns:
            df["date"] = pd.to_datetime(df["date"])
            df.sort_values("date", inplace=True)
        return industry, df

__init__(api_key=None)

Initialize FMP data source.

Parameters:

Name Type Description Default
api_key str

FMP API key. If not provided, will try to read from FMP_API_KEY environment variable.

None
Source code in src/quantrl_lab/data/sources/fmp_loader.py
def __init__(self, api_key: Optional[str] = None):
    """
    Initialize FMP data source.

    Args:
        api_key (str, optional): FMP API key. If not provided, will try to read from
            FMP_API_KEY environment variable.
    """
    self.api_key = api_key or os.environ.get("FMP_API_KEY")
    if not self.api_key:
        raise AuthenticationError("FMP API key must be provided or set in FMP_API_KEY environment variable")

    self._request_wrapper = HTTPRequestWrapper(
        max_retries=3,
        retry_strategy=RetryStrategy.EXPONENTIAL,
        base_delay=1.0,
        rate_limit_delay=self.RATE_LIMIT_SLEEP,
        timeout=30.0,
    )

get_historical_ohlcv_data(symbols, start=None, end=None, timeframe='1d', **kwargs)

Get historical OHLCV data from FMP (daily or intraday).

Parameters:

Name Type Description Default
symbols Union[str, List[str]]

Stock symbol(s) to fetch data for.

required
start Union[str, datetime]

Start date for historical data.

None
end Union[str, datetime]

End date for historical data.

None
timeframe str

Timeframe - "1d" for daily, or intraday: "5min", "15min", "30min", "1hour", "4hour". Defaults to "1d".

'1d'
**kwargs Any

Additional arguments including 'nonadjusted' (bool) for intraday data.

{}

Returns:

Type Description
DataFrame

pd.DataFrame: OHLCV data with standardized column names.

Raises:

Type Description
ValueError

If timeframe is not supported.

Source code in src/quantrl_lab/data/sources/fmp_loader.py
def get_historical_ohlcv_data(
    self,
    symbols: Union[str, List[str]],
    start: Optional[Union[str, datetime]] = None,
    end: Optional[Union[str, datetime]] = None,
    timeframe: str = "1d",
    **kwargs: Any,
) -> pd.DataFrame:
    """
    Get historical OHLCV data from FMP (daily or intraday).

    Args:
        symbols (Union[str, List[str]]): Stock symbol(s) to fetch data for.
        start (Union[str, datetime], optional): Start date for historical data.
        end (Union[str, datetime], optional): End date for historical data.
        timeframe (str, optional): Timeframe - "1d" for daily, or intraday:
            "5min", "15min", "30min", "1hour", "4hour". Defaults to "1d".
        **kwargs: Additional arguments including 'nonadjusted' (bool) for intraday data.

    Returns:
        pd.DataFrame: OHLCV data with standardized column names.

    Raises:
        ValueError: If timeframe is not supported.
    """
    if start is None:
        raise InvalidParametersError("FMP requires a 'start' date for historical data.")

    # FMP only supports single symbols
    symbol = get_single_symbol(symbols, warn_on_multiple=True)

    if timeframe in self.INTRADAY_TIMEFRAMES:
        nonadjusted = kwargs.get("nonadjusted", False)
        return self._get_intraday_data(symbol, start, end, timeframe, nonadjusted)

    if timeframe != "1d":
        logger.warning(f"Timeframe {timeframe} not supported by FMP. Using daily (1d) data.")

    start_dt, end_dt = normalize_date_range(start, end, default_end_to_now=True)
    start_str = format_date_to_string(start_dt)
    end_str = format_date_to_string(end_dt)

    logger.info(
        "Fetching EOD data for {symbol} from {start} to {end}",
        symbol=symbol,
        start=start_str,
        end=end_str,
    )

    endpoint = "historical-price-eod/full"
    params = {
        "symbol": symbol,
        "from": start_str,
        "to": end_str,
    }

    data = self._make_request(endpoint, params)

    df = convert_to_dataframe_safe(data, expected_min_rows=0, symbol=symbol)
    if df.empty:
        return df

    column_mapping = {
        "date": "Timestamp",
        "open": "Open",
        "high": "High",
        "low": "Low",
        "close": "Close",
        "volume": "Volume",
    }

    df = standardize_ohlcv_dataframe(
        df,
        column_mapping=column_mapping,
        symbol=symbol,
        timestamp_col="Timestamp",
        add_date=True,
        sort_data=True,
        convert_numeric=True,
    )

    log_dataframe_info(df, "Fetched EOD data", symbol=symbol)
    return df

get_historical_grades(symbol)

Get historical analyst grades for a symbol.

Parameters:

Name Type Description Default
symbol str

Stock symbol to fetch data for.

required

Returns:

Type Description
DataFrame

pd.DataFrame: Historical grades data.

Source code in src/quantrl_lab/data/sources/fmp_loader.py
def get_historical_grades(self, symbol: str) -> pd.DataFrame:
    """
    Get historical analyst grades for a symbol.

    Args:
        symbol (str): Stock symbol to fetch data for.

    Returns:
        pd.DataFrame: Historical grades data.
    """
    endpoint = "grades-historical"
    params = {"symbol": symbol}

    data = self._make_request(endpoint, params)

    if not data or not isinstance(data, list):
        logger.warning(f"No historical grades found for symbol: {symbol}")
        return pd.DataFrame()

    df = pd.DataFrame(data)

    if df.empty:
        logger.warning(f"Empty grades dataset returned for symbol: {symbol}")
        return pd.DataFrame()

    if "date" in df.columns:
        df["date"] = pd.to_datetime(df["date"])
        df.sort_values("date", inplace=True)

    logger.success(
        "Fetched {n} historical grades for {symbol}",
        n=len(df),
        symbol=symbol,
    )

    return df

get_historical_rating(symbol, limit=100)

Get historical ratings for a symbol.

Parameters:

Name Type Description Default
symbol str

Stock symbol to fetch data for.

required
limit int

Number of records to return. Defaults to 100.

100

Returns:

Type Description
DataFrame

pd.DataFrame: Historical ratings data.

Source code in src/quantrl_lab/data/sources/fmp_loader.py
def get_historical_rating(self, symbol: str, limit: int = 100) -> pd.DataFrame:
    """
    Get historical ratings for a symbol.

    Args:
        symbol (str): Stock symbol to fetch data for.
        limit (int, optional): Number of records to return. Defaults to 100.

    Returns:
        pd.DataFrame: Historical ratings data.
    """
    endpoint = "ratings-historical"
    params = {"symbol": symbol, "limit": limit}

    data = self._make_request(endpoint, params)

    if not data or not isinstance(data, list):
        logger.warning(f"No historical ratings found for symbol: {symbol}")
        return pd.DataFrame()

    df = pd.DataFrame(data)

    if df.empty:
        logger.warning(f"Empty ratings dataset returned for symbol: {symbol}")
        return pd.DataFrame()

    if "date" in df.columns:
        df["date"] = pd.to_datetime(df["date"])
        df.sort_values("date", inplace=True)

    logger.success(
        "Fetched {n} historical ratings for {symbol}",
        n=len(df),
        symbol=symbol,
    )

    return df

get_historical_sector_performance(sector, start=None, end=None)

Get historical performance data for a specific market sector.

This endpoint provides historical performance metrics for market sectors, allowing analysis of sector trends and performance over time.

Parameters:

Name Type Description Default
sector str

Market sector name (e.g., "Energy", "Technology", "Healthcare", "Financials", "Consumer Cyclical", "Industrials", "Basic Materials", "Consumer Defensive", "Real Estate", "Utilities", "Communication Services").

required
start str

Start date in 'YYYY-MM-DD' format. Defaults to API default.

None
end str

End date in 'YYYY-MM-DD' format. Defaults to API default.

None

Returns:

Type Description
DataFrame

pd.DataFrame: Historical sector performance data with columns including date, sector, and performance metrics.

Raises:

Type Description
ValueError

If sector is invalid or API request fails.

Example

source = FMPDataSource() df = source.get_historical_sector_performance("Energy", start="2024-01-01", end="2024-12-31") print(df.head())

Source code in src/quantrl_lab/data/sources/fmp_loader.py
def get_historical_sector_performance(self, sector: str, start: str = None, end: str = None) -> pd.DataFrame:
    """
    Get historical performance data for a specific market sector.

    This endpoint provides historical performance metrics for market sectors,
    allowing analysis of sector trends and performance over time.

    Args:
        sector (str): Market sector name (e.g., "Energy", "Technology", "Healthcare",
            "Financials", "Consumer Cyclical", "Industrials", "Basic Materials",
            "Consumer Defensive", "Real Estate", "Utilities", "Communication Services").
        start (str, optional): Start date in 'YYYY-MM-DD' format. Defaults to API default.
        end (str, optional): End date in 'YYYY-MM-DD' format. Defaults to API default.

    Returns:
        pd.DataFrame: Historical sector performance data with columns including date,
            sector, and performance metrics.

    Raises:
        ValueError: If sector is invalid or API request fails.

    Example:
        >>> source = FMPDataSource()
        >>> df = source.get_historical_sector_performance("Energy", start="2024-01-01", end="2024-12-31")
        >>> print(df.head())
    """
    if not sector or not isinstance(sector, str):
        raise InvalidParametersError("Sector must be a non-empty string")

    logger.info("Fetching historical performance for sector: {sector}", sector=sector)

    endpoint = "historical-sector-performance"
    params = {"sector": sector}

    if start:
        params["from"] = start
    if end:
        params["to"] = end

    data = self._make_request(endpoint, params)

    if not data or not isinstance(data, list):
        logger.warning(f"No historical sector performance data found for sector: {sector}")
        return pd.DataFrame()

    df = convert_to_dataframe_safe(data, expected_min_rows=0, symbol=sector)

    if df.empty:
        logger.warning(f"Empty sector performance dataset returned for sector: {sector}")
        return pd.DataFrame()

    if "date" in df.columns:
        df["date"] = pd.to_datetime(df["date"])
        df.sort_values("date", inplace=True)

    log_dataframe_info(df, "Fetched sector performance", symbol=sector)

    logger.success(
        "Fetched {n} records of historical sector performance for {sector}",
        n=len(df),
        sector=sector,
    )

    return df

get_historical_industry_performance(industry, start=None, end=None)

Get historical performance data for a specific industry.

This endpoint provides historical performance metrics for industries, enabling long-term trend analysis and industry evolution tracking.

Parameters:

Name Type Description Default
industry str

Industry name (e.g., "Biotechnology", "Software", "Banks", "Oil & Gas", "Semiconductors", "Insurance", "Auto Manufacturers", "Pharmaceuticals", "Consumer Electronics", "Aerospace & Defense").

required
start str

Start date in 'YYYY-MM-DD' format. Defaults to API default.

None
end str

End date in 'YYYY-MM-DD' format. Defaults to API default.

None

Returns:

Type Description
DataFrame

pd.DataFrame: Historical industry performance data with columns including date, industry, and performance metrics.

Raises:

Type Description
ValueError

If industry is invalid or API request fails.

Example

source = FMPDataSource() df = source.get_historical_industry_performance("Biotechnology", start="2024-01-01", end="2024-12-31") print(df.head())

Source code in src/quantrl_lab/data/sources/fmp_loader.py
def get_historical_industry_performance(self, industry: str, start: str = None, end: str = None) -> pd.DataFrame:
    """
    Get historical performance data for a specific industry.

    This endpoint provides historical performance metrics for industries,
    enabling long-term trend analysis and industry evolution tracking.

    Args:
        industry (str): Industry name (e.g., "Biotechnology", "Software", "Banks",
            "Oil & Gas", "Semiconductors", "Insurance", "Auto Manufacturers",
            "Pharmaceuticals", "Consumer Electronics", "Aerospace & Defense").
        start (str, optional): Start date in 'YYYY-MM-DD' format. Defaults to API default.
        end (str, optional): End date in 'YYYY-MM-DD' format. Defaults to API default.

    Returns:
        pd.DataFrame: Historical industry performance data with columns including date,
            industry, and performance metrics.

    Raises:
        ValueError: If industry is invalid or API request fails.

    Example:
        >>> source = FMPDataSource()
        >>> df = source.get_historical_industry_performance("Biotechnology", start="2024-01-01", end="2024-12-31")
        >>> print(df.head())
    """
    if not industry or not isinstance(industry, str):
        raise InvalidParametersError("Industry must be a non-empty string")

    logger.info("Fetching historical performance for industry: {industry}", industry=industry)

    endpoint = "historical-industry-performance"
    params = {"industry": industry}

    if start:
        params["from"] = start
    if end:
        params["to"] = end

    data = self._make_request(endpoint, params)

    if not data or not isinstance(data, list):
        logger.warning(f"No historical industry performance data found for industry: {industry}")
        return pd.DataFrame()

    df = convert_to_dataframe_safe(data, expected_min_rows=0, symbol=industry)

    if df.empty:
        logger.warning(f"Empty industry performance dataset returned for industry: {industry}")
        return pd.DataFrame()

    if "date" in df.columns:
        df["date"] = pd.to_datetime(df["date"])
        df.sort_values("date", inplace=True)

    log_dataframe_info(df, "Fetched industry performance", symbol=industry)

    logger.success(
        "Fetched {n} records of historical industry performance for {industry}",
        n=len(df),
        industry=industry,
    )

    return df

get_company_profile(symbol)

Get company profile information including sector, industry, and key metrics.

This endpoint provides comprehensive company information including business description, sector/industry classification, executive information, and key financial metrics.

Parameters:

Name Type Description Default
symbol Union[str, List[str]]

Stock ticker symbol (e.g., "AAPL", "MSFT") or list of symbols (only first symbol will be used if list is provided).

required

Returns:

Type Description
DataFrame

pd.DataFrame: Company profile data with columns including symbol, companyName, sector, industry, description, ceo, website, exchange, mktCap, price, beta, volAvg, currency, ipoDate, address, fullTimeEmployees, and asset type flags.

Raises:

Type Description
ValueError

If symbol is invalid or API request fails.

Example

source = FMPDataSource() profile = source.get_company_profile("AAPL") print(f"Sector: {profile.iloc[0].get('sector')}") print(f"Industry: {profile.iloc[0].get('industry')}") print(f"CEO: {profile.iloc[0].get('ceo')}")

Use Cases
  • Get sector/industry classification for stocks
  • Screen stocks by sector or industry
  • Retrieve company metadata for analysis
  • Build company information datasets
Source code in src/quantrl_lab/data/sources/fmp_loader.py
def get_company_profile(self, symbol: Union[str, List[str]]) -> pd.DataFrame:
    """
    Get company profile information including sector, industry, and
    key metrics.

    This endpoint provides comprehensive company information including business
    description, sector/industry classification, executive information, and
    key financial metrics.

    Args:
        symbol (Union[str, List[str]]): Stock ticker symbol (e.g., "AAPL", "MSFT") or
            list of symbols (only first symbol will be used if list is provided).

    Returns:
        pd.DataFrame: Company profile data with columns including symbol, companyName,
            sector, industry, description, ceo, website, exchange, mktCap, price, beta,
            volAvg, currency, ipoDate, address, fullTimeEmployees, and asset type flags.

    Raises:
        ValueError: If symbol is invalid or API request fails.

    Example:
        >>> source = FMPDataSource()
        >>> profile = source.get_company_profile("AAPL")
        >>> print(f"Sector: {profile.iloc[0].get('sector')}")
        >>> print(f"Industry: {profile.iloc[0].get('industry')}")
        >>> print(f"CEO: {profile.iloc[0].get('ceo')}")

    Use Cases:
        - Get sector/industry classification for stocks
        - Screen stocks by sector or industry
        - Retrieve company metadata for analysis
        - Build company information datasets
    """
    symbols = normalize_symbols(symbol)
    validate_symbols(symbols)
    symbol = get_single_symbol(symbols)

    if not symbol or not isinstance(symbol, str):
        raise InvalidParametersError("Symbol must be a non-empty string")

    logger.info("Fetching company profile for: {symbol}", symbol=symbol)

    endpoint = "profile"
    params = {"symbol": symbol}

    data = self._make_request(endpoint, params)

    if not data or not isinstance(data, list):
        logger.warning(f"No company profile data found for symbol: {symbol}")
        return pd.DataFrame()

    df = convert_to_dataframe_safe(data, expected_min_rows=0, symbol=symbol)

    if df.empty:
        logger.warning(f"Empty company profile dataset returned for symbol: {symbol}")
        return pd.DataFrame()

    if not df.empty:
        company_name = df.iloc[0].get("companyName", "Unknown")
        sector = df.iloc[0].get("sector", "N/A")
        industry = df.iloc[0].get("industry", "N/A")

        logger.success(
            "Fetched company profile for {symbol}: {name} ({sector} - {industry})",
            symbol=symbol,
            name=company_name,
            sector=sector,
            industry=industry,
        )

    return df

async_fetch_ohlcv(session, symbol, start, end=None, timeframe='1d') async

Async fetch of EOD OHLCV data for a single symbol.

Parameters:

Name Type Description Default
session ClientSession

Shared aiohttp session.

required
symbol str

Stock symbol to fetch.

required
start Union[str, datetime]

Start date for historical data.

required
end Union[str, datetime]

End date for historical data.

None
timeframe str

Timeframe string. Defaults to "1d".

'1d'

Returns:

Type Description
Tuple[str, DataFrame]

Tuple[str, pd.DataFrame]: Tuple of (symbol, df).

Source code in src/quantrl_lab/data/sources/fmp_loader.py
async def async_fetch_ohlcv(
    self,
    session: "aiohttp.ClientSession",
    symbol: str,
    start: Union[str, datetime],
    end: Optional[Union[str, datetime]] = None,
    timeframe: str = "1d",
) -> Tuple[str, pd.DataFrame]:
    """
    Async fetch of EOD OHLCV data for a single symbol.

    Args:
        session (aiohttp.ClientSession): Shared aiohttp session.
        symbol (str): Stock symbol to fetch.
        start (Union[str, datetime]): Start date for historical data.
        end (Union[str, datetime], optional): End date for historical data.
        timeframe (str, optional): Timeframe string. Defaults to "1d".

    Returns:
        Tuple[str, pd.DataFrame]: Tuple of (symbol, df).
    """
    wrapper = self._get_async_wrapper()
    start_dt, end_dt = normalize_date_range(start, end, default_end_to_now=True)
    params = {
        "symbol": symbol,
        "from": format_date_to_string(start_dt),
        "to": format_date_to_string(end_dt),
    }
    data = await self._async_request(session, "historical-price-eod/full", params, wrapper)
    df = convert_to_dataframe_safe(data or [], symbol=symbol)
    if not df.empty:
        column_mapping = {
            "date": "Timestamp",
            "open": "Open",
            "high": "High",
            "low": "Low",
            "close": "Close",
            "volume": "Volume",
        }
        df = standardize_ohlcv_dataframe(
            df,
            column_mapping=column_mapping,
            symbol=symbol,
            timestamp_col="Timestamp",
            add_date=True,
            sort_data=True,
            convert_numeric=True,
        )
    return symbol, df

async_fetch_ratings(session, symbol, limit=500) async

Async fetch of historical analyst ratings.

Parameters:

Name Type Description Default
session ClientSession

Shared aiohttp session.

required
symbol str

Stock symbol to fetch.

required
limit int

Maximum number of records to return. Defaults to 500.

500

Returns:

Type Description
Tuple[str, DataFrame]

Tuple[str, pd.DataFrame]: Tuple of (symbol, df).

Source code in src/quantrl_lab/data/sources/fmp_loader.py
async def async_fetch_ratings(
    self,
    session: "aiohttp.ClientSession",
    symbol: str,
    limit: int = 500,
) -> Tuple[str, pd.DataFrame]:
    """
    Async fetch of historical analyst ratings.

    Args:
        session (aiohttp.ClientSession): Shared aiohttp session.
        symbol (str): Stock symbol to fetch.
        limit (int, optional): Maximum number of records to return. Defaults to 500.

    Returns:
        Tuple[str, pd.DataFrame]: Tuple of (symbol, df).
    """
    wrapper = self._get_async_wrapper()
    data = await self._async_request(session, "ratings-historical", {"symbol": symbol, "limit": limit}, wrapper)
    if not data or not isinstance(data, list):
        return symbol, pd.DataFrame()
    df = pd.DataFrame(data)
    if "date" in df.columns:
        df["date"] = pd.to_datetime(df["date"])
        df.sort_values("date", inplace=True)
    return symbol, df

async_fetch_company_profile(session, symbol) async

Async fetch of company profile (sector, industry).

Parameters:

Name Type Description Default
session ClientSession

Shared aiohttp session.

required
symbol str

Stock symbol to fetch.

required

Returns:

Type Description
Tuple[str, DataFrame]

Tuple[str, pd.DataFrame]: Tuple of (symbol, df).

Source code in src/quantrl_lab/data/sources/fmp_loader.py
async def async_fetch_company_profile(
    self,
    session: "aiohttp.ClientSession",
    symbol: str,
) -> Tuple[str, pd.DataFrame]:
    """
    Async fetch of company profile (sector, industry).

    Args:
        session (aiohttp.ClientSession): Shared aiohttp session.
        symbol (str): Stock symbol to fetch.

    Returns:
        Tuple[str, pd.DataFrame]: Tuple of (symbol, df).
    """
    wrapper = self._get_async_wrapper()
    data = await self._async_request(session, "profile", {"symbol": symbol}, wrapper)
    df = convert_to_dataframe_safe(data or [], symbol=symbol)
    return symbol, df

async_fetch_sector_perf(session, sector, start, end) async

Async fetch of historical sector performance.

Parameters:

Name Type Description Default
session ClientSession

Shared aiohttp session.

required
sector str

Market sector name.

required
start str

Start date in 'YYYY-MM-DD' format.

required
end str

End date in 'YYYY-MM-DD' format.

required

Returns:

Type Description
Tuple[str, DataFrame]

Tuple[str, pd.DataFrame]: Tuple of (sector, df).

Source code in src/quantrl_lab/data/sources/fmp_loader.py
async def async_fetch_sector_perf(
    self,
    session: "aiohttp.ClientSession",
    sector: str,
    start: str,
    end: str,
) -> Tuple[str, pd.DataFrame]:
    """
    Async fetch of historical sector performance.

    Args:
        session (aiohttp.ClientSession): Shared aiohttp session.
        sector (str): Market sector name.
        start (str): Start date in 'YYYY-MM-DD' format.
        end (str): End date in 'YYYY-MM-DD' format.

    Returns:
        Tuple[str, pd.DataFrame]: Tuple of (sector, df).
    """
    wrapper = self._get_async_wrapper()
    params = {"sector": sector, "from": start, "to": end}
    data = await self._async_request(session, "historical-sector-performance", params, wrapper)
    df = convert_to_dataframe_safe(data or [], symbol=sector)
    if not df.empty and "date" in df.columns:
        df["date"] = pd.to_datetime(df["date"])
        df.sort_values("date", inplace=True)
    return sector, df

async_fetch_industry_perf(session, industry, start, end) async

Async fetch of historical industry performance.

Parameters:

Name Type Description Default
session ClientSession

Shared aiohttp session.

required
industry str

Industry name.

required
start str

Start date in 'YYYY-MM-DD' format.

required
end str

End date in 'YYYY-MM-DD' format.

required

Returns:

Type Description
Tuple[str, DataFrame]

Tuple[str, pd.DataFrame]: Tuple of (industry, df).

Source code in src/quantrl_lab/data/sources/fmp_loader.py
async def async_fetch_industry_perf(
    self,
    session: "aiohttp.ClientSession",
    industry: str,
    start: str,
    end: str,
) -> Tuple[str, pd.DataFrame]:
    """
    Async fetch of historical industry performance.

    Args:
        session (aiohttp.ClientSession): Shared aiohttp session.
        industry (str): Industry name.
        start (str): Start date in 'YYYY-MM-DD' format.
        end (str): End date in 'YYYY-MM-DD' format.

    Returns:
        Tuple[str, pd.DataFrame]: Tuple of (industry, df).
    """
    wrapper = self._get_async_wrapper()
    params = {"industry": industry, "from": start, "to": end}
    data = await self._async_request(session, "historical-industry-performance", params, wrapper)
    df = convert_to_dataframe_safe(data or [], symbol=industry)
    if not df.empty and "date" in df.columns:
        df["date"] = pd.to_datetime(df["date"])
        df.sort_values("date", inplace=True)
    return industry, df

Registry

source_registry

DataSourceRegistry

Registry for managing multiple data sources with factory pattern.

Supports lazy initialization, multiple sources per type, and dynamic source discovery by capability.

Example

registry = DataSourceRegistry() registry.register_source("alpaca_backup", lambda: AlpacaDataLoader())

Use primary source

data = registry.get_historical_ohlcv_data(...)

Or get specific source

backup = registry.get_source("alpaca_backup") data = backup.get_historical_ohlcv_data(...)

Source code in src/quantrl_lab/data/source_registry.py
class DataSourceRegistry:
    """
    Registry for managing multiple data sources with factory pattern.

    Supports lazy initialization, multiple sources per type, and dynamic
    source discovery by capability.

    Example:
        >>> registry = DataSourceRegistry()
        >>> registry.register_source("alpaca_backup", lambda: AlpacaDataLoader())
        >>>
        >>> # Use primary source
        >>> data = registry.get_historical_ohlcv_data(...)
        >>>
        >>> # Or get specific source
        >>> backup = registry.get_source("alpaca_backup")
        >>> data = backup.get_historical_ohlcv_data(...)
    """

    # Default source configurations
    DEFAULT_SOURCES = {
        "primary_source": AlpacaDataLoader,
        "news_source": AlpacaDataLoader,
        "fundamental_source": FMPDataSource,
    }

    def __init__(self, sources: Optional[Dict[str, type]] = None, **kwargs: Any) -> None:
        """
        Initialize with configured data sources.

        Args:
            sources: Dictionary mapping source names to data source classes
            **kwargs: Individual source overrides (e.g., primary_source=YFinanceDataLoader)

        Example:
            >>> # Use defaults
            >>> registry = DataSourceRegistry()
            >>>
            >>> # Override primary source
            >>> registry = DataSourceRegistry(primary_source=YFinanceDataLoader)
            >>>
            >>> # Custom sources dict
            >>> registry = DataSourceRegistry(sources={
            ...     "primary_source": AlpacaDataLoader,
            ...     "backup_source": YFinanceDataLoader
            ... })
        """
        # Internal storage
        self._factories: Dict[str, Callable] = {}  # name -> factory function
        self._sources: Dict[str, Any] = {}  # name -> instantiated source (lazy)

        # Register default sources
        self._register_defaults()

        # Override with provided sources dict (backward compatibility)
        if sources:
            for name, source_class in sources.items():
                if source_class is not None:
                    self.register_source(name, self._make_factory(source_class), override=True)

        # Override with kwargs (backward compatibility)
        for name, source_class in kwargs.items():
            if source_class is not None:
                self.register_source(name, self._make_factory(source_class), override=True)

    def _register_defaults(self) -> None:
        """Register default data sources."""
        for name, source_class in self.DEFAULT_SOURCES.items():
            if source_class is not None:
                self.register_source(name, self._make_factory(source_class))

    @staticmethod
    def _make_factory(source_class: type) -> Callable:
        """
        Create a factory function for a source class.

        Args:
            source_class: Data source class to instantiate

        Returns:
            Factory function that creates source instances
        """

        def factory(**init_kwargs):
            return source_class(**init_kwargs)

        return factory

    def register_source(self, name: str, factory: Callable, override: bool = False) -> None:
        """
        Register a data source factory.

        Args:
            name: Unique name for this source (e.g., "alpaca_primary", "yfinance_backup")
            factory: Callable that returns a data source instance
            override: If True, replace existing registration

        Raises:
            ValueError: If source already registered and override=False

        Example:
            >>> registry.register_source("custom", lambda: YFinanceDataLoader())
            >>> registry.register_source("primary_source", lambda: AlpacaDataLoader(), override=True)
        """
        if name in self._factories and not override:
            raise ValueError(f"Source '{name}' already registered. Use override=True to replace.")
        self._factories[name] = factory

    def get_source(self, name: str, **init_kwargs: Any) -> Any:
        """
        Get or create a data source instance (lazy initialization).

        Args:
            name: Source name to retrieve
            **init_kwargs: Initialization arguments for the source (if not yet created)

        Returns:
            Data source instance

        Raises:
            KeyError: If no factory registered for this name

        Example:
            >>> source = registry.get_source("primary_source")
            >>> data = source.get_historical_ohlcv_data(...)
        """
        # Lazy initialization - create on first access
        if name not in self._sources:
            if name not in self._factories:
                raise KeyError(f"No factory registered for source '{name}'")
            self._sources[name] = self._factories[name](**init_kwargs)
        return self._sources[name]

    def list_sources_by_capability(self, capability: str) -> List[str]:
        """
        Find all registered sources supporting a capability.

        Args:
            capability: Feature name (e.g., "historical_bars", "news", "streaming")

        Returns:
            List of source names that support the capability

        Example:
            >>> sources = registry.list_sources_by_capability("historical_bars")
            >>> print(sources)  # ["primary_source", "backup_source"]
        """
        results = []
        for name in self._factories:
            try:
                source = self.get_source(name)
                if source.supports_feature(capability):
                    results.append(name)
            except Exception:
                # Skip sources that fail to instantiate
                continue
        return results

    def list_all_sources(self) -> List[str]:
        """
        List all registered source names.

        Returns:
            List of all registered source names
        """
        return list(self._factories.keys())

    # Backward compatibility: lazy properties for common sources
    @property
    def primary_source(self) -> Any:
        """
        Get primary data source (lazy initialization).

        Returns:
            Primary source instance
        """
        return self.get_source("primary_source")

    @property
    def news_source(self) -> Any:
        """
        Get news data source (lazy initialization).

        Returns:
            News source instance
        """
        return self.get_source("news_source")

    # Existing methods preserved for backward compatibility
    def get_historical_ohlcv_data(
        self,
        symbols: Union[str, List[str]],
        start: Union[str, datetime],
        end: Optional[Union[str, datetime]] = None,
        timeframe: str = "1d",
        **kwargs,
    ) -> pd.DataFrame:
        """
        Fetch historical OHLCV data from the primary data source.

        Args:
            symbols: Stock symbol(s) to fetch data for.
            start: Start date for the data.
            end: End date for the data. Defaults to None.
            timeframe: Timeframe for the data. Defaults to "1d".

        Returns:
            pd.DataFrame: Historical OHLCV data.

        Raises:
            RuntimeError: If primary_source doesn't implement HistoricalDataCapable protocol.
        """
        if not isinstance(self.primary_source, HistoricalDataCapable):
            raise RuntimeError(
                f"{self.primary_source.__class__.__name__} doesn't support historical data. "
                f"Please configure a data source that implements HistoricalDataCapable."
            )

        # Use primary source to fetch historical data
        return self.primary_source.get_historical_ohlcv_data(
            symbols=symbols,
            start=start,
            end=end,
            timeframe=timeframe,
            **kwargs,
        )

    def get_news_data(
        self,
        symbols: str,
        start: Union[str, datetime],
        end: Optional[Union[str, datetime]] = None,
        **kwargs: Any,
    ) -> pd.DataFrame:
        """
        Get news data for a symbol or list of symbols.

        Args:
            symbols: Stock symbol(s)
            start: Start date or timestamp
            end: End date or timestamp. Defaults to None.
            **kwargs: Additional parameters passed to the news source

        Returns:
            pd.DataFrame: raw news data

        Raises:
            RuntimeError: If news_source doesn't implement NewsDataCapable protocol.
        """
        if not isinstance(self.news_source, NewsDataCapable):
            raise RuntimeError(
                f"{self.news_source.__class__.__name__} doesn't support news data. "
                f"Please configure a data source that implements NewsDataCapable."
            )

        return self.news_source.get_news_data(symbols=symbols, start=start, end=end, **kwargs)

primary_source property

Get primary data source (lazy initialization).

Returns:

Type Description
Any

Primary source instance

news_source property

Get news data source (lazy initialization).

Returns:

Type Description
Any

News source instance

__init__(sources=None, **kwargs)

Initialize with configured data sources.

Parameters:

Name Type Description Default
sources Optional[Dict[str, type]]

Dictionary mapping source names to data source classes

None
**kwargs Any

Individual source overrides (e.g., primary_source=YFinanceDataLoader)

{}
Example
Use defaults

registry = DataSourceRegistry()

Override primary source

registry = DataSourceRegistry(primary_source=YFinanceDataLoader)

Custom sources dict

registry = DataSourceRegistry(sources={ ... "primary_source": AlpacaDataLoader, ... "backup_source": YFinanceDataLoader ... })

Source code in src/quantrl_lab/data/source_registry.py
def __init__(self, sources: Optional[Dict[str, type]] = None, **kwargs: Any) -> None:
    """
    Initialize with configured data sources.

    Args:
        sources: Dictionary mapping source names to data source classes
        **kwargs: Individual source overrides (e.g., primary_source=YFinanceDataLoader)

    Example:
        >>> # Use defaults
        >>> registry = DataSourceRegistry()
        >>>
        >>> # Override primary source
        >>> registry = DataSourceRegistry(primary_source=YFinanceDataLoader)
        >>>
        >>> # Custom sources dict
        >>> registry = DataSourceRegistry(sources={
        ...     "primary_source": AlpacaDataLoader,
        ...     "backup_source": YFinanceDataLoader
        ... })
    """
    # Internal storage
    self._factories: Dict[str, Callable] = {}  # name -> factory function
    self._sources: Dict[str, Any] = {}  # name -> instantiated source (lazy)

    # Register default sources
    self._register_defaults()

    # Override with provided sources dict (backward compatibility)
    if sources:
        for name, source_class in sources.items():
            if source_class is not None:
                self.register_source(name, self._make_factory(source_class), override=True)

    # Override with kwargs (backward compatibility)
    for name, source_class in kwargs.items():
        if source_class is not None:
            self.register_source(name, self._make_factory(source_class), override=True)

register_source(name, factory, override=False)

Register a data source factory.

Parameters:

Name Type Description Default
name str

Unique name for this source (e.g., "alpaca_primary", "yfinance_backup")

required
factory Callable

Callable that returns a data source instance

required
override bool

If True, replace existing registration

False

Raises:

Type Description
ValueError

If source already registered and override=False

Example

registry.register_source("custom", lambda: YFinanceDataLoader()) registry.register_source("primary_source", lambda: AlpacaDataLoader(), override=True)

Source code in src/quantrl_lab/data/source_registry.py
def register_source(self, name: str, factory: Callable, override: bool = False) -> None:
    """
    Register a data source factory.

    Args:
        name: Unique name for this source (e.g., "alpaca_primary", "yfinance_backup")
        factory: Callable that returns a data source instance
        override: If True, replace existing registration

    Raises:
        ValueError: If source already registered and override=False

    Example:
        >>> registry.register_source("custom", lambda: YFinanceDataLoader())
        >>> registry.register_source("primary_source", lambda: AlpacaDataLoader(), override=True)
    """
    if name in self._factories and not override:
        raise ValueError(f"Source '{name}' already registered. Use override=True to replace.")
    self._factories[name] = factory

get_source(name, **init_kwargs)

Get or create a data source instance (lazy initialization).

Parameters:

Name Type Description Default
name str

Source name to retrieve

required
**init_kwargs Any

Initialization arguments for the source (if not yet created)

{}

Returns:

Type Description
Any

Data source instance

Raises:

Type Description
KeyError

If no factory registered for this name

Example

source = registry.get_source("primary_source") data = source.get_historical_ohlcv_data(...)

Source code in src/quantrl_lab/data/source_registry.py
def get_source(self, name: str, **init_kwargs: Any) -> Any:
    """
    Get or create a data source instance (lazy initialization).

    Args:
        name: Source name to retrieve
        **init_kwargs: Initialization arguments for the source (if not yet created)

    Returns:
        Data source instance

    Raises:
        KeyError: If no factory registered for this name

    Example:
        >>> source = registry.get_source("primary_source")
        >>> data = source.get_historical_ohlcv_data(...)
    """
    # Lazy initialization - create on first access
    if name not in self._sources:
        if name not in self._factories:
            raise KeyError(f"No factory registered for source '{name}'")
        self._sources[name] = self._factories[name](**init_kwargs)
    return self._sources[name]

list_sources_by_capability(capability)

Find all registered sources supporting a capability.

Parameters:

Name Type Description Default
capability str

Feature name (e.g., "historical_bars", "news", "streaming")

required

Returns:

Type Description
List[str]

List of source names that support the capability

Example

sources = registry.list_sources_by_capability("historical_bars") print(sources) # ["primary_source", "backup_source"]

Source code in src/quantrl_lab/data/source_registry.py
def list_sources_by_capability(self, capability: str) -> List[str]:
    """
    Find all registered sources supporting a capability.

    Args:
        capability: Feature name (e.g., "historical_bars", "news", "streaming")

    Returns:
        List of source names that support the capability

    Example:
        >>> sources = registry.list_sources_by_capability("historical_bars")
        >>> print(sources)  # ["primary_source", "backup_source"]
    """
    results = []
    for name in self._factories:
        try:
            source = self.get_source(name)
            if source.supports_feature(capability):
                results.append(name)
        except Exception:
            # Skip sources that fail to instantiate
            continue
    return results

list_all_sources()

List all registered source names.

Returns:

Type Description
List[str]

List of all registered source names

Source code in src/quantrl_lab/data/source_registry.py
def list_all_sources(self) -> List[str]:
    """
    List all registered source names.

    Returns:
        List of all registered source names
    """
    return list(self._factories.keys())

get_historical_ohlcv_data(symbols, start, end=None, timeframe='1d', **kwargs)

Fetch historical OHLCV data from the primary data source.

Parameters:

Name Type Description Default
symbols Union[str, List[str]]

Stock symbol(s) to fetch data for.

required
start Union[str, datetime]

Start date for the data.

required
end Optional[Union[str, datetime]]

End date for the data. Defaults to None.

None
timeframe str

Timeframe for the data. Defaults to "1d".

'1d'

Returns:

Type Description
DataFrame

pd.DataFrame: Historical OHLCV data.

Raises:

Type Description
RuntimeError

If primary_source doesn't implement HistoricalDataCapable protocol.

Source code in src/quantrl_lab/data/source_registry.py
def get_historical_ohlcv_data(
    self,
    symbols: Union[str, List[str]],
    start: Union[str, datetime],
    end: Optional[Union[str, datetime]] = None,
    timeframe: str = "1d",
    **kwargs,
) -> pd.DataFrame:
    """
    Fetch historical OHLCV data from the primary data source.

    Args:
        symbols: Stock symbol(s) to fetch data for.
        start: Start date for the data.
        end: End date for the data. Defaults to None.
        timeframe: Timeframe for the data. Defaults to "1d".

    Returns:
        pd.DataFrame: Historical OHLCV data.

    Raises:
        RuntimeError: If primary_source doesn't implement HistoricalDataCapable protocol.
    """
    if not isinstance(self.primary_source, HistoricalDataCapable):
        raise RuntimeError(
            f"{self.primary_source.__class__.__name__} doesn't support historical data. "
            f"Please configure a data source that implements HistoricalDataCapable."
        )

    # Use primary source to fetch historical data
    return self.primary_source.get_historical_ohlcv_data(
        symbols=symbols,
        start=start,
        end=end,
        timeframe=timeframe,
        **kwargs,
    )

get_news_data(symbols, start, end=None, **kwargs)

Get news data for a symbol or list of symbols.

Parameters:

Name Type Description Default
symbols str

Stock symbol(s)

required
start Union[str, datetime]

Start date or timestamp

required
end Optional[Union[str, datetime]]

End date or timestamp. Defaults to None.

None
**kwargs Any

Additional parameters passed to the news source

{}

Returns:

Type Description
DataFrame

pd.DataFrame: raw news data

Raises:

Type Description
RuntimeError

If news_source doesn't implement NewsDataCapable protocol.

Source code in src/quantrl_lab/data/source_registry.py
def get_news_data(
    self,
    symbols: str,
    start: Union[str, datetime],
    end: Optional[Union[str, datetime]] = None,
    **kwargs: Any,
) -> pd.DataFrame:
    """
    Get news data for a symbol or list of symbols.

    Args:
        symbols: Stock symbol(s)
        start: Start date or timestamp
        end: End date or timestamp. Defaults to None.
        **kwargs: Additional parameters passed to the news source

    Returns:
        pd.DataFrame: raw news data

    Raises:
        RuntimeError: If news_source doesn't implement NewsDataCapable protocol.
    """
    if not isinstance(self.news_source, NewsDataCapable):
        raise RuntimeError(
            f"{self.news_source.__class__.__name__} doesn't support news data. "
            f"Please configure a data source that implements NewsDataCapable."
        )

    return self.news_source.get_news_data(symbols=symbols, start=start, end=end, **kwargs)