Initial commit: Market Trends Scraper

This commit is contained in:
Dev
2025-09-11 17:46:14 +03:00
commit 4ddcde68d4
17 changed files with 3049 additions and 0 deletions

21
LICENSE Normal file
View File

@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2023 Market Trends Scraper
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

340
README.md Normal file
View File

@@ -0,0 +1,340 @@
# Market Trends Scraper
A powerful and flexible Python web scraper for collecting and analyzing pricing and product trends from e-commerce websites. This tool provides comprehensive market insights by extracting product data, tracking price changes, and generating detailed trend analyses.
## Features
- **Multi-Source Scraping**: Collect data from multiple e-commerce websites simultaneously
- **Flexible Configuration**: Easy-to-use YAML configuration for different sources and scraping parameters
- **Dual Scraping Methods**: Supports both requests/BeautifulSoup and Selenium for dynamic content
- **Data Analysis**: Built-in analysis of pricing trends, ratings, and product availability
- **Multiple Output Formats**: Save data in CSV, JSON, or Excel formats
- **Robust Error Handling**: Comprehensive error handling and retry mechanisms
- **Professional Logging**: Detailed logging with configurable levels and outputs
- **Extensible Architecture**: Modular design for easy customization and extension
## Installation
### Prerequisites
- Python 3.8 or higher
- Chrome browser (for Selenium functionality)
- ChromeDriver (compatible with your Chrome version)
### Setup
1. Clone the repository:
```bash
git clone https://github.com/iwasforcedtobehere/market-trends-scraper.git
cd market-trends-scraper
```
2. Create and activate a virtual environment:
```bash
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
```
3. Install dependencies:
```bash
pip install -r requirements.txt
```
4. Install ChromeDriver (if using Selenium):
```bash
# For Ubuntu/Debian:
sudo apt-get install chromium-chromedriver
# For macOS using Homebrew:
brew install chromedriver
# For Windows, download from: https://chromedriver.chromium.org/
```
## Configuration
The scraper uses a YAML configuration file to define scraping sources and parameters. A default configuration will be created automatically at `config/config.yaml` when you first run the scraper.
### Example Configuration
```yaml
scraper:
delay_between_requests: 1.0
timeout: 30
max_retries: 3
user_agent: "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
headless: true
window_size: [1920, 1080]
sources:
- name: "example_ecommerce"
url: "https://example-ecommerce.com/search"
type: "ecommerce"
enabled: true
use_selenium: false
selectors:
product: "div.product-item"
name: "h2.product-title"
price: "span.price"
rating: "div.rating"
availability: "div.stock-status"
pagination:
next_page: "a.next-page"
max_pages: 10
output:
format: "csv"
include_timestamp: true
filename: "market_trends_data"
database:
url: "sqlite:///data/market_trends.db"
echo: false
analysis:
price_history_days: 30
trend_threshold: 0.05
generate_charts: true
```
### Configuration Options
#### Scraper Settings
- `delay_between_requests`: Delay between requests in seconds (default: 1.0)
- `timeout`: Request timeout in seconds (default: 30)
- `max_retries`: Maximum number of retry attempts for failed requests (default: 3)
- `user_agent`: User agent string for HTTP requests
- `headless`: Run browser in headless mode (default: true)
- `window_size`: Browser window size as [width, height] (default: [1920, 1080])
#### Source Configuration
- `name`: Unique identifier for the data source
- `url`: Base URL for scraping
- `type`: Type of website (e.g., "ecommerce")
- `enabled`: Whether to scrape this source (default: true)
- `use_selenium`: Use Selenium instead of requests (default: false)
- `selectors`: CSS selectors for extracting data
- `pagination`: Pagination settings
#### Output Settings
- `format`: Output format ("csv", "json", or "excel")
- `include_timestamp`: Include timestamp in output filename (default: true)
- `filename`: Base filename for output files
#### Analysis Settings
- `price_history_days`: Number of days to consider for price history (default: 30)
- `trend_threshold`: Minimum price change percentage to consider as a trend (default: 0.05)
- `generate_charts`: Generate trend charts (default: true)
## Usage
### Command Line Interface
Run the scraper with default settings:
```bash
python main.py
```
Specify a custom configuration file:
```bash
python main.py --config path/to/config.yaml
```
Specify output file:
```bash
python main.py --output path/to/output.csv
```
Run in verbose mode:
```bash
python main.py --verbose
```
Run browser in non-headless mode (for debugging):
```bash
python main.py --no-headless
```
### Python API
```python
from src.config_manager import ConfigManager
from src.scraper import MarketTrendsScraper
from src.logger import setup_logger
# Setup logging
setup_logger(verbose=True)
# Load configuration
config_manager = ConfigManager("config/config.yaml")
config = config_manager.load_config()
# Initialize scraper
with MarketTrendsScraper(config, headless=True) as scraper:
# Scrape data
data = scraper.scrape_market_trends()
# Save data
scraper.save_data(data, "output.csv")
# Analyze trends
analysis = scraper.analyze_trends(data)
# Save analysis
scraper.save_analysis(analysis, "analysis.json")
```
## Output
### Data Output
The scraper produces structured data with the following fields:
| Field | Description |
|-------|-------------|
| name | Product name |
| price | Product price (as float) |
| rating | Product rating (as float) |
| availability | Product availability status |
| url | Product URL |
| source | Data source name |
| scraped_at | Timestamp when data was scraped |
### Analysis Output
The trend analysis includes:
- **Summary Statistics**: Total products, source distribution
- **Price Analysis**: Average, min, max, median prices and distribution
- **Rating Analysis**: Average, min, max ratings and distribution
- **Availability Analysis**: Count of products by availability status
- **Price Trends by Source**: Comparative analysis across sources
Example analysis output:
```json
{
"total_products": 150,
"sources": {
"example_ecommerce": 100,
"another_store": 50
},
"price_analysis": {
"average_price": 49.99,
"min_price": 9.99,
"max_price": 199.99,
"median_price": 45.00,
"price_distribution": {
"count": 150,
"mean": 49.99,
"std": 35.25,
"min": 9.99,
"25%": 25.00,
"50%": 45.00,
"75%": 75.00,
"max": 199.99
}
},
"rating_analysis": {
"average_rating": 4.2,
"min_rating": 1.0,
"max_rating": 5.0,
"rating_distribution": {
"5.0": 45,
"4.0": 60,
"3.0": 30,
"2.0": 10,
"1.0": 5
}
}
}
```
## Testing
Run all tests:
```bash
pytest
```
Run unit tests only:
```bash
pytest -m unit
```
Run integration tests only:
```bash
pytest -m integration
```
Run tests with coverage report:
```bash
pytest --cov=src --cov-report=html
```
## Project Structure
```
market-trends-scraper/
├── src/
│ ├── __init__.py
│ ├── config_manager.py # Configuration management
│ ├── logger.py # Logging utilities
│ └── scraper.py # Main scraper implementation
├── tests/
│ ├── __init__.py
│ ├── test_config_manager.py
│ ├── test_logger.py
│ ├── test_scraper.py
│ └── test_integration.py
├── config/
│ └── config.yaml # Configuration file
├── data/ # Output data directory
├── main.py # Main entry point
├── requirements.txt # Python dependencies
├── pytest.ini # Test configuration
└── README.md # This file
```
## Contributing
1. Fork the repository
2. Create a feature branch (`git checkout -b feature/amazing-feature`)
3. Commit your changes (`git commit -m 'Add amazing feature'`)
4. Push to the branch (`git push origin feature/amazing-feature`)
5. Open a Pull Request
### Development Guidelines
- Follow PEP 8 style guidelines
- Write comprehensive tests for new features
- Update documentation as needed
- Ensure all tests pass before submitting
## License
This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.
## Disclaimer
This tool is for educational and research purposes. Users are responsible for:
- Complying with websites' terms of service
- Respecting robots.txt files
- Using the tool ethically and responsibly
- Not overwhelming servers with too many requests
The authors are not responsible for any misuse of this tool.
## Support
If you encounter any issues or have questions:
1. Check the [Issues](https://github.com/iwasforcedtobehere/market-trends-scraper/issues) page
2. Create a new issue with detailed information
3. For general questions, use the [Discussions](https://github.com/iwasforcedtobehere/market-trends-scraper/discussions) tab
## Acknowledgments
- [Beautiful Soup](https://www.crummy.com/software/BeautifulSoup/) for HTML parsing
- [Selenium](https://www.selenium.dev/) for browser automation
- [Pandas](https://pandas.pydata.org/) for data analysis
- [Loguru](https://github.com/Delgan/loguru) for logging

101
config/sample_config.yaml Normal file
View File

@@ -0,0 +1,101 @@
# Sample Configuration for Market Trends Scraper
# Copy this file to config.yaml and customize for your needs
scraper:
# Delay between requests in seconds (helps avoid being blocked)
delay_between_requests: 1.5
# Request timeout in seconds
timeout: 30
# Maximum number of retry attempts for failed requests
max_retries: 3
# User agent string for HTTP requests
user_agent: "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
# Run browser in headless mode (no visible UI)
headless: true
# Browser window size [width, height]
window_size: [1920, 1080]
# List of data sources to scrape
sources:
# Example: Amazon (fictional selectors for demonstration)
- name: "amazon"
url: "https://www.amazon.com/s?k=laptop"
type: "ecommerce"
enabled: false # Set to true to enable this source
use_selenium: true # Amazon often requires JavaScript rendering
selectors:
product: "div[data-component-type='s-search-result']"
name: "h2 span.a-text-normal"
price: "span.a-price-whole"
rating: "span.a-icon-alt"
availability: "span.a-color-success"
pagination:
next_page: "a.s-pagination-next"
max_pages: 5
# Example: Generic e-commerce site
- name: "example_ecommerce"
url: "https://example-ecommerce.com/search?q=phone"
type: "ecommerce"
enabled: true
use_selenium: false
selectors:
product: "div.product-card"
name: "h3.product-title"
price: "span.price"
rating: "div.rating-stars"
availability: "div.stock-status"
pagination:
next_page: "a.pagination-next"
max_pages: 10
# Example: Electronics retailer
- name: "electronics_store"
url: "https://example-electronics.com/category/smartphones"
type: "ecommerce"
enabled: true
use_selenium: false
selectors:
product: "article.product-item"
name: "h1.product-name"
price: "div.current-price"
rating: "div.product-rating"
availability: "span.availability-label"
pagination:
next_page: "li.page-item.next a"
max_pages: 3
# Output settings
output:
# Output format: csv, json, or excel
format: "csv"
# Include timestamp in output filename
include_timestamp: true
# Base filename for output files
filename: "market_trends_data"
# Database settings (for future enhancements)
database:
# Database connection URL
url: "sqlite:///data/market_trends.db"
# Enable SQL query logging
echo: false
# Analysis settings
analysis:
# Number of days to consider for price history analysis
price_history_days: 30
# Minimum price change percentage to consider as a trend (0.05 = 5%)
trend_threshold: 0.05
# Generate trend charts (requires matplotlib and seaborn)
generate_charts: true

327
examples/advanced_usage.py Normal file
View File

@@ -0,0 +1,327 @@
#!/usr/bin/env python3
"""
Advanced Usage Example for Market Trends Scraper
This script demonstrates advanced features of the Market Trends Scraper,
including custom data processing, trend analysis, and visualization.
"""
import sys
import json
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timedelta
from pathlib import Path
# Add src directory to Python path
sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "src"))
from config_manager import ConfigManager
from scraper import MarketTrendsScraper
from logger import setup_logger
def create_custom_config():
"""Create a custom configuration for demonstration."""
return {
"scraper": {
"delay_between_requests": 2.0,
"timeout": 30,
"max_retries": 3,
"user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
"headless": True,
"window_size": [1920, 1080]
},
"sources": [
{
"name": "books_store",
"url": "https://books.toscrape.com/",
"type": "ecommerce",
"enabled": True,
"use_selenium": False,
"selectors": {
"product": "article.product_pod",
"name": "h3 a",
"price": "p.price_color",
"rating": "p.star-rating",
"availability": "p.instock.availability"
},
"pagination": {
"next_page": "li.next a",
"max_pages": 3
}
}
],
"output": {
"format": "csv",
"include_timestamp": True,
"filename": "books_analysis"
},
"database": {
"url": "sqlite:///../data/books.db",
"echo": False
},
"analysis": {
"price_history_days": 30,
"trend_threshold": 0.05,
"generate_charts": True
}
}
def custom_price_processing(price_text):
"""
Custom price processing function for specific formats.
Args:
price_text: Raw price text from website
Returns:
Parsed price as float or None if parsing fails
"""
try:
# Remove currency symbol and whitespace
price_clean = price_text.replace('£', '').strip()
# Convert to float
return float(price_clean)
except (ValueError, TypeError):
return None
def custom_rating_processing(rating_element):
"""
Custom rating processing for star ratings.
Args:
rating_element: BeautifulSoup element containing rating
Returns:
Parsed rating as float or None if parsing fails
"""
try:
# Extract rating from class name (e.g., "star-rating Three" -> 3)
class_names = rating_element.get('class', [])
for class_name in class_names:
if class_name.lower() in ['one', 'two', 'three', 'four', 'five']:
rating_map = {
'one': 1.0,
'two': 2.0,
'three': 3.0,
'four': 4.0,
'five': 5.0
}
return rating_map.get(class_name.lower())
return None
except Exception:
return None
def analyze_price_trends(data):
"""
Perform advanced price trend analysis.
Args:
data: List of product data dictionaries
Returns:
Dictionary containing trend analysis results
"""
if not data:
return {"error": "No data available for analysis"}
# Convert to DataFrame
df = pd.DataFrame(data)
# Ensure price is numeric
df['price'] = pd.to_numeric(df['price'], errors='coerce')
# Drop rows with missing prices
df = df.dropna(subset=['price'])
# Basic statistics
analysis = {
"total_products": len(df),
"price_statistics": {
"mean": float(df['price'].mean()),
"median": float(df['price'].median()),
"std": float(df['price'].std()),
"min": float(df['price'].min()),
"max": float(df['price'].max()),
"quartiles": {
"25%": float(df['price'].quantile(0.25)),
"75%": float(df['price'].quantile(0.75))
}
},
"price_distribution": {
"under_10": int(len(df[df['price'] < 10])),
"10_to_20": int(len(df[(df['price'] >= 10) & (df['price'] < 20)])),
"20_to_30": int(len(df[(df['price'] >= 20) & (df['price'] < 30)])),
"30_to_40": int(len(df[(df['price'] >= 30) & (df['price'] < 40)])),
"over_40": int(len(df[df['price'] >= 40]))
}
}
# Rating analysis if available
if 'rating' in df.columns:
df['rating'] = pd.to_numeric(df['rating'], errors='coerce')
rating_df = df.dropna(subset=['rating'])
if not rating_df.empty:
analysis["rating_statistics"] = {
"mean": float(rating_df['rating'].mean()),
"distribution": rating_df['rating'].value_counts().to_dict()
}
return analysis
def generate_visualizations(data, analysis, output_dir):
"""
Generate visualization charts for the analysis.
Args:
data: List of product data dictionaries
analysis: Analysis results dictionary
output_dir: Directory to save charts
"""
# Create output directory if it doesn't exist
Path(output_dir).mkdir(parents=True, exist_ok=True)
# Convert to DataFrame
df = pd.DataFrame(data)
# Ensure price is numeric
df['price'] = pd.to_numeric(df['price'], errors='coerce')
df = df.dropna(subset=['price'])
# Set style
sns.set(style="whitegrid")
plt.figure(figsize=(12, 8))
# Price distribution histogram
plt.subplot(2, 2, 1)
sns.histplot(df['price'], bins=20, kde=True)
plt.title('Price Distribution')
plt.xlabel('Price (£)')
plt.ylabel('Count')
# Price distribution by category
plt.subplot(2, 2, 2)
price_dist = analysis['price_distribution']
categories = list(price_dist.keys())
values = list(price_dist.values())
plt.bar(categories, values)
plt.title('Price Distribution by Category')
plt.xlabel('Price Category')
plt.ylabel('Count')
plt.xticks(rotation=45)
# Box plot for prices
plt.subplot(2, 2, 3)
sns.boxplot(y=df['price'])
plt.title('Price Box Plot')
plt.ylabel('Price (£)')
# Rating vs Price scatter plot (if ratings available)
plt.subplot(2, 2, 4)
if 'rating' in df.columns:
df['rating'] = pd.to_numeric(df['rating'], errors='coerce')
rating_df = df.dropna(subset=['rating'])
if not rating_df.empty:
sns.scatterplot(x='rating', y='price', data=rating_df)
plt.title('Rating vs Price')
plt.xlabel('Rating')
plt.ylabel('Price (£)')
else:
plt.text(0.5, 0.5, 'No rating data available',
horizontalalignment='center', verticalalignment='center')
else:
plt.text(0.5, 0.5, 'No rating data available',
horizontalalignment='center', verticalalignment='center')
# Adjust layout and save
plt.tight_layout()
plt.savefig(f"{output_dir}/market_analysis_charts.png", dpi=300)
plt.close()
print(f"✓ Visualization charts saved to {output_dir}/market_analysis_charts.png")
def main():
"""Main function demonstrating advanced scraper usage."""
# Setup logging
setup_logger(verbose=True)
# Create custom configuration
config = create_custom_config()
print("✓ Custom configuration created")
# Initialize scraper
try:
scraper = MarketTrendsScraper(config, headless=True)
print("✓ Scraper initialized successfully")
except Exception as e:
print(f"✗ Failed to initialize scraper: {str(e)}")
return 1
try:
# Scrape market trends data
print("\n🔍 Scraping market trends data...")
data = scraper.scrape_market_trends()
print(f"✓ Scraped {len(data)} product records")
if not data:
print("⚠ No data was scraped. Check your configuration and selectors.")
return 0
# Save raw data
output_file = "../data/advanced_example_output.csv"
scraper.save_data(data, output_file)
print(f"✓ Raw data saved to {output_file}")
# Perform advanced analysis
print("\n📊 Performing advanced analysis...")
analysis = analyze_price_trends(data)
print("✓ Advanced analysis completed")
# Save analysis results
analysis_file = "../data/advanced_example_analysis.json"
with open(analysis_file, 'w') as f:
json.dump(analysis, f, indent=2)
print(f"✓ Analysis saved to {analysis_file}")
# Generate visualizations
print("\n📈 Generating visualization charts...")
charts_dir = "../data/charts"
generate_visualizations(data, analysis, charts_dir)
# Print detailed summary
print("\n📋 Detailed Summary:")
print(f" - Total products: {analysis.get('total_products', 0)}")
if 'price_statistics' in analysis:
price_stats = analysis['price_statistics']
print(f" - Average price: £{price_stats.get('mean', 0):.2f}")
print(f" - Median price: £{price_stats.get('median', 0):.2f}")
print(f" - Standard deviation: £{price_stats.get('std', 0):.2f}")
print(f" - Price range: £{price_stats.get('min', 0):.2f} - £{price_stats.get('max', 0):.2f}")
if 'price_distribution' in analysis:
print(" - Price distribution:")
for category, count in analysis['price_distribution'].items():
print(f" * {category}: {count} products")
if 'rating_statistics' in analysis:
rating_stats = analysis['rating_statistics']
print(f" - Average rating: {rating_stats.get('mean', 0):.2f}")
print(" - Rating distribution:")
for rating, count in rating_stats['distribution'].items():
print(f" * {rating} stars: {count} products")
print("\n✅ Advanced market trends analysis completed successfully!")
return 0

97
examples/basic_usage.py Normal file
View File

@@ -0,0 +1,97 @@
#!/usr/bin/env python3
"""
Basic Usage Example for Market Trends Scraper
This script demonstrates how to use the Market Trends Scraper
to collect and analyze pricing data from e-commerce websites.
"""
import sys
from pathlib import Path
# Add src directory to Python path
sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "src"))
from config_manager import ConfigManager
from scraper import MarketTrendsScraper
from logger import setup_logger
def main():
"""Main function demonstrating basic scraper usage."""
# Setup logging
setup_logger(verbose=True)
# Initialize configuration manager
config_manager = ConfigManager("../config/sample_config.yaml")
# Load configuration
try:
config = config_manager.load_config()
print("✓ Configuration loaded successfully")
except Exception as e:
print(f"✗ Failed to load configuration: {str(e)}")
return 1
# Initialize scraper
try:
scraper = MarketTrendsScraper(config, headless=True)
print("✓ Scraper initialized successfully")
except Exception as e:
print(f"✗ Failed to initialize scraper: {str(e)}")
return 1
try:
# Scrape market trends data
print("\n🔍 Scraping market trends data...")
data = scraper.scrape_market_trends()
print(f"✓ Scraped {len(data)} product records")
if not data:
print("⚠ No data was scraped. Check your configuration and selectors.")
return 0
# Save scraped data
output_file = "../data/example_output.csv"
scraper.save_data(data, output_file)
print(f"✓ Data saved to {output_file}")
# Analyze trends
print("\n📊 Analyzing market trends...")
analysis = scraper.analyze_trends(data)
print("✓ Trend analysis completed")
# Save analysis results
analysis_file = "../data/example_analysis.json"
scraper.save_analysis(analysis, analysis_file)
print(f"✓ Analysis saved to {analysis_file}")
# Print summary
print("\n📋 Summary:")
print(f" - Total products: {analysis.get('total_products', 0)}")
if 'price_analysis' in analysis:
price_analysis = analysis['price_analysis']
print(f" - Average price: ${price_analysis.get('average_price', 0):.2f}")
print(f" - Price range: ${price_analysis.get('min_price', 0):.2f} - ${price_analysis.get('max_price', 0):.2f}")
if 'sources' in analysis:
print(" - Products by source:")
for source, count in analysis['sources'].items():
print(f" * {source}: {count} products")
print("\n✅ Market trends analysis completed successfully!")
return 0
except Exception as e:
print(f"✗ Error during scraping: {str(e)}")
return 1
finally:
# Close scraper
scraper.close()
if __name__ == "__main__":
sys.exit(main())

96
main.py Normal file
View File

@@ -0,0 +1,96 @@
#!/usr/bin/env python3
"""
Market Trends Scraper - Main Entry Point
This script serves as the main entry point for the Market Trends Scraper application.
It initializes the scraper, processes configuration, and orchestrates the scraping
and analysis workflow.
"""
import sys
import argparse
from pathlib import Path
# Add src directory to Python path
sys.path.insert(0, str(Path(__file__).resolve().parent / "src"))
from scraper import MarketTrendsScraper
from config_manager import ConfigManager
from logger import setup_logger
def parse_arguments():
"""Parse command line arguments."""
parser = argparse.ArgumentParser(
description="Web Scraper for Market Trends - Collect and analyze pricing and product trends"
)
parser.add_argument(
"--config", "-c",
type=str,
default="config/config.yaml",
help="Path to configuration file"
)
parser.add_argument(
"--output", "-o",
type=str,
default="data/output.csv",
help="Path to output file"
)
parser.add_argument(
"--headless",
action="store_true",
help="Run browser in headless mode"
)
parser.add_argument(
"--verbose", "-v",
action="store_true",
help="Enable verbose logging"
)
return parser.parse_args()
def main():
"""Main application entry point."""
args = parse_arguments()
# Setup logging
logger = setup_logger(verbose=args.verbose)
logger.info("Starting Market Trends Scraper")
try:
# Load configuration
config_manager = ConfigManager(args.config)
config = config_manager.load_config()
logger.info("Configuration loaded successfully")
# Initialize and run scraper
scraper = MarketTrendsScraper(config, headless=args.headless)
logger.info("Scraper initialized")
# Run the scraping process
data = scraper.scrape_market_trends()
logger.info(f"Scraped {len(data)} product records")
# Save results
scraper.save_data(data, args.output)
logger.info(f"Data saved to {args.output}")
# Analyze trends
analysis = scraper.analyze_trends(data)
logger.info("Trend analysis completed")
# Save analysis results
analysis_output = args.output.replace('.csv', '_analysis.json')
scraper.save_analysis(analysis, analysis_output)
logger.info(f"Analysis saved to {analysis_output}")
logger.info("Market Trends Scraper completed successfully")
return 0
except Exception as e:
logger.error(f"Error occurred: {str(e)}")
return 1
if __name__ == "__main__":
sys.exit(main())

32
pytest.ini Normal file
View File

@@ -0,0 +1,32 @@
[tool:pytest]
# Python test discovery settings
testpaths = tests
python_files = test_*.py
python_classes = Test*
python_functions = test_*
# Output formatting
addopts =
--verbose
--tb=short
--strict-markers
--disable-warnings
--cov=src
--cov-report=term-missing
--cov-report=html:htmlcov
--cov-fail-under=80
# Custom markers
markers =
slow: marks tests as slow (deselect with '-m "not slow"')
integration: marks tests as integration tests
unit: marks tests as unit tests
web: marks tests that require web access
# Minimum Python version
minversion = 6.0
# Filter warnings
filterwarnings =
ignore::DeprecationWarning
ignore::PendingDeprecationWarning

31
requirements.txt Normal file
View File

@@ -0,0 +1,31 @@
# Web scraping
requests>=2.28.0
beautifulsoup4>=4.11.0
selenium>=4.8.0
lxml>=4.9.0
# Data analysis and manipulation
pandas>=1.5.0
numpy>=1.24.0
matplotlib>=3.6.0
seaborn>=0.12.0
# Data storage
sqlalchemy>=1.4.0
sqlite3
# Configuration management
python-dotenv>=0.21.0
pyyaml>=6.0
# Logging and monitoring
loguru>=0.6.0
# Testing
pytest>=7.2.0
pytest-cov>=4.0.0
responses>=0.22.0
# Utilities
tqdm>=4.64.0
schedule>=1.1.0

0
src/__init__.py Normal file
View File

211
src/config_manager.py Normal file
View File

@@ -0,0 +1,211 @@
"""
Configuration Manager Module
This module provides functionality to load, validate, and manage configuration
settings for the Market Trends Scraper application.
"""
import os
import yaml
from pathlib import Path
from typing import Dict, Any, Optional
from loguru import logger
class ConfigManager:
"""Manages application configuration loading and validation."""
def __init__(self, config_path: str = "config/config.yaml"):
"""
Initialize the ConfigManager.
Args:
config_path: Path to the configuration file
"""
self.config_path = Path(config_path)
self.config = {}
self.default_config = self._get_default_config()
def _get_default_config(self) -> Dict[str, Any]:
"""
Get the default configuration settings.
Returns:
Dictionary containing default configuration values
"""
return {
"scraper": {
"delay_between_requests": 1.0,
"timeout": 30,
"max_retries": 3,
"user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
"headless": True,
"window_size": [1920, 1080]
},
"sources": [
{
"name": "example_ecommerce",
"url": "https://example-ecommerce.com/search",
"type": "ecommerce",
"enabled": True,
"selectors": {
"product": "div.product-item",
"name": "h2.product-title",
"price": "span.price",
"rating": "div.rating",
"availability": "div.stock-status"
},
"pagination": {
"next_page": "a.next-page",
"max_pages": 10
}
}
],
"output": {
"format": "csv",
"include_timestamp": True,
"filename": "market_trends_data"
},
"database": {
"url": "sqlite:///data/market_trends.db",
"echo": False
},
"analysis": {
"price_history_days": 30,
"trend_threshold": 0.05, # 5% change considered significant
"generate_charts": True
}
}
def load_config(self) -> Dict[str, Any]:
"""
Load configuration from file or create default if not exists.
Returns:
Dictionary containing configuration settings
Raises:
FileNotFoundError: If config file doesn't exist and cannot be created
yaml.YAMLError: If config file contains invalid YAML
"""
try:
# Create config directory if it doesn't exist
self.config_path.parent.mkdir(parents=True, exist_ok=True)
# Load config if file exists
if self.config_path.exists():
with open(self.config_path, 'r', encoding='utf-8') as f:
self.config = yaml.safe_load(f)
logger.info(f"Configuration loaded from {self.config_path}")
else:
# Create default config file
self.config = self.default_config
self.save_config()
logger.info(f"Default configuration created at {self.config_path}")
# Validate and merge with defaults
self.config = self._validate_and_merge_config()
return self.config
except Exception as e:
logger.error(f"Failed to load configuration: {str(e)}")
raise
def save_config(self) -> None:
"""
Save current configuration to file.
Raises:
IOError: If unable to write to config file
"""
try:
with open(self.config_path, 'w', encoding='utf-8') as f:
yaml.dump(self.config, f, default_flow_style=False, indent=2)
logger.info(f"Configuration saved to {self.config_path}")
except Exception as e:
logger.error(f"Failed to save configuration: {str(e)}")
raise
def _validate_and_merge_config(self) -> Dict[str, Any]:
"""
Validate loaded configuration and merge with defaults for missing values.
Returns:
Validated and merged configuration dictionary
"""
def deep_merge(source: Dict, destination: Dict) -> Dict:
"""Recursively merge source dictionary into destination."""
for key, value in source.items():
if key in destination:
if isinstance(value, dict) and isinstance(destination[key], dict):
deep_merge(value, destination[key])
else:
destination[key] = value
else:
destination[key] = value
return destination
# Start with default config and merge with loaded config
merged_config = deep_merge(self.default_config, self.config.copy())
# Validate required fields
required_sections = ["scraper", "sources", "output"]
for section in required_sections:
if section not in merged_config:
raise ValueError(f"Missing required configuration section: {section}")
# Validate sources
if not merged_config["sources"]:
raise ValueError("At least one data source must be configured")
return merged_config
def get(self, key: str, default: Any = None) -> Any:
"""
Get configuration value by key using dot notation.
Args:
key: Configuration key (supports dot notation, e.g., "scraper.timeout")
default: Default value if key not found
Returns:
Configuration value or default
"""
keys = key.split('.')
value = self.config
try:
for k in keys:
value = value[k]
return value
except (KeyError, TypeError):
return default
def set(self, key: str, value: Any) -> None:
"""
Set configuration value by key using dot notation.
Args:
key: Configuration key (supports dot notation, e.g., "scraper.timeout")
value: Value to set
"""
keys = key.split('.')
config_ref = self.config
# Navigate to parent of target key
for k in keys[:-1]:
if k not in config_ref:
config_ref[k] = {}
config_ref = config_ref[k]
# Set the value
config_ref[keys[-1]] = value
def reload(self) -> Dict[str, Any]:
"""
Reload configuration from file.
Returns:
Updated configuration dictionary
"""
return self.load_config()

119
src/logger.py Normal file
View File

@@ -0,0 +1,119 @@
"""
Logger Module
This module provides centralized logging configuration and utilities for the
Market Trends Scraper application.
"""
import sys
from pathlib import Path
from loguru import logger
from typing import Optional
def setup_logger(
log_level: str = "INFO",
log_file: Optional[str] = None,
verbose: bool = False,
rotation: str = "10 MB",
retention: str = "7 days",
compression: str = "zip"
) -> None:
"""
Configure and initialize the application logger.
Args:
log_level: Minimum logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
log_file: Path to log file (if None, logs only to console)
verbose: Enable verbose logging (DEBUG level)
rotation: Log file rotation settings
retention: Log file retention settings
compression: Log file compression format
"""
# Remove default logger
logger.remove()
# Determine log level
if verbose:
log_level = "DEBUG"
# Console log format
console_format = (
"<green>{time:YYYY-MM-DD HH:mm:ss}</green> | "
"<level>{level: <8}</level> | "
"<cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> | "
"<level>{message}</level>"
)
# File log format
file_format = (
"{time:YYYY-MM-DD HH:mm:ss} | "
"{level: <8} | "
"{name}:{function}:{line} | "
"{message}"
)
# Add console logger
logger.add(
sys.stderr,
format=console_format,
level=log_level,
colorize=True
)
# Add file logger if log_file is specified
if log_file:
# Create log directory if it doesn't exist
log_path = Path(log_file)
log_path.parent.mkdir(parents=True, exist_ok=True)
logger.add(
log_file,
format=file_format,
level=log_level,
rotation=rotation,
retention=retention,
compression=compression,
encoding="utf-8"
)
# Set up exception handler
def handle_exception(exc_type, exc_value, exc_traceback):
if issubclass(exc_type, KeyboardInterrupt):
sys.__excepthook__(exc_type, exc_value, exc_traceback)
return
logger.opt(exception=(exc_type, exc_value, exc_traceback)).error(
"Uncaught exception occurred"
)
sys.excepthook = handle_exception
def get_logger(name: str = None):
"""
Get a logger instance with the specified name.
Args:
name: Logger name (typically __name__ of the calling module)
Returns:
Logger instance
"""
if name:
return logger.bind(name=name)
return logger
class LoggerMixin:
"""
Mixin class to add logging capabilities to other classes.
"""
@property
def logger(self):
"""Get logger instance for this class."""
return logger.bind(
name=self.__class__.__name__,
id=id(self)
)

618
src/scraper.py Normal file
View File

@@ -0,0 +1,618 @@
"""
Web Scraper Module
This module contains the core functionality for scraping e-commerce websites
to collect product and pricing data for market trend analysis.
"""
import time
import json
import random
from datetime import datetime
from typing import Dict, List, Any, Optional, Union
from pathlib import Path
from urllib.parse import urljoin, urlparse
import requests
from bs4 import BeautifulSoup
import pandas as pd
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
from selenium.common.exceptions import TimeoutException, NoSuchElementException
from logger import LoggerMixin
from config_manager import ConfigManager
class MarketTrendsScraper(LoggerMixin):
"""
Main scraper class for collecting market trends data from e-commerce websites.
"""
def __init__(self, config: Dict[str, Any], headless: bool = True):
"""
Initialize the scraper with configuration.
Args:
config: Configuration dictionary
headless: Whether to run browser in headless mode
"""
self.config = config
self.driver = None
self.session = requests.Session()
self.data = []
self._setup_browser(headless)
self._setup_session()
def _setup_browser(self, headless: bool = True) -> None:
"""
Set up the Selenium WebDriver with appropriate options.
Args:
headless: Whether to run browser in headless mode
"""
try:
chrome_options = Options()
# Set headless mode
if headless:
chrome_options.add_argument("--headless")
# Set window size
window_size = self.config.get("scraper.window_size", [1920, 1080])
chrome_options.add_argument(f"--window-size={window_size[0]},{window_size[1]}")
# Add other options for stability
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--disable-web-security")
chrome_options.add_argument("--disable-features=VizDisplayCompositor")
# Set user agent
user_agent = self.config.get("scraper.user_agent", "")
if user_agent:
chrome_options.add_argument(f"--user-agent={user_agent}")
# Initialize driver
self.driver = webdriver.Chrome(options=chrome_options)
self.driver.set_page_load_timeout(self.config.get("scraper.timeout", 30))
self.logger.info("Browser setup completed")
except Exception as e:
self.logger.error(f"Failed to setup browser: {str(e)}")
raise
def _setup_session(self) -> None:
"""
Set up the requests session with appropriate headers.
"""
user_agent = self.config.get("scraper.user_agent", "")
if user_agent:
self.session.headers.update({"User-Agent": user_agent})
# Add other headers
self.session.headers.update({
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5",
"Accept-Encoding": "gzip, deflate",
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1"
})
self.logger.info("Session setup completed")
def scrape_market_trends(self) -> List[Dict[str, Any]]:
"""
Scrape market trends data from all configured sources.
Returns:
List of dictionaries containing scraped data
"""
all_data = []
sources = self.config.get("sources", [])
for source in sources:
if not source.get("enabled", True):
self.logger.info(f"Skipping disabled source: {source.get('name', 'Unknown')}")
continue
self.logger.info(f"Scraping source: {source.get('name', 'Unknown')}")
try:
source_data = self._scrape_source(source)
all_data.extend(source_data)
self.logger.info(f"Scraped {len(source_data)} records from {source.get('name', 'Unknown')}")
except Exception as e:
self.logger.error(f"Failed to scrape source {source.get('name', 'Unknown')}: {str(e)}")
continue
return all_data
def _scrape_source(self, source: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
Scrape data from a specific source.
Args:
source: Source configuration dictionary
Returns:
List of dictionaries containing scraped data
"""
source_data = []
url = source.get("url", "")
selectors = source.get("selectors", {})
pagination = source.get("pagination", {})
if not url:
self.logger.warning(f"No URL configured for source: {source.get('name', 'Unknown')}")
return source_data
# Determine scraping method
use_selenium = source.get("use_selenium", False)
if use_selenium:
return self._scrape_with_selenium(source)
else:
return self._scrape_with_requests(source)
def _scrape_with_requests(self, source: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
Scrape data using requests and BeautifulSoup.
Args:
source: Source configuration dictionary
Returns:
List of dictionaries containing scraped data
"""
source_data = []
url = source.get("url", "")
selectors = source.get("selectors", {})
pagination = source.get("pagination", {})
max_pages = pagination.get("max_pages", 1)
for page in range(1, max_pages + 1):
try:
# Add page parameter if needed
page_url = url
if page > 1:
page_url = f"{url}?page={page}"
self.logger.debug(f"Scraping page {page}: {page_url}")
# Make request with retry logic
response = self._make_request_with_retry(page_url)
if not response:
continue
# Parse HTML
soup = BeautifulSoup(response.text, 'html.parser')
# Extract product data
products = soup.select(selectors.get("product", ""))
for product in products:
product_data = self._extract_product_data(product, selectors)
if product_data:
product_data["source"] = source.get("name", "Unknown")
product_data["scraped_at"] = datetime.now().isoformat()
source_data.append(product_data)
# Check if there's a next page
if page < max_pages:
next_page = soup.select_one(pagination.get("next_page", ""))
if not next_page:
self.logger.debug(f"No more pages found after page {page}")
break
# Delay between requests
delay = self.config.get("scraper.delay_between_requests", 1.0)
time.sleep(delay + random.uniform(0, 1))
except Exception as e:
self.logger.error(f"Error scraping page {page} from {source.get('name', 'Unknown')}: {str(e)}")
continue
return source_data
def _scrape_with_selenium(self, source: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
Scrape data using Selenium WebDriver.
Args:
source: Source configuration dictionary
Returns:
List of dictionaries containing scraped data
"""
source_data = []
url = source.get("url", "")
selectors = source.get("selectors", {})
pagination = source.get("pagination", {})
max_pages = pagination.get("max_pages", 1)
try:
self.driver.get(url)
for page in range(1, max_pages + 1):
self.logger.debug(f"Scraping page {page} with Selenium")
# Wait for products to load
try:
WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.CSS_SELECTOR, selectors.get("product", "")))
)
except TimeoutException:
self.logger.warning(f"Timeout waiting for products to load on page {page}")
continue
# Extract product data
products = self.driver.find_elements(By.CSS_SELECTOR, selectors.get("product", ""))
for product in products:
product_data = self._extract_product_data_selenium(product, selectors)
if product_data:
product_data["source"] = source.get("name", "Unknown")
product_data["scraped_at"] = datetime.now().isoformat()
source_data.append(product_data)
# Navigate to next page if available
if page < max_pages:
try:
next_button = self.driver.find_element(By.CSS_SELECTOR, pagination.get("next_page", ""))
next_button.click()
# Wait for page to load
time.sleep(2)
except NoSuchElementException:
self.logger.debug(f"No next page button found after page {page}")
break
# Delay between requests
delay = self.config.get("scraper.delay_between_requests", 1.0)
time.sleep(delay + random.uniform(0, 1))
except Exception as e:
self.logger.error(f"Error scraping with Selenium from {source.get('name', 'Unknown')}: {str(e)}")
return source_data
def _make_request_with_retry(self, url: str) -> Optional[requests.Response]:
"""
Make HTTP request with retry logic.
Args:
url: URL to request
Returns:
Response object or None if failed
"""
max_retries = self.config.get("scraper.max_retries", 3)
timeout = self.config.get("scraper.timeout", 30)
for attempt in range(max_retries):
try:
response = self.session.get(url, timeout=timeout)
response.raise_for_status()
return response
except requests.RequestException as e:
self.logger.warning(f"Request attempt {attempt + 1} failed for {url}: {str(e)}")
if attempt < max_retries - 1:
# Exponential backoff
time.sleep((2 ** attempt) + random.uniform(0, 1))
else:
self.logger.error(f"Max retries exceeded for {url}")
return None
return None
def _extract_product_data(self, product: BeautifulSoup, selectors: Dict[str, str]) -> Optional[Dict[str, Any]]:
"""
Extract product data from HTML element using BeautifulSoup.
Args:
product: BeautifulSoup element containing product data
selectors: Dictionary of CSS selectors
Returns:
Dictionary containing product data or None if extraction failed
"""
try:
data = {}
# Extract name
name_element = product.select_one(selectors.get("name", ""))
data["name"] = name_element.get_text(strip=True) if name_element else None
# Extract price
price_element = product.select_one(selectors.get("price", ""))
if price_element:
price_text = price_element.get_text(strip=True)
data["price"] = self._parse_price(price_text)
else:
data["price"] = None
# Extract rating
rating_element = product.select_one(selectors.get("rating", ""))
if rating_element:
rating_text = rating_element.get_text(strip=True)
data["rating"] = self._parse_rating(rating_text)
else:
data["rating"] = None
# Extract availability
availability_element = product.select_one(selectors.get("availability", ""))
data["availability"] = availability_element.get_text(strip=True) if availability_element else None
# Extract URL if available
link_element = product.select_one("a")
if link_element and link_element.get("href"):
data["url"] = link_element.get("href")
return data if data.get("name") else None
except Exception as e:
self.logger.error(f"Error extracting product data: {str(e)}")
return None
def _extract_product_data_selenium(self, product, selectors: Dict[str, str]) -> Optional[Dict[str, Any]]:
"""
Extract product data from Selenium WebElement.
Args:
product: Selenium WebElement containing product data
selectors: Dictionary of CSS selectors
Returns:
Dictionary containing product data or None if extraction failed
"""
try:
data = {}
# Extract name
try:
name_element = product.find_element(By.CSS_SELECTOR, selectors.get("name", ""))
data["name"] = name_element.text.strip()
except NoSuchElementException:
data["name"] = None
# Extract price
try:
price_element = product.find_element(By.CSS_SELECTOR, selectors.get("price", ""))
price_text = price_element.text.strip()
data["price"] = self._parse_price(price_text)
except NoSuchElementException:
data["price"] = None
# Extract rating
try:
rating_element = product.find_element(By.CSS_SELECTOR, selectors.get("rating", ""))
rating_text = rating_element.text.strip()
data["rating"] = self._parse_rating(rating_text)
except NoSuchElementException:
data["rating"] = None
# Extract availability
try:
availability_element = product.find_element(By.CSS_SELECTOR, selectors.get("availability", ""))
data["availability"] = availability_element.text.strip()
except NoSuchElementException:
data["availability"] = None
# Extract URL if available
try:
link_element = product.find_element(By.CSS_SELECTOR, "a")
if link_element.get_attribute("href"):
data["url"] = link_element.get_attribute("href")
except NoSuchElementException:
pass
return data if data.get("name") else None
except Exception as e:
self.logger.error(f"Error extracting product data with Selenium: {str(e)}")
return None
def _parse_price(self, price_text: str) -> Optional[float]:
"""
Parse price text to extract numeric value.
Args:
price_text: Raw price text
Returns:
Parsed price as float or None if parsing failed
"""
try:
# Remove currency symbols and whitespace
price_clean = price_text.replace('$', '').replace('', '').replace('£', '').strip()
# Handle comma as decimal separator
price_clean = price_clean.replace(',', '.')
# Extract numeric part
import re
price_match = re.search(r'[\d.]+', price_clean)
if price_match:
return float(price_match.group())
return None
except Exception as e:
self.logger.error(f"Error parsing price '{price_text}': {str(e)}")
return None
def _parse_rating(self, rating_text: str) -> Optional[float]:
"""
Parse rating text to extract numeric value.
Args:
rating_text: Raw rating text
Returns:
Parsed rating as float or None if parsing failed
"""
try:
# Extract numeric part
import re
rating_match = re.search(r'[\d.]+', rating_text)
if rating_match:
return float(rating_match.group())
return None
except Exception as e:
self.logger.error(f"Error parsing rating '{rating_text}': {str(e)}")
return None
def save_data(self, data: List[Dict[str, Any]], output_path: str) -> None:
"""
Save scraped data to file.
Args:
data: List of dictionaries containing scraped data
output_path: Path to output file
"""
try:
output_file = Path(output_path)
output_file.parent.mkdir(parents=True, exist_ok=True)
# Convert to DataFrame
df = pd.DataFrame(data)
# Save based on file extension
if output_path.endswith('.csv'):
df.to_csv(output_path, index=False)
elif output_path.endswith('.json'):
df.to_json(output_path, orient='records', indent=2)
elif output_path.endswith('.xlsx'):
df.to_excel(output_path, index=False)
else:
# Default to CSV
output_path = output_path + '.csv'
df.to_csv(output_path, index=False)
self.logger.info(f"Data saved to {output_path}")
except Exception as e:
self.logger.error(f"Error saving data to {output_path}: {str(e)}")
raise
def analyze_trends(self, data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Analyze market trends from scraped data.
Args:
data: List of dictionaries containing scraped data
Returns:
Dictionary containing trend analysis results
"""
try:
if not data:
return {"error": "No data available for analysis"}
# Convert to DataFrame
df = pd.DataFrame(data)
# Basic statistics
analysis = {
"total_products": len(df),
"sources": df["source"].value_counts().to_dict(),
"price_analysis": {},
"rating_analysis": {},
"availability_analysis": {}
}
# Price analysis
if "price" in df.columns:
price_data = df["price"].dropna()
if not price_data.empty:
analysis["price_analysis"] = {
"average_price": float(price_data.mean()),
"min_price": float(price_data.min()),
"max_price": float(price_data.max()),
"median_price": float(price_data.median()),
"price_distribution": price_data.describe().to_dict()
}
# Rating analysis
if "rating" in df.columns:
rating_data = df["rating"].dropna()
if not rating_data.empty:
analysis["rating_analysis"] = {
"average_rating": float(rating_data.mean()),
"min_rating": float(rating_data.min()),
"max_rating": float(rating_data.max()),
"rating_distribution": rating_data.value_counts().to_dict()
}
# Availability analysis
if "availability" in df.columns:
availability_data = df["availability"].dropna()
if not availability_data.empty:
analysis["availability_analysis"] = availability_data.value_counts().to_dict()
# Price trends by source
if "price" in df.columns and "source" in df.columns:
price_by_source = df.groupby("source")["price"].agg(["mean", "min", "max"]).to_dict()
analysis["price_by_source"] = price_by_source
self.logger.info("Trend analysis completed")
return analysis
except Exception as e:
self.logger.error(f"Error analyzing trends: {str(e)}")
return {"error": str(e)}
def save_analysis(self, analysis: Dict[str, Any], output_path: str) -> None:
"""
Save trend analysis results to file.
Args:
analysis: Dictionary containing analysis results
output_path: Path to output file
"""
try:
output_file = Path(output_path)
output_file.parent.mkdir(parents=True, exist_ok=True)
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(analysis, f, indent=2, ensure_ascii=False)
self.logger.info(f"Analysis saved to {output_path}")
except Exception as e:
self.logger.error(f"Error saving analysis to {output_path}: {str(e)}")
raise
def close(self) -> None:
"""
Close browser and session.
"""
try:
if self.driver:
self.driver.quit()
self.driver = None
if self.session:
self.session.close()
self.logger.info("Browser and session closed")
except Exception as e:
self.logger.error(f"Error closing browser/session: {str(e)}")
def __enter__(self):
"""Context manager entry."""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""
self.close()

0
tests/__init__.py Normal file
View File

View File

@@ -0,0 +1,165 @@
"""
Unit tests for the ConfigManager module.
"""
import pytest
import tempfile
import os
import yaml
from pathlib import Path
from unittest.mock import patch, mock_open
from src.config_manager import ConfigManager
class TestConfigManager:
"""Test cases for ConfigManager class."""
def test_init_with_default_path(self):
"""Test ConfigManager initialization with default path."""
config_manager = ConfigManager()
assert config_manager.config_path == Path("config/config.yaml")
assert isinstance(config_manager.config, dict)
assert isinstance(config_manager.default_config, dict)
def test_init_with_custom_path(self):
"""Test ConfigManager initialization with custom path."""
custom_path = "custom/config.yaml"
config_manager = ConfigManager(custom_path)
assert config_manager.config_path == Path(custom_path)
def test_get_default_config(self):
"""Test default configuration structure."""
config_manager = ConfigManager()
default_config = config_manager._get_default_config()
# Check required sections
assert "scraper" in default_config
assert "sources" in default_config
assert "output" in default_config
assert "database" in default_config
assert "analysis" in default_config
# Check some default values
assert default_config["scraper"]["delay_between_requests"] == 1.0
assert default_config["scraper"]["timeout"] == 30
assert default_config["scraper"]["headless"] is True
assert isinstance(default_config["sources"], list)
assert len(default_config["sources"]) > 0
@patch('builtins.open', new_callable=mock_open, read_data="scraper:\n timeout: 60")
@patch('pathlib.Path.exists')
def test_load_config_existing_file(self, mock_exists, mock_file):
"""Test loading configuration from existing file."""
mock_exists.return_value = True
config_manager = ConfigManager()
config = config_manager.load_config()
mock_file.assert_called_once()
assert config["scraper"]["timeout"] == 60
@patch('builtins.open', new_callable=mock_open)
@patch('pathlib.Path.exists')
def test_load_config_create_default(self, mock_exists, mock_file):
"""Test creating default configuration when file doesn't exist."""
mock_exists.return_value = False
config_manager = ConfigManager()
config = config_manager.load_config()
# Verify file was created
mock_file.assert_called_once()
# Verify config is default
assert config == config_manager.default_config
@patch('builtins.open', new_callable=mock_open)
def test_save_config(self, mock_file):
"""Test saving configuration to file."""
config_manager = ConfigManager()
config_manager.config = {"test": "value"}
config_manager.save_config()
mock_file.assert_called_once()
# Verify yaml.dump was called with correct arguments
with patch('yaml.dump') as mock_dump:
config_manager.save_config()
mock_dump.assert_called_once()
def test_validate_and_merge_config(self):
"""Test configuration validation and merging."""
config_manager = ConfigManager()
# Test with partial config
partial_config = {
"scraper": {
"timeout": 60
}
}
config_manager.config = partial_config
merged = config_manager._validate_and_merge_config()
# Should have all sections
assert "sources" in merged
assert "output" in merged
# Should have updated value
assert merged["scraper"]["timeout"] == 60
# Should have default values for missing keys
assert merged["scraper"]["delay_between_requests"] == 1.0
def test_validate_and_merge_config_missing_required(self):
"""Test validation fails when required sections are missing."""
config_manager = ConfigManager()
config_manager.config = {"invalid": "config"}
with pytest.raises(ValueError, match="Missing required configuration section"):
config_manager._validate_and_merge_config()
def test_validate_and_merge_config_no_sources(self):
"""Test validation fails when no sources are configured."""
config_manager = ConfigManager()
config_manager.config = {
"scraper": {},
"sources": [],
"output": {}
}
with pytest.raises(ValueError, match="At least one data source must be configured"):
config_manager._validate_and_merge_config()
def test_get_with_dot_notation(self):
"""Test getting configuration values with dot notation."""
config_manager = ConfigManager()
config_manager.config = {
"scraper": {
"timeout": 60,
"nested": {
"value": "test"
}
}
}
assert config_manager.get("scraper.timeout") == 60
assert config_manager.get("scraper.nested.value") == "test"
assert config_manager.get("nonexistent", "default") == "default"
def test_set_with_dot_notation(self):
"""Test setting configuration values with dot notation."""
config_manager = ConfigManager()
config_manager.config = {"scraper": {}}
config_manager.set("scraper.timeout", 60)
config_manager.set("new.nested.value", "test")
assert config_manager.config["scraper"]["timeout"] == 60
assert config_manager.config["new"]["nested"]["value"] == "test"
@patch.object(ConfigManager, 'load_config')
def test_reload(self, mock_load):
"""Test reloading configuration."""
config_manager = ConfigManager()
config_manager.reload()
mock_load.assert_called_once()

355
tests/test_integration.py Normal file
View File

@@ -0,0 +1,355 @@
"""
Integration tests for the Market Trends Scraper application.
These tests verify that all components work together correctly.
"""
import pytest
import tempfile
import os
import json
from pathlib import Path
from unittest.mock import patch, Mock
from src.config_manager import ConfigManager
from src.scraper import MarketTrendsScraper
from src.logger import setup_logger
import main
class TestIntegration:
"""Integration test cases for the entire application."""
@pytest.fixture
def temp_dir(self):
"""Create a temporary directory for test files."""
with tempfile.TemporaryDirectory() as temp_dir:
yield temp_dir
@pytest.fixture
def sample_config_file(self, temp_dir):
"""Create a sample configuration file for testing."""
config_path = Path(temp_dir) / "config.yaml"
config_content = {
"scraper": {
"delay_between_requests": 0.1, # Faster for testing
"timeout": 10,
"max_retries": 2,
"user_agent": "Mozilla/5.0 (Test)",
"headless": True,
"window_size": [1024, 768]
},
"sources": [
{
"name": "test_source",
"url": "https://httpbin.org/html",
"type": "ecommerce",
"enabled": True,
"selectors": {
"product": "p",
"name": "p",
"price": "p",
"rating": "p",
"availability": "p"
},
"pagination": {
"next_page": "a",
"max_pages": 1
}
}
],
"output": {
"format": "csv",
"include_timestamp": True,
"filename": "test_output"
},
"database": {
"url": f"sqlite:///{temp_dir}/test.db",
"echo": False
},
"analysis": {
"price_history_days": 30,
"trend_threshold": 0.05,
"generate_charts": True
}
}
import yaml
with open(config_path, 'w') as f:
yaml.dump(config_content, f)
return str(config_path)
@patch('src.scraper.webdriver')
def test_full_workflow(self, mock_webdriver, sample_config_file, temp_dir):
"""Test the complete workflow from config loading to analysis."""
# Setup mock driver
mock_driver = Mock()
mock_webdriver.Chrome.return_value = mock_driver
# Setup logger
setup_logger()
# Load configuration
config_manager = ConfigManager(sample_config_file)
config = config_manager.load_config()
# Verify config was loaded
assert config["scraper"]["delay_between_requests"] == 0.1
assert len(config["sources"]) == 1
assert config["sources"][0]["name"] == "test_source"
# Initialize scraper
scraper = MarketTrendsScraper(config, headless=True)
# Mock the scraping process to return sample data
sample_data = [
{
"name": "Test Product 1",
"price": 19.99,
"rating": 4.5,
"availability": "In Stock",
"source": "test_source",
"scraped_at": "2023-01-01T00:00:00"
},
{
"name": "Test Product 2",
"price": 29.99,
"rating": 3.8,
"availability": "Out of Stock",
"source": "test_source",
"scraped_at": "2023-01-01T00:00:00"
}
]
with patch.object(scraper, '_scrape_source', return_value=sample_data):
# Scrape data
data = scraper.scrape_market_trends()
# Verify data was scraped
assert len(data) == 2
assert data[0]["name"] == "Test Product 1"
assert data[1]["price"] == 29.99
# Save data
output_path = Path(temp_dir) / "test_output.csv"
scraper.save_data(data, str(output_path))
# Verify file was created
assert output_path.exists()
# Analyze trends
analysis = scraper.analyze_trends(data)
# Verify analysis
assert analysis["total_products"] == 2
assert "price_analysis" in analysis
assert analysis["price_analysis"]["average_price"] == 24.99
assert analysis["price_analysis"]["min_price"] == 19.99
assert analysis["price_analysis"]["max_price"] == 29.99
# Save analysis
analysis_path = Path(temp_dir) / "test_analysis.json"
scraper.save_analysis(analysis, str(analysis_path))
# Verify analysis file was created
assert analysis_path.exists()
# Verify analysis content
with open(analysis_path, 'r') as f:
saved_analysis = json.load(f)
assert saved_analysis["total_products"] == 2
assert saved_analysis["price_analysis"]["average_price"] == 24.99
@patch('src.scraper.webdriver')
def test_multiple_sources(self, mock_webdriver, temp_dir):
"""Test scraping from multiple sources."""
# Setup mock driver
mock_driver = Mock()
mock_webdriver.Chrome.return_value = mock_driver
# Create config with multiple sources
config_content = {
"scraper": {
"delay_between_requests": 0.1,
"timeout": 10,
"headless": True
},
"sources": [
{
"name": "source_1",
"url": "https://example1.com",
"enabled": True,
"selectors": {
"product": "div.product",
"name": "h2",
"price": "span.price"
}
},
{
"name": "source_2",
"url": "https://example2.com",
"enabled": True,
"selectors": {
"product": "div.item",
"name": "h3",
"price": "div.cost"
}
},
{
"name": "source_3",
"url": "https://example3.com",
"enabled": False, # Disabled source
"selectors": {
"product": "div.product",
"name": "h2",
"price": "span.price"
}
}
]
}
import yaml
config_path = Path(temp_dir) / "multi_source_config.yaml"
with open(config_path, 'w') as f:
yaml.dump(config_content, f)
# Initialize scraper
scraper = MarketTrendsScraper(config_content, headless=True)
# Mock different data for each source
def mock_scrape_source(source):
if source["name"] == "source_1":
return [{"name": "Product 1", "price": 10.00, "source": "source_1"}]
elif source["name"] == "source_2":
return [{"name": "Product 2", "price": 20.00, "source": "source_2"}]
else:
return []
with patch.object(scraper, '_scrape_source', side_effect=mock_scrape_source):
data = scraper.scrape_market_trends()
# Verify data from both enabled sources
assert len(data) == 2
sources = {item["source"] for item in data}
assert "source_1" in sources
assert "source_2" in sources
assert "source_3" not in sources # Disabled source should not appear
@patch('src.scraper.webdriver')
def test_error_handling(self, mock_webdriver, temp_dir):
"""Test error handling when scraping fails."""
# Setup mock driver
mock_driver = Mock()
mock_webdriver.Chrome.return_value = mock_driver
config = {
"scraper": {
"delay_between_requests": 0.1,
"timeout": 10,
"headless": True
},
"sources": [
{
"name": "working_source",
"url": "https://example.com",
"enabled": True,
"selectors": {
"product": "div.product",
"name": "h2",
"price": "span.price"
}
},
{
"name": "failing_source",
"url": "https://example.com",
"enabled": True,
"selectors": {
"product": "div.product",
"name": "h2",
"price": "span.price"
}
}
]
}
# Initialize scraper
scraper = MarketTrendsScraper(config, headless=True)
# Mock one source to succeed and one to fail
def mock_scrape_source(source):
if source["name"] == "working_source":
return [{"name": "Working Product", "price": 15.00, "source": "working_source"}]
else:
raise Exception("Scraping failed")
with patch.object(scraper, '_scrape_source', side_effect=mock_scrape_source):
data = scraper.scrape_market_trends()
# Should still get data from working source
assert len(data) == 1
assert data[0]["source"] == "working_source"
@patch('sys.argv', ['main.py', '--config', 'test_config.yaml', '--output', 'test_output.csv'])
@patch('src.scraper.webdriver')
def test_main_entry_point(self, mock_webdriver, temp_dir):
"""Test the main entry point of the application."""
# Setup mock driver
mock_driver = Mock()
mock_webdriver.Chrome.return_value = mock_driver
# Create test config
config_path = Path(temp_dir) / "test_config.yaml"
config_content = {
"scraper": {
"delay_between_requests": 0.1,
"timeout": 10,
"headless": True
},
"sources": [
{
"name": "test_source",
"url": "https://example.com",
"enabled": True,
"selectors": {
"product": "div.product",
"name": "h2",
"price": "span.price"
}
}
]
}
import yaml
with open(config_path, 'w') as f:
yaml.dump(config_content, f)
# Mock the scraper to return sample data
sample_data = [{"name": "Test Product", "price": 19.99, "source": "test_source"}]
with patch('main.ConfigManager') as mock_config_manager, \
patch('main.MarketTrendsScraper') as mock_scraper_class:
# Setup mocks
mock_config_instance = Mock()
mock_config_manager.return_value = mock_config_instance
mock_config_instance.load_config.return_value = config_content
mock_scraper_instance = Mock()
mock_scraper_class.return_value = mock_scraper_instance
mock_scraper_instance.scrape_market_trends.return_value = sample_data
mock_scraper_instance.analyze_trends.return_value = {"total_products": 1}
# Run main function
with patch('sys.argv', ['main.py', '--config', str(config_path), '--output', str(temp_dir / 'output.csv')]):
result = main.main()
# Verify main completed successfully
assert result == 0
# Verify scraper was called
mock_scraper_instance.scrape_market_trends.assert_called_once()
mock_scraper_instance.save_data.assert_called_once()
mock_scraper_instance.analyze_trends.assert_called_once()
mock_scraper_instance.save_analysis.assert_called_once()

165
tests/test_logger.py Normal file
View File

@@ -0,0 +1,165 @@
"""
Unit tests for the Logger module.
"""
import pytest
import sys
import tempfile
import os
from pathlib import Path
from unittest.mock import patch, MagicMock
from loguru import logger
from src.logger import setup_logger, get_logger, LoggerMixin
class TestLoggerSetup:
"""Test cases for logger setup functionality."""
def test_setup_logger_default(self):
"""Test logger setup with default parameters."""
with patch('loguru.logger.remove') as mock_remove, \
patch('loguru.logger.add') as mock_add:
setup_logger()
# Verify default logger was removed
mock_remove.assert_called_once()
# Verify console logger was added
assert mock_add.call_count == 1
console_call = mock_add.call_args_list[0]
assert console_call[0][0] == sys.stderr
assert "level" in console_call[1]
assert console_call[1]["colorize"] is True
def test_setup_logger_with_file(self):
"""Test logger setup with file output."""
with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
log_file = tmp_file.name
try:
with patch('loguru.logger.remove') as mock_remove, \
patch('loguru.logger.add') as mock_add:
setup_logger(log_file=log_file)
# Verify both console and file loggers were added
assert mock_add.call_count == 2
# Check file logger call
file_call = mock_add.call_args_list[1]
assert file_call[0][0] == log_file
assert "rotation" in file_call[1]
assert "retention" in file_call[1]
assert "compression" in file_call[1]
finally:
# Clean up
if os.path.exists(log_file):
os.unlink(log_file)
def test_setup_logger_verbose(self):
"""Test logger setup with verbose mode."""
with patch('loguru.logger.remove') as mock_remove, \
patch('loguru.logger.add') as mock_add:
setup_logger(verbose=True)
# Verify DEBUG level was set
console_call = mock_add.call_args_list[0]
assert console_call[1]["level"] == "DEBUG"
def test_setup_logger_custom_level(self):
"""Test logger setup with custom log level."""
with patch('loguru.logger.remove') as mock_remove, \
patch('loguru.logger.add') as mock_add:
setup_logger(log_level="WARNING")
# Verify WARNING level was set
console_call = mock_add.call_args_list[0]
assert console_call[1]["level"] == "WARNING"
def test_setup_logger_custom_rotation(self):
"""Test logger setup with custom rotation settings."""
with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
log_file = tmp_file.name
try:
with patch('loguru.logger.remove') as mock_remove, \
patch('loguru.logger.add') as mock_add:
setup_logger(log_file=log_file, rotation="100 MB")
# Verify custom rotation was set
file_call = mock_add.call_args_list[1]
assert file_call[1]["rotation"] == "100 MB"
finally:
# Clean up
if os.path.exists(log_file):
os.unlink(log_file)
def test_setup_logger_exception_handler(self):
"""Test that exception handler is set up."""
with patch('loguru.logger.remove') as mock_remove, \
patch('loguru.logger.add') as mock_add, \
patch('sys.excepthook') as mock_excepthook:
setup_logger()
# Verify exception handler was set
assert mock_excepthook is not None
assert callable(mock_excepthook)
class TestGetLogger:
"""Test cases for get_logger function."""
def test_get_logger_no_name(self):
"""Test getting logger without name."""
with patch('loguru.logger.bind') as mock_bind:
get_logger()
mock_bind.assert_called_once()
def test_get_logger_with_name(self):
"""Test getting logger with name."""
with patch('loguru.logger.bind') as mock_bind:
get_logger("test_module")
mock_bind.assert_called_once_with(name="test_module")
class TestLoggerMixin:
"""Test cases for LoggerMixin class."""
def test_logger_property(self):
"""Test logger property in mixin."""
class TestClass(LoggerMixin):
pass
with patch('loguru.logger.bind') as mock_bind:
test_obj = TestClass()
_ = test_obj.logger
# Verify logger was bound with class name and object id
mock_bind.assert_called_once()
args, kwargs = mock_bind.call_args
assert kwargs["name"] == "TestClass"
assert "id" in kwargs
def test_logger_mixin_inheritance(self):
"""Test that logger mixin works with inheritance."""
class ParentClass(LoggerMixin):
pass
class ChildClass(ParentClass):
pass
with patch('loguru.logger.bind') as mock_bind:
child_obj = ChildClass()
_ = child_obj.logger
# Verify logger was bound with child class name
args, kwargs = mock_bind.call_args
assert kwargs["name"] == "ChildClass"

371
tests/test_scraper.py Normal file
View File

@@ -0,0 +1,371 @@
"""
Unit tests for the Scraper module.
"""
import pytest
import json
from unittest.mock import Mock, patch, MagicMock
from bs4 import BeautifulSoup
import pandas as pd
from src.scraper import MarketTrendsScraper
class TestMarketTrendsScraper:
"""Test cases for MarketTrendsScraper class."""
@pytest.fixture
def sample_config(self):
"""Sample configuration for testing."""
return {
"scraper": {
"delay_between_requests": 1.0,
"timeout": 30,
"max_retries": 3,
"user_agent": "Mozilla/5.0",
"headless": True,
"window_size": [1920, 1080]
},
"sources": [
{
"name": "test_source",
"url": "https://example.com/products",
"type": "ecommerce",
"enabled": True,
"selectors": {
"product": "div.product",
"name": "h2.title",
"price": "span.price",
"rating": "div.rating",
"availability": "div.stock"
},
"pagination": {
"next_page": "a.next",
"max_pages": 2
}
}
],
"output": {
"format": "csv",
"include_timestamp": True,
"filename": "market_trends_data"
}
}
@pytest.fixture
def sample_html(self):
"""Sample HTML for testing."""
return """
<html>
<body>
<div class="product">
<h2 class="title">Test Product 1</h2>
<span class="price">$19.99</span>
<div class="rating">4.5 stars</div>
<div class="stock">In Stock</div>
<a href="/product/1">View</a>
</div>
<div class="product">
<h2 class="title">Test Product 2</h2>
<span class="price">€29.99</span>
<div class="rating">3.8 stars</div>
<div class="stock">Out of Stock</div>
<a href="/product/2">View</a>
</div>
<a class="next" href="/page/2">Next</a>
</body>
</html>
"""
@patch('src.scraper.webdriver')
def test_init(self, mock_webdriver, sample_config):
"""Test scraper initialization."""
mock_driver = Mock()
mock_webdriver.Chrome.return_value = mock_driver
scraper = MarketTrendsScraper(sample_config)
assert scraper.config == sample_config
assert scraper.driver == mock_driver
assert scraper.session is not None
assert scraper.data == []
# Verify browser setup
mock_webdriver.Chrome.assert_called_once()
mock_driver.set_page_load_timeout.assert_called_with(30)
@patch('src.scraper.webdriver')
def test_setup_browser(self, mock_webdriver, sample_config):
"""Test browser setup."""
mock_driver = Mock()
mock_webdriver.Chrome.return_value = mock_driver
scraper = MarketTrendsScraper(sample_config)
# Verify options were set
call_args = mock_webdriver.Chrome.call_args
options = call_args[1]['options']
assert options.arguments is not None
assert any("--headless" in arg for arg in options.arguments)
assert any("--window-size=1920,1080" in arg for arg in options.arguments)
@patch('src.scraper.webdriver')
def test_setup_session(self, mock_webdriver, sample_config):
"""Test session setup."""
scraper = MarketTrendsScraper(sample_config)
# Verify headers were set
assert "User-Agent" in scraper.session.headers
assert scraper.session.headers["User-Agent"] == sample_config["scraper"]["user_agent"]
assert "Accept" in scraper.session.headers
@patch('src.scraper.webdriver')
def test_parse_price(self, mock_webdriver, sample_config):
"""Test price parsing."""
scraper = MarketTrendsScraper(sample_config)
# Test various price formats
assert scraper._parse_price("$19.99") == 19.99
assert scraper._parse_price("€29.99") == 29.99
assert scraper._parse_price("£39.99") == 39.99
assert scraper._parse_price("19,99") == 19.99
assert scraper._parse_price("Price: $49.99 USD") == 49.99
assert scraper._parse_price("Invalid price") is None
assert scraper._parse_price("") is None
@patch('src.scraper.webdriver')
def test_parse_rating(self, mock_webdriver, sample_config):
"""Test rating parsing."""
scraper = MarketTrendsScraper(sample_config)
# Test various rating formats
assert scraper._parse_rating("4.5 stars") == 4.5
assert scraper._parse_rating("Rating: 3.8/5") == 3.8
assert scraper._parse_rating("5 stars") == 5.0
assert scraper._parse_rating("Invalid rating") is None
assert scraper._parse_rating("") is None
@patch('src.scraper.webdriver')
def test_extract_product_data(self, mock_webdriver, sample_config, sample_html):
"""Test product data extraction from HTML."""
scraper = MarketTrendsScraper(sample_config)
soup = BeautifulSoup(sample_html, 'html.parser')
product = soup.find('div', class_='product')
selectors = sample_config["sources"][0]["selectors"]
data = scraper._extract_product_data(product, selectors)
assert data is not None
assert data["name"] == "Test Product 1"
assert data["price"] == 19.99
assert data["rating"] == 4.5
assert data["availability"] == "In Stock"
assert data["url"] == "/product/1"
@patch('src.scraper.webdriver')
def test_extract_product_data_no_name(self, mock_webdriver, sample_config):
"""Test product data extraction when name is missing."""
scraper = MarketTrendsScraper(sample_config)
html = """
<div class="product">
<span class="price">$19.99</span>
</div>
"""
soup = BeautifulSoup(html, 'html.parser')
product = soup.find('div', class_='product')
selectors = sample_config["sources"][0]["selectors"]
data = scraper._extract_product_data(product, selectors)
assert data is None # Should return None when name is missing
@patch('src.scraper.webdriver')
def test_make_request_with_retry_success(self, mock_webdriver, sample_config):
"""Test successful HTTP request with retry logic."""
scraper = MarketTrendsScraper(sample_config)
mock_response = Mock()
mock_response.status_code = 200
mock_response.text = "Success"
with patch.object(scraper.session, 'get', return_value=mock_response) as mock_get:
response = scraper._make_request_with_retry("https://example.com")
assert response == mock_response
mock_get.assert_called_once()
@patch('src.scraper.webdriver')
def test_make_request_with_retry_failure(self, mock_webdriver, sample_config):
"""Test HTTP request failure with retry logic."""
scraper = MarketTrendsScraper(sample_config)
with patch.object(scraper.session, 'get', side_effect=Exception("Connection error")):
response = scraper._make_request_with_retry("https://example.com")
assert response is None
@patch('src.scraper.webdriver')
@patch('src.scraper.MarketTrendsScraper._scrape_source')
def test_scrape_market_trends(self, mock_scrape_source, mock_webdriver, sample_config):
"""Test scraping market trends from multiple sources."""
scraper = MarketTrendsScraper(sample_config)
# Mock source data
mock_scrape_source.side_effect = [
[{"name": "Product 1", "price": 19.99}],
[{"name": "Product 2", "price": 29.99}]
]
# Add second source
sample_config["sources"].append({
"name": "test_source_2",
"url": "https://example2.com/products",
"enabled": True,
"selectors": {
"product": "div.product",
"name": "h2.title",
"price": "span.price"
}
})
data = scraper.scrape_market_trends()
assert len(data) == 2
assert data[0]["name"] == "Product 1"
assert data[1]["name"] == "Product 2"
assert mock_scrape_source.call_count == 2
@patch('src.scraper.webdriver')
@patch('src.scraper.MarketTrendsScraper._scrape_with_requests')
def test_scrape_source_with_requests(self, mock_scrape_requests, mock_webdriver, sample_config):
"""Test scraping a source using requests."""
scraper = MarketTrendsScraper(sample_config)
mock_scrape_requests.return_value = [{"name": "Test Product", "price": 19.99}]
source = sample_config["sources"][0]
data = scraper._scrape_source(source)
assert len(data) == 1
assert data[0]["name"] == "Test Product"
mock_scrape_requests.assert_called_once_with(source)
@patch('src.scraper.webdriver')
@patch('src.scraper.MarketTrendsScraper._scrape_with_selenium')
def test_scrape_source_with_selenium(self, mock_scrape_selenium, mock_webdriver, sample_config):
"""Test scraping a source using Selenium."""
scraper = MarketTrendsScraper(sample_config)
mock_scrape_selenium.return_value = [{"name": "Test Product", "price": 19.99}]
# Configure source to use Selenium
source = sample_config["sources"][0]
source["use_selenium"] = True
data = scraper._scrape_source(source)
assert len(data) == 1
assert data[0]["name"] == "Test Product"
mock_scrape_selenium.assert_called_once_with(source)
@patch('src.scraper.webdriver')
@patch('builtins.open', new_callable=Mock)
def test_save_data_csv(self, mock_open, mock_webdriver, sample_config):
"""Test saving data to CSV file."""
scraper = MarketTrendsScraper(sample_config)
data = [
{"name": "Product 1", "price": 19.99, "source": "Test"},
{"name": "Product 2", "price": 29.99, "source": "Test"}
]
with patch.object(pd.DataFrame, 'to_csv') as mock_to_csv:
scraper.save_data(data, "test_output.csv")
mock_to_csv.assert_called_once_with("test_output.csv", index=False)
@patch('src.scraper.webdriver')
@patch('builtins.open', new_callable=Mock)
def test_save_data_json(self, mock_open, mock_webdriver, sample_config):
"""Test saving data to JSON file."""
scraper = MarketTrendsScraper(sample_config)
data = [
{"name": "Product 1", "price": 19.99, "source": "Test"},
{"name": "Product 2", "price": 29.99, "source": "Test"}
]
with patch.object(pd.DataFrame, 'to_json') as mock_to_json:
scraper.save_data(data, "test_output.json")
mock_to_json.assert_called_once()
@patch('src.scraper.webdriver')
def test_analyze_trends(self, mock_webdriver, sample_config):
"""Test trend analysis."""
scraper = MarketTrendsScraper(sample_config)
data = [
{"name": "Product 1", "price": 19.99, "rating": 4.5, "source": "Source A"},
{"name": "Product 2", "price": 29.99, "rating": 3.8, "source": "Source A"},
{"name": "Product 3", "price": 39.99, "rating": 4.2, "source": "Source B"},
{"name": "Product 4", "price": 49.99, "rating": 4.7, "source": "Source B"}
]
analysis = scraper.analyze_trends(data)
assert analysis["total_products"] == 4
assert "price_analysis" in analysis
assert "rating_analysis" in analysis
assert "sources" in analysis
assert analysis["sources"]["Source A"] == 2
assert analysis["sources"]["Source B"] == 2
assert analysis["price_analysis"]["average_price"] == 34.99
assert analysis["price_analysis"]["min_price"] == 19.99
assert analysis["price_analysis"]["max_price"] == 49.99
assert analysis["rating_analysis"]["average_rating"] == 4.3
@patch('src.scraper.webdriver')
def test_analyze_trends_empty_data(self, mock_webdriver, sample_config):
"""Test trend analysis with empty data."""
scraper = MarketTrendsScraper(sample_config)
analysis = scraper.analyze_trends([])
assert "error" in analysis
assert analysis["error"] == "No data available for analysis"
@patch('src.scraper.webdriver')
@patch('builtins.open', new_callable=Mock)
def test_save_analysis(self, mock_open, mock_webdriver, sample_config):
"""Test saving analysis results."""
scraper = MarketTrendsScraper(sample_config)
analysis = {"total_products": 4, "average_price": 34.99}
with patch('json.dump') as mock_json_dump:
scraper.save_analysis(analysis, "test_analysis.json")
mock_json_dump.assert_called_once()
@patch('src.scraper.webdriver')
def test_close(self, mock_webdriver, sample_config):
"""Test closing browser and session."""
mock_driver = Mock()
mock_webdriver.Chrome.return_value = mock_driver
scraper = MarketTrendsScraper(sample_config)
scraper.close()
mock_driver.quit.assert_called_once()
@patch('src.scraper.webdriver')
def test_context_manager(self, mock_webdriver, sample_config):
"""Test using scraper as context manager."""
mock_driver = Mock()
mock_webdriver.Chrome.return_value = mock_driver
with MarketTrendsScraper(sample_config) as scraper:
assert scraper is not None
mock_driver.quit.assert_called_once()