Optimizing Data Processing for Product Comparison Script (Python)

I have a Python script that scrapes product data from multiple websites and performs various processing steps before saving the results as a comparison table (CSV and Excel). However, the data processing part is taking a long time, especially for large datasets. I’m looking for ways to optimize the script for faster execution.

Here’s a breakdown of the processing steps:

Cleaning product names: This involves removing unnecessary patterns in hebrew using expressions and string manipulations.
Fuzzy matching: Product names are compared against a reference list using fuzzywuzzy to identify potential matches, especially for slightly different product descriptions.
Duplicate checking and merging: Duplicate product names are identified and merged, keeping the most complete information from each website.the full code:
from bs4 import BeautifulSoup
import pandas as pd
import requests
import re
import time
import unicodedata
import json
import lxml
from datetime import datetime
from fuzzywuzzy import process

ronis_df = pd.read_csv("ronis_products.csv")
ronis_df.set_index("שם", inplace=True)
ronis_products_list = ronis_df.index.tolist()


# scrape from xxxxxx.co.il

paneco_base_url = "https://www.xxxxxx.co.il/"
categories = ["alcohol", "wine", "pack", "אביזרים", "בירה-2", "משקאות-אנרגיה"]


def scrape_catagory_page(category, page_number):
    url = f"{paneco_base_url}{category}?p={page_number}"
    response = requests.get(url)
    time.sleep(3)
    soup = BeautifulSoup(response.text, "html.parser")
    items = soup.find_all("li", class_="item product product-item")

    product_data = {}
    for item in items:
        name = item.find("a", class_="product-item-link").getText().strip()
        price = re.sub(r'\xa0', '', re.sub(r'\u200F', '', f"₪ "
        f"{item.find('span', class_='price-wrapper').find('span', class_='price').getText().strip()}"))

        if item.find('div', class_='value'):
            units = item.find('div', class_='value').getText()
            name = f"{name} {units}"

        # Checks for exiting key before adding.
        if name not in product_data:
            product_data[name] = {}
        product_data[name][paneco_base_url] = price
    return product_data


master_product_data = {}
for category in categories:
    page_number = 1
    while True:
        category_data = scrape_catagory_page(category, page_number)
        if not category_data:
            break

        master_product_data.update(category_data)
        page_number += 1
print(f"finished scraping: {xxxxxx_base_url}")
# scrape from xxxxxx.co.il

xxxxxx_base_url = "https://www.xxxxxx.co.il/"


def scrape_wineroute_page(xxxxxx_page_number):
    url = f"{wineroute_base_url}search?keyword=&page={wineroute_page_number}"
    response = requests.get(url)
    soup = BeautifulSoup(response.text, "html.parser")
    time.sleep(2)
    wineroute_items = soup.find_all("div", class_="saleitem col-xs-6 col-sm-3")

    product_data = {}
    for item in wineroute_items:
        try:
            name = item.find("h3", class_="name").getText().strip()
            price = item.find("div", class_="price red").getText().strip()
        # to avoid crashing if element is not there!
        except AttributeError:
            price = "N/A"

        if name not in product_data:
            product_data[name] = {}
        product_data[name][wineroute_base_url] = price
    return product_data


wineroute_page_number = 1
while True:
    page_data = scrape_wineroute_page(xxxxxx_page_number)
    if not page_data:
        break
    master_product_data.update(page_data)
    wineroute_page_number += 1

print(f"finished scraping: {xxxxxx_base_url}")
the_importer_base_url = "https://www.xxxxxx.co.il/"


def scrape_importer_page(xxxxxx_page_number):
    url = f"{the_xxxxxx_base_url}catalogsearch/result/?q='+'&p={importer_page_number}"
    try:
        response = requests.get(url)
        response.raise_for_status()
        soup = BeautifulSoup(response.text, "html.parser")
    except requests.exceptions.RequestException as e:
        print(f"Error encountered while scraping page {importer_page_number}: {e}")
        return {}, True

    time.sleep(2)
    importer_items = soup.find_all("div",
                        class_="product-info flex flex-col grow max-md:pl-[22px] pr-0 md:pr-4 md:pl-4 items-center")
    product_data = {}
    for item in importer_items:
        name = (item.find("a", class_="product-item-link leading-[1.35rem] max-md:pr-11 max-md:min-h-[41px]").
                getText().strip())
        price = re.sub(r'\xa0', '', re.sub(r'\u200F', '', item.find("span", class_="price").
                                           getText().strip()))
        if name not in product_data:
            product_data[name] = {}
        product_data[name][the_importer_base_url] = price
    return product_data, False


xxxxxx_page_number = 1
while True:
    page_data, page_connection_error = scrape_importer_page(xxxxxx_page_number)
    if page_connection_error:
        print("Connection error. Retryint...")
        time.sleep(10)
        continue
    if not page_data:
        break
    master_product_data.update(page_data)
    importer_page_number += 1

print(f"finished scraping: {the_xxxxxx_base_url}")
# scrape from https://api.xxxxxx.co.il/api/products?id by api
def scrape_from_haturki_api():
    haturki_url = "https://api.haturki-drinks.co.il/api/products?id"
    response = requests.get(haturki_url)

    if response.status_code != 200:
        print(f"Error: {response.status_code} {response.reason}")

    data = json.loads(response.text)
    result = data["products"]

    product_data = {}
    for item in result:
        name = item['name']
        price = item['price']
        if name not in product_data:
            product_data[name] = {}
        product_data[name][haturki_url] = price
    return product_data


website_data = scrape_from_xxxxxxi_api()
master_product_data.update(website_data)
print(f"finished scraping: xxxxxxx")
# scrape mashkaot.co.il
mashkaot_base_url = "https://www.xxxxxxx.co.il/"


def scrape_xxxxxxx():
    user_agent = "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:129.0) Gecko/20100101 Firefox/129.0"
    headers = {"User-Agent": user_agent}
    url = f"{xxxxxxx_base_url}index.php?dir=site&page=catalog&op=search&q=''"
    try:

        response = requests.get(url, headers=headers)
        response.raise_for_status()
        soup = BeautifulSoup(response.text, 'html.parser')
    except requests.exceptions.RequestException as e:
            print(f"Error encountered while scraping page {scrape_mashkaot}: {e}")
            return {}, True
    items = soup.find_all("div", class_="product-box__desc product-box-info")
    product_data = {}
    for item in items:
        name = item.find("div", class_="product-box-info__title").getText().strip()
        price = item.find("span", class_="product-box-info__price-new").getText().strip()
        if name not in product_data:
            product_data[name] = {}
        product_data[name][mashkaot_base_url] = price
    return product_data, False


run = 1
while run == 1:
    run += 1
    page_data, page_connection_error = scrape_xxxxxxx()
    if page_connection_error:
        print(f"connection error!, continuing in the program skipping on {xxxxxxx_base_url}")
        continue
    if not page_data:
        break
    master_product_data.update(page_data)

print(f"finished scraping: {xxxxxxx_base_url}")
def parse_sitemap(sitemap_urls):
  """function to extract product url from sitemap_index.html"""
  final_urls = []
  for sitemap in sitemap_urls:
    response = requests.get(sitemap)
    response.raise_for_status()

    soup = BeautifulSoup(response.text, 'xml')  # Parse as XML
  # Find all 'loc' tags within 'url' tags
    for url in soup.find_all('url'):
        final_urls.append(url.findNext('loc').getText())
  return final_urls

# scrape from wine-direct


sitemap_urls = ['https://xxxxxxx.co.il/product-sitemap.xml', 'https://xxxxxxx.co.il/product-sitemap2.xml']
extracted_urls = parse_sitemap(sitemap_urls)


def scrape_wine_direct(extracted_urls):
  product_data = {}
  for url in extracted_urls:
    if 'product/' in url:
      response = requests.get(url)
      soup = BeautifulSoup(response.text, 'html.parser')
      name = soup.find("div", class_="elementor-jet-single-title jet-woo-builder").getText().strip()
      price = soup.find("span", class_="woocommerce-Price-amount amount").getText().strip()

      if name not in product_data:
        product_data[name] = {}
      product_data[name]["https://xxxxxxx.co.il/"] = price

  return product_data


page_data = scrape_xxxx_direct(extracted_urls)
master_product_data.update(page_data)
print(f"finished scraping: xxxxxxx")

xxxxxx_sitemap_urls = ['https://xxxxxxx.co.il/product-sitemap.xml',
                           'https://xxxxxx.co.il/product-sitemap2.xml']
xxxxxx_extracted_urls = parse_sitemap(xxxxxxx_sitemap_urls)

# scrape from xxxxxxxx


def scrape_xxxxxx(xxxxxx_extracted_urls):
  product_data = {}
  for url in xxxxxxx_extracted_urls:
    if 'product/%' in url:
      response = requests.get(url)
      soup = BeautifulSoup(response.text, "html.parser")
      name = (soup.find("h1", class_="product_title entry-title elementor-heading-title elementor-size-default").
              getText().strip())
      price = soup.find("p", class_="price").getText().strip()

      if name not in product_data:
        product_data[name] = {}
      product_data[name]["https://xxxxxxx.co.il/"] = price

  return product_data


page_data = scrape_xxxx_direct(extracted_urls)
master_product_data.update(page_data)
print(f"finished scraping: xxxxxxxx")


def process_data(master_product_data):
    """Processes the product data and creates a restructured DataFrame.

    Args:
        master_product_data (dict): The master product data dictionary.

    Returns:
        pd.DataFrame: A DataFrame with products as rows, websites as columns, and prices as values.
    """

    cleaned_data = []
    for product, prices in master_product_data.items():
        cleaned_name = clean_hebrew_name(product)
        print(f"inside process_data after the running of clean_hebrew_name, output: {cleaned_name}")
        row = {'Product Name': cleaned_name}
        for website, price in prices.items():
            row[website] = price
        cleaned_data.append(row)

    df = pd.DataFrame(cleaned_data)
    df = df.set_index('Product Name')

    return df


def clean_hebrew_name(name):
    print(f"clean_hebrew_name started")
    """Cleans a Hebrew product name by removing specific patterns.

    Args:
        name (str): The Hebrew product name to clean.

    Returns:
        str: The cleaned Hebrew product name.
    """

    # Remove specific patterns (adjust as needed)
    patterns = [
        "עד הבית ...", "*מהדורה מיוחדת*","(טן)", "זמין לקניה במרווחים של 6", "*מהדורה 23*", "*מהדורה 24*"
        "(ראיין ריינולדס)", "*מחיר מבצע מוגבל ל- 12 יחידות ללקוחות רשומים בלבד",
        "לקוקטיילים", "[יבוא מקביל]", "-", "(GINARTE)", "[ בקבוק קרמיקה לאספנים ]","בק' קרמיקה",
        "*מוגבל לזוג בק' ללקו..", "*מוגבל לקניה של בקבוק אחד ללקוח רשום","*מחיר מבצע מוגבל ל 6 יחידות ללקוחות רשומים בלבד" ,
        "+2 כוסות במארז מתנה", "*לא למכירה בבודדים*", "(משחקי הכס)"," +כוס מתנה",
        "*לא למכירה בבודד...", "*למכירה בארגזים ...", "מהדורת חורף מ...", " - למכירה בארגזים 24 בק ב...",
        " *מוגבל לקניה של בקבוק אחד ללקוח רשום", "!", "*נדיר*", "*מחיר מבצע מוגבל ל- 6 יחידות ללקוחות רשומים בלבד",
        " +כוס במארז מתנה", "*לאספנים*", "*מהדורה מוגבלת*", "*מהדורת אספנים*", "*לא למכירה ...", "*לא למכירה ב...",
         "* מחיר מבצע מוגבל ל 12 בקבוקים, לחברי מועדון בלבד. ", "*לא למכירה בבודדי..." , "*למכירה בארגזים בלבד...",
        "*מוגבל לזוג בק' ללקוח*", "מוגבל לבקבוק 1 ללקוח","* מחיר מבצע מוגבל ל- 12 בקבוקים, לחברי מועדון בלבד. ",
        " בארגזים 24 בק ב...",
    ]

    for pattern in patterns:
        if pattern in name:
            print(f"this is the pattern i am cleaning from: {pattern} from {name}")
            name = name.replace(pattern, "")
    # Replace multiple spaces with a single space
    name = re.sub(r"\s+", " ", name)
    re.sub(r'\xa0', '', re.sub(r'\u200F', '', name))
    print(f"this is the name after cleaning: {name}")
    return name


# Clean product names
df_cleaned_data = process_data(master_product_data)

for index in df_cleaned_data.index:
    print(f"{index} in df_cleaned_data.index")
    try:
        best_match, score = process.extractOne(index, ronis_products_list, scorer=process.fuzz.ratio)
        if index == best_match:
            pass
        if score >= 90:
            df_cleaned_data.rename(index={index: best_match}, inplace=True)
        else:
            pass
    except TypeError:
        continue

# checks for duplicates
duplicates = df_cleaned_data.index.duplicated()

duplicate_index_names = df_cleaned_data[duplicates].index.tolist()


for index in duplicate_index_names:

    df_cleaned_data.groupby(df_cleaned_data.index).first().fillna('')

# making a new dataframe after the first fuzzy matching
merged_df = df_cleaned_data.groupby(df_cleaned_data.index).first().fillna('N\A')

# iterate over unchanged index that is not in roni's product list if applicable
iterated_index = []
for index in merged_df.index:

    iterated_index.append(index)

    try:
        if index not in ronis_products_list:

            filtered_index_list = [other_index for other_index in merged_df.index if other_index not in iterated_index]

            best_match, score = process.extractOne(index, filtered_index_list, scorer=process.fuzz.ratio)

            if score >= 90:
                merged_df.rename(index={index: best_match}, inplace=True)

    except TypeError:
        continue


duplicates_of_merged = merged_df.index.duplicated()
duplicates_of_merged_index_names = merged_df[duplicates_of_merged].index.tolist()

for index in duplicates_of_merged_index_names:
    merged_df.groupby(merged_df.index).first().fillna('')

# setting the final dataframe
final_df = merged_df.groupby(merged_df.index).first().fillna('N\A')

# saving the final dataframe to csv and excel
final_df.to_csv(f"{datetime.now().strftime('%d-%m-%Y')}.product_comparison.csv")
final_df.to_excel(f"{datetime.now().strftime('%d-%m-%Y')}.product_comparison.xlsx")

total runing time: approx 4 hours.

To optimize your Python script for faster execution, here are several strategies you can implement:

1. Reduce requests and parsing overhead

  • Asynchronous requests: Use asynchronous requests with libraries like aiohttp or httpx to perform multiple HTTP requests concurrently, instead of sequentially.
  • Batch scraping: Instead of scraping individual pages one by one, consider batching them. For instance, scrape multiple pages in parallel if the website allows it.

Example with aiohttp:

python

Copy code

import aiohttp
import asyncio

async def scrape_page(session, url):
    async with session.get(url) as response:
        return await response.text()

async def scrape_pages(base_url, categories):
    async with aiohttp.ClientSession() as session:
        tasks = []
        for category in categories:
            for page in range(1, 5):  # Assuming 5 pages per category
                url = f"{base_url}{category}?p={page}"
                tasks.append(scrape_page(session, url))
        results = await asyncio.gather(*tasks)
        return results

# Run async scraping
results = asyncio.run(scrape_pages(paneco_base_url, categories))

2. Optimize Fuzzy Matching

  • Leverage RapidFuzz: Replace fuzzywuzzy with RapidFuzz, which is much faster and optimized for string matching.
  • Limit comparison set size: For large datasets, reduce the number of comparisons by filtering potential matches based on the initial characters of product names.

Example of switching to RapidFuzz:

python

Copy code

from rapidfuzz import process, fuzz

# Example of faster fuzzy matching
best_match, score = process.extractOne(index, ronis_products_list, scorer=fuzz.ratio)

3. DataFrame Optimizations

  • Use merge instead of groupby: Merging DataFrames can be faster than using .groupby() for duplicate checking.
  • Avoid multiple fillna() calls: Instead of chaining .fillna() repeatedly, try filling all missing values in a single step.

4. Parallelize Data Processing

  • multiprocessing or concurrent.futures: Use Python’s multiprocessing library to parallelize heavy tasks like data cleaning and fuzzy matching.

Example with concurrent.futures:

python

Copy code

import concurrent.futures

def clean_data_in_parallel(data_chunk):
    cleaned_data = []
    for product, prices in data_chunk.items():
        cleaned_name = clean_hebrew_name(product)
        row = {'Product Name': cleaned_name, **prices}
        cleaned_data.append(row)
    return cleaned_data

with concurrent.futures.ProcessPoolExecutor() as executor:
    chunks = [dict(list(master_product_data.items())[i:i+1000]) for i in range(0, len(master_product_data), 1000)]
    cleaned_results = list(executor.map(clean_data_in_parallel, chunks))

5. Caching Results

  • Cache previous requests: Use a cache to avoid re-scraping identical pages when rerunning the script. requests_cache is a useful library for this purpose.

6. Optimize Regular Expressions

  • Regular expressions can be slow for large text operations. Consider using compiled regular expressions or simpler string manipulations where possible.

7. Profile the Script

  • Use cProfile or line_profiler to identify the most time-consuming parts of your code, and focus your optimization efforts there.