Module arvados.diskcache

Expand source code
# Copyright (C) The Arvados Authors. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

import threading
import mmap
import os
import traceback
import stat
import tempfile
import fcntl
import time
import errno
import logging
import weakref

_logger = logging.getLogger('arvados.keep')

cacheblock_suffix = ".keepcacheblock"

class DiskCacheSlot(object):
    __slots__ = ("locator", "ready", "content", "cachedir", "filehandle", "linger")

    def __init__(self, locator, cachedir):
        self.locator = locator
        self.ready = threading.Event()
        self.content = None
        self.cachedir = cachedir
        self.filehandle = None
        self.linger = None

    def get(self):
        return self.content

    def set(self, value):
        tmpfile = None
            if value is None:
                self.content = None

            if len(value) == 0:
                # Can't mmap a 0 length file
                self.content = b''

            if self.content is not None:
                # Has been set already

            blockdir = os.path.join(self.cachedir, self.locator[0:3])
            os.makedirs(blockdir, mode=0o700, exist_ok=True)

            final = os.path.join(blockdir, self.locator) + cacheblock_suffix

            self.filehandle = tempfile.NamedTemporaryFile(dir=blockdir, delete=False, prefix="tmp", suffix=cacheblock_suffix)
            tmpfile =
            os.chmod(tmpfile, stat.S_IRUSR | stat.S_IWUSR)

            # aquire a shared lock, this tells other processes that
            # we're using this block and to please not delete it.
            fcntl.flock(self.filehandle, fcntl.LOCK_SH)

            os.rename(tmpfile, final)
            tmpfile = None

            self.content = mmap.mmap(self.filehandle.fileno(), 0, access=mmap.ACCESS_READ)
            # only set the event when mmap is successful
            if tmpfile is not None:
                # If the tempfile hasn't been renamed on disk yet, try to delete it.

    def size(self):
        if self.content is None:
            if self.linger is not None:
                # If it is still lingering (object is still accessible
                # through the weak reference) it is still taking up
                # space.
                content = self.linger()
                if content is not None:
                    return len(content)
            return 0
            return len(self.content)

    def evict(self):
        if self.content is not None and len(self.content) > 0:
            # The mmap region might be in use when we decided to evict
            # it.  This can happen if the cache is too small.
            # If we call close() now, it'll throw an error if
            # something tries to access it.
            # However, we don't need to explicitly call mmap.close()
            # I confirmed in mmapmodule.c that that both close
            # and deallocate do the same thing:
            # a) close the file descriptor
            # b) unmap the memory range
            # So we can forget it in the cache and delete the file on
            # disk, and it will tear it down after any other
            # lingering Python references to the mapped memory are
            # gone.

            blockdir = os.path.join(self.cachedir, self.locator[0:3])
            final = os.path.join(blockdir, self.locator) + cacheblock_suffix
                fcntl.flock(self.filehandle, fcntl.LOCK_UN)

                # try to get an exclusive lock, this ensures other
                # processes are not using the block.  It is
                # nonblocking and will throw an exception if we
                # can't get it, which is fine because that means
                # we just won't try to delete it.
                # I should note here, the file locking is not
                # strictly necessary, we could just remove it and
                # the kernel would ensure that the underlying
                # inode remains available as long as other
                # processes still have the file open.  However, if
                # you have multiple processes sharing the cache
                # and deleting each other's files, you'll end up
                # with a bunch of ghost files that don't show up
                # in the file system but are still taking up
                # space, which isn't particularly user friendly.
                # The locking strategy ensures that cache blocks
                # in use remain visible.
                fcntl.flock(self.filehandle, fcntl.LOCK_EX | fcntl.LOCK_NB)

                return True
            except OSError:
                self.filehandle = None
                self.linger = weakref.ref(self.content)
                self.content = None
        return False

    def gone(self):
        # Test if an evicted object is lingering
        return self.content is None and (self.linger is None or self.linger() is None)

    def get_from_disk(locator, cachedir):
        blockdir = os.path.join(cachedir, locator[0:3])
        final = os.path.join(blockdir, locator) + cacheblock_suffix

            filehandle = open(final, "rb")

            # aquire a shared lock, this tells other processes that
            # we're using this block and to please not delete it.
            fcntl.flock(filehandle, fcntl.LOCK_SH)

            content = mmap.mmap(filehandle.fileno(), 0, access=mmap.ACCESS_READ)
            dc = DiskCacheSlot(locator, cachedir)
            dc.filehandle = filehandle
            dc.content = content
            return dc
        except FileNotFoundError:
        except Exception as e:

        return None

    def cache_usage(cachedir):
        usage = 0
        for root, dirs, files in os.walk(cachedir):
            for name in files:
                if not name.endswith(cacheblock_suffix):

                blockpath = os.path.join(root, name)
                res = os.stat(blockpath)
                usage += res.st_size
        return usage

    def init_cache(cachedir, maxslots):
        # First check the disk cache works at all by creating a 1 byte cache entry
        checkexists = DiskCacheSlot.get_from_disk('0cc175b9c0f1b6a831c399e269772661', cachedir)
        ds = DiskCacheSlot('0cc175b9c0f1b6a831c399e269772661', cachedir)
        if checkexists is None:
            # Don't keep the test entry around unless it existed beforehand.

        # map in all the files in the cache directory, up to max slots.
        # after max slots, try to delete the excess blocks.
        # this gives the calling process ownership of all the blocks

        blocks = []
        for root, dirs, files in os.walk(cachedir):
            for name in files:
                if not name.endswith(cacheblock_suffix):

                blockpath = os.path.join(root, name)
                res = os.stat(blockpath)

                if len(name) == (32+len(cacheblock_suffix)) and not name.startswith("tmp"):
                    blocks.append((name[0:32], res.st_atime))
                elif name.startswith("tmp") and ((time.time() - res.st_mtime) > 60):
                    # found a temporary file more than 1 minute old,
                    # try to delete it.

        # sort by access time (atime), going from most recently
        # accessed (highest timestamp) to least recently accessed
        # (lowest timestamp).
        blocks.sort(key=lambda x: x[1], reverse=True)

        # Map in all the files we found, up to maxslots, if we exceed
        # maxslots, start throwing things out.
        cachelist = []
        for b in blocks:
            got = DiskCacheSlot.get_from_disk(b[0], cachedir)
            if got is None:
            if len(cachelist) < maxslots:
                # we found more blocks than maxslots, try to
                # throw it out of the cache.

        return cachelist


class DiskCacheSlot (locator, cachedir)
Expand source code
class DiskCacheSlot(object):
    __slots__ = ("locator", "ready", "content", "cachedir", "filehandle", "linger")

    def __init__(self, locator, cachedir):
        self.locator = locator
        self.ready = threading.Event()
        self.content = None
        self.cachedir = cachedir
        self.filehandle = None
        self.linger = None

    def get(self):
        return self.content

    def set(self, value):
        tmpfile = None
            if value is None:
                self.content = None

            if len(value) == 0:
                # Can't mmap a 0 length file
                self.content = b''

            if self.content is not None:
                # Has been set already

            blockdir = os.path.join(self.cachedir, self.locator[0:3])
            os.makedirs(blockdir, mode=0o700, exist_ok=True)

            final = os.path.join(blockdir, self.locator) + cacheblock_suffix

            self.filehandle = tempfile.NamedTemporaryFile(dir=blockdir, delete=False, prefix="tmp", suffix=cacheblock_suffix)
            tmpfile =
            os.chmod(tmpfile, stat.S_IRUSR | stat.S_IWUSR)

            # aquire a shared lock, this tells other processes that
            # we're using this block and to please not delete it.
            fcntl.flock(self.filehandle, fcntl.LOCK_SH)

            os.rename(tmpfile, final)
            tmpfile = None

            self.content = mmap.mmap(self.filehandle.fileno(), 0, access=mmap.ACCESS_READ)
            # only set the event when mmap is successful
            if tmpfile is not None:
                # If the tempfile hasn't been renamed on disk yet, try to delete it.

    def size(self):
        if self.content is None:
            if self.linger is not None:
                # If it is still lingering (object is still accessible
                # through the weak reference) it is still taking up
                # space.
                content = self.linger()
                if content is not None:
                    return len(content)
            return 0
            return len(self.content)

    def evict(self):
        if self.content is not None and len(self.content) > 0:
            # The mmap region might be in use when we decided to evict
            # it.  This can happen if the cache is too small.
            # If we call close() now, it'll throw an error if
            # something tries to access it.
            # However, we don't need to explicitly call mmap.close()
            # I confirmed in mmapmodule.c that that both close
            # and deallocate do the same thing:
            # a) close the file descriptor
            # b) unmap the memory range
            # So we can forget it in the cache and delete the file on
            # disk, and it will tear it down after any other
            # lingering Python references to the mapped memory are
            # gone.

            blockdir = os.path.join(self.cachedir, self.locator[0:3])
            final = os.path.join(blockdir, self.locator) + cacheblock_suffix
                fcntl.flock(self.filehandle, fcntl.LOCK_UN)

                # try to get an exclusive lock, this ensures other
                # processes are not using the block.  It is
                # nonblocking and will throw an exception if we
                # can't get it, which is fine because that means
                # we just won't try to delete it.
                # I should note here, the file locking is not
                # strictly necessary, we could just remove it and
                # the kernel would ensure that the underlying
                # inode remains available as long as other
                # processes still have the file open.  However, if
                # you have multiple processes sharing the cache
                # and deleting each other's files, you'll end up
                # with a bunch of ghost files that don't show up
                # in the file system but are still taking up
                # space, which isn't particularly user friendly.
                # The locking strategy ensures that cache blocks
                # in use remain visible.
                fcntl.flock(self.filehandle, fcntl.LOCK_EX | fcntl.LOCK_NB)

                return True
            except OSError:
                self.filehandle = None
                self.linger = weakref.ref(self.content)
                self.content = None
        return False

    def gone(self):
        # Test if an evicted object is lingering
        return self.content is None and (self.linger is None or self.linger() is None)

    def get_from_disk(locator, cachedir):
        blockdir = os.path.join(cachedir, locator[0:3])
        final = os.path.join(blockdir, locator) + cacheblock_suffix

            filehandle = open(final, "rb")

            # aquire a shared lock, this tells other processes that
            # we're using this block and to please not delete it.
            fcntl.flock(filehandle, fcntl.LOCK_SH)

            content = mmap.mmap(filehandle.fileno(), 0, access=mmap.ACCESS_READ)
            dc = DiskCacheSlot(locator, cachedir)
            dc.filehandle = filehandle
            dc.content = content
            return dc
        except FileNotFoundError:
        except Exception as e:

        return None

    def cache_usage(cachedir):
        usage = 0
        for root, dirs, files in os.walk(cachedir):
            for name in files:
                if not name.endswith(cacheblock_suffix):

                blockpath = os.path.join(root, name)
                res = os.stat(blockpath)
                usage += res.st_size
        return usage

    def init_cache(cachedir, maxslots):
        # First check the disk cache works at all by creating a 1 byte cache entry
        checkexists = DiskCacheSlot.get_from_disk('0cc175b9c0f1b6a831c399e269772661', cachedir)
        ds = DiskCacheSlot('0cc175b9c0f1b6a831c399e269772661', cachedir)
        if checkexists is None:
            # Don't keep the test entry around unless it existed beforehand.

        # map in all the files in the cache directory, up to max slots.
        # after max slots, try to delete the excess blocks.
        # this gives the calling process ownership of all the blocks

        blocks = []
        for root, dirs, files in os.walk(cachedir):
            for name in files:
                if not name.endswith(cacheblock_suffix):

                blockpath = os.path.join(root, name)
                res = os.stat(blockpath)

                if len(name) == (32+len(cacheblock_suffix)) and not name.startswith("tmp"):
                    blocks.append((name[0:32], res.st_atime))
                elif name.startswith("tmp") and ((time.time() - res.st_mtime) > 60):
                    # found a temporary file more than 1 minute old,
                    # try to delete it.

        # sort by access time (atime), going from most recently
        # accessed (highest timestamp) to least recently accessed
        # (lowest timestamp).
        blocks.sort(key=lambda x: x[1], reverse=True)

        # Map in all the files we found, up to maxslots, if we exceed
        # maxslots, start throwing things out.
        cachelist = []
        for b in blocks:
            got = DiskCacheSlot.get_from_disk(b[0], cachedir)
            if got is None:
            if len(cachelist) < maxslots:
                # we found more blocks than maxslots, try to
                # throw it out of the cache.

        return cachelist

Static methods

def cache_usage(cachedir)
Expand source code
def cache_usage(cachedir):
    usage = 0
    for root, dirs, files in os.walk(cachedir):
        for name in files:
            if not name.endswith(cacheblock_suffix):

            blockpath = os.path.join(root, name)
            res = os.stat(blockpath)
            usage += res.st_size
    return usage
def get_from_disk(locator, cachedir)
Expand source code
def get_from_disk(locator, cachedir):
    blockdir = os.path.join(cachedir, locator[0:3])
    final = os.path.join(blockdir, locator) + cacheblock_suffix

        filehandle = open(final, "rb")

        # aquire a shared lock, this tells other processes that
        # we're using this block and to please not delete it.
        fcntl.flock(filehandle, fcntl.LOCK_SH)

        content = mmap.mmap(filehandle.fileno(), 0, access=mmap.ACCESS_READ)
        dc = DiskCacheSlot(locator, cachedir)
        dc.filehandle = filehandle
        dc.content = content
        return dc
    except FileNotFoundError:
    except Exception as e:

    return None
def init_cache(cachedir, maxslots)
Expand source code
def init_cache(cachedir, maxslots):
    # First check the disk cache works at all by creating a 1 byte cache entry
    checkexists = DiskCacheSlot.get_from_disk('0cc175b9c0f1b6a831c399e269772661', cachedir)
    ds = DiskCacheSlot('0cc175b9c0f1b6a831c399e269772661', cachedir)
    if checkexists is None:
        # Don't keep the test entry around unless it existed beforehand.

    # map in all the files in the cache directory, up to max slots.
    # after max slots, try to delete the excess blocks.
    # this gives the calling process ownership of all the blocks

    blocks = []
    for root, dirs, files in os.walk(cachedir):
        for name in files:
            if not name.endswith(cacheblock_suffix):

            blockpath = os.path.join(root, name)
            res = os.stat(blockpath)

            if len(name) == (32+len(cacheblock_suffix)) and not name.startswith("tmp"):
                blocks.append((name[0:32], res.st_atime))
            elif name.startswith("tmp") and ((time.time() - res.st_mtime) > 60):
                # found a temporary file more than 1 minute old,
                # try to delete it.

    # sort by access time (atime), going from most recently
    # accessed (highest timestamp) to least recently accessed
    # (lowest timestamp).
    blocks.sort(key=lambda x: x[1], reverse=True)

    # Map in all the files we found, up to maxslots, if we exceed
    # maxslots, start throwing things out.
    cachelist = []
    for b in blocks:
        got = DiskCacheSlot.get_from_disk(b[0], cachedir)
        if got is None:
        if len(cachelist) < maxslots:
            # we found more blocks than maxslots, try to
            # throw it out of the cache.

    return cachelist

Instance variables

var cachedir

Return an attribute of instance, which is of type owner.

var content

Return an attribute of instance, which is of type owner.

var filehandle

Return an attribute of instance, which is of type owner.

var linger

Return an attribute of instance, which is of type owner.

var locator

Return an attribute of instance, which is of type owner.

var ready

Return an attribute of instance, which is of type owner.


def evict(self)
Expand source code
def evict(self):
    if self.content is not None and len(self.content) > 0:
        # The mmap region might be in use when we decided to evict
        # it.  This can happen if the cache is too small.
        # If we call close() now, it'll throw an error if
        # something tries to access it.
        # However, we don't need to explicitly call mmap.close()
        # I confirmed in mmapmodule.c that that both close
        # and deallocate do the same thing:
        # a) close the file descriptor
        # b) unmap the memory range
        # So we can forget it in the cache and delete the file on
        # disk, and it will tear it down after any other
        # lingering Python references to the mapped memory are
        # gone.

        blockdir = os.path.join(self.cachedir, self.locator[0:3])
        final = os.path.join(blockdir, self.locator) + cacheblock_suffix
            fcntl.flock(self.filehandle, fcntl.LOCK_UN)

            # try to get an exclusive lock, this ensures other
            # processes are not using the block.  It is
            # nonblocking and will throw an exception if we
            # can't get it, which is fine because that means
            # we just won't try to delete it.
            # I should note here, the file locking is not
            # strictly necessary, we could just remove it and
            # the kernel would ensure that the underlying
            # inode remains available as long as other
            # processes still have the file open.  However, if
            # you have multiple processes sharing the cache
            # and deleting each other's files, you'll end up
            # with a bunch of ghost files that don't show up
            # in the file system but are still taking up
            # space, which isn't particularly user friendly.
            # The locking strategy ensures that cache blocks
            # in use remain visible.
            fcntl.flock(self.filehandle, fcntl.LOCK_EX | fcntl.LOCK_NB)

            return True
        except OSError:
            self.filehandle = None
            self.linger = weakref.ref(self.content)
            self.content = None
    return False
def get(self)
Expand source code
def get(self):
    return self.content
def gone(self)
Expand source code
def gone(self):
    # Test if an evicted object is lingering
    return self.content is None and (self.linger is None or self.linger() is None)
def set(self, value)
Expand source code
def set(self, value):
    tmpfile = None
        if value is None:
            self.content = None

        if len(value) == 0:
            # Can't mmap a 0 length file
            self.content = b''

        if self.content is not None:
            # Has been set already

        blockdir = os.path.join(self.cachedir, self.locator[0:3])
        os.makedirs(blockdir, mode=0o700, exist_ok=True)

        final = os.path.join(blockdir, self.locator) + cacheblock_suffix

        self.filehandle = tempfile.NamedTemporaryFile(dir=blockdir, delete=False, prefix="tmp", suffix=cacheblock_suffix)
        tmpfile =
        os.chmod(tmpfile, stat.S_IRUSR | stat.S_IWUSR)

        # aquire a shared lock, this tells other processes that
        # we're using this block and to please not delete it.
        fcntl.flock(self.filehandle, fcntl.LOCK_SH)

        os.rename(tmpfile, final)
        tmpfile = None

        self.content = mmap.mmap(self.filehandle.fileno(), 0, access=mmap.ACCESS_READ)
        # only set the event when mmap is successful
        if tmpfile is not None:
            # If the tempfile hasn't been renamed on disk yet, try to delete it.
def size(self)
Expand source code
def size(self):
    if self.content is None:
        if self.linger is not None:
            # If it is still lingering (object is still accessible
            # through the weak reference) it is still taking up
            # space.
            content = self.linger()
            if content is not None:
                return len(content)
        return 0
        return len(self.content)