Multithreading in Python Scraping

Multithreading in Python Scraping

Hello, I will describe how to implement threading in your Python scraping scripts.

In scraping, we use threading/async to scrape multiple pages simultaneously using proxies. Assume that we have 100 proxies and we will use all of them. We can simply put scrape x100 faster with 100 proxies. Let’s start to code.

I assumed that we have already crawled the site and fetched all the links from the site whom we will scrape the data and stored them in a list company_slugs_list.

import threading
import queue
import json
import csv
from requests.exceptions import ProxyError
from r_funcs import get_page, get_proxy
max_num_worker_threads = 99  # number of proxies
company_slugs_list = [ .. , .. , ...  ]

threads = []

Let’s create our normal functions and scraping functions. The response JSON will be sent as parameter to function (`company` variable) and we will fetch the details from that and store into output file.

def get_company_details(company: json):
    '''Fetch company details from the JSON response'''
    id = company.get('id', None)
    company_name = company.get('name', None)
    company_logo = company.get('logo', None)
    year_founded = company.get('year_founded', None)
    # ...
    # ...
    description = company.get('description', None)
    slogan = company.get('headline', None)

    row = [id, company_name, company_logo, slogan, description, year_founded]

    with open('output.csv', mode='a+', newline='', encoding='utf-32') as company_file:
        company_writer = csv.writer(company_file, quotechar='"')
        company_writer.writerow(row)

Then we create another function that sends a request with proxy and gets a response. Then we parse the response to JSON and send it as a parameter to get_company_details().

def do_work_company(endpoint, appointed_proxy):
    '''Sends requests with given proxy and extracts details from the JSON response'''
    proxies = {
        "http": f"http://username:pass@{appointed_proxy}",
        "https": f"http://username:pass@{appointed_proxy}"
    }

    r = get_page(page_link=endpoint, proxies=proxies, parameters={})
    try:
        company = r.json()
    except json.decoder.JSONDecodeError as json_error:
        with open('links_error_log.csv', mode='a+', newline='', ) as links_file:
            link_writer = csv.writer(links_file, delimiter=',', quotechar='"')
            link_writer.writerow([endpoint])
        company = {}
        print("Error:", json_error, '/', r.status_code, '/', str(appointed_proxy))

    get_company_details(company, link=endpoint, proxy=proxies)

Our functions are ready to work concurrently, we can create worker functions now. Now we can get values from the queue in each thread and start the task. This function has an infinite loop that gets the endpoint and the appointed proxy from the queue and gives them to the function above as parameters.

def worker_company():
    '''Worker function to scrape details of each company'''
    while True:
        item = q.get()
        if item is None:
            break
        ep = item[0] #endpoint
        proxy = item[1] # appointed proxy
        try:
            do_work_company(ep, proxy) # send to the funcion
        except ProxyError:
            raise ProxyError("Proxy related problem")
        except Exception:
            raise Exception("Can't scrape the link: ", ep, proxy)
        q.task_done()

We will create a queue with a maxsize = number of proxies = 100 and we will create 100 threads. Here we are giving `worker_company` function as a target/task to all of the threads.

q = queue.Queue(maxsize=max_num_worker_threads)
for i in range(max_num_worker_threads):
    t = threading.Thread(target=worker_company)
    t.start()
    threads.append(t)

Once threads finish their tasks then we will give another task with a proxy to each thread. Here I’m retrieving slugs from the list batch by batch and put it into a queue(thread-safe).

# Iterate over thousands of slugs batch by batch and scrape
batch_size = max_num_worker_threads
length = len(company_slugs_list)
for i in range(0, length, batch_size):
    companies_batch = company_slugs_list[i:i + batch_size]
    for idx, elem in enumerate(companies_batch):
        company_slug = elem
        proxy = get_proxy(idx)
        single_company_endpoint = f"https://api.unissu.com/vendors/{company_slug}/retrieve/"
        q.put((single_company_endpoint, proxy))
    q.join()

In the end, we will fill the queueNone to queue and break while loops inside worker functions and join threads.

Use lock mechanism to store data

Here we came to the most cumbersome part of threads. While one thread writes the data to a file, other threads shouldn’t write to the same file. You and your friend take a pencil and try to write an A4 page at the same time your own sentences. You will see that the words and letters will all be messed up. What you should do here is create and set a lock and use it while writing data to the file. In this situation, only one thread will be able to write to the file, and other threads will be waiting to write lock to be released and they will write one by one.

Here we will create a very simple lock and use it while opening the output file.

csv_writer_lock = threading.Lock()
...
def fetch_company_details(company: json, link: str = None, proxy=None):
    id = company.get('id', 'NULL')
    company_name = company.get('name', 'NULL')
    company_logo = company.get('logo', 'NULL')
    slogan = company.get('headline', 'NULL')
    description = company.get('description', 'NULL')
    ...
    ...
    with csv_writer_lock:
        with open('output.csv', mode='a+', newline='', encoding='utf-32') as company_file:
            company_writer = csv.writer(company_file, quotechar='"')
            company_writer.writerow(row)

This is a very simple way of scraping using multithreaded mechanism. To make your code run faster You can take advantage of the threading-related enhancements of the library you use. For instance, requests.Session has pool_maxsize parameter to run faster multithreaded code.