I have a Python script that scrapes product data from multiple websites and performs various processing steps before saving the results as a comparison table (CSV and Excel). However, the data processing part is taking a long time, especially for large datasets. I’m looking for ways to optimize the script for faster execution.
Here’s a breakdown of the processing steps:
Cleaning product names: This involves removing unnecessary patterns in hebrew using expressions and string manipulations.
Fuzzy matching: Product names are compared against a reference list using fuzzywuzzy to identify potential matches, especially for slightly different product descriptions.
Duplicate checking and merging: Duplicate product names are identified and merged, keeping the most complete information from each website.the full code:
from bs4 import BeautifulSoup
import pandas as pd
import requests
import re
import time
import unicodedata
import json
import lxml
from datetime import datetime
from fuzzywuzzy import process
ronis_df = pd.read_csv("ronis_products.csv")
ronis_df.set_index("שם", inplace=True)
ronis_products_list = ronis_df.index.tolist()
# scrape from xxxxxx.co.il
paneco_base_url = "https://www.xxxxxx.co.il/"
categories = ["alcohol", "wine", "pack", "אביזרים", "בירה-2", "משקאות-אנרגיה"]
def scrape_catagory_page(category, page_number):
url = f"{paneco_base_url}{category}?p={page_number}"
response = requests.get(url)
time.sleep(3)
soup = BeautifulSoup(response.text, "html.parser")
items = soup.find_all("li", class_="item product product-item")
product_data = {}
for item in items:
name = item.find("a", class_="product-item-link").getText().strip()
price = re.sub(r'\xa0', '', re.sub(r'\u200F', '', f"₪ "
f"{item.find('span', class_='price-wrapper').find('span', class_='price').getText().strip()}"))
if item.find('div', class_='value'):
units = item.find('div', class_='value').getText()
name = f"{name} {units}"
# Checks for exiting key before adding.
if name not in product_data:
product_data[name] = {}
product_data[name][paneco_base_url] = price
return product_data
master_product_data = {}
for category in categories:
page_number = 1
while True:
category_data = scrape_catagory_page(category, page_number)
if not category_data:
break
master_product_data.update(category_data)
page_number += 1
print(f"finished scraping: {xxxxxx_base_url}")
# scrape from xxxxxx.co.il
xxxxxx_base_url = "https://www.xxxxxx.co.il/"
def scrape_wineroute_page(xxxxxx_page_number):
url = f"{wineroute_base_url}search?keyword=&page={wineroute_page_number}"
response = requests.get(url)
soup = BeautifulSoup(response.text, "html.parser")
time.sleep(2)
wineroute_items = soup.find_all("div", class_="saleitem col-xs-6 col-sm-3")
product_data = {}
for item in wineroute_items:
try:
name = item.find("h3", class_="name").getText().strip()
price = item.find("div", class_="price red").getText().strip()
# to avoid crashing if element is not there!
except AttributeError:
price = "N/A"
if name not in product_data:
product_data[name] = {}
product_data[name][wineroute_base_url] = price
return product_data
wineroute_page_number = 1
while True:
page_data = scrape_wineroute_page(xxxxxx_page_number)
if not page_data:
break
master_product_data.update(page_data)
wineroute_page_number += 1
print(f"finished scraping: {xxxxxx_base_url}")
the_importer_base_url = "https://www.xxxxxx.co.il/"
def scrape_importer_page(xxxxxx_page_number):
url = f"{the_xxxxxx_base_url}catalogsearch/result/?q='+'&p={importer_page_number}"
try:
response = requests.get(url)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
except requests.exceptions.RequestException as e:
print(f"Error encountered while scraping page {importer_page_number}: {e}")
return {}, True
time.sleep(2)
importer_items = soup.find_all("div",
class_="product-info flex flex-col grow max-md:pl-[22px] pr-0 md:pr-4 md:pl-4 items-center")
product_data = {}
for item in importer_items:
name = (item.find("a", class_="product-item-link leading-[1.35rem] max-md:pr-11 max-md:min-h-[41px]").
getText().strip())
price = re.sub(r'\xa0', '', re.sub(r'\u200F', '', item.find("span", class_="price").
getText().strip()))
if name not in product_data:
product_data[name] = {}
product_data[name][the_importer_base_url] = price
return product_data, False
xxxxxx_page_number = 1
while True:
page_data, page_connection_error = scrape_importer_page(xxxxxx_page_number)
if page_connection_error:
print("Connection error. Retryint...")
time.sleep(10)
continue
if not page_data:
break
master_product_data.update(page_data)
importer_page_number += 1
print(f"finished scraping: {the_xxxxxx_base_url}")
# scrape from https://api.xxxxxx.co.il/api/products?id by api
def scrape_from_haturki_api():
haturki_url = "https://api.haturki-drinks.co.il/api/products?id"
response = requests.get(haturki_url)
if response.status_code != 200:
print(f"Error: {response.status_code} {response.reason}")
data = json.loads(response.text)
result = data["products"]
product_data = {}
for item in result:
name = item['name']
price = item['price']
if name not in product_data:
product_data[name] = {}
product_data[name][haturki_url] = price
return product_data
website_data = scrape_from_xxxxxxi_api()
master_product_data.update(website_data)
print(f"finished scraping: xxxxxxx")
# scrape mashkaot.co.il
mashkaot_base_url = "https://www.xxxxxxx.co.il/"
def scrape_xxxxxxx():
user_agent = "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:129.0) Gecko/20100101 Firefox/129.0"
headers = {"User-Agent": user_agent}
url = f"{xxxxxxx_base_url}index.php?dir=site&page=catalog&op=search&q=''"
try:
response = requests.get(url, headers=headers)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
except requests.exceptions.RequestException as e:
print(f"Error encountered while scraping page {scrape_mashkaot}: {e}")
return {}, True
items = soup.find_all("div", class_="product-box__desc product-box-info")
product_data = {}
for item in items:
name = item.find("div", class_="product-box-info__title").getText().strip()
price = item.find("span", class_="product-box-info__price-new").getText().strip()
if name not in product_data:
product_data[name] = {}
product_data[name][mashkaot_base_url] = price
return product_data, False
run = 1
while run == 1:
run += 1
page_data, page_connection_error = scrape_xxxxxxx()
if page_connection_error:
print(f"connection error!, continuing in the program skipping on {xxxxxxx_base_url}")
continue
if not page_data:
break
master_product_data.update(page_data)
print(f"finished scraping: {xxxxxxx_base_url}")
def parse_sitemap(sitemap_urls):
"""function to extract product url from sitemap_index.html"""
final_urls = []
for sitemap in sitemap_urls:
response = requests.get(sitemap)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'xml') # Parse as XML
# Find all 'loc' tags within 'url' tags
for url in soup.find_all('url'):
final_urls.append(url.findNext('loc').getText())
return final_urls
# scrape from wine-direct
sitemap_urls = ['https://xxxxxxx.co.il/product-sitemap.xml', 'https://xxxxxxx.co.il/product-sitemap2.xml']
extracted_urls = parse_sitemap(sitemap_urls)
def scrape_wine_direct(extracted_urls):
product_data = {}
for url in extracted_urls:
if 'product/' in url:
response = requests.get(url)
soup = BeautifulSoup(response.text, 'html.parser')
name = soup.find("div", class_="elementor-jet-single-title jet-woo-builder").getText().strip()
price = soup.find("span", class_="woocommerce-Price-amount amount").getText().strip()
if name not in product_data:
product_data[name] = {}
product_data[name]["https://xxxxxxx.co.il/"] = price
return product_data
page_data = scrape_xxxx_direct(extracted_urls)
master_product_data.update(page_data)
print(f"finished scraping: xxxxxxx")
xxxxxx_sitemap_urls = ['https://xxxxxxx.co.il/product-sitemap.xml',
'https://xxxxxx.co.il/product-sitemap2.xml']
xxxxxx_extracted_urls = parse_sitemap(xxxxxxx_sitemap_urls)
# scrape from xxxxxxxx
def scrape_xxxxxx(xxxxxx_extracted_urls):
product_data = {}
for url in xxxxxxx_extracted_urls:
if 'product/%' in url:
response = requests.get(url)
soup = BeautifulSoup(response.text, "html.parser")
name = (soup.find("h1", class_="product_title entry-title elementor-heading-title elementor-size-default").
getText().strip())
price = soup.find("p", class_="price").getText().strip()
if name not in product_data:
product_data[name] = {}
product_data[name]["https://xxxxxxx.co.il/"] = price
return product_data
page_data = scrape_xxxx_direct(extracted_urls)
master_product_data.update(page_data)
print(f"finished scraping: xxxxxxxx")
def process_data(master_product_data):
"""Processes the product data and creates a restructured DataFrame.
Args:
master_product_data (dict): The master product data dictionary.
Returns:
pd.DataFrame: A DataFrame with products as rows, websites as columns, and prices as values.
"""
cleaned_data = []
for product, prices in master_product_data.items():
cleaned_name = clean_hebrew_name(product)
print(f"inside process_data after the running of clean_hebrew_name, output: {cleaned_name}")
row = {'Product Name': cleaned_name}
for website, price in prices.items():
row[website] = price
cleaned_data.append(row)
df = pd.DataFrame(cleaned_data)
df = df.set_index('Product Name')
return df
def clean_hebrew_name(name):
print(f"clean_hebrew_name started")
"""Cleans a Hebrew product name by removing specific patterns.
Args:
name (str): The Hebrew product name to clean.
Returns:
str: The cleaned Hebrew product name.
"""
# Remove specific patterns (adjust as needed)
patterns = [
"עד הבית ...", "*מהדורה מיוחדת*","(טן)", "זמין לקניה במרווחים של 6", "*מהדורה 23*", "*מהדורה 24*"
"(ראיין ריינולדס)", "*מחיר מבצע מוגבל ל- 12 יחידות ללקוחות רשומים בלבד",
"לקוקטיילים", "[יבוא מקביל]", "-", "(GINARTE)", "[ בקבוק קרמיקה לאספנים ]","בק' קרמיקה",
"*מוגבל לזוג בק' ללקו..", "*מוגבל לקניה של בקבוק אחד ללקוח רשום","*מחיר מבצע מוגבל ל 6 יחידות ללקוחות רשומים בלבד" ,
"+2 כוסות במארז מתנה", "*לא למכירה בבודדים*", "(משחקי הכס)"," +כוס מתנה",
"*לא למכירה בבודד...", "*למכירה בארגזים ...", "מהדורת חורף מ...", " - למכירה בארגזים 24 בק ב...",
" *מוגבל לקניה של בקבוק אחד ללקוח רשום", "!", "*נדיר*", "*מחיר מבצע מוגבל ל- 6 יחידות ללקוחות רשומים בלבד",
" +כוס במארז מתנה", "*לאספנים*", "*מהדורה מוגבלת*", "*מהדורת אספנים*", "*לא למכירה ...", "*לא למכירה ב...",
"* מחיר מבצע מוגבל ל 12 בקבוקים, לחברי מועדון בלבד. ", "*לא למכירה בבודדי..." , "*למכירה בארגזים בלבד...",
"*מוגבל לזוג בק' ללקוח*", "מוגבל לבקבוק 1 ללקוח","* מחיר מבצע מוגבל ל- 12 בקבוקים, לחברי מועדון בלבד. ",
" בארגזים 24 בק ב...",
]
for pattern in patterns:
if pattern in name:
print(f"this is the pattern i am cleaning from: {pattern} from {name}")
name = name.replace(pattern, "")
# Replace multiple spaces with a single space
name = re.sub(r"\s+", " ", name)
re.sub(r'\xa0', '', re.sub(r'\u200F', '', name))
print(f"this is the name after cleaning: {name}")
return name
# Clean product names
df_cleaned_data = process_data(master_product_data)
for index in df_cleaned_data.index:
print(f"{index} in df_cleaned_data.index")
try:
best_match, score = process.extractOne(index, ronis_products_list, scorer=process.fuzz.ratio)
if index == best_match:
pass
if score >= 90:
df_cleaned_data.rename(index={index: best_match}, inplace=True)
else:
pass
except TypeError:
continue
# checks for duplicates
duplicates = df_cleaned_data.index.duplicated()
duplicate_index_names = df_cleaned_data[duplicates].index.tolist()
for index in duplicate_index_names:
df_cleaned_data.groupby(df_cleaned_data.index).first().fillna('')
# making a new dataframe after the first fuzzy matching
merged_df = df_cleaned_data.groupby(df_cleaned_data.index).first().fillna('N\A')
# iterate over unchanged index that is not in roni's product list if applicable
iterated_index = []
for index in merged_df.index:
iterated_index.append(index)
try:
if index not in ronis_products_list:
filtered_index_list = [other_index for other_index in merged_df.index if other_index not in iterated_index]
best_match, score = process.extractOne(index, filtered_index_list, scorer=process.fuzz.ratio)
if score >= 90:
merged_df.rename(index={index: best_match}, inplace=True)
except TypeError:
continue
duplicates_of_merged = merged_df.index.duplicated()
duplicates_of_merged_index_names = merged_df[duplicates_of_merged].index.tolist()
for index in duplicates_of_merged_index_names:
merged_df.groupby(merged_df.index).first().fillna('')
# setting the final dataframe
final_df = merged_df.groupby(merged_df.index).first().fillna('N\A')
# saving the final dataframe to csv and excel
final_df.to_csv(f"{datetime.now().strftime('%d-%m-%Y')}.product_comparison.csv")
final_df.to_excel(f"{datetime.now().strftime('%d-%m-%Y')}.product_comparison.xlsx")
total runing time: approx 4 hours.