Skip to content

Latest commit

 

History

History
187 lines (148 loc) · 7.08 KB

File metadata and controls

187 lines (148 loc) · 7.08 KB

How to Scrape Amazon Best Sellers in Electronics - Aug 2023

If you are interested in e-commerce then Amazon is the jackpot of e-commerce data. As with most websites, Amazon does not provide API access to their data which means you’re left to figure out the best ways to scrape the data you need. Today I wanted the best selling items in the electronics category for August 2023. I’m happy to report it wasn’t that hard to scrape!

Unlike some heavily protected websites, I didn't need to analyze extensive network request data, I didn't need selenium, and I didn't need to use a proxy to get the original data. I did however need to rate limit my requests. Amazon is big on request throttling.

Below is the code I used to scrape the site. Let me know what you think.

Code Snippet

# amazon.py
from pythonjsonlogger import jsonlogger
from aiolimiter import AsyncLimiter
from urllib.parse import urlparse
import asyncio
import aiohttp
import logging
import time
from pprint import pprint as pp
import random
import aiofiles
import typer
from typing_extensions import Annotated
from pathlib import Path
import os
from bs4 import BeautifulSoup

# Configures a json style logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logHandler = logging.StreamHandler()
formatter = jsonlogger.JsonFormatter()
logHandler.setFormatter(formatter)
logger.addHandler(logHandler)

async def HTTPClientDownloader(url, settings):
    max_tcp_connections = settings['max_tcp_connections']

    # uses the rate limiter
    async with settings["rate"]:

        # open a session to make the requests
        connector = aiohttp.TCPConnector(limit=max_tcp_connections)
        async with aiohttp.ClientSession(connector=connector) as session:
            start_time = time.perf_counter()  # Start timer

            proxy = None
            html = None

            # makes a GET request to the target website
            async with session.get(url, proxy=proxy, headers=settings['headers']) as response:
                html = await response.text()
                end_time = time.perf_counter()  # Stop timer
                elapsed_time = end_time - start_time  # Calculate time taken to get response
                status = response.status

                logger.info(
                    msg="Request complete.",
                    extra={
                        "status": status,
                        "url": url,
                        "elapsed_time": f"{elapsed_time:4f}",
                    }
                )

                # save the html in a cache folder. We want this here so that if we replay
                # the code we can fetch fro mthe local cache instead of fetching from the 
                # server every time.
                loc = os.path.join(settings['cache_dir'], settings["output_path"])
                async with aiofiles.open(loc, mode="w") as fd:
                    await fd.write(html)

async def dispatch(url, settings):
    await HTTPClientDownloader(url, settings)

# the location of where our async tasks are created and invoked
async def main(start_urls, settings):
    tasks = []
    for url in start_urls:
        task = asyncio.create_task(dispatch(url, settings))
        tasks.append(task)

    results = await asyncio.gather(*tasks)
    print(f"total requests", len(results))


# a cli interface to make the program user friendly
cli_app = typer.Typer()
@cli_app.command("amazon")
def amazon(
    url: Annotated[str, typer.Option("--url", "-u", help="url")],
    out: Annotated[str, typer.Option("--out", "-o", help="output path and file name")],
    use_cache: Annotated[bool, typer.Option(help="Read from the cached version of the page")] = False,
    max_tcp_connections: Annotated[int, typer.Option("--max-tcp-conn", help="max tcp connections")] = 1,
    rate: Annotated[int, typer.Option(help="num of requests per min")] = 1,
):
    def read_from_cache(file_path):
        html_content = None
        with open(file_path, "r") as file:
            html_content = file.read()
            # print(html_content)
            print("Fetching from cache")
        return html_content    


    # cache procedures
    host = urlparse(url).hostname
    directory = "cache"
    current_directory = Path.cwd()
    cache_dir = current_directory / directory / host
    cached_file = Path(cache_dir / out)


    user_agents = [
        'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36' # works!
    ]
    user_agent = random.choice(user_agents)
    settings = {
        "max_tcp_connections": max_tcp_connections,
        "proxies": [
            "http://localhost:8765",
        ],

        "headers": {
            'user-agent': user_agent,
            'accept-language': 'en',
            'accept-encoding': 'gzip, deflate',
            'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8'
        },
        "cache_dir": cache_dir,
        "output_path": out,
        "rate": AsyncLimiter(rate, 60), # 10 reqs/min
    }

    # make sure the cache directory exists
    if not cache_dir.exists():
        cache_dir.mkdir(parents=True)

    # get the resulting HTML from the cache or make a GET request
    html = None
    if use_cache:
        html = read_from_cache(cached_file)
    else:
        # use the asyncio runtime to make a request
        asyncio.run(main([url], settings))
        # read the results from the cache folder
        html = read_from_cache(cached_file)

    # once you have the HTML you can parse the document to your liking
    # from here you can parse for the data you want
    soup = BeautifulSoup(html, 'html.parser')
    # the best seller items are fixed with this ID however it could change in the future
    items = soup.find_all("div", attrs={"id":'gridItemRoot'})
    print(items)


if __name__ == '__main__':
    cli_app()

Use the following command to run the cli script and fetch that amazon page.

python amazon.py -u https://www.amazon.com/Best-Sellers-Electronics/zgbs/electronics/ref=zg_bs_nav_0 -o best-sellers-electronics.html

About Me

I'm Steven Natera, an software engineer in love with all things web scraping and distributed systems. I worked at Twitter as an SRE migrating 500,000 bare metal servers from Aurora Mesos to Kubernetes. Now I spend my time writing web scrapers to scrape data at scale.

Contact me on Twitter @stevennatera if you want to say hello.

Web Scraping For Beginners Course

The art of web scraping takes years to master. But if you need data today, practicing for years is out of the question. The good news is I've created a short course to help you learn the essentials to web scraping so you can get your data fast without spending so much money or without wasting your precious time.

In the course you'll learn:

  • how to analyze a website to determine the best way to scrape data
  • how to use proxies to scrape to bypass anti-bot protection (Cloudflare)
  • how to scrape web sites with Javascript
  • where to store your data
  • ... and more

For the complete table of contents, click here to learn more.