[GH-ISSUE #2463] Deadlock during concurrent writes with low cache space #1215

Closed
opened 2026-03-04 01:52:16 +03:00 by kerem · 1 comment
Owner

Originally created by @amarjayr on GitHub (Jun 3, 2024).
Original GitHub issue: https://github.com/s3fs-fuse/s3fs-fuse/issues/2463

Additional Information

Version of s3fs being used (s3fs --version)

Amazon Simple Storage Service File System V1.94 (commit:a4f694c) with OpenSSL
Copyright (C) 2010 Randy Rizun <rrizun@gmail.com>
License GPL2: GNU GPL version 2 <https://gnu.org/licenses/gpl.html>
This is free software: you are free to change and redistribute it.
There is NO WARRANTY, to the extent permitted by law.

Details about issue

There is an unrecoverable deadlock during concurrent writes due to lock order inversion involving fdent_lock and fd_manager_lock.

In the first path, FdEntity::Write acquires fdent_lock and then fdent_data_lock. In WriteMultipart NoCacheLoadAndPost, ChangeEntityToTempPath attempts to acquire fd_manager_lock. In this case, locks are acquired in this order: fdent_lock, fdent_data_lock, fd_manager_lock.

In the second path, FdManager::GetExistFdEntity (which is called from s3fs_flush, s3fs_fsync, s3fs_read, s3fs_write) acquires fd_manager_lock. Then, in FindPseudoFd attempts to acquire fdent_lock for every fd entry. In this case, locks are acquired in this order: fd_manager_lock, fdent_lock.

I have been able to reproduce with the following python script:

concurrency = 10
file_size = 1024 * 1024 * 1024  # 1 GB
block_size = 50 * 1024 * 1024  # 50 MB

checksums = {}

os.mkdir("source")
os.mkdir("dest")

# Function to write file_size file
def write_file(filename, folder):
    with open(folder + "/" + filename, 'wb') as f:
        file_hash = hashlib.md5()

        bytes_written = 0
        while bytes_written < file_size:
            remaining_bytes = file_size - bytes_written
            current_block_size = min(remaining_bytes, block_size)
            random_bytes = os.urandom(current_block_size)
            num_written = f.write(random_bytes)
            bytes_written += num_written
            file_hash.update(random_bytes[0:num_written])

        checksums[filename] = file_hash.digest()

def rename_file(filename, source_folder, dest_folder):
    os.rename(source_folder + "/" + filename, dest_folder + "/" + filename)

# Function to read and validate file_sizei file
def read_file(filename, folder):
    with open(folder + "/" + filename, 'rb') as f:
        file_hash = hashlib.md5()

        bytes_read = 0
        while True:
            block = f.read(block_size)
            if not block:
                break
            bytes_read += len(block)
            file_hash.update(block)

        if bytes_read == file_size and checksums[filename] == file_hash.digest():
            print(f"{filename} is valid", flush=True)
        else:
            print(f"{filename} is invalid", flush=True)
        assert bytes_read == file_size and checksums[filename] == file_hash.digest()

# List to hold threads
threads = []

# Create threads to write 10 1GB files concurrently
for i in range(concurrency):
    thread = threading.Thread(target=write_file, args=(f"file_{i}.bin", "source",))
    threads.append(thread)
    thread.start()

# Wait for all write threads to finish
for thread in threads:
    thread.join()

# Clear threads list for reading
threads.clear()

# Create threads to rename concurrently
for i in range(concurrency):
    thread = threading.Thread(target=rename_file, args=(f"file_{i}.bin", "source", "dest",))
    threads.append(thread)
    thread.start()

    # write during rename
    write_thread = threading.Thread(target=write_file, args=(f"filenew_{i}.bin", "source"))
    threads.append(write_thread)
    write_thread.start()

# Wait for all read threads to finish
for thread in threads:
    thread.join()

# Clear threads list for reading
threads.clear()

# Create threads to read and validate 10 1GB files concurrently
for i in range(concurrency):
    thread = threading.Thread(target=read_file, args=(f"file_{i}.bin", "dest",))
    threads.append(thread)
    thread.start()

    read_thread = threading.Thread(target=read_file, args=(f"filenew_{i}.bin", "source",))
    threads.append(read_thread)
    read_thread.start()

# Wait for all read threads to finish
for thread in threads:
    thread.join()

print("All files validated successfully")
Originally created by @amarjayr on GitHub (Jun 3, 2024). Original GitHub issue: https://github.com/s3fs-fuse/s3fs-fuse/issues/2463 ### Additional Information #### Version of s3fs being used (`s3fs --version`) ``` Amazon Simple Storage Service File System V1.94 (commit:a4f694c) with OpenSSL Copyright (C) 2010 Randy Rizun <rrizun@gmail.com> License GPL2: GNU GPL version 2 <https://gnu.org/licenses/gpl.html> This is free software: you are free to change and redistribute it. There is NO WARRANTY, to the extent permitted by law. ``` ### Details about issue There is an unrecoverable deadlock during concurrent writes due to lock order inversion involving `fdent_lock` and `fd_manager_lock`. In the first path, `FdEntity::Write` acquires `fdent_lock` and then `fdent_data_lock`. In `WriteMultipart` `NoCacheLoadAndPost`, `ChangeEntityToTempPath` attempts to acquire `fd_manager_lock`. In this case, locks are acquired in this order: `fdent_lock`, `fdent_data_lock`, `fd_manager_lock`. In the second path, `FdManager::GetExistFdEntity` (which is called from s3fs_flush, s3fs_fsync, s3fs_read, s3fs_write) acquires `fd_manager_lock`. Then, in `FindPseudoFd` attempts to acquire `fdent_lock` for every fd entry. In this case, locks are acquired in this order: `fd_manager_lock`, `fdent_lock`. I have been able to reproduce with the following python script: ``` concurrency = 10 file_size = 1024 * 1024 * 1024 # 1 GB block_size = 50 * 1024 * 1024 # 50 MB checksums = {} os.mkdir("source") os.mkdir("dest") # Function to write file_size file def write_file(filename, folder): with open(folder + "/" + filename, 'wb') as f: file_hash = hashlib.md5() bytes_written = 0 while bytes_written < file_size: remaining_bytes = file_size - bytes_written current_block_size = min(remaining_bytes, block_size) random_bytes = os.urandom(current_block_size) num_written = f.write(random_bytes) bytes_written += num_written file_hash.update(random_bytes[0:num_written]) checksums[filename] = file_hash.digest() def rename_file(filename, source_folder, dest_folder): os.rename(source_folder + "/" + filename, dest_folder + "/" + filename) # Function to read and validate file_sizei file def read_file(filename, folder): with open(folder + "/" + filename, 'rb') as f: file_hash = hashlib.md5() bytes_read = 0 while True: block = f.read(block_size) if not block: break bytes_read += len(block) file_hash.update(block) if bytes_read == file_size and checksums[filename] == file_hash.digest(): print(f"{filename} is valid", flush=True) else: print(f"{filename} is invalid", flush=True) assert bytes_read == file_size and checksums[filename] == file_hash.digest() # List to hold threads threads = [] # Create threads to write 10 1GB files concurrently for i in range(concurrency): thread = threading.Thread(target=write_file, args=(f"file_{i}.bin", "source",)) threads.append(thread) thread.start() # Wait for all write threads to finish for thread in threads: thread.join() # Clear threads list for reading threads.clear() # Create threads to rename concurrently for i in range(concurrency): thread = threading.Thread(target=rename_file, args=(f"file_{i}.bin", "source", "dest",)) threads.append(thread) thread.start() # write during rename write_thread = threading.Thread(target=write_file, args=(f"filenew_{i}.bin", "source")) threads.append(write_thread) write_thread.start() # Wait for all read threads to finish for thread in threads: thread.join() # Clear threads list for reading threads.clear() # Create threads to read and validate 10 1GB files concurrently for i in range(concurrency): thread = threading.Thread(target=read_file, args=(f"file_{i}.bin", "dest",)) threads.append(thread) thread.start() read_thread = threading.Thread(target=read_file, args=(f"filenew_{i}.bin", "source",)) threads.append(read_thread) read_thread.start() # Wait for all read threads to finish for thread in threads: thread.join() print("All files validated successfully") ```
kerem closed this issue 2026-03-04 01:52:16 +03:00
Author
Owner

@amarjayr commented on GitHub (Jun 10, 2024):

Apologies, this is a duplicate of #2438. Closing

<!-- gh-comment-id:2158835430 --> @amarjayr commented on GitHub (Jun 10, 2024): Apologies, this is a duplicate of #2438. Closing
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
starred/s3fs-fuse#1215
No description provided.