commit 4ddcde68d44a207c49840a1206dcbf9b0c3a229e Author: Dev Date: Thu Sep 11 17:46:14 2025 +0300 Initial commit: Market Trends Scraper diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..e187b7f --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2023 Market Trends Scraper + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..26de7ff --- /dev/null +++ b/README.md @@ -0,0 +1,340 @@ +# Market Trends Scraper + +A powerful and flexible Python web scraper for collecting and analyzing pricing and product trends from e-commerce websites. This tool provides comprehensive market insights by extracting product data, tracking price changes, and generating detailed trend analyses. + +## Features + +- **Multi-Source Scraping**: Collect data from multiple e-commerce websites simultaneously +- **Flexible Configuration**: Easy-to-use YAML configuration for different sources and scraping parameters +- **Dual Scraping Methods**: Supports both requests/BeautifulSoup and Selenium for dynamic content +- **Data Analysis**: Built-in analysis of pricing trends, ratings, and product availability +- **Multiple Output Formats**: Save data in CSV, JSON, or Excel formats +- **Robust Error Handling**: Comprehensive error handling and retry mechanisms +- **Professional Logging**: Detailed logging with configurable levels and outputs +- **Extensible Architecture**: Modular design for easy customization and extension + +## Installation + +### Prerequisites + +- Python 3.8 or higher +- Chrome browser (for Selenium functionality) +- ChromeDriver (compatible with your Chrome version) + +### Setup + +1. Clone the repository: +```bash +git clone https://github.com/iwasforcedtobehere/market-trends-scraper.git +cd market-trends-scraper +``` + +2. Create and activate a virtual environment: +```bash +python -m venv venv +source venv/bin/activate # On Windows: venv\Scripts\activate +``` + +3. Install dependencies: +```bash +pip install -r requirements.txt +``` + +4. Install ChromeDriver (if using Selenium): +```bash +# For Ubuntu/Debian: +sudo apt-get install chromium-chromedriver + +# For macOS using Homebrew: +brew install chromedriver + +# For Windows, download from: https://chromedriver.chromium.org/ +``` + +## Configuration + +The scraper uses a YAML configuration file to define scraping sources and parameters. A default configuration will be created automatically at `config/config.yaml` when you first run the scraper. + +### Example Configuration + +```yaml +scraper: + delay_between_requests: 1.0 + timeout: 30 + max_retries: 3 + user_agent: "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" + headless: true + window_size: [1920, 1080] + +sources: + - name: "example_ecommerce" + url: "https://example-ecommerce.com/search" + type: "ecommerce" + enabled: true + use_selenium: false + selectors: + product: "div.product-item" + name: "h2.product-title" + price: "span.price" + rating: "div.rating" + availability: "div.stock-status" + pagination: + next_page: "a.next-page" + max_pages: 10 + +output: + format: "csv" + include_timestamp: true + filename: "market_trends_data" + +database: + url: "sqlite:///data/market_trends.db" + echo: false + +analysis: + price_history_days: 30 + trend_threshold: 0.05 + generate_charts: true +``` + +### Configuration Options + +#### Scraper Settings +- `delay_between_requests`: Delay between requests in seconds (default: 1.0) +- `timeout`: Request timeout in seconds (default: 30) +- `max_retries`: Maximum number of retry attempts for failed requests (default: 3) +- `user_agent`: User agent string for HTTP requests +- `headless`: Run browser in headless mode (default: true) +- `window_size`: Browser window size as [width, height] (default: [1920, 1080]) + +#### Source Configuration +- `name`: Unique identifier for the data source +- `url`: Base URL for scraping +- `type`: Type of website (e.g., "ecommerce") +- `enabled`: Whether to scrape this source (default: true) +- `use_selenium`: Use Selenium instead of requests (default: false) +- `selectors`: CSS selectors for extracting data +- `pagination`: Pagination settings + +#### Output Settings +- `format`: Output format ("csv", "json", or "excel") +- `include_timestamp`: Include timestamp in output filename (default: true) +- `filename`: Base filename for output files + +#### Analysis Settings +- `price_history_days`: Number of days to consider for price history (default: 30) +- `trend_threshold`: Minimum price change percentage to consider as a trend (default: 0.05) +- `generate_charts`: Generate trend charts (default: true) + +## Usage + +### Command Line Interface + +Run the scraper with default settings: +```bash +python main.py +``` + +Specify a custom configuration file: +```bash +python main.py --config path/to/config.yaml +``` + +Specify output file: +```bash +python main.py --output path/to/output.csv +``` + +Run in verbose mode: +```bash +python main.py --verbose +``` + +Run browser in non-headless mode (for debugging): +```bash +python main.py --no-headless +``` + +### Python API + +```python +from src.config_manager import ConfigManager +from src.scraper import MarketTrendsScraper +from src.logger import setup_logger + +# Setup logging +setup_logger(verbose=True) + +# Load configuration +config_manager = ConfigManager("config/config.yaml") +config = config_manager.load_config() + +# Initialize scraper +with MarketTrendsScraper(config, headless=True) as scraper: + # Scrape data + data = scraper.scrape_market_trends() + + # Save data + scraper.save_data(data, "output.csv") + + # Analyze trends + analysis = scraper.analyze_trends(data) + + # Save analysis + scraper.save_analysis(analysis, "analysis.json") +``` + +## Output + +### Data Output + +The scraper produces structured data with the following fields: + +| Field | Description | +|-------|-------------| +| name | Product name | +| price | Product price (as float) | +| rating | Product rating (as float) | +| availability | Product availability status | +| url | Product URL | +| source | Data source name | +| scraped_at | Timestamp when data was scraped | + +### Analysis Output + +The trend analysis includes: + +- **Summary Statistics**: Total products, source distribution +- **Price Analysis**: Average, min, max, median prices and distribution +- **Rating Analysis**: Average, min, max ratings and distribution +- **Availability Analysis**: Count of products by availability status +- **Price Trends by Source**: Comparative analysis across sources + +Example analysis output: +```json +{ + "total_products": 150, + "sources": { + "example_ecommerce": 100, + "another_store": 50 + }, + "price_analysis": { + "average_price": 49.99, + "min_price": 9.99, + "max_price": 199.99, + "median_price": 45.00, + "price_distribution": { + "count": 150, + "mean": 49.99, + "std": 35.25, + "min": 9.99, + "25%": 25.00, + "50%": 45.00, + "75%": 75.00, + "max": 199.99 + } + }, + "rating_analysis": { + "average_rating": 4.2, + "min_rating": 1.0, + "max_rating": 5.0, + "rating_distribution": { + "5.0": 45, + "4.0": 60, + "3.0": 30, + "2.0": 10, + "1.0": 5 + } + } +} +``` + +## Testing + +Run all tests: +```bash +pytest +``` + +Run unit tests only: +```bash +pytest -m unit +``` + +Run integration tests only: +```bash +pytest -m integration +``` + +Run tests with coverage report: +```bash +pytest --cov=src --cov-report=html +``` + +## Project Structure + +``` +market-trends-scraper/ +├── src/ +│ ├── __init__.py +│ ├── config_manager.py # Configuration management +│ ├── logger.py # Logging utilities +│ └── scraper.py # Main scraper implementation +├── tests/ +│ ├── __init__.py +│ ├── test_config_manager.py +│ ├── test_logger.py +│ ├── test_scraper.py +│ └── test_integration.py +├── config/ +│ └── config.yaml # Configuration file +├── data/ # Output data directory +├── main.py # Main entry point +├── requirements.txt # Python dependencies +├── pytest.ini # Test configuration +└── README.md # This file +``` + +## Contributing + +1. Fork the repository +2. Create a feature branch (`git checkout -b feature/amazing-feature`) +3. Commit your changes (`git commit -m 'Add amazing feature'`) +4. Push to the branch (`git push origin feature/amazing-feature`) +5. Open a Pull Request + +### Development Guidelines + +- Follow PEP 8 style guidelines +- Write comprehensive tests for new features +- Update documentation as needed +- Ensure all tests pass before submitting + +## License + +This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details. + +## Disclaimer + +This tool is for educational and research purposes. Users are responsible for: +- Complying with websites' terms of service +- Respecting robots.txt files +- Using the tool ethically and responsibly +- Not overwhelming servers with too many requests + +The authors are not responsible for any misuse of this tool. + +## Support + +If you encounter any issues or have questions: + +1. Check the [Issues](https://github.com/iwasforcedtobehere/market-trends-scraper/issues) page +2. Create a new issue with detailed information +3. For general questions, use the [Discussions](https://github.com/iwasforcedtobehere/market-trends-scraper/discussions) tab + +## Acknowledgments + +- [Beautiful Soup](https://www.crummy.com/software/BeautifulSoup/) for HTML parsing +- [Selenium](https://www.selenium.dev/) for browser automation +- [Pandas](https://pandas.pydata.org/) for data analysis +- [Loguru](https://github.com/Delgan/loguru) for logging \ No newline at end of file diff --git a/config/sample_config.yaml b/config/sample_config.yaml new file mode 100644 index 0000000..75a37ae --- /dev/null +++ b/config/sample_config.yaml @@ -0,0 +1,101 @@ +# Sample Configuration for Market Trends Scraper +# Copy this file to config.yaml and customize for your needs + +scraper: + # Delay between requests in seconds (helps avoid being blocked) + delay_between_requests: 1.5 + + # Request timeout in seconds + timeout: 30 + + # Maximum number of retry attempts for failed requests + max_retries: 3 + + # User agent string for HTTP requests + user_agent: "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" + + # Run browser in headless mode (no visible UI) + headless: true + + # Browser window size [width, height] + window_size: [1920, 1080] + +# List of data sources to scrape +sources: + # Example: Amazon (fictional selectors for demonstration) + - name: "amazon" + url: "https://www.amazon.com/s?k=laptop" + type: "ecommerce" + enabled: false # Set to true to enable this source + use_selenium: true # Amazon often requires JavaScript rendering + selectors: + product: "div[data-component-type='s-search-result']" + name: "h2 span.a-text-normal" + price: "span.a-price-whole" + rating: "span.a-icon-alt" + availability: "span.a-color-success" + pagination: + next_page: "a.s-pagination-next" + max_pages: 5 + + # Example: Generic e-commerce site + - name: "example_ecommerce" + url: "https://example-ecommerce.com/search?q=phone" + type: "ecommerce" + enabled: true + use_selenium: false + selectors: + product: "div.product-card" + name: "h3.product-title" + price: "span.price" + rating: "div.rating-stars" + availability: "div.stock-status" + pagination: + next_page: "a.pagination-next" + max_pages: 10 + + # Example: Electronics retailer + - name: "electronics_store" + url: "https://example-electronics.com/category/smartphones" + type: "ecommerce" + enabled: true + use_selenium: false + selectors: + product: "article.product-item" + name: "h1.product-name" + price: "div.current-price" + rating: "div.product-rating" + availability: "span.availability-label" + pagination: + next_page: "li.page-item.next a" + max_pages: 3 + +# Output settings +output: + # Output format: csv, json, or excel + format: "csv" + + # Include timestamp in output filename + include_timestamp: true + + # Base filename for output files + filename: "market_trends_data" + +# Database settings (for future enhancements) +database: + # Database connection URL + url: "sqlite:///data/market_trends.db" + + # Enable SQL query logging + echo: false + +# Analysis settings +analysis: + # Number of days to consider for price history analysis + price_history_days: 30 + + # Minimum price change percentage to consider as a trend (0.05 = 5%) + trend_threshold: 0.05 + + # Generate trend charts (requires matplotlib and seaborn) + generate_charts: true \ No newline at end of file diff --git a/examples/advanced_usage.py b/examples/advanced_usage.py new file mode 100644 index 0000000..eea8319 --- /dev/null +++ b/examples/advanced_usage.py @@ -0,0 +1,327 @@ + +#!/usr/bin/env python3 +""" +Advanced Usage Example for Market Trends Scraper + +This script demonstrates advanced features of the Market Trends Scraper, +including custom data processing, trend analysis, and visualization. +""" + +import sys +import json +import pandas as pd +import matplotlib.pyplot as plt +import seaborn as sns +from datetime import datetime, timedelta +from pathlib import Path + +# Add src directory to Python path +sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "src")) + +from config_manager import ConfigManager +from scraper import MarketTrendsScraper +from logger import setup_logger + + +def create_custom_config(): + """Create a custom configuration for demonstration.""" + return { + "scraper": { + "delay_between_requests": 2.0, + "timeout": 30, + "max_retries": 3, + "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", + "headless": True, + "window_size": [1920, 1080] + }, + "sources": [ + { + "name": "books_store", + "url": "https://books.toscrape.com/", + "type": "ecommerce", + "enabled": True, + "use_selenium": False, + "selectors": { + "product": "article.product_pod", + "name": "h3 a", + "price": "p.price_color", + "rating": "p.star-rating", + "availability": "p.instock.availability" + }, + "pagination": { + "next_page": "li.next a", + "max_pages": 3 + } + } + ], + "output": { + "format": "csv", + "include_timestamp": True, + "filename": "books_analysis" + }, + "database": { + "url": "sqlite:///../data/books.db", + "echo": False + }, + "analysis": { + "price_history_days": 30, + "trend_threshold": 0.05, + "generate_charts": True + } + } + + +def custom_price_processing(price_text): + """ + Custom price processing function for specific formats. + + Args: + price_text: Raw price text from website + + Returns: + Parsed price as float or None if parsing fails + """ + try: + # Remove currency symbol and whitespace + price_clean = price_text.replace('£', '').strip() + + # Convert to float + return float(price_clean) + except (ValueError, TypeError): + return None + + +def custom_rating_processing(rating_element): + """ + Custom rating processing for star ratings. + + Args: + rating_element: BeautifulSoup element containing rating + + Returns: + Parsed rating as float or None if parsing fails + """ + try: + # Extract rating from class name (e.g., "star-rating Three" -> 3) + class_names = rating_element.get('class', []) + for class_name in class_names: + if class_name.lower() in ['one', 'two', 'three', 'four', 'five']: + rating_map = { + 'one': 1.0, + 'two': 2.0, + 'three': 3.0, + 'four': 4.0, + 'five': 5.0 + } + return rating_map.get(class_name.lower()) + return None + except Exception: + return None + + +def analyze_price_trends(data): + """ + Perform advanced price trend analysis. + + Args: + data: List of product data dictionaries + + Returns: + Dictionary containing trend analysis results + """ + if not data: + return {"error": "No data available for analysis"} + + # Convert to DataFrame + df = pd.DataFrame(data) + + # Ensure price is numeric + df['price'] = pd.to_numeric(df['price'], errors='coerce') + + # Drop rows with missing prices + df = df.dropna(subset=['price']) + + # Basic statistics + analysis = { + "total_products": len(df), + "price_statistics": { + "mean": float(df['price'].mean()), + "median": float(df['price'].median()), + "std": float(df['price'].std()), + "min": float(df['price'].min()), + "max": float(df['price'].max()), + "quartiles": { + "25%": float(df['price'].quantile(0.25)), + "75%": float(df['price'].quantile(0.75)) + } + }, + "price_distribution": { + "under_10": int(len(df[df['price'] < 10])), + "10_to_20": int(len(df[(df['price'] >= 10) & (df['price'] < 20)])), + "20_to_30": int(len(df[(df['price'] >= 20) & (df['price'] < 30)])), + "30_to_40": int(len(df[(df['price'] >= 30) & (df['price'] < 40)])), + "over_40": int(len(df[df['price'] >= 40])) + } + } + + # Rating analysis if available + if 'rating' in df.columns: + df['rating'] = pd.to_numeric(df['rating'], errors='coerce') + rating_df = df.dropna(subset=['rating']) + + if not rating_df.empty: + analysis["rating_statistics"] = { + "mean": float(rating_df['rating'].mean()), + "distribution": rating_df['rating'].value_counts().to_dict() + } + + return analysis + + +def generate_visualizations(data, analysis, output_dir): + """ + Generate visualization charts for the analysis. + + Args: + data: List of product data dictionaries + analysis: Analysis results dictionary + output_dir: Directory to save charts + """ + # Create output directory if it doesn't exist + Path(output_dir).mkdir(parents=True, exist_ok=True) + + # Convert to DataFrame + df = pd.DataFrame(data) + + # Ensure price is numeric + df['price'] = pd.to_numeric(df['price'], errors='coerce') + df = df.dropna(subset=['price']) + + # Set style + sns.set(style="whitegrid") + plt.figure(figsize=(12, 8)) + + # Price distribution histogram + plt.subplot(2, 2, 1) + sns.histplot(df['price'], bins=20, kde=True) + plt.title('Price Distribution') + plt.xlabel('Price (£)') + plt.ylabel('Count') + + # Price distribution by category + plt.subplot(2, 2, 2) + price_dist = analysis['price_distribution'] + categories = list(price_dist.keys()) + values = list(price_dist.values()) + plt.bar(categories, values) + plt.title('Price Distribution by Category') + plt.xlabel('Price Category') + plt.ylabel('Count') + plt.xticks(rotation=45) + + # Box plot for prices + plt.subplot(2, 2, 3) + sns.boxplot(y=df['price']) + plt.title('Price Box Plot') + plt.ylabel('Price (£)') + + # Rating vs Price scatter plot (if ratings available) + plt.subplot(2, 2, 4) + if 'rating' in df.columns: + df['rating'] = pd.to_numeric(df['rating'], errors='coerce') + rating_df = df.dropna(subset=['rating']) + if not rating_df.empty: + sns.scatterplot(x='rating', y='price', data=rating_df) + plt.title('Rating vs Price') + plt.xlabel('Rating') + plt.ylabel('Price (£)') + else: + plt.text(0.5, 0.5, 'No rating data available', + horizontalalignment='center', verticalalignment='center') + else: + plt.text(0.5, 0.5, 'No rating data available', + horizontalalignment='center', verticalalignment='center') + + # Adjust layout and save + plt.tight_layout() + plt.savefig(f"{output_dir}/market_analysis_charts.png", dpi=300) + plt.close() + + print(f"✓ Visualization charts saved to {output_dir}/market_analysis_charts.png") + + +def main(): + """Main function demonstrating advanced scraper usage.""" + + # Setup logging + setup_logger(verbose=True) + + # Create custom configuration + config = create_custom_config() + print("✓ Custom configuration created") + + # Initialize scraper + try: + scraper = MarketTrendsScraper(config, headless=True) + print("✓ Scraper initialized successfully") + except Exception as e: + print(f"✗ Failed to initialize scraper: {str(e)}") + return 1 + + try: + # Scrape market trends data + print("\n🔍 Scraping market trends data...") + data = scraper.scrape_market_trends() + print(f"✓ Scraped {len(data)} product records") + + if not data: + print("⚠ No data was scraped. Check your configuration and selectors.") + return 0 + + # Save raw data + output_file = "../data/advanced_example_output.csv" + scraper.save_data(data, output_file) + print(f"✓ Raw data saved to {output_file}") + + # Perform advanced analysis + print("\n📊 Performing advanced analysis...") + analysis = analyze_price_trends(data) + print("✓ Advanced analysis completed") + + # Save analysis results + analysis_file = "../data/advanced_example_analysis.json" + with open(analysis_file, 'w') as f: + json.dump(analysis, f, indent=2) + print(f"✓ Analysis saved to {analysis_file}") + + # Generate visualizations + print("\n📈 Generating visualization charts...") + charts_dir = "../data/charts" + generate_visualizations(data, analysis, charts_dir) + + # Print detailed summary + print("\n📋 Detailed Summary:") + print(f" - Total products: {analysis.get('total_products', 0)}") + + if 'price_statistics' in analysis: + price_stats = analysis['price_statistics'] + print(f" - Average price: £{price_stats.get('mean', 0):.2f}") + print(f" - Median price: £{price_stats.get('median', 0):.2f}") + print(f" - Standard deviation: £{price_stats.get('std', 0):.2f}") + print(f" - Price range: £{price_stats.get('min', 0):.2f} - £{price_stats.get('max', 0):.2f}") + + if 'price_distribution' in analysis: + print(" - Price distribution:") + for category, count in analysis['price_distribution'].items(): + print(f" * {category}: {count} products") + + if 'rating_statistics' in analysis: + rating_stats = analysis['rating_statistics'] + print(f" - Average rating: {rating_stats.get('mean', 0):.2f}") + print(" - Rating distribution:") + for rating, count in rating_stats['distribution'].items(): + print(f" * {rating} stars: {count} products") + + print("\n✅ Advanced market trends analysis completed successfully!") + return 0 + diff --git a/examples/basic_usage.py b/examples/basic_usage.py new file mode 100644 index 0000000..7ec514c --- /dev/null +++ b/examples/basic_usage.py @@ -0,0 +1,97 @@ +#!/usr/bin/env python3 +""" +Basic Usage Example for Market Trends Scraper + +This script demonstrates how to use the Market Trends Scraper +to collect and analyze pricing data from e-commerce websites. +""" + +import sys +from pathlib import Path + +# Add src directory to Python path +sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "src")) + +from config_manager import ConfigManager +from scraper import MarketTrendsScraper +from logger import setup_logger + + +def main(): + """Main function demonstrating basic scraper usage.""" + + # Setup logging + setup_logger(verbose=True) + + # Initialize configuration manager + config_manager = ConfigManager("../config/sample_config.yaml") + + # Load configuration + try: + config = config_manager.load_config() + print("✓ Configuration loaded successfully") + except Exception as e: + print(f"✗ Failed to load configuration: {str(e)}") + return 1 + + # Initialize scraper + try: + scraper = MarketTrendsScraper(config, headless=True) + print("✓ Scraper initialized successfully") + except Exception as e: + print(f"✗ Failed to initialize scraper: {str(e)}") + return 1 + + try: + # Scrape market trends data + print("\n🔍 Scraping market trends data...") + data = scraper.scrape_market_trends() + print(f"✓ Scraped {len(data)} product records") + + if not data: + print("⚠ No data was scraped. Check your configuration and selectors.") + return 0 + + # Save scraped data + output_file = "../data/example_output.csv" + scraper.save_data(data, output_file) + print(f"✓ Data saved to {output_file}") + + # Analyze trends + print("\n📊 Analyzing market trends...") + analysis = scraper.analyze_trends(data) + print("✓ Trend analysis completed") + + # Save analysis results + analysis_file = "../data/example_analysis.json" + scraper.save_analysis(analysis, analysis_file) + print(f"✓ Analysis saved to {analysis_file}") + + # Print summary + print("\n📋 Summary:") + print(f" - Total products: {analysis.get('total_products', 0)}") + + if 'price_analysis' in analysis: + price_analysis = analysis['price_analysis'] + print(f" - Average price: ${price_analysis.get('average_price', 0):.2f}") + print(f" - Price range: ${price_analysis.get('min_price', 0):.2f} - ${price_analysis.get('max_price', 0):.2f}") + + if 'sources' in analysis: + print(" - Products by source:") + for source, count in analysis['sources'].items(): + print(f" * {source}: {count} products") + + print("\n✅ Market trends analysis completed successfully!") + return 0 + + except Exception as e: + print(f"✗ Error during scraping: {str(e)}") + return 1 + + finally: + # Close scraper + scraper.close() + + +if __name__ == "__main__": + sys.exit(main()) \ No newline at end of file diff --git a/main.py b/main.py new file mode 100644 index 0000000..a55f3f8 --- /dev/null +++ b/main.py @@ -0,0 +1,96 @@ +#!/usr/bin/env python3 +""" +Market Trends Scraper - Main Entry Point + +This script serves as the main entry point for the Market Trends Scraper application. +It initializes the scraper, processes configuration, and orchestrates the scraping +and analysis workflow. +""" + +import sys +import argparse +from pathlib import Path + +# Add src directory to Python path +sys.path.insert(0, str(Path(__file__).resolve().parent / "src")) + +from scraper import MarketTrendsScraper +from config_manager import ConfigManager +from logger import setup_logger + + +def parse_arguments(): + """Parse command line arguments.""" + parser = argparse.ArgumentParser( + description="Web Scraper for Market Trends - Collect and analyze pricing and product trends" + ) + parser.add_argument( + "--config", "-c", + type=str, + default="config/config.yaml", + help="Path to configuration file" + ) + parser.add_argument( + "--output", "-o", + type=str, + default="data/output.csv", + help="Path to output file" + ) + parser.add_argument( + "--headless", + action="store_true", + help="Run browser in headless mode" + ) + parser.add_argument( + "--verbose", "-v", + action="store_true", + help="Enable verbose logging" + ) + return parser.parse_args() + + +def main(): + """Main application entry point.""" + args = parse_arguments() + + # Setup logging + logger = setup_logger(verbose=args.verbose) + logger.info("Starting Market Trends Scraper") + + try: + # Load configuration + config_manager = ConfigManager(args.config) + config = config_manager.load_config() + logger.info("Configuration loaded successfully") + + # Initialize and run scraper + scraper = MarketTrendsScraper(config, headless=args.headless) + logger.info("Scraper initialized") + + # Run the scraping process + data = scraper.scrape_market_trends() + logger.info(f"Scraped {len(data)} product records") + + # Save results + scraper.save_data(data, args.output) + logger.info(f"Data saved to {args.output}") + + # Analyze trends + analysis = scraper.analyze_trends(data) + logger.info("Trend analysis completed") + + # Save analysis results + analysis_output = args.output.replace('.csv', '_analysis.json') + scraper.save_analysis(analysis, analysis_output) + logger.info(f"Analysis saved to {analysis_output}") + + logger.info("Market Trends Scraper completed successfully") + return 0 + + except Exception as e: + logger.error(f"Error occurred: {str(e)}") + return 1 + + +if __name__ == "__main__": + sys.exit(main()) \ No newline at end of file diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..9b02493 --- /dev/null +++ b/pytest.ini @@ -0,0 +1,32 @@ +[tool:pytest] +# Python test discovery settings +testpaths = tests +python_files = test_*.py +python_classes = Test* +python_functions = test_* + +# Output formatting +addopts = + --verbose + --tb=short + --strict-markers + --disable-warnings + --cov=src + --cov-report=term-missing + --cov-report=html:htmlcov + --cov-fail-under=80 + +# Custom markers +markers = + slow: marks tests as slow (deselect with '-m "not slow"') + integration: marks tests as integration tests + unit: marks tests as unit tests + web: marks tests that require web access + +# Minimum Python version +minversion = 6.0 + +# Filter warnings +filterwarnings = + ignore::DeprecationWarning + ignore::PendingDeprecationWarning \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..88aadaf --- /dev/null +++ b/requirements.txt @@ -0,0 +1,31 @@ +# Web scraping +requests>=2.28.0 +beautifulsoup4>=4.11.0 +selenium>=4.8.0 +lxml>=4.9.0 + +# Data analysis and manipulation +pandas>=1.5.0 +numpy>=1.24.0 +matplotlib>=3.6.0 +seaborn>=0.12.0 + +# Data storage +sqlalchemy>=1.4.0 +sqlite3 + +# Configuration management +python-dotenv>=0.21.0 +pyyaml>=6.0 + +# Logging and monitoring +loguru>=0.6.0 + +# Testing +pytest>=7.2.0 +pytest-cov>=4.0.0 +responses>=0.22.0 + +# Utilities +tqdm>=4.64.0 +schedule>=1.1.0 \ No newline at end of file diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/config_manager.py b/src/config_manager.py new file mode 100644 index 0000000..6bbe94e --- /dev/null +++ b/src/config_manager.py @@ -0,0 +1,211 @@ +""" +Configuration Manager Module + +This module provides functionality to load, validate, and manage configuration +settings for the Market Trends Scraper application. +""" + +import os +import yaml +from pathlib import Path +from typing import Dict, Any, Optional +from loguru import logger + + +class ConfigManager: + """Manages application configuration loading and validation.""" + + def __init__(self, config_path: str = "config/config.yaml"): + """ + Initialize the ConfigManager. + + Args: + config_path: Path to the configuration file + """ + self.config_path = Path(config_path) + self.config = {} + self.default_config = self._get_default_config() + + def _get_default_config(self) -> Dict[str, Any]: + """ + Get the default configuration settings. + + Returns: + Dictionary containing default configuration values + """ + return { + "scraper": { + "delay_between_requests": 1.0, + "timeout": 30, + "max_retries": 3, + "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", + "headless": True, + "window_size": [1920, 1080] + }, + "sources": [ + { + "name": "example_ecommerce", + "url": "https://example-ecommerce.com/search", + "type": "ecommerce", + "enabled": True, + "selectors": { + "product": "div.product-item", + "name": "h2.product-title", + "price": "span.price", + "rating": "div.rating", + "availability": "div.stock-status" + }, + "pagination": { + "next_page": "a.next-page", + "max_pages": 10 + } + } + ], + "output": { + "format": "csv", + "include_timestamp": True, + "filename": "market_trends_data" + }, + "database": { + "url": "sqlite:///data/market_trends.db", + "echo": False + }, + "analysis": { + "price_history_days": 30, + "trend_threshold": 0.05, # 5% change considered significant + "generate_charts": True + } + } + + def load_config(self) -> Dict[str, Any]: + """ + Load configuration from file or create default if not exists. + + Returns: + Dictionary containing configuration settings + + Raises: + FileNotFoundError: If config file doesn't exist and cannot be created + yaml.YAMLError: If config file contains invalid YAML + """ + try: + # Create config directory if it doesn't exist + self.config_path.parent.mkdir(parents=True, exist_ok=True) + + # Load config if file exists + if self.config_path.exists(): + with open(self.config_path, 'r', encoding='utf-8') as f: + self.config = yaml.safe_load(f) + logger.info(f"Configuration loaded from {self.config_path}") + else: + # Create default config file + self.config = self.default_config + self.save_config() + logger.info(f"Default configuration created at {self.config_path}") + + # Validate and merge with defaults + self.config = self._validate_and_merge_config() + return self.config + + except Exception as e: + logger.error(f"Failed to load configuration: {str(e)}") + raise + + def save_config(self) -> None: + """ + Save current configuration to file. + + Raises: + IOError: If unable to write to config file + """ + try: + with open(self.config_path, 'w', encoding='utf-8') as f: + yaml.dump(self.config, f, default_flow_style=False, indent=2) + logger.info(f"Configuration saved to {self.config_path}") + except Exception as e: + logger.error(f"Failed to save configuration: {str(e)}") + raise + + def _validate_and_merge_config(self) -> Dict[str, Any]: + """ + Validate loaded configuration and merge with defaults for missing values. + + Returns: + Validated and merged configuration dictionary + """ + def deep_merge(source: Dict, destination: Dict) -> Dict: + """Recursively merge source dictionary into destination.""" + for key, value in source.items(): + if key in destination: + if isinstance(value, dict) and isinstance(destination[key], dict): + deep_merge(value, destination[key]) + else: + destination[key] = value + else: + destination[key] = value + return destination + + # Start with default config and merge with loaded config + merged_config = deep_merge(self.default_config, self.config.copy()) + + # Validate required fields + required_sections = ["scraper", "sources", "output"] + for section in required_sections: + if section not in merged_config: + raise ValueError(f"Missing required configuration section: {section}") + + # Validate sources + if not merged_config["sources"]: + raise ValueError("At least one data source must be configured") + + return merged_config + + def get(self, key: str, default: Any = None) -> Any: + """ + Get configuration value by key using dot notation. + + Args: + key: Configuration key (supports dot notation, e.g., "scraper.timeout") + default: Default value if key not found + + Returns: + Configuration value or default + """ + keys = key.split('.') + value = self.config + + try: + for k in keys: + value = value[k] + return value + except (KeyError, TypeError): + return default + + def set(self, key: str, value: Any) -> None: + """ + Set configuration value by key using dot notation. + + Args: + key: Configuration key (supports dot notation, e.g., "scraper.timeout") + value: Value to set + """ + keys = key.split('.') + config_ref = self.config + + # Navigate to parent of target key + for k in keys[:-1]: + if k not in config_ref: + config_ref[k] = {} + config_ref = config_ref[k] + + # Set the value + config_ref[keys[-1]] = value + + def reload(self) -> Dict[str, Any]: + """ + Reload configuration from file. + + Returns: + Updated configuration dictionary + """ + return self.load_config() \ No newline at end of file diff --git a/src/logger.py b/src/logger.py new file mode 100644 index 0000000..e8c6e1c --- /dev/null +++ b/src/logger.py @@ -0,0 +1,119 @@ +""" +Logger Module + +This module provides centralized logging configuration and utilities for the +Market Trends Scraper application. +""" + +import sys +from pathlib import Path +from loguru import logger +from typing import Optional + + +def setup_logger( + log_level: str = "INFO", + log_file: Optional[str] = None, + verbose: bool = False, + rotation: str = "10 MB", + retention: str = "7 days", + compression: str = "zip" +) -> None: + """ + Configure and initialize the application logger. + + Args: + log_level: Minimum logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL) + log_file: Path to log file (if None, logs only to console) + verbose: Enable verbose logging (DEBUG level) + rotation: Log file rotation settings + retention: Log file retention settings + compression: Log file compression format + """ + # Remove default logger + logger.remove() + + # Determine log level + if verbose: + log_level = "DEBUG" + + # Console log format + console_format = ( + "{time:YYYY-MM-DD HH:mm:ss} | " + "{level: <8} | " + "{name}:{function}:{line} | " + "{message}" + ) + + # File log format + file_format = ( + "{time:YYYY-MM-DD HH:mm:ss} | " + "{level: <8} | " + "{name}:{function}:{line} | " + "{message}" + ) + + # Add console logger + logger.add( + sys.stderr, + format=console_format, + level=log_level, + colorize=True + ) + + # Add file logger if log_file is specified + if log_file: + # Create log directory if it doesn't exist + log_path = Path(log_file) + log_path.parent.mkdir(parents=True, exist_ok=True) + + logger.add( + log_file, + format=file_format, + level=log_level, + rotation=rotation, + retention=retention, + compression=compression, + encoding="utf-8" + ) + + # Set up exception handler + def handle_exception(exc_type, exc_value, exc_traceback): + if issubclass(exc_type, KeyboardInterrupt): + sys.__excepthook__(exc_type, exc_value, exc_traceback) + return + + logger.opt(exception=(exc_type, exc_value, exc_traceback)).error( + "Uncaught exception occurred" + ) + + sys.excepthook = handle_exception + + +def get_logger(name: str = None): + """ + Get a logger instance with the specified name. + + Args: + name: Logger name (typically __name__ of the calling module) + + Returns: + Logger instance + """ + if name: + return logger.bind(name=name) + return logger + + +class LoggerMixin: + """ + Mixin class to add logging capabilities to other classes. + """ + + @property + def logger(self): + """Get logger instance for this class.""" + return logger.bind( + name=self.__class__.__name__, + id=id(self) + ) \ No newline at end of file diff --git a/src/scraper.py b/src/scraper.py new file mode 100644 index 0000000..4b75c33 --- /dev/null +++ b/src/scraper.py @@ -0,0 +1,618 @@ +""" +Web Scraper Module + +This module contains the core functionality for scraping e-commerce websites +to collect product and pricing data for market trend analysis. +""" + +import time +import json +import random +from datetime import datetime +from typing import Dict, List, Any, Optional, Union +from pathlib import Path +from urllib.parse import urljoin, urlparse + +import requests +from bs4 import BeautifulSoup +import pandas as pd +from selenium import webdriver +from selenium.webdriver.common.by import By +from selenium.webdriver.support.ui import WebDriverWait +from selenium.webdriver.support import expected_conditions as EC +from selenium.webdriver.chrome.options import Options +from selenium.common.exceptions import TimeoutException, NoSuchElementException + +from logger import LoggerMixin +from config_manager import ConfigManager + + +class MarketTrendsScraper(LoggerMixin): + """ + Main scraper class for collecting market trends data from e-commerce websites. + """ + + def __init__(self, config: Dict[str, Any], headless: bool = True): + """ + Initialize the scraper with configuration. + + Args: + config: Configuration dictionary + headless: Whether to run browser in headless mode + """ + self.config = config + self.driver = None + self.session = requests.Session() + self.data = [] + self._setup_browser(headless) + self._setup_session() + + def _setup_browser(self, headless: bool = True) -> None: + """ + Set up the Selenium WebDriver with appropriate options. + + Args: + headless: Whether to run browser in headless mode + """ + try: + chrome_options = Options() + + # Set headless mode + if headless: + chrome_options.add_argument("--headless") + + # Set window size + window_size = self.config.get("scraper.window_size", [1920, 1080]) + chrome_options.add_argument(f"--window-size={window_size[0]},{window_size[1]}") + + # Add other options for stability + chrome_options.add_argument("--no-sandbox") + chrome_options.add_argument("--disable-dev-shm-usage") + chrome_options.add_argument("--disable-gpu") + chrome_options.add_argument("--disable-web-security") + chrome_options.add_argument("--disable-features=VizDisplayCompositor") + + # Set user agent + user_agent = self.config.get("scraper.user_agent", "") + if user_agent: + chrome_options.add_argument(f"--user-agent={user_agent}") + + # Initialize driver + self.driver = webdriver.Chrome(options=chrome_options) + self.driver.set_page_load_timeout(self.config.get("scraper.timeout", 30)) + + self.logger.info("Browser setup completed") + + except Exception as e: + self.logger.error(f"Failed to setup browser: {str(e)}") + raise + + def _setup_session(self) -> None: + """ + Set up the requests session with appropriate headers. + """ + user_agent = self.config.get("scraper.user_agent", "") + if user_agent: + self.session.headers.update({"User-Agent": user_agent}) + + # Add other headers + self.session.headers.update({ + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", + "Accept-Language": "en-US,en;q=0.5", + "Accept-Encoding": "gzip, deflate", + "Connection": "keep-alive", + "Upgrade-Insecure-Requests": "1" + }) + + self.logger.info("Session setup completed") + + def scrape_market_trends(self) -> List[Dict[str, Any]]: + """ + Scrape market trends data from all configured sources. + + Returns: + List of dictionaries containing scraped data + """ + all_data = [] + sources = self.config.get("sources", []) + + for source in sources: + if not source.get("enabled", True): + self.logger.info(f"Skipping disabled source: {source.get('name', 'Unknown')}") + continue + + self.logger.info(f"Scraping source: {source.get('name', 'Unknown')}") + + try: + source_data = self._scrape_source(source) + all_data.extend(source_data) + self.logger.info(f"Scraped {len(source_data)} records from {source.get('name', 'Unknown')}") + except Exception as e: + self.logger.error(f"Failed to scrape source {source.get('name', 'Unknown')}: {str(e)}") + continue + + return all_data + + def _scrape_source(self, source: Dict[str, Any]) -> List[Dict[str, Any]]: + """ + Scrape data from a specific source. + + Args: + source: Source configuration dictionary + + Returns: + List of dictionaries containing scraped data + """ + source_data = [] + url = source.get("url", "") + selectors = source.get("selectors", {}) + pagination = source.get("pagination", {}) + + if not url: + self.logger.warning(f"No URL configured for source: {source.get('name', 'Unknown')}") + return source_data + + # Determine scraping method + use_selenium = source.get("use_selenium", False) + + if use_selenium: + return self._scrape_with_selenium(source) + else: + return self._scrape_with_requests(source) + + def _scrape_with_requests(self, source: Dict[str, Any]) -> List[Dict[str, Any]]: + """ + Scrape data using requests and BeautifulSoup. + + Args: + source: Source configuration dictionary + + Returns: + List of dictionaries containing scraped data + """ + source_data = [] + url = source.get("url", "") + selectors = source.get("selectors", {}) + pagination = source.get("pagination", {}) + max_pages = pagination.get("max_pages", 1) + + for page in range(1, max_pages + 1): + try: + # Add page parameter if needed + page_url = url + if page > 1: + page_url = f"{url}?page={page}" + + self.logger.debug(f"Scraping page {page}: {page_url}") + + # Make request with retry logic + response = self._make_request_with_retry(page_url) + + if not response: + continue + + # Parse HTML + soup = BeautifulSoup(response.text, 'html.parser') + + # Extract product data + products = soup.select(selectors.get("product", "")) + + for product in products: + product_data = self._extract_product_data(product, selectors) + if product_data: + product_data["source"] = source.get("name", "Unknown") + product_data["scraped_at"] = datetime.now().isoformat() + source_data.append(product_data) + + # Check if there's a next page + if page < max_pages: + next_page = soup.select_one(pagination.get("next_page", "")) + if not next_page: + self.logger.debug(f"No more pages found after page {page}") + break + + # Delay between requests + delay = self.config.get("scraper.delay_between_requests", 1.0) + time.sleep(delay + random.uniform(0, 1)) + + except Exception as e: + self.logger.error(f"Error scraping page {page} from {source.get('name', 'Unknown')}: {str(e)}") + continue + + return source_data + + def _scrape_with_selenium(self, source: Dict[str, Any]) -> List[Dict[str, Any]]: + """ + Scrape data using Selenium WebDriver. + + Args: + source: Source configuration dictionary + + Returns: + List of dictionaries containing scraped data + """ + source_data = [] + url = source.get("url", "") + selectors = source.get("selectors", {}) + pagination = source.get("pagination", {}) + max_pages = pagination.get("max_pages", 1) + + try: + self.driver.get(url) + + for page in range(1, max_pages + 1): + self.logger.debug(f"Scraping page {page} with Selenium") + + # Wait for products to load + try: + WebDriverWait(self.driver, 10).until( + EC.presence_of_element_located((By.CSS_SELECTOR, selectors.get("product", ""))) + ) + except TimeoutException: + self.logger.warning(f"Timeout waiting for products to load on page {page}") + continue + + # Extract product data + products = self.driver.find_elements(By.CSS_SELECTOR, selectors.get("product", "")) + + for product in products: + product_data = self._extract_product_data_selenium(product, selectors) + if product_data: + product_data["source"] = source.get("name", "Unknown") + product_data["scraped_at"] = datetime.now().isoformat() + source_data.append(product_data) + + # Navigate to next page if available + if page < max_pages: + try: + next_button = self.driver.find_element(By.CSS_SELECTOR, pagination.get("next_page", "")) + next_button.click() + + # Wait for page to load + time.sleep(2) + except NoSuchElementException: + self.logger.debug(f"No next page button found after page {page}") + break + + # Delay between requests + delay = self.config.get("scraper.delay_between_requests", 1.0) + time.sleep(delay + random.uniform(0, 1)) + + except Exception as e: + self.logger.error(f"Error scraping with Selenium from {source.get('name', 'Unknown')}: {str(e)}") + + return source_data + + def _make_request_with_retry(self, url: str) -> Optional[requests.Response]: + """ + Make HTTP request with retry logic. + + Args: + url: URL to request + + Returns: + Response object or None if failed + """ + max_retries = self.config.get("scraper.max_retries", 3) + timeout = self.config.get("scraper.timeout", 30) + + for attempt in range(max_retries): + try: + response = self.session.get(url, timeout=timeout) + response.raise_for_status() + return response + except requests.RequestException as e: + self.logger.warning(f"Request attempt {attempt + 1} failed for {url}: {str(e)}") + if attempt < max_retries - 1: + # Exponential backoff + time.sleep((2 ** attempt) + random.uniform(0, 1)) + else: + self.logger.error(f"Max retries exceeded for {url}") + return None + + return None + + def _extract_product_data(self, product: BeautifulSoup, selectors: Dict[str, str]) -> Optional[Dict[str, Any]]: + """ + Extract product data from HTML element using BeautifulSoup. + + Args: + product: BeautifulSoup element containing product data + selectors: Dictionary of CSS selectors + + Returns: + Dictionary containing product data or None if extraction failed + """ + try: + data = {} + + # Extract name + name_element = product.select_one(selectors.get("name", "")) + data["name"] = name_element.get_text(strip=True) if name_element else None + + # Extract price + price_element = product.select_one(selectors.get("price", "")) + if price_element: + price_text = price_element.get_text(strip=True) + data["price"] = self._parse_price(price_text) + else: + data["price"] = None + + # Extract rating + rating_element = product.select_one(selectors.get("rating", "")) + if rating_element: + rating_text = rating_element.get_text(strip=True) + data["rating"] = self._parse_rating(rating_text) + else: + data["rating"] = None + + # Extract availability + availability_element = product.select_one(selectors.get("availability", "")) + data["availability"] = availability_element.get_text(strip=True) if availability_element else None + + # Extract URL if available + link_element = product.select_one("a") + if link_element and link_element.get("href"): + data["url"] = link_element.get("href") + + return data if data.get("name") else None + + except Exception as e: + self.logger.error(f"Error extracting product data: {str(e)}") + return None + + def _extract_product_data_selenium(self, product, selectors: Dict[str, str]) -> Optional[Dict[str, Any]]: + """ + Extract product data from Selenium WebElement. + + Args: + product: Selenium WebElement containing product data + selectors: Dictionary of CSS selectors + + Returns: + Dictionary containing product data or None if extraction failed + """ + try: + data = {} + + # Extract name + try: + name_element = product.find_element(By.CSS_SELECTOR, selectors.get("name", "")) + data["name"] = name_element.text.strip() + except NoSuchElementException: + data["name"] = None + + # Extract price + try: + price_element = product.find_element(By.CSS_SELECTOR, selectors.get("price", "")) + price_text = price_element.text.strip() + data["price"] = self._parse_price(price_text) + except NoSuchElementException: + data["price"] = None + + # Extract rating + try: + rating_element = product.find_element(By.CSS_SELECTOR, selectors.get("rating", "")) + rating_text = rating_element.text.strip() + data["rating"] = self._parse_rating(rating_text) + except NoSuchElementException: + data["rating"] = None + + # Extract availability + try: + availability_element = product.find_element(By.CSS_SELECTOR, selectors.get("availability", "")) + data["availability"] = availability_element.text.strip() + except NoSuchElementException: + data["availability"] = None + + # Extract URL if available + try: + link_element = product.find_element(By.CSS_SELECTOR, "a") + if link_element.get_attribute("href"): + data["url"] = link_element.get_attribute("href") + except NoSuchElementException: + pass + + return data if data.get("name") else None + + except Exception as e: + self.logger.error(f"Error extracting product data with Selenium: {str(e)}") + return None + + def _parse_price(self, price_text: str) -> Optional[float]: + """ + Parse price text to extract numeric value. + + Args: + price_text: Raw price text + + Returns: + Parsed price as float or None if parsing failed + """ + try: + # Remove currency symbols and whitespace + price_clean = price_text.replace('$', '').replace('€', '').replace('£', '').strip() + + # Handle comma as decimal separator + price_clean = price_clean.replace(',', '.') + + # Extract numeric part + import re + price_match = re.search(r'[\d.]+', price_clean) + if price_match: + return float(price_match.group()) + + return None + + except Exception as e: + self.logger.error(f"Error parsing price '{price_text}': {str(e)}") + return None + + def _parse_rating(self, rating_text: str) -> Optional[float]: + """ + Parse rating text to extract numeric value. + + Args: + rating_text: Raw rating text + + Returns: + Parsed rating as float or None if parsing failed + """ + try: + # Extract numeric part + import re + rating_match = re.search(r'[\d.]+', rating_text) + if rating_match: + return float(rating_match.group()) + + return None + + except Exception as e: + self.logger.error(f"Error parsing rating '{rating_text}': {str(e)}") + return None + + def save_data(self, data: List[Dict[str, Any]], output_path: str) -> None: + """ + Save scraped data to file. + + Args: + data: List of dictionaries containing scraped data + output_path: Path to output file + """ + try: + output_file = Path(output_path) + output_file.parent.mkdir(parents=True, exist_ok=True) + + # Convert to DataFrame + df = pd.DataFrame(data) + + # Save based on file extension + if output_path.endswith('.csv'): + df.to_csv(output_path, index=False) + elif output_path.endswith('.json'): + df.to_json(output_path, orient='records', indent=2) + elif output_path.endswith('.xlsx'): + df.to_excel(output_path, index=False) + else: + # Default to CSV + output_path = output_path + '.csv' + df.to_csv(output_path, index=False) + + self.logger.info(f"Data saved to {output_path}") + + except Exception as e: + self.logger.error(f"Error saving data to {output_path}: {str(e)}") + raise + + def analyze_trends(self, data: List[Dict[str, Any]]) -> Dict[str, Any]: + """ + Analyze market trends from scraped data. + + Args: + data: List of dictionaries containing scraped data + + Returns: + Dictionary containing trend analysis results + """ + try: + if not data: + return {"error": "No data available for analysis"} + + # Convert to DataFrame + df = pd.DataFrame(data) + + # Basic statistics + analysis = { + "total_products": len(df), + "sources": df["source"].value_counts().to_dict(), + "price_analysis": {}, + "rating_analysis": {}, + "availability_analysis": {} + } + + # Price analysis + if "price" in df.columns: + price_data = df["price"].dropna() + if not price_data.empty: + analysis["price_analysis"] = { + "average_price": float(price_data.mean()), + "min_price": float(price_data.min()), + "max_price": float(price_data.max()), + "median_price": float(price_data.median()), + "price_distribution": price_data.describe().to_dict() + } + + # Rating analysis + if "rating" in df.columns: + rating_data = df["rating"].dropna() + if not rating_data.empty: + analysis["rating_analysis"] = { + "average_rating": float(rating_data.mean()), + "min_rating": float(rating_data.min()), + "max_rating": float(rating_data.max()), + "rating_distribution": rating_data.value_counts().to_dict() + } + + # Availability analysis + if "availability" in df.columns: + availability_data = df["availability"].dropna() + if not availability_data.empty: + analysis["availability_analysis"] = availability_data.value_counts().to_dict() + + # Price trends by source + if "price" in df.columns and "source" in df.columns: + price_by_source = df.groupby("source")["price"].agg(["mean", "min", "max"]).to_dict() + analysis["price_by_source"] = price_by_source + + self.logger.info("Trend analysis completed") + return analysis + + except Exception as e: + self.logger.error(f"Error analyzing trends: {str(e)}") + return {"error": str(e)} + + def save_analysis(self, analysis: Dict[str, Any], output_path: str) -> None: + """ + Save trend analysis results to file. + + Args: + analysis: Dictionary containing analysis results + output_path: Path to output file + """ + try: + output_file = Path(output_path) + output_file.parent.mkdir(parents=True, exist_ok=True) + + with open(output_file, 'w', encoding='utf-8') as f: + json.dump(analysis, f, indent=2, ensure_ascii=False) + + self.logger.info(f"Analysis saved to {output_path}") + + except Exception as e: + self.logger.error(f"Error saving analysis to {output_path}: {str(e)}") + raise + + def close(self) -> None: + """ + Close browser and session. + """ + try: + if self.driver: + self.driver.quit() + self.driver = None + + if self.session: + self.session.close() + + self.logger.info("Browser and session closed") + + except Exception as e: + self.logger.error(f"Error closing browser/session: {str(e)}") + + def __enter__(self): + """Context manager entry.""" + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Context manager exit.""" + self.close() \ No newline at end of file diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_config_manager.py b/tests/test_config_manager.py new file mode 100644 index 0000000..9001c3b --- /dev/null +++ b/tests/test_config_manager.py @@ -0,0 +1,165 @@ +""" +Unit tests for the ConfigManager module. +""" + +import pytest +import tempfile +import os +import yaml +from pathlib import Path +from unittest.mock import patch, mock_open + +from src.config_manager import ConfigManager + + +class TestConfigManager: + """Test cases for ConfigManager class.""" + + def test_init_with_default_path(self): + """Test ConfigManager initialization with default path.""" + config_manager = ConfigManager() + assert config_manager.config_path == Path("config/config.yaml") + assert isinstance(config_manager.config, dict) + assert isinstance(config_manager.default_config, dict) + + def test_init_with_custom_path(self): + """Test ConfigManager initialization with custom path.""" + custom_path = "custom/config.yaml" + config_manager = ConfigManager(custom_path) + assert config_manager.config_path == Path(custom_path) + + def test_get_default_config(self): + """Test default configuration structure.""" + config_manager = ConfigManager() + default_config = config_manager._get_default_config() + + # Check required sections + assert "scraper" in default_config + assert "sources" in default_config + assert "output" in default_config + assert "database" in default_config + assert "analysis" in default_config + + # Check some default values + assert default_config["scraper"]["delay_between_requests"] == 1.0 + assert default_config["scraper"]["timeout"] == 30 + assert default_config["scraper"]["headless"] is True + assert isinstance(default_config["sources"], list) + assert len(default_config["sources"]) > 0 + + @patch('builtins.open', new_callable=mock_open, read_data="scraper:\n timeout: 60") + @patch('pathlib.Path.exists') + def test_load_config_existing_file(self, mock_exists, mock_file): + """Test loading configuration from existing file.""" + mock_exists.return_value = True + + config_manager = ConfigManager() + config = config_manager.load_config() + + mock_file.assert_called_once() + assert config["scraper"]["timeout"] == 60 + + @patch('builtins.open', new_callable=mock_open) + @patch('pathlib.Path.exists') + def test_load_config_create_default(self, mock_exists, mock_file): + """Test creating default configuration when file doesn't exist.""" + mock_exists.return_value = False + + config_manager = ConfigManager() + config = config_manager.load_config() + + # Verify file was created + mock_file.assert_called_once() + # Verify config is default + assert config == config_manager.default_config + + @patch('builtins.open', new_callable=mock_open) + def test_save_config(self, mock_file): + """Test saving configuration to file.""" + config_manager = ConfigManager() + config_manager.config = {"test": "value"} + + config_manager.save_config() + + mock_file.assert_called_once() + # Verify yaml.dump was called with correct arguments + with patch('yaml.dump') as mock_dump: + config_manager.save_config() + mock_dump.assert_called_once() + + def test_validate_and_merge_config(self): + """Test configuration validation and merging.""" + config_manager = ConfigManager() + + # Test with partial config + partial_config = { + "scraper": { + "timeout": 60 + } + } + config_manager.config = partial_config + + merged = config_manager._validate_and_merge_config() + + # Should have all sections + assert "sources" in merged + assert "output" in merged + # Should have updated value + assert merged["scraper"]["timeout"] == 60 + # Should have default values for missing keys + assert merged["scraper"]["delay_between_requests"] == 1.0 + + def test_validate_and_merge_config_missing_required(self): + """Test validation fails when required sections are missing.""" + config_manager = ConfigManager() + config_manager.config = {"invalid": "config"} + + with pytest.raises(ValueError, match="Missing required configuration section"): + config_manager._validate_and_merge_config() + + def test_validate_and_merge_config_no_sources(self): + """Test validation fails when no sources are configured.""" + config_manager = ConfigManager() + config_manager.config = { + "scraper": {}, + "sources": [], + "output": {} + } + + with pytest.raises(ValueError, match="At least one data source must be configured"): + config_manager._validate_and_merge_config() + + def test_get_with_dot_notation(self): + """Test getting configuration values with dot notation.""" + config_manager = ConfigManager() + config_manager.config = { + "scraper": { + "timeout": 60, + "nested": { + "value": "test" + } + } + } + + assert config_manager.get("scraper.timeout") == 60 + assert config_manager.get("scraper.nested.value") == "test" + assert config_manager.get("nonexistent", "default") == "default" + + def test_set_with_dot_notation(self): + """Test setting configuration values with dot notation.""" + config_manager = ConfigManager() + config_manager.config = {"scraper": {}} + + config_manager.set("scraper.timeout", 60) + config_manager.set("new.nested.value", "test") + + assert config_manager.config["scraper"]["timeout"] == 60 + assert config_manager.config["new"]["nested"]["value"] == "test" + + @patch.object(ConfigManager, 'load_config') + def test_reload(self, mock_load): + """Test reloading configuration.""" + config_manager = ConfigManager() + config_manager.reload() + + mock_load.assert_called_once() \ No newline at end of file diff --git a/tests/test_integration.py b/tests/test_integration.py new file mode 100644 index 0000000..b5b09a4 --- /dev/null +++ b/tests/test_integration.py @@ -0,0 +1,355 @@ +""" +Integration tests for the Market Trends Scraper application. + +These tests verify that all components work together correctly. +""" + +import pytest +import tempfile +import os +import json +from pathlib import Path +from unittest.mock import patch, Mock + +from src.config_manager import ConfigManager +from src.scraper import MarketTrendsScraper +from src.logger import setup_logger +import main + + +class TestIntegration: + """Integration test cases for the entire application.""" + + @pytest.fixture + def temp_dir(self): + """Create a temporary directory for test files.""" + with tempfile.TemporaryDirectory() as temp_dir: + yield temp_dir + + @pytest.fixture + def sample_config_file(self, temp_dir): + """Create a sample configuration file for testing.""" + config_path = Path(temp_dir) / "config.yaml" + config_content = { + "scraper": { + "delay_between_requests": 0.1, # Faster for testing + "timeout": 10, + "max_retries": 2, + "user_agent": "Mozilla/5.0 (Test)", + "headless": True, + "window_size": [1024, 768] + }, + "sources": [ + { + "name": "test_source", + "url": "https://httpbin.org/html", + "type": "ecommerce", + "enabled": True, + "selectors": { + "product": "p", + "name": "p", + "price": "p", + "rating": "p", + "availability": "p" + }, + "pagination": { + "next_page": "a", + "max_pages": 1 + } + } + ], + "output": { + "format": "csv", + "include_timestamp": True, + "filename": "test_output" + }, + "database": { + "url": f"sqlite:///{temp_dir}/test.db", + "echo": False + }, + "analysis": { + "price_history_days": 30, + "trend_threshold": 0.05, + "generate_charts": True + } + } + + import yaml + with open(config_path, 'w') as f: + yaml.dump(config_content, f) + + return str(config_path) + + @patch('src.scraper.webdriver') + def test_full_workflow(self, mock_webdriver, sample_config_file, temp_dir): + """Test the complete workflow from config loading to analysis.""" + # Setup mock driver + mock_driver = Mock() + mock_webdriver.Chrome.return_value = mock_driver + + # Setup logger + setup_logger() + + # Load configuration + config_manager = ConfigManager(sample_config_file) + config = config_manager.load_config() + + # Verify config was loaded + assert config["scraper"]["delay_between_requests"] == 0.1 + assert len(config["sources"]) == 1 + assert config["sources"][0]["name"] == "test_source" + + # Initialize scraper + scraper = MarketTrendsScraper(config, headless=True) + + # Mock the scraping process to return sample data + sample_data = [ + { + "name": "Test Product 1", + "price": 19.99, + "rating": 4.5, + "availability": "In Stock", + "source": "test_source", + "scraped_at": "2023-01-01T00:00:00" + }, + { + "name": "Test Product 2", + "price": 29.99, + "rating": 3.8, + "availability": "Out of Stock", + "source": "test_source", + "scraped_at": "2023-01-01T00:00:00" + } + ] + + with patch.object(scraper, '_scrape_source', return_value=sample_data): + # Scrape data + data = scraper.scrape_market_trends() + + # Verify data was scraped + assert len(data) == 2 + assert data[0]["name"] == "Test Product 1" + assert data[1]["price"] == 29.99 + + # Save data + output_path = Path(temp_dir) / "test_output.csv" + scraper.save_data(data, str(output_path)) + + # Verify file was created + assert output_path.exists() + + # Analyze trends + analysis = scraper.analyze_trends(data) + + # Verify analysis + assert analysis["total_products"] == 2 + assert "price_analysis" in analysis + assert analysis["price_analysis"]["average_price"] == 24.99 + assert analysis["price_analysis"]["min_price"] == 19.99 + assert analysis["price_analysis"]["max_price"] == 29.99 + + # Save analysis + analysis_path = Path(temp_dir) / "test_analysis.json" + scraper.save_analysis(analysis, str(analysis_path)) + + # Verify analysis file was created + assert analysis_path.exists() + + # Verify analysis content + with open(analysis_path, 'r') as f: + saved_analysis = json.load(f) + + assert saved_analysis["total_products"] == 2 + assert saved_analysis["price_analysis"]["average_price"] == 24.99 + + @patch('src.scraper.webdriver') + def test_multiple_sources(self, mock_webdriver, temp_dir): + """Test scraping from multiple sources.""" + # Setup mock driver + mock_driver = Mock() + mock_webdriver.Chrome.return_value = mock_driver + + # Create config with multiple sources + config_content = { + "scraper": { + "delay_between_requests": 0.1, + "timeout": 10, + "headless": True + }, + "sources": [ + { + "name": "source_1", + "url": "https://example1.com", + "enabled": True, + "selectors": { + "product": "div.product", + "name": "h2", + "price": "span.price" + } + }, + { + "name": "source_2", + "url": "https://example2.com", + "enabled": True, + "selectors": { + "product": "div.item", + "name": "h3", + "price": "div.cost" + } + }, + { + "name": "source_3", + "url": "https://example3.com", + "enabled": False, # Disabled source + "selectors": { + "product": "div.product", + "name": "h2", + "price": "span.price" + } + } + ] + } + + import yaml + config_path = Path(temp_dir) / "multi_source_config.yaml" + with open(config_path, 'w') as f: + yaml.dump(config_content, f) + + # Initialize scraper + scraper = MarketTrendsScraper(config_content, headless=True) + + # Mock different data for each source + def mock_scrape_source(source): + if source["name"] == "source_1": + return [{"name": "Product 1", "price": 10.00, "source": "source_1"}] + elif source["name"] == "source_2": + return [{"name": "Product 2", "price": 20.00, "source": "source_2"}] + else: + return [] + + with patch.object(scraper, '_scrape_source', side_effect=mock_scrape_source): + data = scraper.scrape_market_trends() + + # Verify data from both enabled sources + assert len(data) == 2 + sources = {item["source"] for item in data} + assert "source_1" in sources + assert "source_2" in sources + assert "source_3" not in sources # Disabled source should not appear + + @patch('src.scraper.webdriver') + def test_error_handling(self, mock_webdriver, temp_dir): + """Test error handling when scraping fails.""" + # Setup mock driver + mock_driver = Mock() + mock_webdriver.Chrome.return_value = mock_driver + + config = { + "scraper": { + "delay_between_requests": 0.1, + "timeout": 10, + "headless": True + }, + "sources": [ + { + "name": "working_source", + "url": "https://example.com", + "enabled": True, + "selectors": { + "product": "div.product", + "name": "h2", + "price": "span.price" + } + }, + { + "name": "failing_source", + "url": "https://example.com", + "enabled": True, + "selectors": { + "product": "div.product", + "name": "h2", + "price": "span.price" + } + } + ] + } + + # Initialize scraper + scraper = MarketTrendsScraper(config, headless=True) + + # Mock one source to succeed and one to fail + def mock_scrape_source(source): + if source["name"] == "working_source": + return [{"name": "Working Product", "price": 15.00, "source": "working_source"}] + else: + raise Exception("Scraping failed") + + with patch.object(scraper, '_scrape_source', side_effect=mock_scrape_source): + data = scraper.scrape_market_trends() + + # Should still get data from working source + assert len(data) == 1 + assert data[0]["source"] == "working_source" + + @patch('sys.argv', ['main.py', '--config', 'test_config.yaml', '--output', 'test_output.csv']) + @patch('src.scraper.webdriver') + def test_main_entry_point(self, mock_webdriver, temp_dir): + """Test the main entry point of the application.""" + # Setup mock driver + mock_driver = Mock() + mock_webdriver.Chrome.return_value = mock_driver + + # Create test config + config_path = Path(temp_dir) / "test_config.yaml" + config_content = { + "scraper": { + "delay_between_requests": 0.1, + "timeout": 10, + "headless": True + }, + "sources": [ + { + "name": "test_source", + "url": "https://example.com", + "enabled": True, + "selectors": { + "product": "div.product", + "name": "h2", + "price": "span.price" + } + } + ] + } + + import yaml + with open(config_path, 'w') as f: + yaml.dump(config_content, f) + + # Mock the scraper to return sample data + sample_data = [{"name": "Test Product", "price": 19.99, "source": "test_source"}] + + with patch('main.ConfigManager') as mock_config_manager, \ + patch('main.MarketTrendsScraper') as mock_scraper_class: + + # Setup mocks + mock_config_instance = Mock() + mock_config_manager.return_value = mock_config_instance + mock_config_instance.load_config.return_value = config_content + + mock_scraper_instance = Mock() + mock_scraper_class.return_value = mock_scraper_instance + mock_scraper_instance.scrape_market_trends.return_value = sample_data + mock_scraper_instance.analyze_trends.return_value = {"total_products": 1} + + # Run main function + with patch('sys.argv', ['main.py', '--config', str(config_path), '--output', str(temp_dir / 'output.csv')]): + result = main.main() + + # Verify main completed successfully + assert result == 0 + + # Verify scraper was called + mock_scraper_instance.scrape_market_trends.assert_called_once() + mock_scraper_instance.save_data.assert_called_once() + mock_scraper_instance.analyze_trends.assert_called_once() + mock_scraper_instance.save_analysis.assert_called_once() \ No newline at end of file diff --git a/tests/test_logger.py b/tests/test_logger.py new file mode 100644 index 0000000..fb9a8a9 --- /dev/null +++ b/tests/test_logger.py @@ -0,0 +1,165 @@ +""" +Unit tests for the Logger module. +""" + +import pytest +import sys +import tempfile +import os +from pathlib import Path +from unittest.mock import patch, MagicMock +from loguru import logger + +from src.logger import setup_logger, get_logger, LoggerMixin + + +class TestLoggerSetup: + """Test cases for logger setup functionality.""" + + def test_setup_logger_default(self): + """Test logger setup with default parameters.""" + with patch('loguru.logger.remove') as mock_remove, \ + patch('loguru.logger.add') as mock_add: + + setup_logger() + + # Verify default logger was removed + mock_remove.assert_called_once() + + # Verify console logger was added + assert mock_add.call_count == 1 + console_call = mock_add.call_args_list[0] + assert console_call[0][0] == sys.stderr + assert "level" in console_call[1] + assert console_call[1]["colorize"] is True + + def test_setup_logger_with_file(self): + """Test logger setup with file output.""" + with tempfile.NamedTemporaryFile(delete=False) as tmp_file: + log_file = tmp_file.name + + try: + with patch('loguru.logger.remove') as mock_remove, \ + patch('loguru.logger.add') as mock_add: + + setup_logger(log_file=log_file) + + # Verify both console and file loggers were added + assert mock_add.call_count == 2 + + # Check file logger call + file_call = mock_add.call_args_list[1] + assert file_call[0][0] == log_file + assert "rotation" in file_call[1] + assert "retention" in file_call[1] + assert "compression" in file_call[1] + finally: + # Clean up + if os.path.exists(log_file): + os.unlink(log_file) + + def test_setup_logger_verbose(self): + """Test logger setup with verbose mode.""" + with patch('loguru.logger.remove') as mock_remove, \ + patch('loguru.logger.add') as mock_add: + + setup_logger(verbose=True) + + # Verify DEBUG level was set + console_call = mock_add.call_args_list[0] + assert console_call[1]["level"] == "DEBUG" + + def test_setup_logger_custom_level(self): + """Test logger setup with custom log level.""" + with patch('loguru.logger.remove') as mock_remove, \ + patch('loguru.logger.add') as mock_add: + + setup_logger(log_level="WARNING") + + # Verify WARNING level was set + console_call = mock_add.call_args_list[0] + assert console_call[1]["level"] == "WARNING" + + def test_setup_logger_custom_rotation(self): + """Test logger setup with custom rotation settings.""" + with tempfile.NamedTemporaryFile(delete=False) as tmp_file: + log_file = tmp_file.name + + try: + with patch('loguru.logger.remove') as mock_remove, \ + patch('loguru.logger.add') as mock_add: + + setup_logger(log_file=log_file, rotation="100 MB") + + # Verify custom rotation was set + file_call = mock_add.call_args_list[1] + assert file_call[1]["rotation"] == "100 MB" + finally: + # Clean up + if os.path.exists(log_file): + os.unlink(log_file) + + def test_setup_logger_exception_handler(self): + """Test that exception handler is set up.""" + with patch('loguru.logger.remove') as mock_remove, \ + patch('loguru.logger.add') as mock_add, \ + patch('sys.excepthook') as mock_excepthook: + + setup_logger() + + # Verify exception handler was set + assert mock_excepthook is not None + assert callable(mock_excepthook) + + +class TestGetLogger: + """Test cases for get_logger function.""" + + def test_get_logger_no_name(self): + """Test getting logger without name.""" + with patch('loguru.logger.bind') as mock_bind: + get_logger() + mock_bind.assert_called_once() + + def test_get_logger_with_name(self): + """Test getting logger with name.""" + with patch('loguru.logger.bind') as mock_bind: + get_logger("test_module") + mock_bind.assert_called_once_with(name="test_module") + + +class TestLoggerMixin: + """Test cases for LoggerMixin class.""" + + def test_logger_property(self): + """Test logger property in mixin.""" + + class TestClass(LoggerMixin): + pass + + with patch('loguru.logger.bind') as mock_bind: + test_obj = TestClass() + _ = test_obj.logger + + # Verify logger was bound with class name and object id + mock_bind.assert_called_once() + args, kwargs = mock_bind.call_args + assert kwargs["name"] == "TestClass" + assert "id" in kwargs + + def test_logger_mixin_inheritance(self): + """Test that logger mixin works with inheritance.""" + + class ParentClass(LoggerMixin): + pass + + class ChildClass(ParentClass): + pass + + with patch('loguru.logger.bind') as mock_bind: + child_obj = ChildClass() + _ = child_obj.logger + + # Verify logger was bound with child class name + args, kwargs = mock_bind.call_args + assert kwargs["name"] == "ChildClass" \ No newline at end of file diff --git a/tests/test_scraper.py b/tests/test_scraper.py new file mode 100644 index 0000000..803d480 --- /dev/null +++ b/tests/test_scraper.py @@ -0,0 +1,371 @@ +""" +Unit tests for the Scraper module. +""" + +import pytest +import json +from unittest.mock import Mock, patch, MagicMock +from bs4 import BeautifulSoup +import pandas as pd + +from src.scraper import MarketTrendsScraper + + +class TestMarketTrendsScraper: + """Test cases for MarketTrendsScraper class.""" + + @pytest.fixture + def sample_config(self): + """Sample configuration for testing.""" + return { + "scraper": { + "delay_between_requests": 1.0, + "timeout": 30, + "max_retries": 3, + "user_agent": "Mozilla/5.0", + "headless": True, + "window_size": [1920, 1080] + }, + "sources": [ + { + "name": "test_source", + "url": "https://example.com/products", + "type": "ecommerce", + "enabled": True, + "selectors": { + "product": "div.product", + "name": "h2.title", + "price": "span.price", + "rating": "div.rating", + "availability": "div.stock" + }, + "pagination": { + "next_page": "a.next", + "max_pages": 2 + } + } + ], + "output": { + "format": "csv", + "include_timestamp": True, + "filename": "market_trends_data" + } + } + + @pytest.fixture + def sample_html(self): + """Sample HTML for testing.""" + return """ + + +
+

Test Product 1

+ $19.99 +
4.5 stars
+
In Stock
+ View +
+
+

Test Product 2

+ €29.99 +
3.8 stars
+
Out of Stock
+ View +
+ + + + """ + + @patch('src.scraper.webdriver') + def test_init(self, mock_webdriver, sample_config): + """Test scraper initialization.""" + mock_driver = Mock() + mock_webdriver.Chrome.return_value = mock_driver + + scraper = MarketTrendsScraper(sample_config) + + assert scraper.config == sample_config + assert scraper.driver == mock_driver + assert scraper.session is not None + assert scraper.data == [] + + # Verify browser setup + mock_webdriver.Chrome.assert_called_once() + mock_driver.set_page_load_timeout.assert_called_with(30) + + @patch('src.scraper.webdriver') + def test_setup_browser(self, mock_webdriver, sample_config): + """Test browser setup.""" + mock_driver = Mock() + mock_webdriver.Chrome.return_value = mock_driver + + scraper = MarketTrendsScraper(sample_config) + + # Verify options were set + call_args = mock_webdriver.Chrome.call_args + options = call_args[1]['options'] + + assert options.arguments is not None + assert any("--headless" in arg for arg in options.arguments) + assert any("--window-size=1920,1080" in arg for arg in options.arguments) + + @patch('src.scraper.webdriver') + def test_setup_session(self, mock_webdriver, sample_config): + """Test session setup.""" + scraper = MarketTrendsScraper(sample_config) + + # Verify headers were set + assert "User-Agent" in scraper.session.headers + assert scraper.session.headers["User-Agent"] == sample_config["scraper"]["user_agent"] + assert "Accept" in scraper.session.headers + + @patch('src.scraper.webdriver') + def test_parse_price(self, mock_webdriver, sample_config): + """Test price parsing.""" + scraper = MarketTrendsScraper(sample_config) + + # Test various price formats + assert scraper._parse_price("$19.99") == 19.99 + assert scraper._parse_price("€29.99") == 29.99 + assert scraper._parse_price("£39.99") == 39.99 + assert scraper._parse_price("19,99") == 19.99 + assert scraper._parse_price("Price: $49.99 USD") == 49.99 + assert scraper._parse_price("Invalid price") is None + assert scraper._parse_price("") is None + + @patch('src.scraper.webdriver') + def test_parse_rating(self, mock_webdriver, sample_config): + """Test rating parsing.""" + scraper = MarketTrendsScraper(sample_config) + + # Test various rating formats + assert scraper._parse_rating("4.5 stars") == 4.5 + assert scraper._parse_rating("Rating: 3.8/5") == 3.8 + assert scraper._parse_rating("5 stars") == 5.0 + assert scraper._parse_rating("Invalid rating") is None + assert scraper._parse_rating("") is None + + @patch('src.scraper.webdriver') + def test_extract_product_data(self, mock_webdriver, sample_config, sample_html): + """Test product data extraction from HTML.""" + scraper = MarketTrendsScraper(sample_config) + + soup = BeautifulSoup(sample_html, 'html.parser') + product = soup.find('div', class_='product') + selectors = sample_config["sources"][0]["selectors"] + + data = scraper._extract_product_data(product, selectors) + + assert data is not None + assert data["name"] == "Test Product 1" + assert data["price"] == 19.99 + assert data["rating"] == 4.5 + assert data["availability"] == "In Stock" + assert data["url"] == "/product/1" + + @patch('src.scraper.webdriver') + def test_extract_product_data_no_name(self, mock_webdriver, sample_config): + """Test product data extraction when name is missing.""" + scraper = MarketTrendsScraper(sample_config) + + html = """ +
+ $19.99 +
+ """ + soup = BeautifulSoup(html, 'html.parser') + product = soup.find('div', class_='product') + selectors = sample_config["sources"][0]["selectors"] + + data = scraper._extract_product_data(product, selectors) + + assert data is None # Should return None when name is missing + + @patch('src.scraper.webdriver') + def test_make_request_with_retry_success(self, mock_webdriver, sample_config): + """Test successful HTTP request with retry logic.""" + scraper = MarketTrendsScraper(sample_config) + + mock_response = Mock() + mock_response.status_code = 200 + mock_response.text = "Success" + + with patch.object(scraper.session, 'get', return_value=mock_response) as mock_get: + response = scraper._make_request_with_retry("https://example.com") + + assert response == mock_response + mock_get.assert_called_once() + + @patch('src.scraper.webdriver') + def test_make_request_with_retry_failure(self, mock_webdriver, sample_config): + """Test HTTP request failure with retry logic.""" + scraper = MarketTrendsScraper(sample_config) + + with patch.object(scraper.session, 'get', side_effect=Exception("Connection error")): + response = scraper._make_request_with_retry("https://example.com") + + assert response is None + + @patch('src.scraper.webdriver') + @patch('src.scraper.MarketTrendsScraper._scrape_source') + def test_scrape_market_trends(self, mock_scrape_source, mock_webdriver, sample_config): + """Test scraping market trends from multiple sources.""" + scraper = MarketTrendsScraper(sample_config) + + # Mock source data + mock_scrape_source.side_effect = [ + [{"name": "Product 1", "price": 19.99}], + [{"name": "Product 2", "price": 29.99}] + ] + + # Add second source + sample_config["sources"].append({ + "name": "test_source_2", + "url": "https://example2.com/products", + "enabled": True, + "selectors": { + "product": "div.product", + "name": "h2.title", + "price": "span.price" + } + }) + + data = scraper.scrape_market_trends() + + assert len(data) == 2 + assert data[0]["name"] == "Product 1" + assert data[1]["name"] == "Product 2" + assert mock_scrape_source.call_count == 2 + + @patch('src.scraper.webdriver') + @patch('src.scraper.MarketTrendsScraper._scrape_with_requests') + def test_scrape_source_with_requests(self, mock_scrape_requests, mock_webdriver, sample_config): + """Test scraping a source using requests.""" + scraper = MarketTrendsScraper(sample_config) + + mock_scrape_requests.return_value = [{"name": "Test Product", "price": 19.99}] + + source = sample_config["sources"][0] + data = scraper._scrape_source(source) + + assert len(data) == 1 + assert data[0]["name"] == "Test Product" + mock_scrape_requests.assert_called_once_with(source) + + @patch('src.scraper.webdriver') + @patch('src.scraper.MarketTrendsScraper._scrape_with_selenium') + def test_scrape_source_with_selenium(self, mock_scrape_selenium, mock_webdriver, sample_config): + """Test scraping a source using Selenium.""" + scraper = MarketTrendsScraper(sample_config) + + mock_scrape_selenium.return_value = [{"name": "Test Product", "price": 19.99}] + + # Configure source to use Selenium + source = sample_config["sources"][0] + source["use_selenium"] = True + + data = scraper._scrape_source(source) + + assert len(data) == 1 + assert data[0]["name"] == "Test Product" + mock_scrape_selenium.assert_called_once_with(source) + + @patch('src.scraper.webdriver') + @patch('builtins.open', new_callable=Mock) + def test_save_data_csv(self, mock_open, mock_webdriver, sample_config): + """Test saving data to CSV file.""" + scraper = MarketTrendsScraper(sample_config) + + data = [ + {"name": "Product 1", "price": 19.99, "source": "Test"}, + {"name": "Product 2", "price": 29.99, "source": "Test"} + ] + + with patch.object(pd.DataFrame, 'to_csv') as mock_to_csv: + scraper.save_data(data, "test_output.csv") + mock_to_csv.assert_called_once_with("test_output.csv", index=False) + + @patch('src.scraper.webdriver') + @patch('builtins.open', new_callable=Mock) + def test_save_data_json(self, mock_open, mock_webdriver, sample_config): + """Test saving data to JSON file.""" + scraper = MarketTrendsScraper(sample_config) + + data = [ + {"name": "Product 1", "price": 19.99, "source": "Test"}, + {"name": "Product 2", "price": 29.99, "source": "Test"} + ] + + with patch.object(pd.DataFrame, 'to_json') as mock_to_json: + scraper.save_data(data, "test_output.json") + mock_to_json.assert_called_once() + + @patch('src.scraper.webdriver') + def test_analyze_trends(self, mock_webdriver, sample_config): + """Test trend analysis.""" + scraper = MarketTrendsScraper(sample_config) + + data = [ + {"name": "Product 1", "price": 19.99, "rating": 4.5, "source": "Source A"}, + {"name": "Product 2", "price": 29.99, "rating": 3.8, "source": "Source A"}, + {"name": "Product 3", "price": 39.99, "rating": 4.2, "source": "Source B"}, + {"name": "Product 4", "price": 49.99, "rating": 4.7, "source": "Source B"} + ] + + analysis = scraper.analyze_trends(data) + + assert analysis["total_products"] == 4 + assert "price_analysis" in analysis + assert "rating_analysis" in analysis + assert "sources" in analysis + assert analysis["sources"]["Source A"] == 2 + assert analysis["sources"]["Source B"] == 2 + assert analysis["price_analysis"]["average_price"] == 34.99 + assert analysis["price_analysis"]["min_price"] == 19.99 + assert analysis["price_analysis"]["max_price"] == 49.99 + assert analysis["rating_analysis"]["average_rating"] == 4.3 + + @patch('src.scraper.webdriver') + def test_analyze_trends_empty_data(self, mock_webdriver, sample_config): + """Test trend analysis with empty data.""" + scraper = MarketTrendsScraper(sample_config) + + analysis = scraper.analyze_trends([]) + + assert "error" in analysis + assert analysis["error"] == "No data available for analysis" + + @patch('src.scraper.webdriver') + @patch('builtins.open', new_callable=Mock) + def test_save_analysis(self, mock_open, mock_webdriver, sample_config): + """Test saving analysis results.""" + scraper = MarketTrendsScraper(sample_config) + + analysis = {"total_products": 4, "average_price": 34.99} + + with patch('json.dump') as mock_json_dump: + scraper.save_analysis(analysis, "test_analysis.json") + mock_json_dump.assert_called_once() + + @patch('src.scraper.webdriver') + def test_close(self, mock_webdriver, sample_config): + """Test closing browser and session.""" + mock_driver = Mock() + mock_webdriver.Chrome.return_value = mock_driver + + scraper = MarketTrendsScraper(sample_config) + scraper.close() + + mock_driver.quit.assert_called_once() + + @patch('src.scraper.webdriver') + def test_context_manager(self, mock_webdriver, sample_config): + """Test using scraper as context manager.""" + mock_driver = Mock() + mock_webdriver.Chrome.return_value = mock_driver + + with MarketTrendsScraper(sample_config) as scraper: + assert scraper is not None + + mock_driver.quit.assert_called_once() \ No newline at end of file