[GH-ISSUE #91] Architecture: Use multiple cores to run link archiving in parallel #1576

Open
opened 2026-03-01 17:51:51 +03:00 by kerem · 15 comments
Owner

Originally created by @pirate on GitHub (Aug 30, 2018).
Original GitHub issue: https://github.com/ArchiveBox/ArchiveBox/issues/91

Add a --parallel=8 cli option to enable using multiprocessing to download a large number of links in parallel. Default to number of cores on machine, allow --parallel=1 to override it to 1 core.

Originally created by @pirate on GitHub (Aug 30, 2018). Original GitHub issue: https://github.com/ArchiveBox/ArchiveBox/issues/91 Add a `--parallel=8` cli option to enable using multiprocessing to download a large number of links in parallel. Default to number of cores on machine, allow `--parallel=1` to override it to 1 core.
Author
Owner

@pirate commented on GitHub (Aug 30, 2018):

Inspired by https://github.com/aurelg/linkbak I've had this on my mind for a while since it's super easy to implement, but @aurelg inspired me to actually make an issue for it.

<!-- gh-comment-id:417478713 --> @pirate commented on GitHub (Aug 30, 2018): Inspired by https://github.com/aurelg/linkbak I've had this on my mind for a while since it's super easy to implement, but @aurelg inspired me to actually make an issue for it.
Author
Owner

@aurelg commented on GitHub (Aug 31, 2018):

The relevant code is in this file:

    nb_workers = args.j if args.j else os.cpu_count()
    get_logger().warning("Using %s workers", nb_workers)

    if nb_workers > 1:
        with contextlib.closing(multiprocessing.Pool(nb_workers)) as pool:
            pool.starmap(start_link_handler,
                         [(l, args) for l in get_links(args.file[0])])
    else:
        for link in get_links(args.file[0]):
            start_link_handler(link, args)

This is actually pretty easy. In your case, the only difficulty might be to handle the screen output/progression bar properly: if different workers are updating the screen at the same time, it may quickly become a bit messy.

<!-- gh-comment-id:417544236 --> @aurelg commented on GitHub (Aug 31, 2018): The relevant code is in [this file](https://github.com/aurelg/linkbak/blob/master/src/linkbak/lnk2bak.py#L127-L136): ``` nb_workers = args.j if args.j else os.cpu_count() get_logger().warning("Using %s workers", nb_workers) if nb_workers > 1: with contextlib.closing(multiprocessing.Pool(nb_workers)) as pool: pool.starmap(start_link_handler, [(l, args) for l in get_links(args.file[0])]) else: for link in get_links(args.file[0]): start_link_handler(link, args) ``` This is actually pretty easy. In your case, the only difficulty might be to handle the screen output/progression bar properly: if different workers are updating the screen at the same time, it may quickly become a bit messy.
Author
Owner

@pirate commented on GitHub (Jan 23, 2019):

I think I can fix the parallel process stdout multiplexing problem with one of two solutions:

  • a program like curtsies two show each stream in a grid, tail -f style
  • having each subprocess >> /logfile, but obtain a lock for the duration of each link archive, so the output remains in contiguous chunks instead of being mixed together line by line:

some simplified pseudocode:

import fcntl

def archive_links(link):
    link, stdout = run_archive_methods(link)

    with open(f'data/logs/archive.log', 'a') as f:
        fcntl.flock(f, fcntl.LOCK_EX)
        f.write(stdout)
        fcntl.flock(f, fcntl.LOCK_UN)

if parallel > 1:
    pool = subprocess.Pool(count=parallel)
    pool_status = pool.map_async(bar, links)
    last_pos = 0
    while not pool_status.ready():
        with open(f'data/logs/archive.log', 'r') as f:
            f.seek(last_pos)
            print(f.read())
            f.seek(0, os.SEEK_END)
            last_pos = f.tell()
        sys.stdout.flush()
        mr.wait(0.2)
else:
    for link in links:
        link, stdout = run_archive_methods(link)
        print(stdout)
<!-- gh-comment-id:456636186 --> @pirate commented on GitHub (Jan 23, 2019): I think I can fix the parallel process stdout multiplexing problem with one of two solutions: - a program like [curtsies](https://github.com/bpython/curtsies) two show each stream in a grid, tail -f style - having each subprocess >> /logfile, but obtain a lock for the duration of each link archive, so the output remains in contiguous chunks instead of being mixed together line by line: *some simplified pseudocode:* ```python import fcntl def archive_links(link): link, stdout = run_archive_methods(link) with open(f'data/logs/archive.log', 'a') as f: fcntl.flock(f, fcntl.LOCK_EX) f.write(stdout) fcntl.flock(f, fcntl.LOCK_UN) if parallel > 1: pool = subprocess.Pool(count=parallel) pool_status = pool.map_async(bar, links) last_pos = 0 while not pool_status.ready(): with open(f'data/logs/archive.log', 'r') as f: f.seek(last_pos) print(f.read()) f.seek(0, os.SEEK_END) last_pos = f.tell() sys.stdout.flush() mr.wait(0.2) else: for link in links: link, stdout = run_archive_methods(link) print(stdout) ```
Author
Owner

@anarcat commented on GitHub (May 6, 2019):

parallel downloading is tricky - you need to be nice to remote hosts and not hit them too much. there's a per-host limit specified in RFCs which most web browsers override. most clients do 7 or 10 requests per domain, IIRC.

but since you're crawling different URLs, you have a more complicated problem to deal with and can fetch more than 7-10 simultaneous queries globally... you need to be careful because it requires coordination among the workers as well, or have a director that does the right thing. in my feed reader, I believe I just throw about that number of threads (with multiprocessing.Pool and pool.apply_async, see the short design discussion) at all sites and hope for the best, but that's clearly inefficient.

I came here because I was looking at a bug report about running multiple archivebox add in parallel: I think this is broken right now (see #234 for an example failure) because the index.json gets corrupted or stolen between processes. it would be nice to have at least a lock file to prevent such problems from happening.

<!-- gh-comment-id:489799497 --> @anarcat commented on GitHub (May 6, 2019): parallel downloading is tricky - you need to be nice to remote hosts and not hit them too much. there's a per-host limit specified in RFCs which most web browsers override. most clients do 7 or 10 requests per domain, IIRC. but since you're crawling different URLs, you have a more complicated problem to deal with and can fetch *more* than 7-10 simultaneous queries globally... you need to be careful because it requires coordination among the workers as well, or have a director that does the right thing. in my [feed reader](https://feed2exec.readthedocs.io/), I believe I just throw about that number of threads (with `multiprocessing.Pool` and `pool.apply_async`, see [the short design discussion](https://feed2exec.readthedocs.io/en/stable/design.html#concurrent-processing)) at all sites and hope for the best, but that's clearly inefficient. I came here because I was looking at a bug report about running multiple `archivebox add` in parallel: I think this is broken right now (see #234 for an example failure) because the `index.json` gets corrupted or stolen between processes. it would be nice to have at least a lock file to prevent such problems from happening.
Author
Owner

@LaserWires commented on GitHub (Sep 21, 2019):

parallel downloading is tricky - you need to be nice to remote hosts and not hit them too much. there's a per-host limit specified in RFCs which most web browsers override. most clients do 7 or 10 requests per domain, IIRC.

but since you're crawling different URLs, you have a more complicated problem to deal with and can fetch more than 7-10 simultaneous queries globally... you need to be careful because it requires coordination among the workers as well, or have a director that does the right thing. in my feed reader, I believe I just throw about that number of threads (with multiprocessing.Pool and pool.apply_async, see the short design discussion) at all sites and hope for the best, but that's clearly inefficient.

I came here because I was looking at a bug report about running multiple archivebox add in parallel: I think this is broken right now (see #234 for an example failure) because the index.json gets corrupted or stolen between processes. it would be nice to have at least a lock file to prevent such problems from happening.

Most site mirroring apps incorporate download by proxy it is a very common feature which might be implemented into archivebox hence a large --parallel value is of no issue with such a feature. A list list of proxies can enable an archive operation to use a very large --parallel value. Most webservers currently are very adequately suited with high bandwidth telecoms along with very capable hardware and users with archivebox overloading them isn't likely an issue.

<!-- gh-comment-id:533757688 --> @LaserWires commented on GitHub (Sep 21, 2019): > parallel downloading is tricky - you need to be nice to remote hosts and not hit them too much. there's a per-host limit specified in RFCs which most web browsers override. most clients do 7 or 10 requests per domain, IIRC. > > but since you're crawling different URLs, you have a more complicated problem to deal with and can fetch _more_ than 7-10 simultaneous queries globally... you need to be careful because it requires coordination among the workers as well, or have a director that does the right thing. in my [feed reader](https://feed2exec.readthedocs.io/), I believe I just throw about that number of threads (with `multiprocessing.Pool` and `pool.apply_async`, see [the short design discussion](https://feed2exec.readthedocs.io/en/stable/design.html#concurrent-processing)) at all sites and hope for the best, but that's clearly inefficient. > > I came here because I was looking at a bug report about running multiple `archivebox add` in parallel: I think this is broken right now (see #234 for an example failure) because the `index.json` gets corrupted or stolen between processes. it would be nice to have at least a lock file to prevent such problems from happening. Most site mirroring apps incorporate download by proxy it is a very common feature which might be implemented into archivebox hence a large `--parallel` value is of no issue with such a feature. A list list of proxies can enable an archive operation to use a very large `--parallel` value. Most webservers currently are very adequately suited with high bandwidth telecoms along with very capable hardware and users with archivebox overloading them isn't likely an issue.
Author
Owner

@pirate commented on GitHub (Sep 22, 2019):

It's not an issue of overloading archivebox, it's an issue of overloading / hitting rate-limits on the content servers, which piping through a proxy wont solve.

<!-- gh-comment-id:533844739 --> @pirate commented on GitHub (Sep 22, 2019): It's not an issue of overloading archivebox, it's an issue of overloading / hitting rate-limits on the content servers, which piping through a proxy wont solve.
Author
Owner

@karlicoss commented on GitHub (Oct 25, 2020):

Even more cores than 8 might make sense, because often things are blocked on IO with no throughput, e.g. pages that would timeout. Might need some careful scheduling, but would be very cool to have!
Another IMO useful thing is having some sort of "pipeline" concurrency, e.g. one executor only archives DOM and always runs in front. The other executors run behind and handle singlepage/screenshots/media/etc, i.e. slower, but not as essential bits. This might also make it easier to schedule the load depending on which archivers are IO/CPU bound.

<!-- gh-comment-id:716139015 --> @karlicoss commented on GitHub (Oct 25, 2020): Even more cores than 8 might make sense, because often things are blocked on IO with no throughput, e.g. pages that would timeout. Might need some careful scheduling, but would be very cool to have! Another IMO useful thing is having some sort of "pipeline" concurrency, e.g. one executor only archives DOM and always runs in front. The other executors run behind and handle singlepage/screenshots/media/etc, i.e. slower, but not as essential bits. This might also make it easier to schedule the load depending on which archivers are IO/CPU bound.
Author
Owner

@pirate commented on GitHub (Dec 10, 2020):

A quick update for everyone watching this, v0.5.0 is going to be released soon with improvements to how ArchiveResults are stored (we moved them into the SqliteDB). This was a necessary blocker to fix before we can get around to parallel archiving in the next version.

v0.5.0 will be faster, but it wont have built-in concurrent archiving support yet, that will be the primary focus for v0.6.0. The plan is to add a background task queue handler like dramatiq or more likely huey (because it has sqlite3 support so we don't need to run redis).

Once we have the background task worker system in place, we can implement a worker pool for Chrome/playwright and each of the other extractor methods. Then archiving can run in parallel by default, archiving like 5-10 sites at a time depending on the system resources available and how well the worker pool system performs for each extractor type. Huey and dramatic both have built-in rate limiting systems that will allow us to cap the number of concurrent requests going to each site or being handled by each extractor. It's still quite a bit of work left, but we're getting closer!

Having a background task system will also enable us to do many other cool things, like building the scheduled import system into the UI #578, using a single shared chrome process instead of relaunching chrome for each link, and many other small improvements to performance.

<!-- gh-comment-id:742551111 --> @pirate commented on GitHub (Dec 10, 2020): A quick update for everyone watching this, v0.5.0 is going to be released soon with improvements to how ArchiveResults are stored (we moved them into the SqliteDB). This was a necessary blocker to fix before we can get around to parallel archiving in the next version. v0.5.0 will be faster, but it wont have built-in concurrent archiving support yet, that will be the primary focus for v0.6.0. The plan is to add a background task queue handler like dramatiq or more likely [huey](https://github.com/coleifer/huey) (because it has sqlite3 support so we don't need to run redis). Once we have the background task worker system in place, we can implement a worker pool for Chrome/playwright and each of the other extractor methods. Then archiving can run in parallel by default, archiving like 5-10 sites at a time depending on the system resources available and how well the worker pool system performs for each extractor type. Huey and dramatic both have built-in rate limiting systems that will allow us to cap the number of concurrent requests going to each site or being handled by each extractor. It's still quite a bit of work left, but we're getting closer! Having a background task system will also enable us to do many other cool things, like building the scheduled import system into the UI #578, using a single shared chrome process instead of relaunching chrome for each link, and many other small improvements to performance.
Author
Owner

@pirate commented on GitHub (Apr 12, 2021):

With v0.6 released now we've taken another step towards the goal of using a message-passing architecture to fully support parallel archiving. v0.6 moves that last bit of ArchiveResult state into the SQLite3 db where it can be managed with migrations and kept ACID compliant.

The next step of the process is to implement a worker queue for DB writes, and have all writes made to Snapshot/ArchiveResult models processed in a single thread, opening up other threads to be able to do things in parallel without locking the db anymore. Message passing is a big change though, so expect it to come in increments, with about 3~6 months of work to go depending on how much free time I have for ArchiveBox.

Side note: the UX of v0.6 is >10x faster in many other ways though (web UI, indexing, management tasks, etc.), only archiving itself remains to be sped up now. You can also still attempt to run arhcivebox add commands in parallel, it's safe and works to speed up archiving a lot already, but you may encounter occasional database locked warnings that mean you have to restart stuck additions manually.

<!-- gh-comment-id:817449497 --> @pirate commented on GitHub (Apr 12, 2021): With v0.6 released now we've taken another step towards the goal of using a message-passing architecture to fully support parallel archiving. v0.6 moves that last bit of ArchiveResult state into the SQLite3 db where it can be managed with migrations and kept ACID compliant. The next step of the process is to implement a worker queue for DB writes, and have all writes made to Snapshot/ArchiveResult models processed in a single thread, opening up other threads to be able to do things in parallel without locking the db anymore. Message passing is a big change though, so expect it to come in increments, with about 3~6 months of work to go depending on how much free time I have for ArchiveBox. Side note: the UX of v0.6 is >10x faster in many other ways though (web UI, indexing, management tasks, etc.), only archiving itself remains to be sped up now. You can also still attempt to run `arhcivebox add` commands in parallel, it's safe and works to speed up archiving a lot already, but you may encounter occasional `database locked` warnings that mean you have to restart stuck additions manually.
Author
Owner

@runkaiz commented on GitHub (May 3, 2021):

Sorry quick question, so I run archivebox in a docker container and currently would allocating it more than one CPU core or thread have any performance gains?

<!-- gh-comment-id:830995310 --> @runkaiz commented on GitHub (May 3, 2021): Sorry quick question, so I run archivebox in a docker container and currently would allocating it more than one CPU core or thread have any performance gains?
Author
Owner

@pirate commented on GitHub (May 7, 2021):

@1105420698, allocating more than 1 cpu is definitely still advised, as django will use all available cores to handle incoming requests in parallel, and a few of the extractors already take advantage of multiple cores to render pages faster (e.g. chrome).

ArchiveBox is already fairly multicore-capable (e.g. you can run multiple add or update threads at the same time), it's just a few remaining edge cases and highly-parallel write scenarios that will be improved by the pending message queue refactoring work.

<!-- gh-comment-id:834018887 --> @pirate commented on GitHub (May 7, 2021): @1105420698, allocating more than 1 cpu is definitely still advised, as django will use all available cores to handle incoming requests in parallel, and a few of the extractors already take advantage of multiple cores to render pages faster (e.g. chrome). ArchiveBox is already fairly multicore-capable (e.g. you can run multiple `add` or `update` threads at the same time), it's just a few remaining edge cases and highly-parallel write scenarios that will be improved by the pending message queue refactoring work.
Author
Owner

@pirate commented on GitHub (Jun 30, 2021):

Ok I'm pretty set on using Huey at this point for the job scheduler, it can use SQLite, it comes with a great django admin dashboard, and it supports nested tasks and mutexes.

https://github.com/boxine/django-huey-monitor/#screenshots

Here's the approach I'm thinking of to massage all critical operations into a message-passing / queue / worker arrangement in rough pseudocode:

  1. archivebox add --depth=1 'https://example.com/feed.xml' leads to these tasks being triggered and handled in this order ->
# global mutexes that workers will use to limit number of concurrent operations at a time, when they use a resource that is constrained (e.g. disk IO, network, CPU, etc.)

GLOBAL_FS_ATOMIC_MUTEX = Pool('fs:atomic', timeout=120, max_concurrent=3)                            # increase these on fast SSDs, decrease it on slow spinny drives
GLOBAL_FS_CHOWN_MUTEX = Pool('fs:chown', timeout=120, max_concurrent=3)

GLOBAL_DB_WRITE_MUTEX = Pool('db:write', timeout=120, max_concurrent=1)                              # raise this if your db can handle concurrent writes at all (i.e. not sqlite)

GLOBAL_DB_TABLE_SNAPSHOT_MUTEX = Pool('db_snapshot:write', timeout=360, max_concurrent=1)            # raise these if your db can handle concurrent writes to the same table
GLOBAL_DB_TABLE_ARCHIVERESULT_MUTEX = Pool('db_archiveresult:write', timeout=360, max_concurrent=1)

DB_ROW_SNAPSHOT_MUTEX = lambda url: Pool(f'db_snapshot:write:{url}', timeout=360, max_concurrent=1)  # raise these if your db can handle concurrent writes to the same row (probably a bad idea)
DB_ROW_ARCHIVERESULT_MUTEX = lambda url: Pool(f'db_archiveresult:write:{url}', timeout=360, max_concurrent=1)

GLOBAL_SNAPSHOT_PULL_MUTEX = Pool('snapshot:pull', timeout=800000, max_concurrent=4)                 # raise this if you want to snapshot more URLs in parallel

GLOBAL_EXTRACTOR_MUTEX = Pool('extractor:run', timeout=234234234, max_concurrent=12)                 # only allow 12 extractors to run at a time globally

GLOBAL_PER_DEPENDENCY_MUTEX = lambda dependency: Pool('extractor:run_dependency:{dependency}', timeout=234234234, max_concurrent=4)  # only allow 4 of each type of extractor dependency to run at a time

PER_DOMAIN_RATELIMIT_MUTEX = lambda domain: Pool('domain:pull:{url}', timeout=3600, max_concurrent=3, rate_limit=sliding_window(...))  # raise this if the domain you're archiving can handle lots of concurrent requests

GLOBAL_SHELL_CMD_MUTEX = Pool('system:shell_cmd', timeout=234234234, max_concurrent=8)                 # maximum number of shell cmds/external binaries to execute at once


await CLI.AchiveboxAdd(urls_text, depth=1) {
    await parsers.Parse(urls_text, depth) { ... }
    
    await models.Snapshot.bulk_update_snapshots(snapshots) {
    	await parallel([
            models.Snapshot.update_or_create(snapshot1, acquire=[GLOBAL_DB_WRITE_MUTEX, GLOBAL_DB_TABLE_SNAPSHOT_MUTEX, DB_ROW_SNAPSHOT_MUTEX(snapshot1.url)]),
            models.Snapshot.update_or_create(snapshot2, acquire=[GLOBAL_DB_WRITE_MUTEX, GLOBAL_DB_TABLE_SNAPSHOT_MUTEX], DB_ROW_SNAPSHOT_MUTEX(snapshot2.url)),
            models.Snapshot.update_or_create(snapshot3, acquire=[GLOBAL_DB_WRITE_MUTEX, GLOBAL_DB_TABLE_SNAPSHOT_MUTEX], DB_ROW_SNAPSHOT_MUTEX(snapshot3.url)),
        ])
    }
    
    await index.update_main_index(snapshots) {
        await parallel(max=4, [
            models.Snapshot.write(snapshot1) {
                await await models.Snapshot.update_or_create(snapshot, acquire=[GLOBAL_DB_WRITE_MUTEX, GLOBAL_DB_TABLE_SNAPSHOT_MUTEX, DB_ROW_SNAPSHOT_MUTEX])
                await models.Snapshot.update_filesystem(snapshot1) {
                    await parallel([
                        system.atomic_write('index.json', acquire=[GLOBAL_FS_ATOMIC_MUTEX]),
                        system.atomic_write('index.html', acquire=[GLOBAL_FS_ATOMIC_MUTEX]),
                    ])
                }
            },
            models.Snapshot.write(snapshot2) {
                ...
            },
            ...
        ])
    }

           
    await extractors.save_snapshots(snapshots) {
    
      await extractors.pull_snapshot(snapshot1, acquire=[GLOBAL_SNAPSHOT_PULL_MUTEX]) {
                
         await parallel(max_concurrency=2, [
            
			extractors.extract(snapshot1, 'wget', acquire=[PER_DOMAIN_RATELIMIT_MUTEX(snapshot1.domain), GLOBAL_EXTRACTOR_MUTEX]) {
                await extractors.run_dependency('wget', acquire=[GLOBAL_PER_DEPENDENCY_MUTEX('wget')]) {
                    system.run_shell_cmd('wget ...', acquire=[GLOBAL_SHELL_CMD_MUTEX])
                    system.chown(result.output_path, acquire=[GLOBAL_FS_CHOWN_MUTEX])
                    system.dir_size(result.output_path)
                }

                await models.ArchiveResult.write(snapshot, result) {
                    models.ArchiveResult.get_or_create(snapshot, result, acquire=[GLOBAL_DB_WRITE_MUTEX, GLOBAL_DB_TABLE_ARCHIVERESULT_MUTEX])
                    models.Snapshot.write(snapshot, merge_results=results)
                        ...
                    index.add_index_texts(result=result) &
                }
            },
                    
                
            extractors.extract(snapshot1, extractor='git', acquire=[...]) {
                extractors.run_dependency('git')
                    system.run_shell_cmd('git ...')
                    system.chown(result.output_path)
                    system.dir_size(result.output_path)
                    
                models.ArchiveResult.write(snapshot, result)
                    ...
            },
                        
            extractors.extract(snapshot1, extractor='playwright_screenshot', acquire=[...]) {
                extractors.run_dependency('playwright')
                    context.assert_on_url(snapshot1.url)
                    context.assert_loaded()
                    context.run_js(snapshot1, 'screenshot.js')
                    system.chown(result.output_path)
                    system.dir_size(result.output_path)
                 
                models.ArchiveResult.write(snapshot, result)
                    ...
			
			},
			
	    ])
			
            
        extractors.pull_snapshot(snapshot2, acquire=[GLOBAL_SNAPSHOT_PULL_MUTEX]) &
        
            extractor_pool = POOL(max_concurrency=2, key='extractors.extract({snapshot.url})') 
        
            models.Snapshot.write(snapshot2)
                ...
            extractors.extract(snapshot2, extractor='wget')
                ...
            extractors.extract(snapshot1, extractor='git')
               ...
            ...
        ...
      

WIP ignore this, just toying around with different patterns / styles to find something with good ergonomics:

run({task: 'cli.archivebox_add', urls: 'https://example.com', depth: 1})

def add(urls: str='', depth=0, extractors='all', parent_task: str='main.add'):
    parsed_urls = await call({task: 'parsers.parse', urls: 'https://example.com', parent_task: parent_task})`
    snapshots = []
    
    # create Snapshots and update/create the snapshot indexes <timestamp>/index.{json,html}
    for url in parsed_urls:
        snapshot, created = await call({task: 'Snapshot.get_or_create', url='https://example.com'})
		call({task: 'index.update_snapshot_index, snapshot: snapshot})`
        snapshots.append(snapshot)
    
    # ru the
    for snapshot in snapshots:
        schedule({task: 'extractors.run_extractors', snapshot: snapshot, extractors: 'all', overwrite: false}
        

I'm worried that the heavy reliance on mutexes and locking will lead to difficult-to-debug deadlock scenarios where parents span children that eat up all the worker slots, then are unable to complete, leading to the parent to timeout and force kill those workers prematurely.


I also reached out to the folks who are building django-huey-monitor as it looks like a great fit for our job handling UI: https://github.com/boxine/django-huey-monitor/issues/40

<!-- gh-comment-id:871343428 --> @pirate commented on GitHub (Jun 30, 2021): Ok I'm pretty set on using Huey at this point for the job scheduler, it can use SQLite, it comes with a great django admin dashboard, and it supports nested tasks and mutexes. https://github.com/boxine/django-huey-monitor/#screenshots Here's the approach I'm thinking of to massage all critical operations into a message-passing / queue / worker arrangement in rough pseudocode: 1. `archivebox add --depth=1 'https://example.com/feed.xml'` leads to these tasks being triggered and handled in this order -> ```python # global mutexes that workers will use to limit number of concurrent operations at a time, when they use a resource that is constrained (e.g. disk IO, network, CPU, etc.) GLOBAL_FS_ATOMIC_MUTEX = Pool('fs:atomic', timeout=120, max_concurrent=3) # increase these on fast SSDs, decrease it on slow spinny drives GLOBAL_FS_CHOWN_MUTEX = Pool('fs:chown', timeout=120, max_concurrent=3) GLOBAL_DB_WRITE_MUTEX = Pool('db:write', timeout=120, max_concurrent=1) # raise this if your db can handle concurrent writes at all (i.e. not sqlite) GLOBAL_DB_TABLE_SNAPSHOT_MUTEX = Pool('db_snapshot:write', timeout=360, max_concurrent=1) # raise these if your db can handle concurrent writes to the same table GLOBAL_DB_TABLE_ARCHIVERESULT_MUTEX = Pool('db_archiveresult:write', timeout=360, max_concurrent=1) DB_ROW_SNAPSHOT_MUTEX = lambda url: Pool(f'db_snapshot:write:{url}', timeout=360, max_concurrent=1) # raise these if your db can handle concurrent writes to the same row (probably a bad idea) DB_ROW_ARCHIVERESULT_MUTEX = lambda url: Pool(f'db_archiveresult:write:{url}', timeout=360, max_concurrent=1) GLOBAL_SNAPSHOT_PULL_MUTEX = Pool('snapshot:pull', timeout=800000, max_concurrent=4) # raise this if you want to snapshot more URLs in parallel GLOBAL_EXTRACTOR_MUTEX = Pool('extractor:run', timeout=234234234, max_concurrent=12) # only allow 12 extractors to run at a time globally GLOBAL_PER_DEPENDENCY_MUTEX = lambda dependency: Pool('extractor:run_dependency:{dependency}', timeout=234234234, max_concurrent=4) # only allow 4 of each type of extractor dependency to run at a time PER_DOMAIN_RATELIMIT_MUTEX = lambda domain: Pool('domain:pull:{url}', timeout=3600, max_concurrent=3, rate_limit=sliding_window(...)) # raise this if the domain you're archiving can handle lots of concurrent requests GLOBAL_SHELL_CMD_MUTEX = Pool('system:shell_cmd', timeout=234234234, max_concurrent=8) # maximum number of shell cmds/external binaries to execute at once await CLI.AchiveboxAdd(urls_text, depth=1) { await parsers.Parse(urls_text, depth) { ... } await models.Snapshot.bulk_update_snapshots(snapshots) { await parallel([ models.Snapshot.update_or_create(snapshot1, acquire=[GLOBAL_DB_WRITE_MUTEX, GLOBAL_DB_TABLE_SNAPSHOT_MUTEX, DB_ROW_SNAPSHOT_MUTEX(snapshot1.url)]), models.Snapshot.update_or_create(snapshot2, acquire=[GLOBAL_DB_WRITE_MUTEX, GLOBAL_DB_TABLE_SNAPSHOT_MUTEX], DB_ROW_SNAPSHOT_MUTEX(snapshot2.url)), models.Snapshot.update_or_create(snapshot3, acquire=[GLOBAL_DB_WRITE_MUTEX, GLOBAL_DB_TABLE_SNAPSHOT_MUTEX], DB_ROW_SNAPSHOT_MUTEX(snapshot3.url)), ]) } await index.update_main_index(snapshots) { await parallel(max=4, [ models.Snapshot.write(snapshot1) { await await models.Snapshot.update_or_create(snapshot, acquire=[GLOBAL_DB_WRITE_MUTEX, GLOBAL_DB_TABLE_SNAPSHOT_MUTEX, DB_ROW_SNAPSHOT_MUTEX]) await models.Snapshot.update_filesystem(snapshot1) { await parallel([ system.atomic_write('index.json', acquire=[GLOBAL_FS_ATOMIC_MUTEX]), system.atomic_write('index.html', acquire=[GLOBAL_FS_ATOMIC_MUTEX]), ]) } }, models.Snapshot.write(snapshot2) { ... }, ... ]) } await extractors.save_snapshots(snapshots) { await extractors.pull_snapshot(snapshot1, acquire=[GLOBAL_SNAPSHOT_PULL_MUTEX]) { await parallel(max_concurrency=2, [ extractors.extract(snapshot1, 'wget', acquire=[PER_DOMAIN_RATELIMIT_MUTEX(snapshot1.domain), GLOBAL_EXTRACTOR_MUTEX]) { await extractors.run_dependency('wget', acquire=[GLOBAL_PER_DEPENDENCY_MUTEX('wget')]) { system.run_shell_cmd('wget ...', acquire=[GLOBAL_SHELL_CMD_MUTEX]) system.chown(result.output_path, acquire=[GLOBAL_FS_CHOWN_MUTEX]) system.dir_size(result.output_path) } await models.ArchiveResult.write(snapshot, result) { models.ArchiveResult.get_or_create(snapshot, result, acquire=[GLOBAL_DB_WRITE_MUTEX, GLOBAL_DB_TABLE_ARCHIVERESULT_MUTEX]) models.Snapshot.write(snapshot, merge_results=results) ... index.add_index_texts(result=result) & } }, extractors.extract(snapshot1, extractor='git', acquire=[...]) { extractors.run_dependency('git') system.run_shell_cmd('git ...') system.chown(result.output_path) system.dir_size(result.output_path) models.ArchiveResult.write(snapshot, result) ... }, extractors.extract(snapshot1, extractor='playwright_screenshot', acquire=[...]) { extractors.run_dependency('playwright') context.assert_on_url(snapshot1.url) context.assert_loaded() context.run_js(snapshot1, 'screenshot.js') system.chown(result.output_path) system.dir_size(result.output_path) models.ArchiveResult.write(snapshot, result) ... }, ]) extractors.pull_snapshot(snapshot2, acquire=[GLOBAL_SNAPSHOT_PULL_MUTEX]) & extractor_pool = POOL(max_concurrency=2, key='extractors.extract({snapshot.url})') models.Snapshot.write(snapshot2) ... extractors.extract(snapshot2, extractor='wget') ... extractors.extract(snapshot1, extractor='git') ... ... ... ``` WIP ignore this, just toying around with different patterns / styles to find something with good ergonomics: ``` run({task: 'cli.archivebox_add', urls: 'https://example.com', depth: 1}) def add(urls: str='', depth=0, extractors='all', parent_task: str='main.add'): parsed_urls = await call({task: 'parsers.parse', urls: 'https://example.com', parent_task: parent_task})` snapshots = [] # create Snapshots and update/create the snapshot indexes <timestamp>/index.{json,html} for url in parsed_urls: snapshot, created = await call({task: 'Snapshot.get_or_create', url='https://example.com'}) call({task: 'index.update_snapshot_index, snapshot: snapshot})` snapshots.append(snapshot) # ru the for snapshot in snapshots: schedule({task: 'extractors.run_extractors', snapshot: snapshot, extractors: 'all', overwrite: false} ``` I'm worried that the heavy reliance on mutexes and locking will lead to difficult-to-debug deadlock scenarios where parents span children that eat up all the worker slots, then are unable to complete, leading to the parent to timeout and force kill those workers prematurely. --- I also reached out to the folks who are building `django-huey-monitor` as it looks like a great fit for our job handling UI: https://github.com/boxine/django-huey-monitor/issues/40
Author
Owner

@jgoerzen commented on GitHub (Jul 5, 2021):

Over in #781 it was stated that parallel adds don't work yet. Over at https://github.com/ArchiveBox/ArchiveBox/wiki/Usage#large-archives there is an example of doing this that should probably be removed until this is fixed.

<!-- gh-comment-id:874371850 --> @jgoerzen commented on GitHub (Jul 5, 2021): Over in #781 it was stated that parallel adds don't work yet. Over at https://github.com/ArchiveBox/ArchiveBox/wiki/Usage#large-archives there is an example of doing this that should probably be removed until this is fixed.
Author
Owner

@pirate commented on GitHub (Jul 6, 2021):

It works better in some cases (fast SSDs) than others so it's still worth trying, shouldn't be dangerous to data integrity, it'll just lock up if it's on a slow filesystem. I added a note to the Usage page.

<!-- gh-comment-id:874388380 --> @pirate commented on GitHub (Jul 6, 2021): It works better in some cases (fast SSDs) than others so it's still worth trying, shouldn't be dangerous to data integrity, it'll just lock up if it's on a slow filesystem. I added a note to the Usage page.
Author
Owner

@pirate commented on GitHub (Apr 12, 2022):

Note I've added a new DB/filesystem troubleshooting area to the wiki that may help people arriving here from Google: https://github.com/ArchiveBox/ArchiveBox/wiki/Upgrading-or-Merging-Archives#database-troubleshooting

Contributions/suggestions welcome there.

<!-- gh-comment-id:1097265077 --> @pirate commented on GitHub (Apr 12, 2022): Note I've added a new DB/filesystem troubleshooting area to the wiki that may help people arriving here from Google: https://github.com/ArchiveBox/ArchiveBox/wiki/Upgrading-or-Merging-Archives#database-troubleshooting Contributions/suggestions welcome there.
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
starred/ArchiveBox#1576
No description provided.