Skip to content

Data Processing API

DataProcessor

Source code in src/quantrl_lab/data/processing/processor.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
class DataProcessor:
    @staticmethod
    def load_indicators(file_path: str) -> List[Union[str, Dict]]:
        """
        Load indicator configuration from a YAML or JSON file.

        Args:
            file_path: Path to the configuration file (.yaml, .yml, or .json)

        Returns:
            List[Union[str, Dict]]: List of indicator configurations

        Raises:
            FileNotFoundError: If the file does not exist
            ValueError: If the file format is unsupported or invalid
        """
        if not os.path.exists(file_path):
            raise FileNotFoundError(f"Configuration file not found: {file_path}")

        ext = os.path.splitext(file_path)[1].lower()

        try:
            with open(file_path, "r") as f:
                if ext in [".yaml", ".yml"]:
                    config_data = yaml.safe_load(f)
                elif ext == ".json":
                    config_data = json.load(f)
                else:
                    raise ValueError(f"Unsupported configuration format: {ext}. Use .yaml or .json")

            # Validate structure - expect a list or a dict with an 'indicators' key
            if isinstance(config_data, list):
                return config_data
            elif isinstance(config_data, dict) and "indicators" in config_data:
                return config_data["indicators"]
            else:
                raise ValueError("Invalid config structure. Expected a list or a dict with 'indicators' key.")

        except Exception as e:
            raise ValueError(f"Failed to load indicator config from {file_path}: {e}")

    def __init__(self, ohlcv_data: pd.DataFrame, **kwargs):
        if ohlcv_data is None:
            raise ValueError("Required parameter 'ohlcv_data' is missing.")

        self.ohlcv_data = ohlcv_data  # minimal required data

        # === Optional data sources ===
        self.news_data = kwargs.get("news_data", None)
        self.fundamental_data = kwargs.get("fundamental_data", None)
        self.macro_data = kwargs.get("macro_data", None)
        self.calendar_event_data = kwargs.get("calendar_event_data", None)

        # === Analyst & Context Data ===
        self.analyst_grades = kwargs.get("analyst_grades", None)
        self.analyst_ratings = kwargs.get("analyst_ratings", None)
        self.sector_performance = kwargs.get("sector_performance", None)
        self.industry_performance = kwargs.get("industry_performance", None)

        # === Sentiment configuration and provider ===
        self.sentiment_config = kwargs.get("sentiment_config", SentimentConfig())
        self.sentiment_provider = kwargs.get("sentiment_provider")

        if self.sentiment_provider is None and self.news_data is not None:
            # Default to HuggingFaceProvider if news data is present but no provider given
            self.sentiment_provider = HuggingFaceProvider()

    def append_technical_indicators(
        self,
        df: pd.DataFrame,
        indicators: Optional[List[Union[str, Dict]]] = None,
        **kwargs,
    ) -> pd.DataFrame:
        """
        Add technical indicators to existing OHLCV DataFrame.

        Args:
            df (pd.DataFrame): raw OHLCV data
            indicators (Optional[List[Union[str, Dict]]], optional): Defaults to None.

        Raises:
            ValueError: if input DataFrame is empty
            ValueError: if required columns are missing

        Returns:
            pd.DataFrame: DataFrame with added technical indicators
        """
        # Return original if no indicators specified
        if not indicators:
            return df.copy()

        try:
            generator = TechnicalFeatureGenerator(indicators)
            return generator.generate(df, **kwargs)
        except ValueError as e:
            # Re-raise with same message or log
            raise e
        except Exception as e:
            console.print(f"[red]❌ Failed to append technical indicators: {e}[/red]")
            return df.copy()

    def append_news_sentiment_data(self, df: pd.DataFrame, fillna_strategy="neutral") -> pd.DataFrame:
        """
        Append news sentiment data to the OHLCV DataFrame.

        Args:
            df (pd.DataFrame): Input OHLCV DataFrame.
            fillna_strategy (str, optional): Strategy for handling missing sentiment scores. Defaults to "neutral".

        Raises:
            ValueError: If the input DataFrame is empty or if the strategy is unsupported.

        Returns:
            pd.DataFrame: DataFrame with appended news sentiment data.
        """
        if self.news_data is None or self.news_data.empty:
            console.print("[yellow]⚠️  No news data provided. Skipping sentiment analysis.[/yellow]")
            return df

        try:
            generator = SentimentFeatureGenerator(
                self.sentiment_provider, self.sentiment_config, self.news_data, fillna_strategy
            )
            return generator.generate(df)
        except ValueError as e:
            raise e
        except Exception as e:
            console.print(f"[red]❌ Failed to append sentiment data: {e}[/red]")
            return df

    def drop_unwanted_columns(
        self, df: pd.DataFrame, columns_to_drop: Optional[List[str]] = None, keep_date: bool = False
    ) -> pd.DataFrame:
        """
        Drop unwanted columns from the DataFrame.

        Args:
            df (pd.DataFrame): Input DataFrame.
            columns_to_drop (Optional[List[str]], optional): List of column names to drop.
                If None, will drop default columns ('Date', 'Timestamp', 'Symbol'). Defaults to None.
            keep_date (bool): If True, date-related columns will not be dropped.
        Returns:
            pd.DataFrame: DataFrame with specified columns dropped.
        """
        if columns_to_drop is None:
            columns_to_drop = [config.DEFAULT_DATE_COLUMN, "Timestamp", "Symbol"]
        elif not isinstance(columns_to_drop, list):
            raise ValueError(
                f"Invalid type for 'columns_to_drop': expected list, got {type(columns_to_drop).__name__}."
            )

        if keep_date:
            columns_to_drop = [col for col in columns_to_drop if col not in config.DATE_COLUMNS]

        return df.drop(columns=columns_to_drop, errors="ignore")

    def convert_columns_to_numeric(self, df: pd.DataFrame, columns: Optional[List[str]] = None) -> pd.DataFrame:
        """
        Convert specified columns to numeric, handling date columns
        carefully.

        Args:
            df (pd.DataFrame): Input DataFrame
            columns (Optional[List[str]]): Specific columns to convert. If None, converts all object columns.

        Returns:
            pd.DataFrame: DataFrame with numeric conversions applied
        """
        if columns is None:
            # Only convert object columns that are not date-like
            columns = []
            for col in df.columns:
                if df[col].dtype == "object":
                    # Skip columns that look like dates
                    if col in config.DATE_COLUMNS or col.lower() in [c.lower() for c in config.DATE_COLUMNS]:
                        continue
                    # Check if it's actually a date column by looking at the data
                    sample_val = df[col].dropna().iloc[0] if not df[col].dropna().empty else None
                    if sample_val is not None:
                        try:
                            pd.to_datetime(sample_val)
                            # If conversion succeeds, it's probably a date column - skip it
                            continue
                        except (ValueError, TypeError):
                            # Not a date, safe to convert to numeric
                            columns.append(col)
        elif not isinstance(columns, list):
            raise ValueError(f"Invalid type for 'columns': expected list, got {type(columns).__name__}.")

        for col in columns:
            if col in df.columns and df[col].dtype == "object":
                df[col] = pd.to_numeric(df[col], errors="coerce")

        return df

    def data_processing_pipeline(
        self,
        indicators: Optional[List[Union[str, Dict]]] = None,
        alpha_selection_config: Optional[Dict[str, Any]] = None,
        fillna_strategy: str = "neutral",
        split_config: Optional[Dict] = None,
        **kwargs: Any,
    ) -> Tuple[Union[pd.DataFrame, Dict[str, pd.DataFrame]], Dict]:
        """
        Main data processing pipeline.

        Applies technical indicators, sentiment analysis, and data transformations

        This method uses the DataPipeline infrastructure internally.

        Args:
            indicators (Optional[List[Union[str, Dict]]], optional):
                List of indicators to apply. Supports:
                - String format: ["SMA", "RSI"]
                - Dict format: [{"SMA": {"window": 20}}, {"RSI": {"window": 14}}]
                Defaults to None (no indicators).
            alpha_selection_config (Optional[Dict], optional):
                Configuration for dynamic alpha selection. If provided, the pipeline
                will automatically select and apply the best indicators.
                Keys: "metric" (default "ic"), "threshold", "top_k", "candidates".
            fillna_strategy (str, optional): Strategy for handling missing sentiment scores.
                Options: "neutral" (fill with 0.0) or "fill_forward" (forward fill).
                Defaults to "neutral".
            split_config (Optional[Dict], optional): Configuration for data splitting.
                If None, returns a single DataFrame. Otherwise, returns dict of DataFrames.
                Ratio-based: {'train': 0.7, 'test': 0.3}
                Date-based: {'train': ('2020-01-01', '2021-12-31'), 'test': ('2022-01-01', '2022-12-31')}
            **kwargs: Additional arguments:
                - columns_to_drop: List of columns to drop (overrides default)
                - columns_to_convert: List of columns to convert to numeric

        Returns:
            Tuple[Union[pd.DataFrame, Dict[str, pd.DataFrame]], Dict]: A tuple containing:
                - Processed DataFrame if split_config is None
                - Dictionary of DataFrames if split_config is provided (keys: split names)
                - Metadata dictionary with processing information
        """
        from quantrl_lab.data.processing.pipeline import DataPipeline
        from quantrl_lab.data.processing.steps import (
            AnalystEstimatesStep,
            ColumnCleanupStep,
            MarketContextStep,
            NumericConversionStep,
            SentimentEnrichmentStep,
            TechnicalIndicatorStep,
        )

        # Build pipeline
        pipeline = DataPipeline()

        # 1. Technical Indicators (Manual)
        pipeline.add_step(TechnicalIndicatorStep(indicators=indicators))

        # 2. Analyst Estimates
        if self.analyst_grades is not None or self.analyst_ratings is not None:
            pipeline.add_step(AnalystEstimatesStep(grades_df=self.analyst_grades, ratings_df=self.analyst_ratings))

        # 3. Market Context
        if self.sector_performance is not None or self.industry_performance is not None:
            pipeline.add_step(
                MarketContextStep(sector_perf_df=self.sector_performance, industry_perf_df=self.industry_performance)
            )

        # 4. Sentiment Enrichment (only if news data available)
        if self.news_data is not None:
            pipeline.add_step(
                SentimentEnrichmentStep(
                    news_data=self.news_data,
                    provider=self.sentiment_provider,
                    config=self.sentiment_config,
                    fillna_strategy=fillna_strategy,
                )
            )

        # 5. Numeric Conversion
        # Convert specified columns to numeric
        columns_to_convert = kwargs.get("columns_to_convert", None)
        pipeline.add_step(NumericConversionStep(columns=columns_to_convert))

        # 6. Column Cleanup
        # If columns_to_drop is passed, use it; otherwise rely on defaults in step
        # Note: We keep date columns if splitting is required later
        columns_to_drop = kwargs.get("columns_to_drop", None)
        # If splitting, we MUST keep date columns for the split operation
        # If not splitting, the pipeline step handles default date dropping unless overridden
        keep_date = split_config is not None

        # Configure Cleanup Step
        cleanup_step = ColumnCleanupStep(columns_to_drop=columns_to_drop, keep_date=keep_date)
        pipeline.add_step(cleanup_step)

        # Execute Pipeline
        # We pass symbol for metadata tracking if available
        symbol = None
        if "Symbol" in self.ohlcv_data.columns:
            unique_symbols = self.ohlcv_data["Symbol"].unique()
            symbol = unique_symbols[0] if len(unique_symbols) == 1 else None

        processed_data, metadata_obj = pipeline.execute(self.ohlcv_data, symbol=symbol)

        # Update metadata flags
        if self.analyst_grades is not None or self.analyst_ratings is not None:
            metadata_obj.analyst_data_applied = True
        if self.sector_performance is not None or self.industry_performance is not None:
            metadata_obj.market_context_applied = True

        # Handle Data Splitting (Post-Processing)
        # Debug: Check for columns with all NaN values before dropna
        verbose = kwargs.get("verbose", False)
        if verbose:
            null_counts = processed_data.isnull().sum()
            all_null_cols = null_counts[null_counts == len(processed_data)]
            if not all_null_cols.empty:
                console.print(f"[yellow]⚠️  Warning: Columns with all NaN values: {list(all_null_cols.index)}[/yellow]")

            console.print(f"[cyan]Before dropna: {len(processed_data)} rows[/cyan]")
            console.print(f"[cyan]Columns in DataFrame: {list(processed_data.columns)}[/cyan]")

        # Drop rows with any NaN values
        # This handles:
        # 1. Indicator warm-up periods (e.g., SMA(200) creates 200 leading NaNs)
        # 2. Missing price data
        # 3. Any other features that couldn't be computed/filled
        initial_len = len(processed_data)
        processed_data = processed_data.dropna().reset_index(drop=True)
        dropped_count = initial_len - len(processed_data)

        if verbose:
            if dropped_count > 0:
                console.print(f"[yellow]Dropped {dropped_count} rows containing NaNs (indicator warm-up, etc)[/yellow]")
            else:
                console.print("[green]No rows dropped (data is clean)[/green]")

        if verbose:
            console.print(f"[cyan]After dropna: {len(processed_data)} rows[/cyan]")

        if split_config:
            split_data, split_metadata = self._split_data(processed_data, split_config)

            # Merge split metadata into pipeline metadata
            metadata_obj.date_ranges = split_metadata["date_ranges"]
            metadata_obj.final_shapes = split_metadata["final_shapes"]

            # Drop date column after splitting if it wasn't supposed to be kept
            for key in split_data:
                # Re-run cleanup to drop date columns now that splitting is done
                # unless user explicitly asked to keep them via columns_to_drop logic?
                # For safety, we replicate old behavior: drop defaults
                split_data[key] = self.drop_unwanted_columns(
                    split_data[key], [config.DEFAULT_DATE_COLUMN, "Timestamp", "Symbol"]
                )

            return split_data, metadata_obj.to_dict()
        else:
            # Handle metadata for non-split data (legacy logic port)
            date_column = next((col for col in config.DATE_COLUMNS if col in processed_data.columns), None)
            if date_column:
                dates = pd.to_datetime(processed_data[date_column])
                metadata_obj.date_ranges["full_data"] = {
                    "start": dates.min().strftime("%Y-%m-%d"),
                    "end": dates.max().strftime("%Y-%m-%d"),
                }
            metadata_obj.final_shapes["full_data"] = processed_data.shape

            # If we didn't split, we might still need to drop the date column if it was kept
            if not keep_date:
                pass

            return processed_data, metadata_obj.to_dict()

    def _split_data(self, df: pd.DataFrame, split_config: Dict) -> Tuple[Dict[str, pd.DataFrame], Dict]:
        """
        Split the data into respective sets according to the config.

        This method now delegates to the new splitter classes (RatioSplitter or DateRangeSplitter)
        while maintaining backward compatibility with the existing API.

        Args:
            df (pd.DataFrame): input dataframe
            split_config (Dict): split config in dictionary format
                Example by ratio: {'train': 0.7, 'test': 0.3}
                Example by dates: {'train': ('2020-01-01', '2021-12-31'), 'test': ('2022-01-01', '2022-12-31')}

        Raises:
            ValueError: If date column not found for splitting or invalid config

        Returns:
            Tuple[Dict[str, pd.DataFrame], Dict]: datasets in dict and metadata
        """
        # Determine split type based on config values
        is_date_based = any(isinstance(v, (tuple, list)) for v in split_config.values())

        if is_date_based:
            # Use DateRangeSplitter
            splitter = DateRangeSplitter(split_config)
        else:
            # Use RatioSplitter
            splitter = RatioSplitter(split_config)

        # Perform split
        split_data = splitter.split(df)

        # Get metadata from splitter
        metadata = splitter.get_metadata()

        return split_data, metadata

load_indicators(file_path) staticmethod

Load indicator configuration from a YAML or JSON file.

Parameters:

Name Type Description Default
file_path str

Path to the configuration file (.yaml, .yml, or .json)

required

Returns:

Type Description
List[Union[str, Dict]]

List[Union[str, Dict]]: List of indicator configurations

Raises:

Type Description
FileNotFoundError

If the file does not exist

ValueError

If the file format is unsupported or invalid

Source code in src/quantrl_lab/data/processing/processor.py
@staticmethod
def load_indicators(file_path: str) -> List[Union[str, Dict]]:
    """
    Load indicator configuration from a YAML or JSON file.

    Args:
        file_path: Path to the configuration file (.yaml, .yml, or .json)

    Returns:
        List[Union[str, Dict]]: List of indicator configurations

    Raises:
        FileNotFoundError: If the file does not exist
        ValueError: If the file format is unsupported or invalid
    """
    if not os.path.exists(file_path):
        raise FileNotFoundError(f"Configuration file not found: {file_path}")

    ext = os.path.splitext(file_path)[1].lower()

    try:
        with open(file_path, "r") as f:
            if ext in [".yaml", ".yml"]:
                config_data = yaml.safe_load(f)
            elif ext == ".json":
                config_data = json.load(f)
            else:
                raise ValueError(f"Unsupported configuration format: {ext}. Use .yaml or .json")

        # Validate structure - expect a list or a dict with an 'indicators' key
        if isinstance(config_data, list):
            return config_data
        elif isinstance(config_data, dict) and "indicators" in config_data:
            return config_data["indicators"]
        else:
            raise ValueError("Invalid config structure. Expected a list or a dict with 'indicators' key.")

    except Exception as e:
        raise ValueError(f"Failed to load indicator config from {file_path}: {e}")

append_technical_indicators(df, indicators=None, **kwargs)

Add technical indicators to existing OHLCV DataFrame.

Parameters:

Name Type Description Default
df DataFrame

raw OHLCV data

required
indicators Optional[List[Union[str, Dict]]]

Defaults to None.

None

Raises:

Type Description
ValueError

if input DataFrame is empty

ValueError

if required columns are missing

Returns:

Type Description
DataFrame

pd.DataFrame: DataFrame with added technical indicators

Source code in src/quantrl_lab/data/processing/processor.py
def append_technical_indicators(
    self,
    df: pd.DataFrame,
    indicators: Optional[List[Union[str, Dict]]] = None,
    **kwargs,
) -> pd.DataFrame:
    """
    Add technical indicators to existing OHLCV DataFrame.

    Args:
        df (pd.DataFrame): raw OHLCV data
        indicators (Optional[List[Union[str, Dict]]], optional): Defaults to None.

    Raises:
        ValueError: if input DataFrame is empty
        ValueError: if required columns are missing

    Returns:
        pd.DataFrame: DataFrame with added technical indicators
    """
    # Return original if no indicators specified
    if not indicators:
        return df.copy()

    try:
        generator = TechnicalFeatureGenerator(indicators)
        return generator.generate(df, **kwargs)
    except ValueError as e:
        # Re-raise with same message or log
        raise e
    except Exception as e:
        console.print(f"[red]❌ Failed to append technical indicators: {e}[/red]")
        return df.copy()

append_news_sentiment_data(df, fillna_strategy='neutral')

Append news sentiment data to the OHLCV DataFrame.

Parameters:

Name Type Description Default
df DataFrame

Input OHLCV DataFrame.

required
fillna_strategy str

Strategy for handling missing sentiment scores. Defaults to "neutral".

'neutral'

Raises:

Type Description
ValueError

If the input DataFrame is empty or if the strategy is unsupported.

Returns:

Type Description
DataFrame

pd.DataFrame: DataFrame with appended news sentiment data.

Source code in src/quantrl_lab/data/processing/processor.py
def append_news_sentiment_data(self, df: pd.DataFrame, fillna_strategy="neutral") -> pd.DataFrame:
    """
    Append news sentiment data to the OHLCV DataFrame.

    Args:
        df (pd.DataFrame): Input OHLCV DataFrame.
        fillna_strategy (str, optional): Strategy for handling missing sentiment scores. Defaults to "neutral".

    Raises:
        ValueError: If the input DataFrame is empty or if the strategy is unsupported.

    Returns:
        pd.DataFrame: DataFrame with appended news sentiment data.
    """
    if self.news_data is None or self.news_data.empty:
        console.print("[yellow]⚠️  No news data provided. Skipping sentiment analysis.[/yellow]")
        return df

    try:
        generator = SentimentFeatureGenerator(
            self.sentiment_provider, self.sentiment_config, self.news_data, fillna_strategy
        )
        return generator.generate(df)
    except ValueError as e:
        raise e
    except Exception as e:
        console.print(f"[red]❌ Failed to append sentiment data: {e}[/red]")
        return df

drop_unwanted_columns(df, columns_to_drop=None, keep_date=False)

Drop unwanted columns from the DataFrame.

Parameters:

Name Type Description Default
df DataFrame

Input DataFrame.

required
columns_to_drop Optional[List[str]]

List of column names to drop. If None, will drop default columns ('Date', 'Timestamp', 'Symbol'). Defaults to None.

None
keep_date bool

If True, date-related columns will not be dropped.

False

Returns: pd.DataFrame: DataFrame with specified columns dropped.

Source code in src/quantrl_lab/data/processing/processor.py
def drop_unwanted_columns(
    self, df: pd.DataFrame, columns_to_drop: Optional[List[str]] = None, keep_date: bool = False
) -> pd.DataFrame:
    """
    Drop unwanted columns from the DataFrame.

    Args:
        df (pd.DataFrame): Input DataFrame.
        columns_to_drop (Optional[List[str]], optional): List of column names to drop.
            If None, will drop default columns ('Date', 'Timestamp', 'Symbol'). Defaults to None.
        keep_date (bool): If True, date-related columns will not be dropped.
    Returns:
        pd.DataFrame: DataFrame with specified columns dropped.
    """
    if columns_to_drop is None:
        columns_to_drop = [config.DEFAULT_DATE_COLUMN, "Timestamp", "Symbol"]
    elif not isinstance(columns_to_drop, list):
        raise ValueError(
            f"Invalid type for 'columns_to_drop': expected list, got {type(columns_to_drop).__name__}."
        )

    if keep_date:
        columns_to_drop = [col for col in columns_to_drop if col not in config.DATE_COLUMNS]

    return df.drop(columns=columns_to_drop, errors="ignore")

convert_columns_to_numeric(df, columns=None)

Convert specified columns to numeric, handling date columns carefully.

Parameters:

Name Type Description Default
df DataFrame

Input DataFrame

required
columns Optional[List[str]]

Specific columns to convert. If None, converts all object columns.

None

Returns:

Type Description
DataFrame

pd.DataFrame: DataFrame with numeric conversions applied

Source code in src/quantrl_lab/data/processing/processor.py
def convert_columns_to_numeric(self, df: pd.DataFrame, columns: Optional[List[str]] = None) -> pd.DataFrame:
    """
    Convert specified columns to numeric, handling date columns
    carefully.

    Args:
        df (pd.DataFrame): Input DataFrame
        columns (Optional[List[str]]): Specific columns to convert. If None, converts all object columns.

    Returns:
        pd.DataFrame: DataFrame with numeric conversions applied
    """
    if columns is None:
        # Only convert object columns that are not date-like
        columns = []
        for col in df.columns:
            if df[col].dtype == "object":
                # Skip columns that look like dates
                if col in config.DATE_COLUMNS or col.lower() in [c.lower() for c in config.DATE_COLUMNS]:
                    continue
                # Check if it's actually a date column by looking at the data
                sample_val = df[col].dropna().iloc[0] if not df[col].dropna().empty else None
                if sample_val is not None:
                    try:
                        pd.to_datetime(sample_val)
                        # If conversion succeeds, it's probably a date column - skip it
                        continue
                    except (ValueError, TypeError):
                        # Not a date, safe to convert to numeric
                        columns.append(col)
    elif not isinstance(columns, list):
        raise ValueError(f"Invalid type for 'columns': expected list, got {type(columns).__name__}.")

    for col in columns:
        if col in df.columns and df[col].dtype == "object":
            df[col] = pd.to_numeric(df[col], errors="coerce")

    return df

data_processing_pipeline(indicators=None, alpha_selection_config=None, fillna_strategy='neutral', split_config=None, **kwargs)

Main data processing pipeline.

Applies technical indicators, sentiment analysis, and data transformations

This method uses the DataPipeline infrastructure internally.

Parameters:

Name Type Description Default
indicators Optional[List[Union[str, Dict]]]

List of indicators to apply. Supports: - String format: ["SMA", "RSI"] - Dict format: [{"SMA": {"window": 20}}, {"RSI": {"window": 14}}] Defaults to None (no indicators).

None
alpha_selection_config Optional[Dict]

Configuration for dynamic alpha selection. If provided, the pipeline will automatically select and apply the best indicators. Keys: "metric" (default "ic"), "threshold", "top_k", "candidates".

None
fillna_strategy str

Strategy for handling missing sentiment scores. Options: "neutral" (fill with 0.0) or "fill_forward" (forward fill). Defaults to "neutral".

'neutral'
split_config Optional[Dict]

Configuration for data splitting. If None, returns a single DataFrame. Otherwise, returns dict of DataFrames. Ratio-based: {'train': 0.7, 'test': 0.3} Date-based: {'train': ('2020-01-01', '2021-12-31'), 'test': ('2022-01-01', '2022-12-31')}

None
**kwargs Any

Additional arguments: - columns_to_drop: List of columns to drop (overrides default) - columns_to_convert: List of columns to convert to numeric

{}

Returns:

Type Description
Tuple[Union[DataFrame, Dict[str, DataFrame]], Dict]

Tuple[Union[pd.DataFrame, Dict[str, pd.DataFrame]], Dict]: A tuple containing: - Processed DataFrame if split_config is None - Dictionary of DataFrames if split_config is provided (keys: split names) - Metadata dictionary with processing information

Source code in src/quantrl_lab/data/processing/processor.py
def data_processing_pipeline(
    self,
    indicators: Optional[List[Union[str, Dict]]] = None,
    alpha_selection_config: Optional[Dict[str, Any]] = None,
    fillna_strategy: str = "neutral",
    split_config: Optional[Dict] = None,
    **kwargs: Any,
) -> Tuple[Union[pd.DataFrame, Dict[str, pd.DataFrame]], Dict]:
    """
    Main data processing pipeline.

    Applies technical indicators, sentiment analysis, and data transformations

    This method uses the DataPipeline infrastructure internally.

    Args:
        indicators (Optional[List[Union[str, Dict]]], optional):
            List of indicators to apply. Supports:
            - String format: ["SMA", "RSI"]
            - Dict format: [{"SMA": {"window": 20}}, {"RSI": {"window": 14}}]
            Defaults to None (no indicators).
        alpha_selection_config (Optional[Dict], optional):
            Configuration for dynamic alpha selection. If provided, the pipeline
            will automatically select and apply the best indicators.
            Keys: "metric" (default "ic"), "threshold", "top_k", "candidates".
        fillna_strategy (str, optional): Strategy for handling missing sentiment scores.
            Options: "neutral" (fill with 0.0) or "fill_forward" (forward fill).
            Defaults to "neutral".
        split_config (Optional[Dict], optional): Configuration for data splitting.
            If None, returns a single DataFrame. Otherwise, returns dict of DataFrames.
            Ratio-based: {'train': 0.7, 'test': 0.3}
            Date-based: {'train': ('2020-01-01', '2021-12-31'), 'test': ('2022-01-01', '2022-12-31')}
        **kwargs: Additional arguments:
            - columns_to_drop: List of columns to drop (overrides default)
            - columns_to_convert: List of columns to convert to numeric

    Returns:
        Tuple[Union[pd.DataFrame, Dict[str, pd.DataFrame]], Dict]: A tuple containing:
            - Processed DataFrame if split_config is None
            - Dictionary of DataFrames if split_config is provided (keys: split names)
            - Metadata dictionary with processing information
    """
    from quantrl_lab.data.processing.pipeline import DataPipeline
    from quantrl_lab.data.processing.steps import (
        AnalystEstimatesStep,
        ColumnCleanupStep,
        MarketContextStep,
        NumericConversionStep,
        SentimentEnrichmentStep,
        TechnicalIndicatorStep,
    )

    # Build pipeline
    pipeline = DataPipeline()

    # 1. Technical Indicators (Manual)
    pipeline.add_step(TechnicalIndicatorStep(indicators=indicators))

    # 2. Analyst Estimates
    if self.analyst_grades is not None or self.analyst_ratings is not None:
        pipeline.add_step(AnalystEstimatesStep(grades_df=self.analyst_grades, ratings_df=self.analyst_ratings))

    # 3. Market Context
    if self.sector_performance is not None or self.industry_performance is not None:
        pipeline.add_step(
            MarketContextStep(sector_perf_df=self.sector_performance, industry_perf_df=self.industry_performance)
        )

    # 4. Sentiment Enrichment (only if news data available)
    if self.news_data is not None:
        pipeline.add_step(
            SentimentEnrichmentStep(
                news_data=self.news_data,
                provider=self.sentiment_provider,
                config=self.sentiment_config,
                fillna_strategy=fillna_strategy,
            )
        )

    # 5. Numeric Conversion
    # Convert specified columns to numeric
    columns_to_convert = kwargs.get("columns_to_convert", None)
    pipeline.add_step(NumericConversionStep(columns=columns_to_convert))

    # 6. Column Cleanup
    # If columns_to_drop is passed, use it; otherwise rely on defaults in step
    # Note: We keep date columns if splitting is required later
    columns_to_drop = kwargs.get("columns_to_drop", None)
    # If splitting, we MUST keep date columns for the split operation
    # If not splitting, the pipeline step handles default date dropping unless overridden
    keep_date = split_config is not None

    # Configure Cleanup Step
    cleanup_step = ColumnCleanupStep(columns_to_drop=columns_to_drop, keep_date=keep_date)
    pipeline.add_step(cleanup_step)

    # Execute Pipeline
    # We pass symbol for metadata tracking if available
    symbol = None
    if "Symbol" in self.ohlcv_data.columns:
        unique_symbols = self.ohlcv_data["Symbol"].unique()
        symbol = unique_symbols[0] if len(unique_symbols) == 1 else None

    processed_data, metadata_obj = pipeline.execute(self.ohlcv_data, symbol=symbol)

    # Update metadata flags
    if self.analyst_grades is not None or self.analyst_ratings is not None:
        metadata_obj.analyst_data_applied = True
    if self.sector_performance is not None or self.industry_performance is not None:
        metadata_obj.market_context_applied = True

    # Handle Data Splitting (Post-Processing)
    # Debug: Check for columns with all NaN values before dropna
    verbose = kwargs.get("verbose", False)
    if verbose:
        null_counts = processed_data.isnull().sum()
        all_null_cols = null_counts[null_counts == len(processed_data)]
        if not all_null_cols.empty:
            console.print(f"[yellow]⚠️  Warning: Columns with all NaN values: {list(all_null_cols.index)}[/yellow]")

        console.print(f"[cyan]Before dropna: {len(processed_data)} rows[/cyan]")
        console.print(f"[cyan]Columns in DataFrame: {list(processed_data.columns)}[/cyan]")

    # Drop rows with any NaN values
    # This handles:
    # 1. Indicator warm-up periods (e.g., SMA(200) creates 200 leading NaNs)
    # 2. Missing price data
    # 3. Any other features that couldn't be computed/filled
    initial_len = len(processed_data)
    processed_data = processed_data.dropna().reset_index(drop=True)
    dropped_count = initial_len - len(processed_data)

    if verbose:
        if dropped_count > 0:
            console.print(f"[yellow]Dropped {dropped_count} rows containing NaNs (indicator warm-up, etc)[/yellow]")
        else:
            console.print("[green]No rows dropped (data is clean)[/green]")

    if verbose:
        console.print(f"[cyan]After dropna: {len(processed_data)} rows[/cyan]")

    if split_config:
        split_data, split_metadata = self._split_data(processed_data, split_config)

        # Merge split metadata into pipeline metadata
        metadata_obj.date_ranges = split_metadata["date_ranges"]
        metadata_obj.final_shapes = split_metadata["final_shapes"]

        # Drop date column after splitting if it wasn't supposed to be kept
        for key in split_data:
            # Re-run cleanup to drop date columns now that splitting is done
            # unless user explicitly asked to keep them via columns_to_drop logic?
            # For safety, we replicate old behavior: drop defaults
            split_data[key] = self.drop_unwanted_columns(
                split_data[key], [config.DEFAULT_DATE_COLUMN, "Timestamp", "Symbol"]
            )

        return split_data, metadata_obj.to_dict()
    else:
        # Handle metadata for non-split data (legacy logic port)
        date_column = next((col for col in config.DATE_COLUMNS if col in processed_data.columns), None)
        if date_column:
            dates = pd.to_datetime(processed_data[date_column])
            metadata_obj.date_ranges["full_data"] = {
                "start": dates.min().strftime("%Y-%m-%d"),
                "end": dates.max().strftime("%Y-%m-%d"),
            }
        metadata_obj.final_shapes["full_data"] = processed_data.shape

        # If we didn't split, we might still need to drop the date column if it was kept
        if not keep_date:
            pass

        return processed_data, metadata_obj.to_dict()

Data Pipeline

The DataPipeline allows for flexible, composable data transformations.

DataPipeline

Composable data processing pipeline.

DataPipeline allows chaining multiple processing steps together using a builder pattern. Each step transforms the DataFrame and can update the processing metadata.

Example

pipeline = (DataPipeline() ... .add_step(TechnicalIndicatorStep(indicators=["SMA", "RSI"])) ... .add_step(SentimentEnrichmentStep(news_data=news_df)) ... .add_step(ColumnCleanupStep(columns_to_drop=["Date"])))

result_df, metadata = pipeline.execute(raw_df) print(metadata.technical_indicators) # ["SMA", "RSI"]

Source code in src/quantrl_lab/data/processing/pipeline.py
class DataPipeline:
    """
    Composable data processing pipeline.

    DataPipeline allows chaining multiple processing steps together using
    a builder pattern. Each step transforms the DataFrame and can update
    the processing metadata.

    Example:
        >>> pipeline = (DataPipeline()
        ...     .add_step(TechnicalIndicatorStep(indicators=["SMA", "RSI"]))
        ...     .add_step(SentimentEnrichmentStep(news_data=news_df))
        ...     .add_step(ColumnCleanupStep(columns_to_drop=["Date"])))
        >>>
        >>> result_df, metadata = pipeline.execute(raw_df)
        >>> print(metadata.technical_indicators)  # ["SMA", "RSI"]
    """

    def __init__(self):
        """Initialize empty pipeline."""
        self._steps: List[ProcessingStep] = []

    def add_step(self, step: ProcessingStep) -> "DataPipeline":
        """
        Add a processing step to the pipeline.

        Args:
            step: ProcessingStep instance to add

        Returns:
            Self for method chaining (builder pattern)

        Example:
            >>> pipeline = DataPipeline()
            >>> pipeline.add_step(TechnicalIndicatorStep(["SMA"]))
            >>> pipeline.add_step(ColumnCleanupStep())
        """
        self._steps.append(step)
        return self

    def execute(self, df: pd.DataFrame, symbol: str = None) -> Tuple[pd.DataFrame, ProcessingMetadata]:
        """
        Execute all steps in the pipeline.

        Processes the DataFrame through each step sequentially, maintaining
        metadata throughout the pipeline.

        Args:
            df: Input DataFrame to process
            symbol: Optional symbol name for metadata tracking

        Returns:
            Tuple of (processed DataFrame, processing metadata)

        Raises:
            ValueError: If any step raises a validation error

        Example:
            >>> pipeline = DataPipeline().add_step(TechnicalIndicatorStep(["SMA"]))
            >>> result_df, metadata = pipeline.execute(raw_df)
            >>> assert "SMA_20" in result_df.columns
        """
        # Initialize metadata
        metadata = ProcessingMetadata(
            symbol=symbol,
            original_shape=df.shape,
        )

        # Execute steps sequentially
        result = df.copy()
        for i, step in enumerate(self._steps):
            step_name = step.get_step_name()
            logger.debug(f"Executing step {i+1}/{len(self._steps)}: {step_name}")

            try:
                result = step.process(result, metadata)
            except Exception as e:
                logger.error(f"Step '{step_name}' failed: {e}")
                raise

        # Finalize metadata
        metadata.final_shapes = {"processed": result.shape}

        return result, metadata

    def get_steps(self) -> List[ProcessingStep]:
        """
        Get list of all steps in the pipeline.

        Returns:
            List of ProcessingStep instances
        """
        return self._steps.copy()

    def __len__(self) -> int:
        """Return number of steps in pipeline."""
        return len(self._steps)

    def __repr__(self) -> str:
        """Return string representation of pipeline."""
        step_names = [step.get_step_name() for step in self._steps]
        return f"DataPipeline({len(self._steps)} steps: {step_names})"

__init__()

Initialize empty pipeline.

Source code in src/quantrl_lab/data/processing/pipeline.py
def __init__(self):
    """Initialize empty pipeline."""
    self._steps: List[ProcessingStep] = []

add_step(step)

Add a processing step to the pipeline.

Parameters:

Name Type Description Default
step ProcessingStep

ProcessingStep instance to add

required

Returns:

Type Description
DataPipeline

Self for method chaining (builder pattern)

Example

pipeline = DataPipeline() pipeline.add_step(TechnicalIndicatorStep(["SMA"])) pipeline.add_step(ColumnCleanupStep())

Source code in src/quantrl_lab/data/processing/pipeline.py
def add_step(self, step: ProcessingStep) -> "DataPipeline":
    """
    Add a processing step to the pipeline.

    Args:
        step: ProcessingStep instance to add

    Returns:
        Self for method chaining (builder pattern)

    Example:
        >>> pipeline = DataPipeline()
        >>> pipeline.add_step(TechnicalIndicatorStep(["SMA"]))
        >>> pipeline.add_step(ColumnCleanupStep())
    """
    self._steps.append(step)
    return self

execute(df, symbol=None)

Execute all steps in the pipeline.

Processes the DataFrame through each step sequentially, maintaining metadata throughout the pipeline.

Parameters:

Name Type Description Default
df DataFrame

Input DataFrame to process

required
symbol str

Optional symbol name for metadata tracking

None

Returns:

Type Description
Tuple[DataFrame, ProcessingMetadata]

Tuple of (processed DataFrame, processing metadata)

Raises:

Type Description
ValueError

If any step raises a validation error

Example

pipeline = DataPipeline().add_step(TechnicalIndicatorStep(["SMA"])) result_df, metadata = pipeline.execute(raw_df) assert "SMA_20" in result_df.columns

Source code in src/quantrl_lab/data/processing/pipeline.py
def execute(self, df: pd.DataFrame, symbol: str = None) -> Tuple[pd.DataFrame, ProcessingMetadata]:
    """
    Execute all steps in the pipeline.

    Processes the DataFrame through each step sequentially, maintaining
    metadata throughout the pipeline.

    Args:
        df: Input DataFrame to process
        symbol: Optional symbol name for metadata tracking

    Returns:
        Tuple of (processed DataFrame, processing metadata)

    Raises:
        ValueError: If any step raises a validation error

    Example:
        >>> pipeline = DataPipeline().add_step(TechnicalIndicatorStep(["SMA"]))
        >>> result_df, metadata = pipeline.execute(raw_df)
        >>> assert "SMA_20" in result_df.columns
    """
    # Initialize metadata
    metadata = ProcessingMetadata(
        symbol=symbol,
        original_shape=df.shape,
    )

    # Execute steps sequentially
    result = df.copy()
    for i, step in enumerate(self._steps):
        step_name = step.get_step_name()
        logger.debug(f"Executing step {i+1}/{len(self._steps)}: {step_name}")

        try:
            result = step.process(result, metadata)
        except Exception as e:
            logger.error(f"Step '{step_name}' failed: {e}")
            raise

    # Finalize metadata
    metadata.final_shapes = {"processed": result.shape}

    return result, metadata

get_steps()

Get list of all steps in the pipeline.

Returns:

Type Description
List[ProcessingStep]

List of ProcessingStep instances

Source code in src/quantrl_lab/data/processing/pipeline.py
def get_steps(self) -> List[ProcessingStep]:
    """
    Get list of all steps in the pipeline.

    Returns:
        List of ProcessingStep instances
    """
    return self._steps.copy()

__len__()

Return number of steps in pipeline.

Source code in src/quantrl_lab/data/processing/pipeline.py
def __len__(self) -> int:
    """Return number of steps in pipeline."""
    return len(self._steps)

__repr__()

Return string representation of pipeline.

Source code in src/quantrl_lab/data/processing/pipeline.py
def __repr__(self) -> str:
    """Return string representation of pipeline."""
    step_names = [step.get_step_name() for step in self._steps]
    return f"DataPipeline({len(self._steps)} steps: {step_names})"

Processing Steps

Processing steps encapsulate individual transformations. They can be chained together in a pipeline.

Base Interface

ProcessingStep

Bases: Protocol

Protocol for pipeline processing steps.

Steps are composable transformations that take a DataFrame and metadata, apply a transformation, and return the modified DataFrame. Steps should update the metadata to track what transformations were applied.

Example

step = TechnicalIndicatorStep(indicators=["SMA", "RSI"]) result_df = step.process(input_df, metadata) assert "SMA_20" in result_df.columns

Source code in src/quantrl_lab/data/processing/steps/base.py
@runtime_checkable
class ProcessingStep(Protocol):
    """
    Protocol for pipeline processing steps.

    Steps are composable transformations that take a DataFrame and metadata,
    apply a transformation, and return the modified DataFrame. Steps should
    update the metadata to track what transformations were applied.

    Example:
        >>> step = TechnicalIndicatorStep(indicators=["SMA", "RSI"])
        >>> result_df = step.process(input_df, metadata)
        >>> assert "SMA_20" in result_df.columns
    """

    def process(self, data: pd.DataFrame, metadata: ProcessingMetadata) -> pd.DataFrame:
        """
        Apply transformation to DataFrame.

        Args:
            data: Input DataFrame to process
            metadata: Processing metadata to track transformations

        Returns:
            Transformed DataFrame (may be modified in-place or copied)

        Raises:
            ValueError: If data is invalid or missing required columns
        """
        ...

    def get_step_name(self) -> str:
        """
        Return human-readable name of this processing step.

        Returns:
            Step name (e.g., "Technical Indicators", "Sentiment Enrichment")
        """
        ...

process(data, metadata)

Apply transformation to DataFrame.

Parameters:

Name Type Description Default
data DataFrame

Input DataFrame to process

required
metadata ProcessingMetadata

Processing metadata to track transformations

required

Returns:

Type Description
DataFrame

Transformed DataFrame (may be modified in-place or copied)

Raises:

Type Description
ValueError

If data is invalid or missing required columns

Source code in src/quantrl_lab/data/processing/steps/base.py
def process(self, data: pd.DataFrame, metadata: ProcessingMetadata) -> pd.DataFrame:
    """
    Apply transformation to DataFrame.

    Args:
        data: Input DataFrame to process
        metadata: Processing metadata to track transformations

    Returns:
        Transformed DataFrame (may be modified in-place or copied)

    Raises:
        ValueError: If data is invalid or missing required columns
    """
    ...

get_step_name()

Return human-readable name of this processing step.

Returns:

Type Description
str

Step name (e.g., "Technical Indicators", "Sentiment Enrichment")

Source code in src/quantrl_lab/data/processing/steps/base.py
def get_step_name(self) -> str:
    """
    Return human-readable name of this processing step.

    Returns:
        Step name (e.g., "Technical Indicators", "Sentiment Enrichment")
    """
    ...

Available Steps

TechnicalIndicatorStep

Apply technical indicators to DataFrame.

This step wraps TechnicalFeatureGenerator to add technical indicators as new columns. Indicators can be specified as strings (use defaults) or dicts (with custom parameters).

Example

step = TechnicalIndicatorStep(indicators=["SMA", {"RSI": {"window": 14}}]) result = step.process(df, metadata)

Source code in src/quantrl_lab/data/processing/steps/features/technical.py
class TechnicalIndicatorStep:
    """
    Apply technical indicators to DataFrame.

    This step wraps TechnicalFeatureGenerator to add technical indicators
    as new columns. Indicators can be specified as strings (use defaults)
    or dicts (with custom parameters).

    Example:
        >>> step = TechnicalIndicatorStep(indicators=["SMA", {"RSI": {"window": 14}}])
        >>> result = step.process(df, metadata)
    """

    def __init__(self, indicators: Optional[List[Union[str, Dict]]] = None):
        """
        Initialize technical indicator step.

        Args:
            indicators: List of indicators to apply. Can be strings ("SMA")
                or dicts ({"SMA": {"window": 20}}).
        """
        self.indicators = indicators or []

    def process(self, data: pd.DataFrame, metadata: ProcessingMetadata) -> pd.DataFrame:
        """
        Apply technical indicators to DataFrame.

        Args:
            data: Input DataFrame with OHLCV data
            metadata: Processing metadata (updated with applied indicators)

        Returns:
            DataFrame with technical indicator columns added

        Raises:
            ValueError: If required columns are missing
        """
        if not self.indicators:
            return data.copy()

        try:
            generator = TechnicalFeatureGenerator(self.indicators)
            result = generator.generate(data)

            # Update metadata
            metadata.technical_indicators = self.indicators

            return result
        except ValueError as e:
            raise e
        except Exception as e:
            console.print(f"[red]❌ Failed to apply technical indicators: {e}[/red]")
            return data.copy()

    def get_step_name(self) -> str:
        """Return step name."""
        return "Technical Indicators"

__init__(indicators=None)

Initialize technical indicator step.

Parameters:

Name Type Description Default
indicators Optional[List[Union[str, Dict]]]

List of indicators to apply. Can be strings ("SMA") or dicts ({"SMA": {"window": 20}}).

None
Source code in src/quantrl_lab/data/processing/steps/features/technical.py
def __init__(self, indicators: Optional[List[Union[str, Dict]]] = None):
    """
    Initialize technical indicator step.

    Args:
        indicators: List of indicators to apply. Can be strings ("SMA")
            or dicts ({"SMA": {"window": 20}}).
    """
    self.indicators = indicators or []

process(data, metadata)

Apply technical indicators to DataFrame.

Parameters:

Name Type Description Default
data DataFrame

Input DataFrame with OHLCV data

required
metadata ProcessingMetadata

Processing metadata (updated with applied indicators)

required

Returns:

Type Description
DataFrame

DataFrame with technical indicator columns added

Raises:

Type Description
ValueError

If required columns are missing

Source code in src/quantrl_lab/data/processing/steps/features/technical.py
def process(self, data: pd.DataFrame, metadata: ProcessingMetadata) -> pd.DataFrame:
    """
    Apply technical indicators to DataFrame.

    Args:
        data: Input DataFrame with OHLCV data
        metadata: Processing metadata (updated with applied indicators)

    Returns:
        DataFrame with technical indicator columns added

    Raises:
        ValueError: If required columns are missing
    """
    if not self.indicators:
        return data.copy()

    try:
        generator = TechnicalFeatureGenerator(self.indicators)
        result = generator.generate(data)

        # Update metadata
        metadata.technical_indicators = self.indicators

        return result
    except ValueError as e:
        raise e
    except Exception as e:
        console.print(f"[red]❌ Failed to apply technical indicators: {e}[/red]")
        return data.copy()

get_step_name()

Return step name.

Source code in src/quantrl_lab/data/processing/steps/features/technical.py
def get_step_name(self) -> str:
    """Return step name."""
    return "Technical Indicators"

SentimentEnrichmentStep

Add news sentiment scores to DataFrame.

This step enriches OHLCV data with sentiment scores computed from news data. Requires news_data to be provided.

Example

step = SentimentEnrichmentStep( ... news_data=news_df, ... provider=HuggingFaceProvider(), ... fillna_strategy="neutral" ... ) result = step.process(df, metadata)

Source code in src/quantrl_lab/data/processing/steps/alternative/sentiment.py
class SentimentEnrichmentStep:
    """
    Add news sentiment scores to DataFrame.

    This step enriches OHLCV data with sentiment scores computed from
    news data. Requires news_data to be provided.

    Example:
        >>> step = SentimentEnrichmentStep(
        ...     news_data=news_df,
        ...     provider=HuggingFaceProvider(),
        ...     fillna_strategy="neutral"
        ... )
        >>> result = step.process(df, metadata)
    """

    def __init__(
        self,
        news_data: pd.DataFrame,
        provider: SentimentProvider = None,
        config: SentimentConfig = None,
        fillna_strategy: str = "neutral",
    ):
        """
        Initialize sentiment enrichment step.

        Args:
            news_data: DataFrame with news articles
            provider: Sentiment analysis provider (default: HuggingFaceProvider)
            config: Sentiment configuration
            fillna_strategy: Strategy for filling missing scores ("neutral" or "fill_forward")
        """
        self.news_data = news_data
        self.provider = provider
        self.config = config or SentimentConfig()
        self.fillna_strategy = fillna_strategy

    def process(self, data: pd.DataFrame, metadata: ProcessingMetadata) -> pd.DataFrame:
        """
        Add sentiment scores to DataFrame.

        Args:
            data: Input OHLCV DataFrame
            metadata: Processing metadata (updated with sentiment flag)

        Returns:
            DataFrame with sentiment scores added

        Raises:
            ValueError: If news_data is empty or invalid
        """
        if self.news_data is None or self.news_data.empty:
            console.print("[yellow]⚠️  No news data provided. Skipping sentiment analysis.[/yellow]")
            return data

        try:
            # SentimentFeatureGenerator expects 'Date' as a column.
            # If Date is the index, temporarily move it to a column.
            restored_index = False
            df = data
            if "Date" not in df.columns and df.index.name == "Date":
                df = df.reset_index()
                restored_index = True

            generator = SentimentFeatureGenerator(
                self.provider,
                self.config,
                self.news_data,
                self.fillna_strategy,
            )
            result = generator.generate(df)

            if restored_index and "Date" in result.columns:
                result = result.set_index("Date")

            # Update metadata
            metadata.news_sentiment_applied = True
            metadata.fillna_strategy = self.fillna_strategy

            return result
        except ValueError as e:
            raise e
        except Exception as e:
            console.print(f"[red]❌ Failed to add sentiment data: {e}[/red]")
            return data

    def get_step_name(self) -> str:
        """Return step name."""
        return "Sentiment Enrichment"

__init__(news_data, provider=None, config=None, fillna_strategy='neutral')

Initialize sentiment enrichment step.

Parameters:

Name Type Description Default
news_data DataFrame

DataFrame with news articles

required
provider SentimentProvider

Sentiment analysis provider (default: HuggingFaceProvider)

None
config SentimentConfig

Sentiment configuration

None
fillna_strategy str

Strategy for filling missing scores ("neutral" or "fill_forward")

'neutral'
Source code in src/quantrl_lab/data/processing/steps/alternative/sentiment.py
def __init__(
    self,
    news_data: pd.DataFrame,
    provider: SentimentProvider = None,
    config: SentimentConfig = None,
    fillna_strategy: str = "neutral",
):
    """
    Initialize sentiment enrichment step.

    Args:
        news_data: DataFrame with news articles
        provider: Sentiment analysis provider (default: HuggingFaceProvider)
        config: Sentiment configuration
        fillna_strategy: Strategy for filling missing scores ("neutral" or "fill_forward")
    """
    self.news_data = news_data
    self.provider = provider
    self.config = config or SentimentConfig()
    self.fillna_strategy = fillna_strategy

process(data, metadata)

Add sentiment scores to DataFrame.

Parameters:

Name Type Description Default
data DataFrame

Input OHLCV DataFrame

required
metadata ProcessingMetadata

Processing metadata (updated with sentiment flag)

required

Returns:

Type Description
DataFrame

DataFrame with sentiment scores added

Raises:

Type Description
ValueError

If news_data is empty or invalid

Source code in src/quantrl_lab/data/processing/steps/alternative/sentiment.py
def process(self, data: pd.DataFrame, metadata: ProcessingMetadata) -> pd.DataFrame:
    """
    Add sentiment scores to DataFrame.

    Args:
        data: Input OHLCV DataFrame
        metadata: Processing metadata (updated with sentiment flag)

    Returns:
        DataFrame with sentiment scores added

    Raises:
        ValueError: If news_data is empty or invalid
    """
    if self.news_data is None or self.news_data.empty:
        console.print("[yellow]⚠️  No news data provided. Skipping sentiment analysis.[/yellow]")
        return data

    try:
        # SentimentFeatureGenerator expects 'Date' as a column.
        # If Date is the index, temporarily move it to a column.
        restored_index = False
        df = data
        if "Date" not in df.columns and df.index.name == "Date":
            df = df.reset_index()
            restored_index = True

        generator = SentimentFeatureGenerator(
            self.provider,
            self.config,
            self.news_data,
            self.fillna_strategy,
        )
        result = generator.generate(df)

        if restored_index and "Date" in result.columns:
            result = result.set_index("Date")

        # Update metadata
        metadata.news_sentiment_applied = True
        metadata.fillna_strategy = self.fillna_strategy

        return result
    except ValueError as e:
        raise e
    except Exception as e:
        console.print(f"[red]❌ Failed to add sentiment data: {e}[/red]")
        return data

get_step_name()

Return step name.

Source code in src/quantrl_lab/data/processing/steps/alternative/sentiment.py
def get_step_name(self) -> str:
    """Return step name."""
    return "Sentiment Enrichment"

AnalystEstimatesStep

Merge analyst grades and ratings into the DataFrame.

This step merges historical analyst data (grades, ratings) onto the main OHLCV DataFrame based on timestamps. Since analyst updates are sparse, values are forward-filled to represent the "current" analyst consensus at each time step.

Attributes:

Name Type Description
grades_df DataFrame

Historical grades data.

ratings_df DataFrame

Historical ratings data.

Source code in src/quantrl_lab/data/processing/steps/alternative/analyst.py
class AnalystEstimatesStep:
    """
    Merge analyst grades and ratings into the DataFrame.

    This step merges historical analyst data (grades, ratings) onto the main
    OHLCV DataFrame based on timestamps. Since analyst updates are sparse,
    values are forward-filled to represent the "current" analyst consensus
    at each time step.

    Attributes:
        grades_df (pd.DataFrame): Historical grades data.
        ratings_df (pd.DataFrame): Historical ratings data.
    """

    def __init__(self, grades_df: pd.DataFrame = None, ratings_df: pd.DataFrame = None):
        self.grades_df = grades_df
        self.ratings_df = ratings_df

    def process(self, data: pd.DataFrame, metadata: ProcessingMetadata) -> pd.DataFrame:
        """
        Merge and forward-fill analyst data.

        Args:
            data: Input OHLCV DataFrame (must have datetime index or 'Date'/'Timestamp' column)
            metadata: Processing metadata

        Returns:
            DataFrame with added analyst features
        """
        if (self.grades_df is None or self.grades_df.empty) and (self.ratings_df is None or self.ratings_df.empty):
            return data

        # Ensure working copy and set index to datetime for merging
        df = data.copy()

        # Identify date column for merging
        date_col = None
        if isinstance(df.index, pd.DatetimeIndex):
            pass  # Index is already good
        elif "Timestamp" in df.columns:
            df["Timestamp"] = pd.to_datetime(df["Timestamp"])
            df.set_index("Timestamp", inplace=True)
            date_col = "Timestamp"
        elif "Date" in df.columns:
            df["Date"] = pd.to_datetime(df["Date"])
            df.set_index("Date", inplace=True)
            date_col = "Date"

        # Normalize index to tz-naive UTC and midnight to prevent join errors
        if isinstance(df.index, pd.DatetimeIndex):
            if df.index.tz is not None:
                df.index = df.index.tz_convert("UTC").tz_localize(None)
            df.index = df.index.normalize()

        # Helper to prepare month key for joining
        # We use to_period('M') to match "2023-01-03" (Data) with "2023-01-01" (Grade)
        df["_join_month"] = df.index.to_period("M")

        # --- Process Grades ---
        if self.grades_df is not None and not self.grades_df.empty:
            grades = self.grades_df.copy()
            if "date" in grades.columns:
                grades["date"] = pd.to_datetime(grades["date"])

                # Create join key
                grades["_join_month"] = grades["date"].dt.to_period("M")

                # Deduplicate: Keep the last rating for the month if multiple exist
                # This prevents row explosion (Cartesian product) if FMP has >1 record/month
                grades = grades.sort_values("date").drop_duplicates(subset=["_join_month"], keep="last")

                # Drop redundant columns
                grades = grades.drop(columns=["symbol", "date"], errors="ignore")

                # Merge on month key
                # We use reset_index() on df to preserve the DatetimeIndex during merge
                # then set it back.
                df_reset = df.reset_index()

                # Merge
                merged = pd.merge(df_reset, grades, on="_join_month", how="left", suffixes=("", "_grade"))

                # Restore index
                if date_col:
                    merged.set_index(date_col, inplace=True)
                else:
                    # Fallback if date_col wasn't explicitly tracked (shouldn't happen given logic above)
                    merged.set_index(df.index.name or "index", inplace=True)

                df = merged

        # --- Process Ratings ---
        if self.ratings_df is not None and not self.ratings_df.empty:
            ratings = self.ratings_df.copy()
            if "date" in ratings.columns:
                ratings["date"] = pd.to_datetime(ratings["date"])

                # Create join key
                ratings["_join_month"] = ratings["date"].dt.to_period("M")

                # Deduplicate
                ratings = ratings.sort_values("date").drop_duplicates(subset=["_join_month"], keep="last")

                # Drop redundant columns
                ratings = ratings.drop(columns=["symbol", "rating", "date"], errors="ignore")

                # Merge
                df_reset = df.reset_index()

                merged = pd.merge(df_reset, ratings, on="_join_month", how="left", suffixes=("", "_rating"))

                # Restore index
                if date_col:
                    merged.set_index(date_col, inplace=True)
                else:
                    merged.set_index(df.index.name or "index", inplace=True)

                df = merged

        # Cleanup join key
        if "_join_month" in df.columns:
            df = df.drop(columns=["_join_month"])

        # Reset index if we changed it
        if date_col:
            df.reset_index(inplace=True)

        return df

    def get_step_name(self) -> str:
        return "Analyst Estimates Enrichment"

process(data, metadata)

Merge and forward-fill analyst data.

Parameters:

Name Type Description Default
data DataFrame

Input OHLCV DataFrame (must have datetime index or 'Date'/'Timestamp' column)

required
metadata ProcessingMetadata

Processing metadata

required

Returns:

Type Description
DataFrame

DataFrame with added analyst features

Source code in src/quantrl_lab/data/processing/steps/alternative/analyst.py
def process(self, data: pd.DataFrame, metadata: ProcessingMetadata) -> pd.DataFrame:
    """
    Merge and forward-fill analyst data.

    Args:
        data: Input OHLCV DataFrame (must have datetime index or 'Date'/'Timestamp' column)
        metadata: Processing metadata

    Returns:
        DataFrame with added analyst features
    """
    if (self.grades_df is None or self.grades_df.empty) and (self.ratings_df is None or self.ratings_df.empty):
        return data

    # Ensure working copy and set index to datetime for merging
    df = data.copy()

    # Identify date column for merging
    date_col = None
    if isinstance(df.index, pd.DatetimeIndex):
        pass  # Index is already good
    elif "Timestamp" in df.columns:
        df["Timestamp"] = pd.to_datetime(df["Timestamp"])
        df.set_index("Timestamp", inplace=True)
        date_col = "Timestamp"
    elif "Date" in df.columns:
        df["Date"] = pd.to_datetime(df["Date"])
        df.set_index("Date", inplace=True)
        date_col = "Date"

    # Normalize index to tz-naive UTC and midnight to prevent join errors
    if isinstance(df.index, pd.DatetimeIndex):
        if df.index.tz is not None:
            df.index = df.index.tz_convert("UTC").tz_localize(None)
        df.index = df.index.normalize()

    # Helper to prepare month key for joining
    # We use to_period('M') to match "2023-01-03" (Data) with "2023-01-01" (Grade)
    df["_join_month"] = df.index.to_period("M")

    # --- Process Grades ---
    if self.grades_df is not None and not self.grades_df.empty:
        grades = self.grades_df.copy()
        if "date" in grades.columns:
            grades["date"] = pd.to_datetime(grades["date"])

            # Create join key
            grades["_join_month"] = grades["date"].dt.to_period("M")

            # Deduplicate: Keep the last rating for the month if multiple exist
            # This prevents row explosion (Cartesian product) if FMP has >1 record/month
            grades = grades.sort_values("date").drop_duplicates(subset=["_join_month"], keep="last")

            # Drop redundant columns
            grades = grades.drop(columns=["symbol", "date"], errors="ignore")

            # Merge on month key
            # We use reset_index() on df to preserve the DatetimeIndex during merge
            # then set it back.
            df_reset = df.reset_index()

            # Merge
            merged = pd.merge(df_reset, grades, on="_join_month", how="left", suffixes=("", "_grade"))

            # Restore index
            if date_col:
                merged.set_index(date_col, inplace=True)
            else:
                # Fallback if date_col wasn't explicitly tracked (shouldn't happen given logic above)
                merged.set_index(df.index.name or "index", inplace=True)

            df = merged

    # --- Process Ratings ---
    if self.ratings_df is not None and not self.ratings_df.empty:
        ratings = self.ratings_df.copy()
        if "date" in ratings.columns:
            ratings["date"] = pd.to_datetime(ratings["date"])

            # Create join key
            ratings["_join_month"] = ratings["date"].dt.to_period("M")

            # Deduplicate
            ratings = ratings.sort_values("date").drop_duplicates(subset=["_join_month"], keep="last")

            # Drop redundant columns
            ratings = ratings.drop(columns=["symbol", "rating", "date"], errors="ignore")

            # Merge
            df_reset = df.reset_index()

            merged = pd.merge(df_reset, ratings, on="_join_month", how="left", suffixes=("", "_rating"))

            # Restore index
            if date_col:
                merged.set_index(date_col, inplace=True)
            else:
                merged.set_index(df.index.name or "index", inplace=True)

            df = merged

    # Cleanup join key
    if "_join_month" in df.columns:
        df = df.drop(columns=["_join_month"])

    # Reset index if we changed it
    if date_col:
        df.reset_index(inplace=True)

    return df

MarketContextStep

Merge broad market context (Sector/Industry performance) into the DataFrame.

This step allows the agent to see how the specific stock's sector or industry is performing relative to the stock itself.

Attributes:

Name Type Description
sector_perf_df DataFrame

Historical sector performance.

industry_perf_df DataFrame

Historical industry performance.

Source code in src/quantrl_lab/data/processing/steps/features/context.py
class MarketContextStep:
    """
    Merge broad market context (Sector/Industry performance) into the
    DataFrame.

    This step allows the agent to see how the specific stock's sector or
    industry is performing relative to the stock itself.

    Attributes:
        sector_perf_df (pd.DataFrame): Historical sector performance.
        industry_perf_df (pd.DataFrame): Historical industry performance.
    """

    def __init__(self, sector_perf_df: pd.DataFrame = None, industry_perf_df: pd.DataFrame = None):
        self.sector_perf_df = sector_perf_df
        self.industry_perf_df = industry_perf_df

    def process(self, data: pd.DataFrame, metadata: ProcessingMetadata) -> pd.DataFrame:
        """
        Merge sector and industry data.

        Args:
            data: Input OHLCV DataFrame
            metadata: Processing metadata

        Returns:
            DataFrame with added context features (prefixed with sector_ or industry_)
        """
        if (self.sector_perf_df is None or self.sector_perf_df.empty) and (
            self.industry_perf_df is None or self.industry_perf_df.empty
        ):
            return data

        df = data.copy()

        # Setup index for merging
        date_col = None
        temp_index = False
        if not isinstance(df.index, pd.DatetimeIndex):
            # Try to find date column
            for col in ["Timestamp", "Date", "date"]:
                if col in df.columns:
                    df[col] = pd.to_datetime(df[col])
                    df.set_index(col, inplace=True)
                    date_col = col
                    temp_index = True
                    break

        # Normalize index to tz-naive UTC midnight to prevent join errors
        if isinstance(df.index, pd.DatetimeIndex):
            if df.index.tz is not None:
                df.index = df.index.tz_convert("UTC").tz_localize(None)
            df.index = df.index.normalize()

        # --- Merge Sector Performance ---
        if self.sector_perf_df is not None and not self.sector_perf_df.empty:
            sector_df = self.sector_perf_df.copy()
            if "date" in sector_df.columns:
                sector_df["date"] = pd.to_datetime(sector_df["date"])
                sector_df.set_index("date", inplace=True)

                if isinstance(sector_df.index, pd.DatetimeIndex):
                    if sector_df.index.tz is not None:
                        sector_df.index = sector_df.index.tz_convert("UTC").tz_localize(None)
                    sector_df.index = sector_df.index.normalize()

                # Keep numeric columns only for performance metrics
                numeric_cols = sector_df.select_dtypes(include=["number"]).columns
                sector_df = sector_df[numeric_cols]

                # Add prefix
                sector_df = sector_df.add_prefix("sector_")

                # Join
                df = df.join(sector_df, how="left")

        # --- Merge Industry Performance ---
        if self.industry_perf_df is not None and not self.industry_perf_df.empty:
            ind_df = self.industry_perf_df.copy()
            if "date" in ind_df.columns:
                ind_df["date"] = pd.to_datetime(ind_df["date"])
                ind_df.set_index("date", inplace=True)

                if isinstance(ind_df.index, pd.DatetimeIndex):
                    if ind_df.index.tz is not None:
                        ind_df.index = ind_df.index.tz_convert("UTC").tz_localize(None)
                    ind_df.index = ind_df.index.normalize()

                numeric_cols = ind_df.select_dtypes(include=["number"]).columns
                ind_df = ind_df[numeric_cols]

                ind_df = ind_df.add_prefix("industry_")

                df = df.join(ind_df, how="left")

        # Restore index if we changed it temporarily
        if temp_index and date_col:
            df.reset_index(inplace=True)

        return df

    def get_step_name(self) -> str:
        return "Market Context Enrichment"

process(data, metadata)

Merge sector and industry data.

Parameters:

Name Type Description Default
data DataFrame

Input OHLCV DataFrame

required
metadata ProcessingMetadata

Processing metadata

required

Returns:

Type Description
DataFrame

DataFrame with added context features (prefixed with sector_ or industry_)

Source code in src/quantrl_lab/data/processing/steps/features/context.py
def process(self, data: pd.DataFrame, metadata: ProcessingMetadata) -> pd.DataFrame:
    """
    Merge sector and industry data.

    Args:
        data: Input OHLCV DataFrame
        metadata: Processing metadata

    Returns:
        DataFrame with added context features (prefixed with sector_ or industry_)
    """
    if (self.sector_perf_df is None or self.sector_perf_df.empty) and (
        self.industry_perf_df is None or self.industry_perf_df.empty
    ):
        return data

    df = data.copy()

    # Setup index for merging
    date_col = None
    temp_index = False
    if not isinstance(df.index, pd.DatetimeIndex):
        # Try to find date column
        for col in ["Timestamp", "Date", "date"]:
            if col in df.columns:
                df[col] = pd.to_datetime(df[col])
                df.set_index(col, inplace=True)
                date_col = col
                temp_index = True
                break

    # Normalize index to tz-naive UTC midnight to prevent join errors
    if isinstance(df.index, pd.DatetimeIndex):
        if df.index.tz is not None:
            df.index = df.index.tz_convert("UTC").tz_localize(None)
        df.index = df.index.normalize()

    # --- Merge Sector Performance ---
    if self.sector_perf_df is not None and not self.sector_perf_df.empty:
        sector_df = self.sector_perf_df.copy()
        if "date" in sector_df.columns:
            sector_df["date"] = pd.to_datetime(sector_df["date"])
            sector_df.set_index("date", inplace=True)

            if isinstance(sector_df.index, pd.DatetimeIndex):
                if sector_df.index.tz is not None:
                    sector_df.index = sector_df.index.tz_convert("UTC").tz_localize(None)
                sector_df.index = sector_df.index.normalize()

            # Keep numeric columns only for performance metrics
            numeric_cols = sector_df.select_dtypes(include=["number"]).columns
            sector_df = sector_df[numeric_cols]

            # Add prefix
            sector_df = sector_df.add_prefix("sector_")

            # Join
            df = df.join(sector_df, how="left")

    # --- Merge Industry Performance ---
    if self.industry_perf_df is not None and not self.industry_perf_df.empty:
        ind_df = self.industry_perf_df.copy()
        if "date" in ind_df.columns:
            ind_df["date"] = pd.to_datetime(ind_df["date"])
            ind_df.set_index("date", inplace=True)

            if isinstance(ind_df.index, pd.DatetimeIndex):
                if ind_df.index.tz is not None:
                    ind_df.index = ind_df.index.tz_convert("UTC").tz_localize(None)
                ind_df.index = ind_df.index.normalize()

            numeric_cols = ind_df.select_dtypes(include=["number"]).columns
            ind_df = ind_df[numeric_cols]

            ind_df = ind_df.add_prefix("industry_")

            df = df.join(ind_df, how="left")

    # Restore index if we changed it temporarily
    if temp_index and date_col:
        df.reset_index(inplace=True)

    return df

NumericConversionStep

Convert DataFrame columns to numeric types.

This step converts object columns to numeric, while preserving date columns. Useful for ensuring proper data types before feeding to ML models.

Example

step = NumericConversionStep(columns=["volume", "price"]) result = step.process(df, metadata)

Source code in src/quantrl_lab/data/processing/steps/cleaning/conversion.py
class NumericConversionStep:
    """
    Convert DataFrame columns to numeric types.

    This step converts object columns to numeric, while preserving
    date columns. Useful for ensuring proper data types before
    feeding to ML models.

    Example:
        >>> step = NumericConversionStep(columns=["volume", "price"])
        >>> result = step.process(df, metadata)
    """

    def __init__(self, columns: Optional[List[str]] = None):
        """
        Initialize numeric conversion step.

        Args:
            columns (List[str], optional): Specific columns to convert. If None, converts
                all object columns excluding date columns. Defaults to None.
        """
        self.columns = columns

    def process(self, data: pd.DataFrame, metadata: ProcessingMetadata) -> pd.DataFrame:
        """
        Convert specified columns to numeric.

        Args:
            data (pd.DataFrame): Input DataFrame.
            metadata (ProcessingMetadata): Processing metadata (not modified).

        Returns:
            pd.DataFrame: DataFrame with numeric columns.

        Raises:
            ValueError: If columns is not a list.
        """
        df = data.copy()
        columns_to_convert = self.columns

        if columns_to_convert is None:
            columns_to_convert = []
            for col in df.columns:
                if df[col].dtype == "object":
                    if col in config.DATE_COLUMNS or col.lower() in [c.lower() for c in config.DATE_COLUMNS]:
                        continue

                    # Sample a value to detect date columns not listed in DATE_COLUMNS
                    sample_val = df[col].dropna().iloc[0] if not df[col].dropna().empty else None
                    if sample_val is not None:
                        try:
                            pd.to_datetime(sample_val)
                            continue
                        except (ValueError, TypeError):
                            columns_to_convert.append(col)
        elif not isinstance(columns_to_convert, list):
            raise ValueError(f"Invalid type for 'columns': expected list, got {type(columns_to_convert).__name__}.")

        for col in columns_to_convert:
            if col in df.columns and df[col].dtype == "object":
                df[col] = pd.to_numeric(df[col], errors="coerce")

        return df

    def get_step_name(self) -> str:
        """Return step name."""
        return "Numeric Conversion"

__init__(columns=None)

Initialize numeric conversion step.

Parameters:

Name Type Description Default
columns List[str]

Specific columns to convert. If None, converts all object columns excluding date columns. Defaults to None.

None
Source code in src/quantrl_lab/data/processing/steps/cleaning/conversion.py
def __init__(self, columns: Optional[List[str]] = None):
    """
    Initialize numeric conversion step.

    Args:
        columns (List[str], optional): Specific columns to convert. If None, converts
            all object columns excluding date columns. Defaults to None.
    """
    self.columns = columns

process(data, metadata)

Convert specified columns to numeric.

Parameters:

Name Type Description Default
data DataFrame

Input DataFrame.

required
metadata ProcessingMetadata

Processing metadata (not modified).

required

Returns:

Type Description
DataFrame

pd.DataFrame: DataFrame with numeric columns.

Raises:

Type Description
ValueError

If columns is not a list.

Source code in src/quantrl_lab/data/processing/steps/cleaning/conversion.py
def process(self, data: pd.DataFrame, metadata: ProcessingMetadata) -> pd.DataFrame:
    """
    Convert specified columns to numeric.

    Args:
        data (pd.DataFrame): Input DataFrame.
        metadata (ProcessingMetadata): Processing metadata (not modified).

    Returns:
        pd.DataFrame: DataFrame with numeric columns.

    Raises:
        ValueError: If columns is not a list.
    """
    df = data.copy()
    columns_to_convert = self.columns

    if columns_to_convert is None:
        columns_to_convert = []
        for col in df.columns:
            if df[col].dtype == "object":
                if col in config.DATE_COLUMNS or col.lower() in [c.lower() for c in config.DATE_COLUMNS]:
                    continue

                # Sample a value to detect date columns not listed in DATE_COLUMNS
                sample_val = df[col].dropna().iloc[0] if not df[col].dropna().empty else None
                if sample_val is not None:
                    try:
                        pd.to_datetime(sample_val)
                        continue
                    except (ValueError, TypeError):
                        columns_to_convert.append(col)
    elif not isinstance(columns_to_convert, list):
        raise ValueError(f"Invalid type for 'columns': expected list, got {type(columns_to_convert).__name__}.")

    for col in columns_to_convert:
        if col in df.columns and df[col].dtype == "object":
            df[col] = pd.to_numeric(df[col], errors="coerce")

    return df

get_step_name()

Return step name.

Source code in src/quantrl_lab/data/processing/steps/cleaning/conversion.py
def get_step_name(self) -> str:
    """Return step name."""
    return "Numeric Conversion"

ColumnCleanupStep

Drop unwanted columns from DataFrame.

This step removes specified columns or default columns (Date, Timestamp, Symbol) from the DataFrame.

Example

step = ColumnCleanupStep(columns_to_drop=["Date", "Symbol"]) result = step.process(df, metadata)

Source code in src/quantrl_lab/data/processing/steps/cleaning/cleanup.py
class ColumnCleanupStep:
    """
    Drop unwanted columns from DataFrame.

    This step removes specified columns or default columns
    (Date, Timestamp, Symbol) from the DataFrame.

    Example:
        >>> step = ColumnCleanupStep(columns_to_drop=["Date", "Symbol"])
        >>> result = step.process(df, metadata)
    """

    def __init__(self, columns_to_drop: Optional[List[str]] = None, keep_date: bool = False):
        """
        Initialize column cleanup step.

        Args:
            columns_to_drop: List of columns to drop. If None, drops default columns.
            keep_date: If True, preserve date columns even if in columns_to_drop.
        """
        if columns_to_drop is None:
            self.columns_to_drop = [config.DEFAULT_DATE_COLUMN, "Timestamp", "Symbol"]
        elif not isinstance(columns_to_drop, list):
            raise ValueError(
                f"Invalid type for 'columns_to_drop': expected list, got {type(columns_to_drop).__name__}."
            )
        else:
            self.columns_to_drop = columns_to_drop

        self.keep_date = keep_date

    def process(self, data: pd.DataFrame, metadata: ProcessingMetadata) -> pd.DataFrame:
        """
        Drop specified columns from DataFrame.

        Args:
            data: Input DataFrame
            metadata: Processing metadata (updated with dropped columns)

        Returns:
            DataFrame with columns removed
        """
        columns_to_drop = self.columns_to_drop.copy()

        if self.keep_date:
            columns_to_drop = [col for col in columns_to_drop if col not in config.DATE_COLUMNS]

        # Track actually dropped columns
        actually_dropped = [col for col in columns_to_drop if col in data.columns]
        metadata.columns_dropped.extend(actually_dropped)

        return data.drop(columns=columns_to_drop, errors="ignore")

    def get_step_name(self) -> str:
        """Return step name."""
        return "Column Cleanup"

__init__(columns_to_drop=None, keep_date=False)

Initialize column cleanup step.

Parameters:

Name Type Description Default
columns_to_drop Optional[List[str]]

List of columns to drop. If None, drops default columns.

None
keep_date bool

If True, preserve date columns even if in columns_to_drop.

False
Source code in src/quantrl_lab/data/processing/steps/cleaning/cleanup.py
def __init__(self, columns_to_drop: Optional[List[str]] = None, keep_date: bool = False):
    """
    Initialize column cleanup step.

    Args:
        columns_to_drop: List of columns to drop. If None, drops default columns.
        keep_date: If True, preserve date columns even if in columns_to_drop.
    """
    if columns_to_drop is None:
        self.columns_to_drop = [config.DEFAULT_DATE_COLUMN, "Timestamp", "Symbol"]
    elif not isinstance(columns_to_drop, list):
        raise ValueError(
            f"Invalid type for 'columns_to_drop': expected list, got {type(columns_to_drop).__name__}."
        )
    else:
        self.columns_to_drop = columns_to_drop

    self.keep_date = keep_date

process(data, metadata)

Drop specified columns from DataFrame.

Parameters:

Name Type Description Default
data DataFrame

Input DataFrame

required
metadata ProcessingMetadata

Processing metadata (updated with dropped columns)

required

Returns:

Type Description
DataFrame

DataFrame with columns removed

Source code in src/quantrl_lab/data/processing/steps/cleaning/cleanup.py
def process(self, data: pd.DataFrame, metadata: ProcessingMetadata) -> pd.DataFrame:
    """
    Drop specified columns from DataFrame.

    Args:
        data: Input DataFrame
        metadata: Processing metadata (updated with dropped columns)

    Returns:
        DataFrame with columns removed
    """
    columns_to_drop = self.columns_to_drop.copy()

    if self.keep_date:
        columns_to_drop = [col for col in columns_to_drop if col not in config.DATE_COLUMNS]

    # Track actually dropped columns
    actually_dropped = [col for col in columns_to_drop if col in data.columns]
    metadata.columns_dropped.extend(actually_dropped)

    return data.drop(columns=columns_to_drop, errors="ignore")

get_step_name()

Return step name.

Source code in src/quantrl_lab/data/processing/steps/cleaning/cleanup.py
def get_step_name(self) -> str:
    """Return step name."""
    return "Column Cleanup"